[UP] v 0.1.1

This commit is contained in:
Evgeny (Krymmy) Momotov 2025-11-17 09:35:46 +03:00
parent d375911150
commit ee9d6e0255
3 changed files with 121 additions and 56 deletions

View file

@ -1,6 +1,6 @@
[project]
name = "yandexapimanager"
version = "0.1.0"
version = "0.1.1"
description = ""
authors = [
{name = "Evgeny (Krymmy) Momotov",email = "evgeny.momotov@gmail.com"}

View file

@ -48,141 +48,206 @@ class YandexManagerClient:
self.token = access_response.token
return access_response
async def create_call(self, data: CallIn) -> Call:
response = await self._request('POST', f'/calls/', data=data.model_dump())
async def create_call(self, data: CallIn) -> Call | None:
response = await self._request('POST', '/calls/', data=data.model_dump())
if not response:
return None
return Call(**response)
async def list_yandex_fleets(self, **params):
response = await self._request('GET', f'/yandex_fleet/', params=params)
async def list_yandex_fleets(self, **params) -> list[YandexFleet] | None:
response = await self._request('GET', '/yandex_fleet/', params=params)
if not response:
return []
return [YandexFleet(**fleet) for fleet in response]
async def get_yandex_fleet(self, fleet_id: str):
async def get_yandex_fleet(self, fleet_id: str) -> YandexFleet | None:
response = await self._request('GET', f'/yandex_fleet/{fleet_id}')
if not response:
return None
return YandexFleet(**response)
async def get_yandex_fleet_api_key(self, fleet_id: str):
async def get_yandex_fleet_api_key(self, fleet_id: str) -> YandexFleetApiKey | None:
response = await self._request('GET', f'/yandex_fleet/{fleet_id}/api_key')
if not response:
return None
return YandexFleetApiKey(**response)
async def list_drivers(self, **params):
response = await self._request('GET', f'/yandex_fleet/drivers', params=params)
async def list_drivers(self, **params) -> list[Driver] | None:
response = await self._request('GET', '/yandex_fleet/drivers', params=params)
if not response:
return []
return [Driver(**driver) for driver in response]
async def get_driver(self, driver_id: str):
async def get_driver(self, driver_id: str) -> Driver | None:
response = await self._request('GET', f'/yandex_fleet/drivers/{driver_id}')
if not response:
return None
return Driver(**response)
async def get_driver_profile(self, driver_id: str):
async def get_driver_profile(self, driver_id: str) -> DriverProfile | None:
response = await self._request('GET', f'/yandex_fleet/drivers/{driver_id}/profile')
if not response:
return None
return DriverProfile(**response)
async def list_driver_groups(self, **params):
response = await self._request('GET', f'/yandex_fleet/drivers_groups', params=params)
async def list_driver_groups(self, **params) -> list[DriversGroup] | None:
response = await self._request('GET', '/yandex_fleet/drivers_groups', params=params)
if not response:
return None
return [DriversGroup(**group) for group in response]
async def get_driver_group(self, group_id: str):
async def get_driver_group(self, group_id: str) -> DriversGroup | None:
response = await self._request('GET', f'/yandex_fleet/drivers_groups/{group_id}')
if not response:
return None
return DriversGroup(**response)
async def _inspection_rout(self, method: str, endpoint: str, **kwargs):
async def _inspection_rout(self, method: str, endpoint: str, **kwargs) -> dict | list | None:
response = await self._request(method, f'/inspections/{endpoint.strip("/")}', **kwargs)
if not response:
return None
return response
async def list_inspection_worktime(self, **params):
async def list_inspection_worktime(self, **params) -> list[InspectionWorkTime] | None:
response = await self._inspection_rout('GET', 'worktime', params=params)
if not response:
return []
return [InspectionWorkTime(**worktime) for worktime in response]
async def get_inspection_worktime(self, worktime_id: str):
async def get_inspection_worktime(self, worktime_id: str) -> InspectionWorkTime | None:
response = await self._inspection_rout('GET', f'worktime/{worktime_id}')
if not response:
return None
return InspectionWorkTime(**response)
async def list_actions(self, **params):
async def list_actions(self, **params) -> list[Action] | None:
response = await self._inspection_rout('GET', 'actions', params=params)
if not response:
return []
return [Action(**action) for action in response]
async def get_action(self, action_id: str):
async def get_action(self, action_id: str) -> Action | None:
response = await self._inspection_rout('GET', f'actions/{action_id}')
if not response:
return None
return Action(**response)
async def get_check(self, check_id: str):
async def get_check(self, check_id: str) -> Check | None:
response = await self._inspection_rout('GET', f'checks/{check_id}')
if not response:
return None
return Check(**response)
async def list_conditions(self, **params):
async def list_conditions(self, **params) -> list[Condition] | None:
response = await self._inspection_rout('GET', 'conditions', params=params)
if not response:
return []
return [Condition(**condition) for condition in response]
async def get_condition(self, condition_id: str):
async def get_condition(self, condition_id: str) -> Condition | None:
response = await self._inspection_rout('GET', f'conditions/{condition_id}')
if not response:
return None
return Condition(**response)
async def list_expressions(self, **params):
async def list_expressions(self, **params) -> list[ConditionalExpression] | None:
response = await self._inspection_rout('GET', 'expressions', params=params)
if not response:
return []
return [ConditionalExpression(**expression) for expression in response]
async def get_expression(self, expression_id: str):
async def get_expression(self, expression_id: str) -> ConditionalExpression | None:
response = await self._inspection_rout('GET', f'expressions/{expression_id}')
if not response:
return None
return ConditionalExpression(**response)
async def list_yandex_fleet_inspections(self, **params):
async def list_yandex_fleet_inspections(self, **params) -> list[YandexFleetInspection] | None:
response = await self._inspection_rout('GET', 'fleets', params=params)
if not response:
return []
return [YandexFleetInspection(**inspection) for inspection in response]
async def get_yandex_fleet_inspection(self, inspection_id: str):
async def get_yandex_fleet_inspection(self, inspection_id: str) -> YandexFleetInspection | None:
response = await self._inspection_rout('GET', f'fleets/{inspection_id}')
if not response:
return None
return YandexFleetInspection(**response)
async def list_fleet_checks(self, **params):
async def list_fleet_checks(self, **params) -> list[YandexFleetInspectionCheeks] | None:
response = await self._inspection_rout('GET', 'fleet_checks', params=params)
if not response:
return []
return [YandexFleetInspectionCheeks(**check) for check in response]
async def get_fleet_check(self, check_id: str):
async def get_fleet_check(self, check_id: str) -> YandexFleetInspectionCheeks | None:
response = await self._inspection_rout('GET', f'fleet_checks/{check_id}')
if not response:
return None
return YandexFleetInspectionCheeks(**response)
async def list_group_inspections(self, **params):
response = await self._inspection_rout('GET', f'groups/', params=params)
async def list_group_inspections(self, **params) -> list[GroupInspection] | None:
response = await self._inspection_rout('GET', 'groups/', params=params)
if not response:
return []
return [GroupInspection(**inspection) for inspection in response]
async def get_group_inspection(self, group_id: str):
response = await self._inspection_rout('GET', f'groups/{item_id}')
async def get_group_inspection(self, group_id: str) -> GroupInspection | None:
response = await self._inspection_rout('GET', f'groups/{group_id}')
if not response:
return None
return GroupInspection(**response)
async def list_group_checks(self, **params):
response = await self._inspection_rout('GET', f'group_checks/', params=params)
async def list_group_checks(self, **params) -> list[GroupInspectionCheek] | None:
response = await self._inspection_rout('GET', 'group_checks/', params=params)
if not response:
return []
return [GroupInspectionCheek(**check) for check in response]
async def get_group_check(self, check_id: str):
async def get_group_check(self, check_id: str) -> GroupInspectionCheek | None:
response = await self._inspection_rout('GET', f'group_checks/{check_id}')
if not response:
return None
return GroupInspectionCheek(**response)
async def list_driver_inspections(self, **params):
response = await self._inspection_rout('GET', f'drivers/', params=params)
async def list_driver_inspections(self, **params) -> list[DriverInspection] | None:
response = await self._inspection_rout('GET', 'drivers/', params=params)
if not response:
return []
return [DriverInspection(**inspection) for inspection in response]
async def get_driver_inspection(self, driver_id: str):
async def get_driver_inspection(self, driver_id: str) -> DriverInspection | None:
response = await self._inspection_rout('GET', f'drivers/{driver_id}')
if not response:
return None
return DriverInspection(**response)
async def list_driver_checks(self, **params):
response = await self._inspection_rout('GET', f'driver_checks/', params=params)
async def list_driver_checks(self, **params) -> list[DriverInspectionCheek] | None:
response = await self._inspection_rout('GET', 'driver_checks/', params=params)
if not response:
return []
return [DriverInspectionCheek(**check) for check in response]
async def get_driver_check(self, check_id: str):
async def get_driver_check(self, check_id: str) -> DriverInspectionCheek | None:
response = await self._inspection_rout('GET', f'driver_checks/{check_id}')
if not response:
return None
return DriverInspectionCheek(**response)
async def update_driver_check_time(self, check_id: str):
response = await self._inspection_rout('patch', f'driver_checks/{check_id}/last_check')
async def update_driver_check_time(self, check_id: str) -> DriverInspectionCheek | None:
response = await self._inspection_rout('PATCH', f'driver_checks/{check_id}/last_check')
if not response:
return None
return DriverInspectionCheek(**response)
async def update_group_check_time(self, check_id: str):
response = await self._inspection_rout('patch', f'group_checks/{check_id}/last_check')
async def update_group_check_time(self, check_id: str) -> GroupInspectionCheek | None:
response = await self._inspection_rout('PATCH', f'group_checks/{check_id}/last_check')
if not response:
return None
return GroupInspectionCheek(**response)
async def update_fleet_check_time(self, check_id: str):
response = await self._inspection_rout('patch', f'fleet_checks/{check_id}/last_check')
return YandexFleetInspectionCheeks(**response)
async def update_fleet_check_time(self, check_id: str) -> YandexFleetInspectionCheeks | None:
response = await self._inspection_rout('PATCH', f'fleet_checks/{check_id}/last_check')
if not response:
return None
return YandexFleetInspectionCheeks(**response)

View file

@ -179,8 +179,8 @@ class InspectionWorkTime(BasicModel, UUIDModel):
class CallDriver(BasicModel, UUIDModel):
owner: User = Field(description="Владелец (пользователь)")
prompt: CallPrompt = Field(description="Промпт для звонка")
owner: Optional[User] = Field(default=None, description="Владелец (пользователь)")
prompt: Optional[CallPrompt] = Field(default=None, description="Промпт для звонка")
class Config:
extra = "ignore"
@ -251,7 +251,7 @@ class Check(BasicModel, UUIDModel):
class BaseInspectionModel(UUIDModel):
owner: Optional[User] = Field(..., description="Владелец проверки", default=None)
name : str = Field(..., description="Название проверки")
work_time: InspectionWorkTime = Field(..., description="Рабочее время")
work_time: Optional[InspectionWorkTime] = Field(None, description="Рабочее время")
class BaseInspectionsCheckModel(UUIDModel):