Compare commits
3 commits
a9f5ba24fd
...
9b8a0f611c
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b8a0f611c | |||
| 6430bee645 | |||
| e2f7a9925f |
3 changed files with 192 additions and 1 deletions
|
|
@ -6,7 +6,7 @@ authors = [
|
|||
{name = "Evgeny (Krymmy) Momotov",email = "evgeny.momotov@gmail.com"}
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = [
|
||||
"pydantic (==2.11.7)",
|
||||
"aiohttp (==3.12.15)"
|
||||
|
|
|
|||
60
src/AsteriskAPIManager/api_manager.py
Normal file
60
src/AsteriskAPIManager/api_manager.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
import uuid
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
|
||||
import aiohttp
|
||||
|
||||
from data_models import *
|
||||
|
||||
|
||||
class AsteriskManagerClient:
|
||||
def __init__(self, base_url: str, auth_key: Optional[str] = None, token: Optional[str] = None):
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.token: Optional[str] = None
|
||||
if token:
|
||||
self.token = token
|
||||
|
||||
if auth_key and not token:
|
||||
self.auth_key = auth_key
|
||||
else:
|
||||
self.auth_key = None
|
||||
|
||||
if not self.token and not self.auth_key:
|
||||
raise ValueError("Either token or auth_key must be provided")
|
||||
|
||||
|
||||
async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
|
||||
headers = kwargs.get('headers', {})
|
||||
if self.token:
|
||||
headers['Authorization'] = f'Bearer {self.token}'
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.request(method, f"{self.base_url}{endpoint}", **kwargs) as response:
|
||||
if response.status >= 400:
|
||||
error_data = await response.json()
|
||||
raise Exception(f"API Error {response.status}: {error_data}")
|
||||
return await response.json()
|
||||
|
||||
|
||||
async def register_manager(self, auth_key: Optional[str] = None) -> AccessResponse:
|
||||
if auth_key is None and self.auth_key is None:
|
||||
raise ValueError("Auth key is required for registration")
|
||||
auth_key = auth_key or self.auth_key
|
||||
data = Registration(auth_key=auth_key).dict()
|
||||
response = await self._request('POST', '/asterisk_manager/register', json=data)
|
||||
access_response = AccessResponse(**response)
|
||||
self.token = access_response.token
|
||||
return access_response
|
||||
|
||||
|
||||
async def get_sip_accounts(self, **params) -> List[SIPAccount]:
|
||||
response = await self._request('GET', '/sip/', params=params)
|
||||
return [SIPAccount(**account) for account in response]
|
||||
|
||||
async def get_sip_account(self, account_id: int) -> SIPAccount:
|
||||
response = await self._request('GET', f'/sip/{account_id}')
|
||||
return SIPAccount(**response)
|
||||
|
||||
async def set_sip_register_status(self, account_id: int, is_registered: bool):
|
||||
data = {"is_registered": is_registered}
|
||||
response = await self._request('PATCH', f'/sip/{account_id}', json=data)
|
||||
return SIPAccount(**response)
|
||||
131
src/AsteriskAPIManager/data_models.py
Normal file
131
src/AsteriskAPIManager/data_models.py
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
import re
|
||||
from pydantic import BaseModel, Field, validator
|
||||
from typing import Optional
|
||||
|
||||
class User(BaseModel):
|
||||
id: Optional[int] = Field(description="Идентификатор (автоматический)", default=None)
|
||||
username: str = Field(max_length=150, description="Пользовательское имя", example="john_doe")
|
||||
#password: str = Field(max_length=255, description="Пароль", example="secret123")
|
||||
email: str = Field(max_length=254, default="", description="Электронная почта", example="john@example.com")
|
||||
first_name: str = Field(max_length=150, default="", description="Имя", example="John")
|
||||
last_name: str = Field(max_length=150, default="", description="Фамилия", example="Doe")
|
||||
#is_staff: bool = Field(default=False, description="Статус администратора")
|
||||
#is_active: bool = Field(default=False, description="Активен")
|
||||
#is_superuser: bool = Field(default=False, description="Суперпользователь")
|
||||
#date_joined: str = Field(description="Дата создания", example="2024-01-01T12:00:00")
|
||||
#last_login: Optional[str] = Field(null=True, default=None, description="Последний вход")
|
||||
|
||||
@validator("email")
|
||||
def validate_email(cls, v):
|
||||
if v and "@" not in v or "." not in v:
|
||||
raise ValueError("Некорректный email")
|
||||
return v
|
||||
|
||||
@validator("username")
|
||||
def validate_username(cls, v):
|
||||
if len(v) > 150:
|
||||
raise ValueError("Имя пользователя слишком длинное (максимум 150 символов)")
|
||||
return v
|
||||
|
||||
@validator("first_name")
|
||||
def validate_first_name(cls, v):
|
||||
if v and len(v) > 150:
|
||||
raise ValueError("Имя слишком длинное (максимум 150 символов)")
|
||||
return v
|
||||
|
||||
@validator("last_name")
|
||||
def validate_last_name(cls, v):
|
||||
if v and len(v) > 150:
|
||||
raise ValueError("Фамилия слишком длинная (максимум 150 символов)")
|
||||
return v
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
from_attributes = True
|
||||
|
||||
|
||||
|
||||
class SIPAccount(BaseModel):
|
||||
"""
|
||||
SIP-аккаунт.
|
||||
"""
|
||||
unique_id: str = Field(description="Уникальный идентификатор", example="00000000-0000-0000-0000-000000000000")
|
||||
id: Optional[int] = Field(description="Идентификатор (автоматический)", default=None)
|
||||
created_at: Optional[str] = Field(description="Дата создания", example="2024-01-01T12:00:00")
|
||||
updated_at: Optional[str] = Field(description="Дата обновления", example="2024-01-01T12:00:00")
|
||||
|
||||
owner: Optional[User] = Field(description="Владелец")
|
||||
name: str = Field(max_length=255, description="Название", example="SIP-аккаунт-1")
|
||||
is_registered: bool = Field(default=False, description="Зарегистрирован")
|
||||
username: str = Field(max_length=40, description="Имя SIP клиента", example="user123")
|
||||
password: Optional[str] = Field(max_length=80, null=True, blank=True, description="Пароль (открытый, временно)")
|
||||
|
||||
server_url: str = Field(
|
||||
max_length=215,
|
||||
description="URL сервера",
|
||||
example="https://192.168.1.100:5060"
|
||||
)
|
||||
port: Optional[int] = Field(null=True, blank=True, description="Порт", ge=0, le=65535)
|
||||
internal_number: Optional[str] = Field(max_length=255, null=True, blank=True, description="Внутренний номер")
|
||||
endpoint_name: Optional[str] = Field(max_length=255, null=True, blank=True, description="Имя конечной точки")
|
||||
|
||||
@validator("server_url")
|
||||
def validate_server_url(cls, v):
|
||||
if not v:
|
||||
return v
|
||||
# Проверяем, содержит ли строка URL (в формате http:// или https://)
|
||||
if v.startswith("http://") or v.startswith("https://"):
|
||||
return v
|
||||
# Проверяем, является ли это IPv4 (простая проверкаDetails)
|
||||
parts = v.strip().split(':')
|
||||
if len(parts) == 1:
|
||||
ip_part = parts[0]
|
||||
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip_part):
|
||||
# Проверяем, что каждая часть от 0 до 255
|
||||
ip_parts = [int(x) for x in ip_part.split('.')]
|
||||
if all(0 <= part <= 255 for part in ip_parts):
|
||||
return v
|
||||
raise ValueError("Invalid server URL format (must be http(s):// or IPv4 address)")
|
||||
|
||||
@validator("username")
|
||||
def validate_username(cls, v):
|
||||
if len(v) > 40:
|
||||
raise ValueError("Username too long (max 40 characters)")
|
||||
return v
|
||||
|
||||
@validator("name")
|
||||
def validate_name(cls, v):
|
||||
if len(v) > 255:
|
||||
raise ValueError("Name too long (max 255 characters)")
|
||||
return v
|
||||
|
||||
@validator("internal_number")
|
||||
def validate_internal_number(cls, v):
|
||||
if v and len(v) > 255:
|
||||
raise ValueError("Internal number too long (max 255 characters)")
|
||||
return v
|
||||
|
||||
@validator("endpoint_name")
|
||||
def validate_endpoint_name(cls, v):
|
||||
if v and len(v) > 255:
|
||||
raise ValueError("Endpoint name too long (max 255 characters)")
|
||||
return v
|
||||
|
||||
@validator("port")
|
||||
def validate_port(cls, v):
|
||||
if v is not None and not (0 <= v <= 65535):
|
||||
raise ValueError("Port must be between 0 and 65535")
|
||||
return v
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
from_attributes = True
|
||||
|
||||
|
||||
|
||||
class AccessResponse(BaseModel):
|
||||
success: bool
|
||||
token: str
|
||||
|
||||
class Registration(BaseModel):
|
||||
auth_key: str
|
||||
Loading…
Add table
Reference in a new issue