from instance.model import User, Subscription, Transaction, Administrators from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import desc from dateutil.relativedelta import relativedelta from datetime import datetime from .xui_rep import PanelInteraction from .mongo_rep import MongoDBRepository import random import string import logging import asyncio class DatabaseManager: def __init__(self, session_generator): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.session_generator = session_generator self.logger = logging.getLogger(__name__) self.mongo_repo = MongoDBRepository() async def create_user(self, telegram_id: int): """ Создаёт нового пользователя, если его нет. """ async for session in self.session_generator(): try: username = self.generate_string(6) result = await session.execute(select(User).where(User.telegram_id == int(telegram_id))) user = result.scalars().first() if not user: new_user = User(telegram_id=int(telegram_id), username=username) session.add(new_user) await session.commit() return new_user return user except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) return result.scalars().first() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") return None async def add_transaction(self, user_id: int, amount: float): """ Добавляет транзакцию для пользователя. """ async for session in self.session_generator(): try: transaction = Transaction(user_id=user_id, amount=amount) session.add(transaction) await session.commit() except SQLAlchemyError as e: self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}") await session.rollback() async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalars().first() if user: user.balance += int(amount) await self.add_transaction(user.id, amount) await session.commit() else: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" except SQLAlchemyError as e: self.logger.error(f"Ошибка при обновлении баланса: {e}") await session.rollback() return "ERROR" async def last_subscription(self, user_id: int): """ Возвращает список подписок пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .where(Subscription.user_id == user_id) .order_by(desc(Subscription.created_at)) ) return result.scalars().all() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении последней подписки пользователя {user_id}: {e}") return "ERROR" async def last_transaction(self, user_id: int): """ Возвращает список транзакций пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Transaction) .where(Transaction.user_id == user_id) .order_by(desc(Transaction.created_at)) ) transactions = result.scalars().all() return transactions except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}") return "ERROR" async def buy_sub(self, telegram_id: str, plan_id: str): async for session in self.session_generator(): try: result = await self.create_user(telegram_id) if not result: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" # Получение тарифного плана из MongoDB plan = await self.mongo_repo.get_subscription_plan(plan_id) if not plan: self.logger.error(f"Тарифный план {plan_id} не найден.") return "ERROR" # Проверка достаточности средств cost = int(plan["price"]) if result.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "INSUFFICIENT_FUNDS" # Списываем средства result.balance -= cost # Создаем подписку expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) server = await self.mongo_repo.get_server_with_least_clients() self.logger.info(f"Выбран сервер для подписки: {server}") new_subscription = Subscription( user_id=result.id, vpn_server_id=str(server['server']["name"]), plan=plan_id, expiry_date=expiry_date ) session.add(new_subscription) # Попытка добавить пользователя на сервер # Получаем информацию о пользователе user = result # так как result уже содержит пользователя if not user: self.logger.error(f"Не удалось найти пользователя для добавления на сервер.") await session.rollback() return "ERROR" # Получаем сервер из MongoDB server_data = await self.mongo_repo.get_server(new_subscription.vpn_server_id) if not server_data: self.logger.error(f"Не удалось найти сервер с ID {new_subscription.vpn_server_id}.") await session.rollback() return "ERROR" server_info = server_data['server'] url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}" login_data = { 'username': server_info['login'], 'password': server_info['password'], } panel = PanelInteraction(url_base, login_data, self.logger,server_info['certificate']['data']) expiry_date_iso = new_subscription.expiry_date.isoformat() # Добавляем на сервер response = await panel.add_client(user.id, expiry_date_iso, user.username) if response != "OK": self.logger.error(f"Ошибка при добавлении клиента {telegram_id} на сервер: {response}") # Если не получилось добавить на сервер, откатываем транзакцию await session.rollback() return "ERROR" # Если мы здесь - значит и подписка, и добавление на сервер успешны await session.commit() self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id} и клиент добавлен на сервер.") return "OK" except SQLAlchemyError as e: self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" except Exception as e: self.logger.error(f"Непредвиденная ошибка: {e}") await session.rollback() return "ERROR" @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ characters = string.ascii_lowercase + string.digits return ''.join(random.choices(characters, k=length))