409 lines
16 KiB
Python
409 lines
16 KiB
Python
import aiohttp
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List, Union, Dict, Any
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import uuid
|
|
|
|
# Enums
|
|
class DriverStatus(str, Enum):
|
|
FREE = "free"
|
|
IN_SYSTEM_RESERVE = "in_system_reserve"
|
|
ON_ORDER = "on_ordrer"
|
|
OFFLINE = "offline"
|
|
|
|
class OrderStatus(str, Enum):
|
|
NEW = "new"
|
|
SEARCHING_TAXI = "searching_taxi"
|
|
IN_PROCESS = "in_process"
|
|
COMPLETED = "completed"
|
|
CUSTOMER_CANCELED = "customer_canceled"
|
|
DRIVER_CANCELED = "driver_canceled"
|
|
|
|
class PromptFor(str, Enum):
|
|
TAXI = "TAXI"
|
|
CLIENT = "CLIENT"
|
|
|
|
# Models
|
|
class ApiKeyOut(BaseModel):
|
|
api_key: str
|
|
|
|
class CarIn(BaseModel):
|
|
mark: str = Field(max_length=255)
|
|
model: str = Field(max_length=255)
|
|
color: str = Field(max_length=255)
|
|
number: str = Field(max_length=255)
|
|
|
|
class CarOut(BaseModel):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
mark: str = Field(max_length=255)
|
|
model: str = Field(max_length=255)
|
|
color: str = Field(max_length=255)
|
|
number: str = Field(max_length=255)
|
|
driver: Optional['DriverOut'] = None
|
|
|
|
class ClientIn(BaseModel):
|
|
name: Optional[str] = Field(default=None, max_length=255)
|
|
lastname: Optional[str] = Field(default=None, max_length=255)
|
|
surname: Optional[str] = Field(default=None, max_length=255)
|
|
phone: Optional[str] = Field(default=None, max_length=20)
|
|
|
|
class ClientOut(BaseModel):
|
|
id: int
|
|
name: Optional[str] = Field(default=None, max_length=255)
|
|
lastname: Optional[str] = Field(default=None, max_length=255)
|
|
surname: Optional[str] = Field(default=None, max_length=255)
|
|
phone: Optional[str] = Field(default=None, max_length=20)
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
orders: Optional[List['OrderOut']] = None
|
|
|
|
class DriverAddressUpdate(BaseModel):
|
|
address: str
|
|
|
|
class DriverIn(BaseModel):
|
|
name: Optional[str] = Field(default=None, max_length=255)
|
|
lastname: Optional[str] = Field(default=None, max_length=255)
|
|
surname: Optional[str] = Field(default=None, max_length=255)
|
|
phone: Optional[str] = Field(default=None, max_length=20)
|
|
status: Optional[DriverStatus] = Field(default=DriverStatus.OFFLINE)
|
|
address: Optional[str] = Field(default=None, max_length=255)
|
|
latitude: Optional[Union[float, str]] = None
|
|
longitude: Optional[Union[float, str]] = None
|
|
car_id: Optional[int] = None
|
|
|
|
class DriverOut(BaseModel):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
name: Optional[str] = Field(default=None, max_length=255)
|
|
lastname: Optional[str] = Field(default=None, max_length=255)
|
|
surname: Optional[str] = Field(default=None, max_length=255)
|
|
phone: Optional[str] = Field(default=None, max_length=20)
|
|
status: Optional[DriverStatus] = Field(default=DriverStatus.OFFLINE)
|
|
address: Optional[str] = Field(default=None, max_length=255)
|
|
latitude: Optional[str] = None
|
|
longitude: Optional[str] = None
|
|
car: Optional['CarOut'] = None
|
|
orders: Optional[List['OrderOut']] = None
|
|
|
|
class Geocode(BaseModel):
|
|
longitude: float
|
|
latitude: float
|
|
|
|
class GlobalSettingsOut(BaseModel):
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
id: int = 1
|
|
nearby_radius: Optional[int] = Field(default=3000)
|
|
|
|
class GlobalSettingsUpdate(BaseModel):
|
|
nearby_radius: Optional[int] = None
|
|
nominatium_id: Optional[int] = None
|
|
openstreetmap_id: Optional[int] = None
|
|
|
|
class NominatiumSettingsOut(BaseModel):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
url: Optional[str] = Field(default=None, max_length=255)
|
|
token: Optional[str] = Field(default=None, max_length=255)
|
|
|
|
class OpenStreetMapSettingsOut(BaseModel):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
url: Optional[str] = Field(default=None, max_length=255)
|
|
token: Optional[str] = Field(default=None, max_length=255)
|
|
|
|
class OrderIn(BaseModel):
|
|
status: Optional[OrderStatus] = Field(default=OrderStatus.NEW)
|
|
from_address: Optional[str] = Field(default=None, max_length=255)
|
|
from_latitude: Optional[Union[float, str]] = None
|
|
from_longitude: Optional[Union[float, str]] = None
|
|
to_address: Optional[str] = Field(default=None, max_length=255)
|
|
to_latitude: Optional[Union[float, str]] = None
|
|
to_longitude: Optional[Union[float, str]] = None
|
|
client_id: int
|
|
driver_id: Optional[int] = None
|
|
|
|
class OrderInUpdate(BaseModel):
|
|
status: Optional[OrderStatus] = Field(default=OrderStatus.NEW)
|
|
from_address: Optional[str] = Field(default=None, max_length=255)
|
|
from_latitude: Optional[Union[float, str]] = None
|
|
from_longitude: Optional[Union[float, str]] = None
|
|
to_address: Optional[str] = Field(default=None, max_length=255)
|
|
to_latitude: Optional[Union[float, str]] = None
|
|
to_longitude: Optional[Union[float, str]] = None
|
|
client_id: Optional[int] = None
|
|
driver_id: Optional[int] = None
|
|
|
|
class OrderOut(BaseModel):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
status: Optional[OrderStatus] = Field(default=OrderStatus.NEW)
|
|
client: Optional['ClientOut'] = None
|
|
driver: Optional['DriverOut'] = None
|
|
from_address: Optional[str] = Field(default=None, max_length=255)
|
|
from_latitude: Optional[str] = None
|
|
from_longitude: Optional[str] = None
|
|
to_address: Optional[str] = Field(default=None, max_length=255)
|
|
to_latitude: Optional[str] = None
|
|
to_longitude: Optional[str] = None
|
|
|
|
class PromptIn(BaseModel):
|
|
name: str = Field(max_length=255)
|
|
content: str
|
|
prompt_for: Optional[PromptFor] = Field(default=PromptFor.TAXI)
|
|
|
|
class PromptOut(PromptIn):
|
|
id: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class AddressRequest(BaseModel):
|
|
address_from: str
|
|
address_to: str
|
|
|
|
class DistanceRequest(BaseModel):
|
|
coord_from: Geocode
|
|
coord_to: Geocode
|
|
|
|
class ValidationError(BaseModel):
|
|
loc: List[Union[str, int]]
|
|
msg: str
|
|
type: str
|
|
|
|
class HTTPValidationError(BaseModel):
|
|
detail: List[ValidationError]
|
|
|
|
# Resolve forward references
|
|
CarOut.update_forward_refs()
|
|
ClientOut.update_forward_refs()
|
|
DriverOut.update_forward_refs()
|
|
OrderOut.update_forward_refs()
|
|
|
|
# Client
|
|
class TaxisClient:
|
|
def __init__(self, base_url: str, api_key: Optional[str] = None):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_key = api_key
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
|
|
async def _request(self, method: str, endpoint: str, params: Dict[str, Any] = None, json_data: Dict[str, Any] = None) -> Any:
|
|
url = f"{self.base_url}/{endpoint.strip('/')}"
|
|
params = params or {}
|
|
json_data = json_data or {}
|
|
headers={
|
|
"Authorization": f"Bearer {self.api_key}" if self.api_key else None
|
|
}
|
|
async with aiohttp.ClientSession(headers=headers) as session:
|
|
async with session.request(method, url, params=params, json=json_data) as resp:
|
|
if resp.status == 401:
|
|
raise Exception("Unauthorized")
|
|
resp.raise_for_status()
|
|
|
|
if resp.content_type == "application/json":
|
|
return await resp.json()
|
|
return await resp.text()
|
|
|
|
async def login(self, username: str, password: str, grant_type: str = "password",
|
|
scope: str = "", client_id: Optional[str] = None,
|
|
client_secret: Optional[str] = None) -> 'ApiKeyOut':
|
|
data = {
|
|
"grant_type": grant_type,
|
|
"username": username,
|
|
"password": password,
|
|
"scope": scope,
|
|
"client_id": client_id,
|
|
"client_secret": client_secret
|
|
}
|
|
result = await self._request("POST", "/auth/login", json_data=data)
|
|
self.api_key = result["api_key"]
|
|
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
|
|
return ApiKeyOut(**result)
|
|
|
|
# Prompts
|
|
async def list_prompts(self, **params) -> List['PromptOut']:
|
|
result = await self._request("GET", "/prompts/", params=params)
|
|
return [PromptOut(**item) for item in result]
|
|
|
|
async def create_prompt(self, prompt: 'PromptIn') -> 'PromptOut':
|
|
result = await self._request("POST", "/prompts/", json_data=prompt.dict())
|
|
return PromptOut(**result)
|
|
|
|
async def get_prompt(self, prompt_id: int, **params) -> 'PromptOut':
|
|
result = await self._request("GET", f"/prompts/{prompt_id}", params=params)
|
|
return PromptOut(**result)
|
|
|
|
async def update_prompt(self, prompt_id: int, prompt: 'PromptIn') -> 'PromptOut':
|
|
result = await self._request("PUT", f"/prompts/{prompt_id}", json_data=prompt.dict())
|
|
return PromptOut(**result)
|
|
|
|
async def delete_prompt(self, prompt_id: int) -> None:
|
|
await self._request("DELETE", f"/prompts/{prompt_id}")
|
|
|
|
# Cars
|
|
async def list_cars(self, **params) -> List['CarOut']:
|
|
result = await self._request("GET", "/cars/", params=params)
|
|
return [CarOut(**item) for item in result]
|
|
|
|
async def create_car(self, car: 'CarIn') -> 'CarOut':
|
|
result = await self._request("POST", "/cars/", json_data=car.dict())
|
|
return CarOut(**result)
|
|
|
|
async def get_car(self, car_id: int, **params) -> 'CarOut':
|
|
result = await self._request("GET", f"/cars/{car_id}", params=params)
|
|
return CarOut(**result)
|
|
|
|
async def update_car(self, car_id: int, car: 'CarIn') -> 'CarOut':
|
|
result = await self._request("PUT", f"/cars/{car_id}", json_data=car.dict())
|
|
return CarOut(**result)
|
|
|
|
async def delete_car(self, car_id: int) -> None:
|
|
await self._request("DELETE", f"/cars/{car_id}")
|
|
|
|
# Drivers
|
|
async def list_drivers(self, **params) -> List['DriverOut']:
|
|
result = await self._request("GET", "/drivers/", params=params)
|
|
return [DriverOut(**item) for item in result]
|
|
|
|
async def create_driver(self, driver: 'DriverIn') -> 'DriverOut':
|
|
result = await self._request("POST", "/drivers/", json_data=driver.dict())
|
|
return DriverOut(**result)
|
|
|
|
async def get_driver(self, driver_id: int, **params) -> 'DriverOut':
|
|
result = await self._request("GET", f"/drivers/{driver_id}", params=params)
|
|
return DriverOut(**result)
|
|
|
|
async def get_driver_by_phone(self, phone: str, **params) -> 'DriverOut':
|
|
result = await self._request("GET", f"/drivers/phone/{phone}", params=params)
|
|
return DriverOut(**result)
|
|
|
|
async def update_driver(self, driver_id: int, driver: 'DriverIn') -> 'DriverOut':
|
|
result = await self._request("PUT", f"/drivers/{driver_id}", json_data=driver.dict())
|
|
return DriverOut(**result)
|
|
|
|
async def delete_driver(self, driver_id: int) -> None:
|
|
await self._request("DELETE", f"/drivers/{driver_id}")
|
|
|
|
async def update_driver_address(self, driver_id: int, address: 'DriverAddressUpdate') -> 'DriverOut':
|
|
result = await self._request("PUT", f"/drivers/{driver_id}/address", json_data=address.dict())
|
|
return DriverOut(**result)
|
|
|
|
async def update_driver_status(self, driver_id: int, status: 'DriverStatus') -> 'DriverOut':
|
|
result = await self._request("PATCH", f"/drivers/{driver_id}/status", params={"status": status.value})
|
|
return DriverOut(**result)
|
|
|
|
async def get_nearest_driver(self, lat: float, lon: float, exclude_ids: Optional[List[int]] = None, **params) -> 'DriverOut':
|
|
params = params.copy()
|
|
params.update({"lat": lat, "lon": lon})
|
|
if exclude_ids:
|
|
params["exclude_ids"] = exclude_ids
|
|
result = await self._request("GET", "/drivers/nearest", params=params)
|
|
return DriverOut(**result)
|
|
|
|
async def get_nearby_drivers(self, lat: float, lon: float, limit: int = 5, exclude_ids: Optional[List[int]] = None, **params) -> List['DriverOut']:
|
|
params = params.copy()
|
|
params.update({"lat": lat, "lon": lon, "limit": limit})
|
|
if exclude_ids:
|
|
params["exclude_ids"] = exclude_ids
|
|
result = await self._request("GET", "/drivers/nearest_list", params=params)
|
|
return [DriverOut(**item) for item in result]
|
|
|
|
# Clients
|
|
async def list_clients(self, **params) -> List['ClientOut']:
|
|
result = await self._request("GET", "/clients/", params=params)
|
|
return [ClientOut(**item) for item in result]
|
|
|
|
async def create_client(self, client: 'ClientIn') -> 'ClientOut':
|
|
result = await self._request("POST", "/clients/", json_data=client.dict())
|
|
return ClientOut(**result)
|
|
|
|
async def get_client(self, client_id: int, **params) -> 'ClientOut':
|
|
result = await self._request("GET", f"/clients/{client_id}", params=params)
|
|
return ClientOut(**result)
|
|
|
|
async def get_client_by_phone(self, phone: str, **params) -> 'ClientOut':
|
|
result = await self._request("GET", f"/clients/phone/{phone}", params=params)
|
|
return ClientOut(**result)
|
|
|
|
async def update_client(self, client_id: int, client: 'ClientIn') -> 'ClientOut':
|
|
result = await self._request("PUT", f"/clients/{client_id}", json_data=client.dict())
|
|
return ClientOut(**result)
|
|
|
|
async def delete_client(self, client_id: int) -> None:
|
|
await self._request("DELETE", f"/clients/{client_id}")
|
|
|
|
# Orders
|
|
async def list_orders(self, **params) -> List['OrderOut']:
|
|
result = await self._request("GET", "/orders/", params=params)
|
|
return [OrderOut(**item) for item in result]
|
|
|
|
async def create_order(self, order: 'OrderIn') -> 'OrderOut':
|
|
result = await self._request("POST", "/orders/", json_data=order.dict())
|
|
return OrderOut(**result)
|
|
|
|
async def get_order(self, order_id: int, **params) -> 'OrderOut':
|
|
result = await self._request("GET", f"/orders/{order_id}", params=params)
|
|
return OrderOut(**result)
|
|
|
|
async def update_order(self, order_id: int, order: 'OrderInUpdate') -> 'OrderOut':
|
|
result = await self._request("PUT", f"/orders/{order_id}", json_data=order.dict())
|
|
return OrderOut(**result)
|
|
|
|
async def delete_order(self, order_id: int) -> None:
|
|
await self._request("DELETE", f"/orders/{order_id}")
|
|
|
|
async def update_order_status(self, order_id: int, status: 'OrderStatus') -> 'OrderOut':
|
|
result = await self._request("PATCH", f"/orders/{order_id}/status", params={"status": status.value})
|
|
return OrderOut(**result)
|
|
|
|
async def update_order_driver(self, order_id: int, driver_id: int) -> 'OrderOut':
|
|
result = await self._request("PATCH", f"/orders/{order_id}/driver", params={"driver_id": driver_id})
|
|
return OrderOut(**result)
|
|
|
|
async def delete_order_driver(self, order_id: int) -> 'OrderOut':
|
|
result = await self._request("DELETE", f"/orders/{order_id}/driver")
|
|
return OrderOut(**result)
|
|
|
|
# Settings
|
|
async def get_global_settings(self, **params) -> 'GlobalSettingsOut':
|
|
result = await self._request("GET", "/settings/global", params=params)
|
|
return GlobalSettingsOut(**result)
|
|
|
|
async def update_global_settings(self, settings: 'GlobalSettingsUpdate') -> 'GlobalSettingsOut':
|
|
result = await self._request("PUT", "/settings/global", json_data=settings.dict())
|
|
return GlobalSettingsOut(**result)
|
|
|
|
async def get_nominatium_settings(self, nominatium_id: int, **params) -> 'NominatiumSettingsOut':
|
|
result = await self._request("GET", f"/settings/nominatium/{nominatium_id}", params=params)
|
|
return NominatiumSettingsOut(**result)
|
|
|
|
async def get_openstreetmap_settings(self, openstreetmap_id: int, **params) -> 'OpenStreetMapSettingsOut':
|
|
result = await self._request("GET", f"/settings/openstreetmap/{openstreetmap_id}", params=params)
|
|
return OpenStreetMapSettingsOut(**result)
|
|
|
|
# Union
|
|
async def geocode_address(self, address: str, **params) -> 'Geocode':
|
|
params = params.copy()
|
|
params.update({"address": address})
|
|
result = await self._request("GET", "/union/geocode", params=params)
|
|
return Geocode(**result)
|
|
|
|
async def get_distance(self, request: 'DistanceRequest') -> float:
|
|
result = await self._request("POST", "/union/distance", json_data=request.dict())
|
|
return float(result)
|
|
|
|
async def get_distance_by_address(self, request: 'AddressRequest') -> float:
|
|
result = await self._request("POST", "/union/distance_by_address", json_data=request.dict())
|
|
return float(result)
|