import aiohttp import uuid import json import base64 import ssl def generate_uuid(): return str(uuid.uuid4()) class PanelInteraction: def __init__(self, base_url, login_data, logger, certificate=None, is_encoded=True): self.base_url = base_url self.login_data = login_data self.logger = logger self.ssl_context = self._create_ssl_context(certificate, is_encoded) self.session_id = None self.headers = None def _create_ssl_context(self, certificate, is_encoded): if not certificate: raise ValueError("Certificate is required.") try: ssl_context = ssl.create_default_context() if is_encoded: certificate = base64.b64decode(certificate).decode() ssl_context.load_verify_locations(cadata=certificate) return ssl_context except Exception as e: self.logger.error(f"Error creating SSL context: {e}") raise ValueError("Invalid certificate format.") from e async def _ensure_logged_in(self): if not self.session_id: try: self.session_id = await self.login() if self.session_id: self.headers = { 'Accept': 'application/json', 'Cookie': f'3x-ui={self.session_id}', 'Content-Type': 'application/json' } else: self.logger.error("Login failed: Unable to retrieve session ID.") raise ValueError("Login failed: No session ID.") except Exception as e: self.logger.exception("Unexpected error during login.") raise async def login(self): login_url = f"{self.base_url}/login" self.logger.info(f"Attempting to login at: {login_url}") async with aiohttp.ClientSession() as session: try: async with session.post( login_url, data=self.login_data, ssl=self.ssl_context, timeout=10 ) as response: if response.status == 200: session_id = response.cookies.get("3x-ui") if session_id: return session_id.value else: self.logger.error("Login failed: No session ID received.") else: error_details = await response.text() self.logger.error(f"Login failed with status {response.status}: {error_details}") except aiohttp.ClientError as e: self.logger.exception(f"Login request failed: {e}") raise async def get_inbound_info(self, inbound_id: int = 1): """ Fetch inbound information by ID. :param inbound_id: ID of the inbound. :return: JSON response or None. """ await self._ensure_logged_in() url = f"{self.base_url}/panel/api/inbounds/get/{inbound_id}" async with aiohttp.ClientSession() as session: try: async with session.get( url, headers=self.headers, ssl=self.ssl_context, timeout=10 ) as response: response_text = await response.text() # Получаем текст ответа self.logger.info(f"Inbound Info (raw): {response_text}") # Логируем сырой текст if response.status == 200: return await response.json() else: self.logger.error(f"Failed to get inbound info: {response.status}") return None except aiohttp.ClientError as e: self.logger.error(f"Get inbound info request failed: {e}") return None async def get_client_traffic(self, email): """ Fetch traffic information for a specific client. :param email: Client's email. :return: JSON response or None. """ await self._ensure_logged_in() url = f"{self.base_url}/panel/api/inbounds/getClientTraffics/{email}" async with aiohttp.ClientSession() as session: try: async with session.get( url, headers=self.headers, ssl=self.ssl_context, timeout=10 ) as response: if response.status == 200: return await response.json() else: self.logger.error(f"Failed to get client traffic: {response.status}") return None except aiohttp.ClientError as e: self.logger.error(f"Get client traffic request failed: {e}") return None async def update_client_expiry(self, client_uuid, new_expiry_time, client_email): """ Update the expiry date of a specific client. :param client_uuid: UUID of the client. :param new_expiry_time: New expiry date in ISO format. :param client_email: Client's email. :return: None. """ await self._ensure_logged_in() url = f"{self.base_url}/panel/api/inbounds/updateClient" update_data = { "id": 1, "settings": { "clients": [ { "id": client_uuid, "alterId": 0, "email": client_email, "limitIp": 2, "totalGB": 0, "expiryTime": new_expiry_time, "enable": True, "tgId": "", "subId": "" } ] } } async with aiohttp.ClientSession() as session: try: async with session.post( url, headers=self.headers, json=update_data, ssl=self.ssl_context ) as response: if response.status == 200: self.logger.info("Client expiry updated successfully.") else: self.logger.error(f"Failed to update client expiry: {response.status}") except aiohttp.ClientError as e: self.logger.error(f"Update client expiry request failed: {e}") async def add_client(self, inbound_id, expiry_date, email): """ Add a new client to an inbound. :param inbound_id: ID of the inbound. :param expiry_date: Expiry date in ISO format. :param email: Client's email. :return: JSON response or None. """ await self._ensure_logged_in() url = f"{self.base_url}/panel/api/inbounds/addClient" client_info = { "id": generate_uuid(), "flow": "xtls-rprx-vision", "email": email, "limitIp": 2, "totalGB": 0, "expiryTime": expiry_date, "enable": True, "tgId": "", "subId": "", "reset": 0 } settings = json.dumps({"clients": [client_info]}) # Преобразуем объект в JSON-строку payload = { "id": int(inbound_id), # Преобразуем inbound_id в число "settings": settings # Передаем settings как JSON-строку } async with aiohttp.ClientSession() as session: try: async with session.post( url, headers=self.headers, json=payload, ssl=self.ssl_context ) as response: response_json = await response.json() if response.status == 200 and response_json.get('success'): self.logger.info(f"Клиент успешно добавлен: {response_json}") return "OK" else: error_msg = response_json.get('msg', 'Причина не указана') self.logger.error(f"Не удалось добавить клиента: {error_msg}") return None except aiohttp.ClientError as e: self.logger.error(f"Add client request failed: {e}") return None async def delete_depleted_clients(self, inbound_id=None): """ Удалить исчерпанных клиентов. :param inbound_id: ID входящего соединения (inbound), если None, удаляет для всех. :return: Ответ сервера или None в случае ошибки. """ await self._ensure_logged_in() url = f"{self.base_url}/panel/api/inbounds/delDepletedClients/{inbound_id or ''}".rstrip('/') async with aiohttp.ClientSession() as session: try: async with session.post(url, headers=self.headers, ssl=self.ssl_context, timeout=10) as response: if response.status == 200: self.logger.info(f"Depleted clients deleted successfully for inbound_id: {inbound_id}") return await response.json() else: error_details = await response.text() self.logger.error(f"Failed to delete depleted clients: {response.status} - {error_details}") return None except aiohttp.ClientError as e: self.logger.error(f"Delete depleted clients request failed: {e}") return None