from decimal import Decimal import json from instance.model import User, Subscription, Transaction, SupportTicket, TicketMessage, TicketStatus from .xui_rep import PanelInteraction from .postgres_rep import PostgresRepository from .mongo_rep import MongoDBRepository from instance.model import Transaction from dateutil.relativedelta import relativedelta from datetime import datetime import random import string import logging from uuid import UUID class DatabaseManager: def __init__(self, session_generator): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.logger = logging.getLogger(__name__) self.mongo_repo = MongoDBRepository() self.postgres_repo = PostgresRepository(session_generator, self.logger) async def create_user(self, telegram_id: int): """ Создаёт пользователя. """ username = self.generate_string(6) return await self.postgres_repo.create_user(telegram_id, username) async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ return await self.postgres_repo.get_user_by_telegram_id(telegram_id) async def add_transaction(self, user_id: UUID, amount: float): """ Добавляет транзакцию. """ tran = Transaction( user_id=user_id, amount=Decimal(amount), transaction_type="default" ) return await self.postgres_repo.add_record(tran) async def get_transaction(self, user_id: UUID, limit: int = 10): """ Возвращает транзакции. """ return await self.postgres_repo.get_last_transactions(user_id, limit) async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}") user = await self.get_user_by_telegram_id(telegram_id) if not user: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" updated = await self.postgres_repo.update_balance(user, amount) if not updated: self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}") return "ERROR" self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции") await self.add_transaction(user.id, amount) return "OK" async def get_active_subscription(self, telegram_id: int): """ Проверяет наличие активной подписки. """ return await self.postgres_repo.get_active_subscription(telegram_id) async def get_last_subscriptions(self, user_id: UUID, limit: int ): """ Возвращает список последних подписок. """ return await self.postgres_repo.get_last_subscription_by_user_id(user_id, limit) async def buy_sub(self, telegram_id: str, plan_id: str): """ Покупает подписку. """ active_subscription = await self.get_active_subscription(telegram_id) self.logger.info(f"{active_subscription}") if active_subscription: self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") return "ACTIVE_SUBSCRIPTION_EXISTS" result = await self._initialize_user_and_plan(telegram_id, plan_id) if isinstance(result, str): return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" user, plan = result await self.postgres_repo.update_balance(user,-plan['price']) new_subscription, server = await self._create_subscription_and_add_client(user, plan) if not new_subscription: return "ERROR" self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.") return "OK" async def _initialize_user_and_plan(self, telegram_id, plan_id): """ Инициализирует пользователя и план подписки. """ user = await self.get_user_by_telegram_id(telegram_id) if not user: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" plan = await self.mongo_repo.get_subscription_plan(plan_id) if not plan: self.logger.error(f"Тарифный план {plan_id} не найден.") return "TARIFF_NOT_FOUND" cost = int(plan["price"]) if user.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "INSUFFICIENT_FUNDS" return user, plan async def _create_subscription_and_add_client(self, user, plan): """ Создаёт подписку и добавляет клиента на сервер. """ expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) server = await self.mongo_repo.get_server_with_least_clients() if not server: self.logger.error("Нет доступных серверов для подписки.") return None, None new_subscription = Subscription( user_id=user.id, vpn_server_id=str(server["server"]["name"]), plan=plan["name"], expiry_date=expiry_date, ) panel = PanelInteraction( base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, logger=self.logger, certificate=server["server"]["certificate"]["data"], ) response = await panel.add_client( inbound_id=1, expiry_date=expiry_date.isoformat(), email=user.username, ) if response != "OK": self.logger.error(f"Ошибка при добавлении клиента: {response}") return None, None await self.postgres_repo.add_record(new_subscription) return new_subscription, server async def generate_uri(self, telegram_id: int): """ Генерация URI для пользователя. :param telegram_id: Telegram ID пользователя. :return: Строка URI или None в случае ошибки. """ try: # Извлечение данных subscription = await self.postgres_repo.get_active_subscription(telegram_id) if not subscription: self.logger.error(f"Подписки для пользователя {telegram_id} не найдены.") return "SUB_ERROR" server = await self.mongo_repo.get_server(subscription.vpn_server_id) if not server: self.logger.error(f"Сервер с ID {subscription.vpn_server_id} не найден в MongoDB.") return None user = await self.postgres_repo.get_user_by_telegram_id(telegram_id) if not user: self.logger.error(f"Пользователь с telegram_id {telegram_id} не найден.") return None email = user.username # Используем email из данных пользователя panel = PanelInteraction( base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, logger=self.logger, certificate=server["server"]["certificate"]["data"], ) inbound_info = await panel.get_inbound_info(inbound_id=1) # Используем фиксированный ID if not inbound_info: self.logger.error(f"Не удалось получить информацию об инбаунде для ID {subscription.vpn_server_id}.") return None # Логируем полученные данные self.logger.info(f"Inbound Info: {inbound_info}") # Разбор JSON-строк try: stream_settings = json.loads(inbound_info["obj"]["streamSettings"]) except KeyError as e: self.logger.error(f"Ключ 'streamSettings' отсутствует: {e}") return None except json.JSONDecodeError as e: self.logger.error(f"Ошибка разбора JSON для 'streamSettings': {e}") return None settings = json.loads(inbound_info["obj"]["settings"]) # Разбираем JSON # Находим клиента по email client = next((c for c in settings["clients"] if c["email"] == email), None) if not client: self.logger.error(f"Клиент с email {email} не найден среди клиентов.") return None server_info = server["server"] # Преобразование данных в формат URI uri = ( f"vless://{client['id']}@{server_info['ip']}:443?" f"type={stream_settings['network']}&security={stream_settings['security']}" f"&pbk={stream_settings['realitySettings']['settings']['publicKey']}" f"&fp={stream_settings['realitySettings']['settings']['fingerprint']}" f"&sni={stream_settings['realitySettings']['serverNames'][0]}" f"&sid={stream_settings['realitySettings']['shortIds'][0]}" f"&spx=%2F&flow={client['flow']}" f"#{inbound_info['obj']['remark']}-{client['email']}" ) self.logger.info(f"Сформирован URI для пользователя {telegram_id}: {uri}") return uri except Exception as e: self.logger.error(f"Ошибка при генерации URI для пользователя {telegram_id}: {e}") return None async def create_ticket(self, user_id: UUID, subject: str, message: str): """ Создаёт тикет """ ticket = SupportTicket(user_id=user_id,subject=subject,message=message) return await self.postgres_repo.add_record(ticket) @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))