from typing import Any, Dict, Optional, Literal import aiohttp import requests from datetime import date, datetime, time, timezone import logging from instance import User, Subscription class UserAlreadyExistsError(Exception): """Пользователь уже существует в системе""" pass class MarzbanUser: """Модель пользователя Marzban""" def __init__(self, data: Dict[str, Any]): self.username = data.get('username') self.status = data.get('status') self.expire = data.get('expire') self.data_limit = data.get('data_limit') self.data_limit_reset_strategy = data.get('data_limit_reset_strategy') self.used_traffic = data.get('used_traffic') self.lifetime_used_traffic = data.get('lifetime_used_traffic') self.subscription_url = data.get('subscription_url') self.online_at = data.get('online_at') self.created_at = data.get('created_at') self.proxies = data.get('proxies', {}) self.inbounds = data.get('inbounds', {}) self.note = data.get('note') class UserStatus: """Статус пользователя""" def __init__(self, data: Dict[str, Any]): self.used_traffic = data.get('used_traffic', 0) self.lifetime_used_traffic = data.get('lifetime_used_traffic', 0) self.online_at = data.get('online_at') self.status = data.get('status') self.expire = data.get('expire') self.data_limit = data.get('data_limit') class MarzbanService: def __init__(self, baseURL: str, username: str, password: str) -> None: self.base_url = baseURL.rstrip('/') self.token = self._get_token(username, password) self.headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } self._session: Optional[aiohttp.ClientSession] = None def _get_token(self, username: str, password: str) -> str: """Получение токена авторизации""" try: response = requests.post( f"{self.base_url}/api/admin/token", data={'username': username, 'password': password} ) response.raise_for_status() return response.json()['access_token'] except requests.RequestException as e: logging.error(f"Failed to get token: {e}") raise Exception(f"Authentication failed: {e}") async def _get_session(self) -> aiohttp.ClientSession: """Ленивое создание сессии""" if self._session is None or self._session.closed: timeout = aiohttp.ClientTimeout(total=30) self._session = aiohttp.ClientSession(timeout=timeout) return self._session async def _make_request(self, endpoint: str, method: Literal["get", "post", "put", "delete"], data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Улучшенный метод для запросов""" url = f"{self.base_url}{endpoint}" try: session = await self._get_session() async with session.request( method=method.upper(), url=url, headers=self.headers, json=data ) as response: response_data = await response.json() if response.content_length else {} if response.status == 409: raise UserAlreadyExistsError(f"User already exists: {response_data}") if response.status not in (200, 201): raise Exception(f"HTTP {response.status}: {response_data}") return response_data except UserAlreadyExistsError: raise # Пробрасываем наверх except aiohttp.ClientError as e: logging.error(f"Network error during {method.upper()} to {url}: {e}") raise except Exception as e: logging.error(f"Unexpected error during request to {url}: {e}") raise async def create_user(self, user: User, subscription: Subscription) -> str | MarzbanUser: """Создает нового пользователя в Marzban""" logging.info(f"Конец подписки пользователя {user.telegram_id} {subscription.end_date}") if subscription.end_date: if isinstance(subscription.end_date, datetime): if subscription.end_date.tzinfo is None: end_date = subscription.end_date.replace(tzinfo=timezone.utc) else: end_date = subscription.end_date expire_timestamp = int(end_date.timestamp()) elif isinstance(subscription.end_date, date): end_datetime = datetime.combine( subscription.end_date, time(23, 59, 59), tzinfo=timezone.utc ) expire_timestamp = int(end_datetime.timestamp()) else: expire_timestamp = 0 else: expire_timestamp = 0 data = { "username": user.username, "status": "active", "expire": expire_timestamp, "data_limit": 100 * 1073741824, # Конвертируем GB в bytes "data_limit_reset_strategy": "no_reset", "proxies": { "trojan": {} }, "inbounds": { "trojan": ["TROJAN WS NOTLS"] }, "note": f"Telegram: {user.telegram_id}", "on_hold_timeout": None, "on_hold_expire_duration": 0, "next_plan": { "add_remaining_traffic": False, "data_limit": 0, "expire": 0, "fire_on_either": True } } try: response_data = await self._make_request("/api/user", "post", data) marzban_user = MarzbanUser(response_data) logging.info(f"Пользователь {user.username} успешно создан в Marzban") return marzban_user except UserAlreadyExistsError: logging.warning(f"Пользователь {user.telegram_id} уже существует в Marzban") return "USER_ALREADY_EXISTS" except Exception as e: logging.error(f"Failed to create user {user.username}: {e}") raise Exception(f"Failed to create user: {e}") async def update_user(self, user: User, subscription: Subscription) -> MarzbanUser: """Обновляет существующего пользователя""" username = user.username if subscription.end_date: if isinstance(subscription.end_date, datetime): # Если это datetime, преобразуем в timestamp if subscription.end_date.tzinfo is None: end_date = subscription.end_date.replace(tzinfo=timezone.utc) else: end_date = subscription.end_date expire_timestamp = int(end_date.timestamp()) elif isinstance(subscription.end_date, date): # Если это date, создаем datetime на конец дня и преобразуем в timestamp end_datetime = datetime.combine( subscription.end_date, time(23, 59, 59), tzinfo=timezone.utc ) expire_timestamp = int(end_datetime.timestamp()) else: expire_timestamp = 0 else: expire_timestamp = 0 data = { "status": "active", "expire": expire_timestamp } try: response_data = await self._make_request(f"/api/user/{username}", "put", data) marzban_user = MarzbanUser(response_data) logging.info(f"User {username} updated successfully") return marzban_user except Exception as e: logging.error(f"Failed to update user {username}: {e}") raise Exception(f"Failed to update user: {e}") async def disable_user(self, user: User) -> bool: """Отключает пользователя""" username = user.username data = { "status": "disabled" } try: await self._make_request(f"/api/user/{username}", "put", data) logging.info(f"User {username} disabled successfully") return True except Exception as e: logging.error(f"Failed to disable user {username}: {e}") return False async def enable_user(self, user: User) -> bool: """Включает пользователя""" username = user.username data = { "status": "active" } try: await self._make_request(f"/api/user/{username}", "put", data) logging.info(f"User {username} enabled successfully") return True except Exception as e: logging.error(f"Failed to enable user {username}: {e}") return False async def delete_user(self, user: User) -> bool: """Полностью удаляет пользователя из Marzban""" try: await self._make_request(f"/api/user/{user.username}", "delete") logging.info(f"User {user.username} deleted successfully") return True except Exception as e: logging.error(f"Failed to delete user {user.username}: {e}") return False async def get_user_status(self, user: User) -> UserStatus: """Получает текущий статус пользователя""" try: response_data = await self._make_request(f"/api/user/{user.username}", "get") return UserStatus(response_data) except Exception as e: logging.error(f"Failed to get status for user {user.username}: {e}") raise Exception(f"Failed to get user status: {e}") async def get_subscription_url(self, user: User) -> str | None: """Возвращает готовую subscription_url для подключения""" try: response_data = await self._make_request(f"/api/user/{user.username}", "get") return response_data.get('subscription_url', '') except Exception as e: logging.error(f"Failed to get subscription URL for user {user.username}: {e}") return None async def get_config_links(self, user: User) -> str: """Возвращает конфигурации для подключения""" username = user.username try: response_data = await self._make_request(f"/api/user/{username}", "get") return response_data.get('links', '') except Exception as e: logging.error(f"Failed to get configurations URL's for user {username}: {e}") return "" async def check_marzban_health(self) -> bool: """Проверяет доступность Marzban API""" try: await self._make_request("/api/admin", "get") return True except Exception as e: logging.error(f"Marzban health check failed: {e}") return False async def close(self): """Закрытие сессии""" if self._session and not self._session.closed: await self._session.close() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()