from instance.model import User, Subscription, Transaction, Administrators from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import desc from dateutil.relativedelta import relativedelta from datetime import datetime from .xui_rep import PanelInteraction from .mongo_rep import MongoDBRepository import random import string import logging from uuid import UUID class DatabaseManager: def __init__(self, session_generator): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.session_generator = session_generator self.logger = logging.getLogger(__name__) self.mongo_repo = MongoDBRepository() async def create_user(self, telegram_id: int): """ Создаёт нового пользователя, если его нет. """ async for session in self.session_generator(): try: username = self.generate_string(6) result = await session.execute(select(User).where(User.telegram_id == int(telegram_id))) user = result.scalars().first() if not user: new_user = User(telegram_id=int(telegram_id), username=username) session.add(new_user) await session.commit() return new_user return user except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) return result.scalars().first() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") return None async def add_transaction(self, user_id: int, amount: float): """ Добавляет транзакцию для пользователя. """ async for session in self.session_generator(): try: transaction = Transaction(user_id=user_id, amount=amount) session.add(transaction) await session.commit() except SQLAlchemyError as e: self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}") await session.rollback() async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalars().first() if user: user.balance += int(amount) await self.add_transaction(user.id, amount) await session.commit() else: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" except SQLAlchemyError as e: self.logger.error(f"Ошибка при обновлении баланса: {e}") await session.rollback() return "ERROR" async def last_subscription(self, user_id: str): """ Возвращает последнюю подписку пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .where(Subscription.user_id == str(user_id)) .order_by(desc(Subscription.created_at)) .limit(1) # Применяем limit правильно ) subscription = result.scalar_one_or_none() if subscription: return subscription else: return None except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении подписки для пользователя {user_id}: {e}") return "ERROR" async def last_transaction(self, user_id: UUID): """ Возвращает список транзакций пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Transaction) .where(Transaction.user_id == str(user_id)) .order_by(desc(Transaction.created_at)) ) transactions = result.scalars().all() return transactions except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}") return "ERROR" async def buy_sub(self, telegram_id: str, plan_id: str): async for session in self.session_generator(): try: active_subscription = await self._check_active_subscription(telegram_id, session) if active_subscription: self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") return "ACTIVE_SUBSCRIPTION_EXISTS" result = await self._initialize_user_and_plan(telegram_id, plan_id) if isinstance(result, str): return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" user, plan = result user.balance -= int(plan["price"]) session.add(user) new_subscription, server = await self._create_subscription_and_add_client(user, plan, session) if not new_subscription: await session.rollback() return "ERROR" await session.commit() self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id} и клиент добавлен на сервер.") return "OK" except SQLAlchemyError as e: self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" except Exception as e: self.logger.error(f"Непредвиденная ошибка: {e}") await session.rollback() return "ERROR" async def _initialize_user_and_plan(self, telegram_id, plan_id): user = await self.create_user(telegram_id) if not user: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" plan = await self.mongo_repo.get_subscription_plan(plan_id) if not plan: self.logger.error(f"Тарифный план {plan_id} не найден.") return "TARIFF_NOT_FOUND" cost = int(plan["price"]) if user.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "INSUFFICIENT_FUNDS" return user, plan async def _create_subscription_and_add_client(self, user, plan, session): expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) server = await self.mongo_repo.get_server_with_least_clients() self.logger.info(f"Выбран сервер для подписки: {server}") new_subscription = Subscription( user_id=user.id, vpn_server_id=str(server['server']["name"]), plan=plan["name"], expiry_date=expiry_date ) session.add(new_subscription) server_data = await self.mongo_repo.get_server(new_subscription.vpn_server_id) if not server_data: self.logger.error(f"Не удалось найти сервер с ID {new_subscription.vpn_server_id}.") return None, None server_info = server_data['server'] url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}" login_data = { 'username': server_info['login'], 'password': server_info['password'], } panel = PanelInteraction(url_base, login_data, self.logger, server_info['certificate']['data']) expiry_date_iso = new_subscription.expiry_date.isoformat() response = await panel.add_client(1, expiry_date_iso, user.username) if response != "OK": self.logger.error(f"Ошибка при добавлении клиента {user.telegram_id} на сервер: {response}") return None, None return new_subscription, server async def _check_active_subscription(self, telegram_id, session): """ Проверяет наличие активной подписки у пользователя. :param telegram_id: Telegram ID пользователя. :param session: Текущая сессия базы данных. :return: Объект подписки или None. """ try: result = await session.execute( select(Subscription) .join(User, Subscription.user_id == User.id) .where(User.telegram_id == telegram_id, Subscription.expiry_date > datetime.utcnow()) ) return result.scalars().first() except Exception as e: self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}") return None @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ characters = string.ascii_lowercase + string.digits return ''.join(random.choices(characters, k=length))