from databases.model import User, Subscription, Transaction, Administrators from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import desc from dateutil.relativedelta import relativedelta from datetime import datetime from utils.panel import PanelInteraction from databases.mongodb import MongoDBRepository import random import string import logging import asyncio class DatabaseManager: def __init__(self, session_generator): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.session_generator = session_generator self.logger = logging.getLogger(__name__) self.mongo_repo = MongoDBRepository() async def create_user(self, telegram_id: int): """ Создаёт нового пользователя, если его нет. """ async for session in self.session_generator(): try: username = self.generate_string(6) result = await session.execute(select(User).where(User.telegram_id == int(telegram_id))) user = result.scalars().first() if not user: new_user = User(telegram_id=int(telegram_id), username=username) session.add(new_user) await session.commit() return new_user return user except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) return result.scalars().first() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") return None async def add_transaction(self, user_id: int, amount: float): """ Добавляет транзакцию для пользователя. """ async for session in self.session_generator(): try: transaction = Transaction(user_id=user_id, amount=amount) session.add(transaction) await session.commit() except SQLAlchemyError as e: self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}") await session.rollback() async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalars().first() if user: user.balance += int(amount) await self.add_transaction(user.id, amount) await session.commit() else: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" except SQLAlchemyError as e: self.logger.error(f"Ошибка при обновлении баланса: {e}") await session.rollback() return "ERROR" async def last_subscription(self, user_id: int): """ Возвращает список подписок пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .where(Subscription.user_id == user_id) .order_by(desc(Subscription.created_at)) ) return result.scalars().all() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении последней подписки пользователя {user_id}: {e}") return "ERROR" async def last_transaction(self, user_id: int): """ Возвращает список транзакций пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Transaction) .where(Transaction.user_id == user_id) .order_by(desc(Transaction.created_at)) ) transactions = result.scalars().all() return transactions except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}") return "ERROR" async def buy_sub(self, telegram_id: str, plan_id: str): async for session in self.session_generator(): try: result = await self.create_user(telegram_id) if not result: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" # Получение тарифного плана из MongoDB plan = await self.mongo_repo.get_subscription_plan(plan_id) if not plan: self.logger.error(f"Тарифный план {plan_id} не найден.") return "ERROR" # Проверка достаточности средств cost = int(plan["price"]) if result.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "INSUFFICIENT_FUNDS" # Списываем средства result.balance -= cost # Создаем подписку expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) server = await self.mongo_repo.get_server_with_least_clients() self.logger.info(f"Выбран сервер для подписки: {server}") new_subscription = Subscription( user_id=result.id, vpn_server_id=str(server['server']["name"]), plan=plan_id, expiry_date=expiry_date ) session.add(new_subscription) # Попытка добавить пользователя на сервер # Получаем информацию о пользователе user = result # так как result уже содержит пользователя if not user: self.logger.error(f"Не удалось найти пользователя для добавления на сервер.") await session.rollback() return "ERROR" # Получаем сервер из MongoDB server_data = await self.mongo_repo.get_server(new_subscription.vpn_server_id) if not server_data: self.logger.error(f"Не удалось найти сервер с ID {new_subscription.vpn_server_id}.") await session.rollback() return "ERROR" server_info = server_data['server'] url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}" login_data = { 'username': server_info['login'], 'password': server_info['password'], } panel = PanelInteraction(url_base, login_data, self.logger,server_info['certificate']['data']) expiry_date_iso = new_subscription.expiry_date.isoformat() # Добавляем на сервер response = await panel.add_client(user.id, expiry_date_iso, user.username) if response != "OK": self.logger.error(f"Ошибка при добавлении клиента {telegram_id} на сервер: {response}") # Если не получилось добавить на сервер, откатываем транзакцию await session.rollback() return "ERROR" # Если мы здесь - значит и подписка, и добавление на сервер успешны await session.commit() self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id} и клиент добавлен на сервер.") return "OK" except SQLAlchemyError as e: self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" except Exception as e: self.logger.error(f"Непредвиденная ошибка: {e}") await session.rollback() return "ERROR" @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ characters = string.ascii_lowercase + string.digits return ''.join(random.choices(characters, k=length))