from instance.model import User, Subscription, Transaction, Administrators, SupportTicket,TicketMessage,TicketStatus from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import desc from dateutil.relativedelta import relativedelta from datetime import datetime from .xui_rep import PanelInteraction from .mongo_rep import MongoDBRepository import random import string import logging from uuid import UUID class DatabaseManager: def __init__(self, session_generator): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.session_generator = session_generator self.logger = logging.getLogger(__name__) self.mongo_repo = MongoDBRepository() async def create_user(self, telegram_id: int): """ Создаёт нового пользователя, если его нет. """ async for session in self.session_generator(): try: username = self.generate_string(6) result = await session.execute(select(User).where(User.telegram_id == int(telegram_id))) user = result.scalars().first() if not user: new_user = User(telegram_id=int(telegram_id), username=username) session.add(new_user) await session.commit() return new_user return user except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) return result.scalars().first() except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") return None async def add_transaction(self, user_id: int, amount: float): """ Добавляет транзакцию для пользователя. """ async for session in self.session_generator(): try: transaction = Transaction(user_id=user_id, amount=amount) session.add(transaction) await session.commit() except SQLAlchemyError as e: self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}") await session.rollback() async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalars().first() if user: user.balance += int(amount) await self.add_transaction(user.id, amount) await session.commit() else: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" except SQLAlchemyError as e: self.logger.error(f"Ошибка при обновлении баланса: {e}") await session.rollback() return "ERROR" async def last_subscriptions(self, user_id: str, limit: int = 10): """ Возвращает список последних подписок пользователя, ограниченный заданным количеством. :param user_id: ID пользователя :param limit: Максимальное количество подписок для возврата """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .where(Subscription.user_id == str(user_id)) .order_by(desc(Subscription.created_at)) .limit(limit) # Ограничиваем количество результатов ) subscriptions = result.scalars().all() # Получаем все результаты до лимита if subscriptions: return subscriptions else: self.logger.info(f"Для пользователя {user_id} подписки не найдены.") return [] # Возвращаем пустой список, если подписок нет except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении подписок для пользователя {user_id}: {e}") return "ERROR" async def last_transaction(self, user_id: UUID): """ Возвращает список транзакций пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Transaction) .where(Transaction.user_id == str(user_id)) .order_by(desc(Transaction.created_at)) ) transactions = result.scalars().all() return transactions except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}") return "ERROR" async def buy_sub(self, telegram_id: str, plan_id: str): async for session in self.session_generator(): try: active_subscription = await self._check_active_subscription(telegram_id, session) if active_subscription: self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") return "ACTIVE_SUBSCRIPTION_EXISTS" result = await self._initialize_user_and_plan(telegram_id, plan_id) if isinstance(result, str): return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" user, plan = result user.balance -= int(plan["price"]) session.add(user) new_subscription, server = await self._create_subscription_and_add_client(user, plan, session) if not new_subscription: await session.rollback() return "ERROR" await session.commit() self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id} и клиент добавлен на сервер.") return "OK" except SQLAlchemyError as e: self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}") await session.rollback() return "ERROR" except Exception as e: self.logger.error(f"Непредвиденная ошибка: {e}") await session.rollback() return "ERROR" async def _initialize_user_and_plan(self, telegram_id, plan_id): user = await self.create_user(telegram_id) if not user: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" plan = await self.mongo_repo.get_subscription_plan(plan_id) if not plan: self.logger.error(f"Тарифный план {plan_id} не найден.") return "TARIFF_NOT_FOUND" cost = int(plan["price"]) if user.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "INSUFFICIENT_FUNDS" return user, plan async def _create_subscription_and_add_client(self, user, plan, session): expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) server = await self.mongo_repo.get_server_with_least_clients() self.logger.info(f"Выбран сервер для подписки: {server}") new_subscription = Subscription( user_id=user.id, vpn_server_id=str(server['server']["name"]), plan=plan["name"], expiry_date=expiry_date ) session.add(new_subscription) server_data = await self.mongo_repo.get_server(new_subscription.vpn_server_id) if not server_data: self.logger.error(f"Не удалось найти сервер с ID {new_subscription.vpn_server_id}.") return None, None server_info = server_data['server'] url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}" login_data = { 'username': server_info['login'], 'password': server_info['password'], } panel = PanelInteraction(url_base, login_data, self.logger, server_info['certificate']['data']) expiry_date_iso = new_subscription.expiry_date.isoformat() response = await panel.add_client(1, expiry_date_iso, user.username) if response != "OK": self.logger.error(f"Ошибка при добавлении клиента {user.telegram_id} на сервер: {response}") return None, None return new_subscription, server async def _check_active_subscription(self, telegram_id, session): """ Проверяет наличие активной подписки у пользователя. :param telegram_id: Telegram ID пользователя. :param session: Текущая сессия базы данных. :return: Объект подписки или None. """ try: result = await session.execute( select(Subscription) .join(User, Subscription.user_id == User.id) .where(User.telegram_id == telegram_id, Subscription.expiry_date > datetime.utcnow()) ) return result.scalars().first() except Exception as e: self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}") return None async def add_ticket_message(self, ticket_id: int, sender: str, message: str): """ Добавляет сообщение к тикету. """ async for session in self.session_generator(): try: self.logger.info(f"Попытка добавления сообщения в тикет {ticket_id} от {sender}") ticket_message = TicketMessage(ticket_id=ticket_id, sender=sender, message=message) session.add(ticket_message) await session.commit() self.logger.info(f"Сообщение добавлено к тикету {ticket_id}: {message}") return ticket_message except SQLAlchemyError as e: self.logger.error(f"Ошибка при добавлении сообщения в тикет {ticket_id}: {e}") await session.rollback() return None async def get_ticket_messages(self, ticket_id: int): """ Возвращает список сообщений для указанного тикета. """ async for session in self.session_generator(): try: self.logger.info(f"Получение сообщений для тикета {ticket_id}") result = await session.execute( select(TicketMessage).where(TicketMessage.ticket_id == ticket_id).order_by(TicketMessage.created_at) ) messages = result.scalars().all() self.logger.info(f"Найдено {len(messages)} сообщений для тикета {ticket_id}") return messages except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении сообщений для тикета {ticket_id}: {e}") return None async def create_ticket(self, user_id: int, subject: str, message: str): """ Создаёт новый тикет. """ async for session in self.session_generator(): try: self.logger.info(f"Создание тикета для пользователя {user_id}: {subject}") ticket = SupportTicket(user_id=user_id, subject=subject, message=message) session.add(ticket) await session.commit() self.logger.info(f"Тикет создан с ID {ticket.id} для пользователя {user_id}") return ticket except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании тикета: {e}") await session.rollback() return None async def list_tickets(self, user_id: int): """ Возвращает список тикетов пользователя. """ async for session in self.session_generator(): try: self.logger.info(f"Получение списка тикетов для пользователя {user_id}") result = await session.execute( select(SupportTicket).where(SupportTicket.user_id == user_id) ) tickets = result.scalars().all() self.logger.info(f"Найдено {len(tickets)} тикетов для пользователя {user_id}") return tickets except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении тикетов для пользователя {user_id}: {e}") return None async def update_ticket_status(self, ticket_id: int, status: TicketStatus): """ Обновляет статус тикета. """ async for session in self.session_generator(): try: self.logger.info(f"Попытка обновления статуса тикета {ticket_id} на {status}") result = await session.execute( select(SupportTicket).where(SupportTicket.id == ticket_id) ) ticket = result.scalars().first() if ticket: ticket.status = status await session.commit() self.logger.info(f"Статус тикета {ticket_id} обновлён на {status}") return ticket self.logger.warning(f"Тикет с ID {ticket_id} не найден для обновления статуса") return None except SQLAlchemyError as e: self.logger.error(f"Ошибка при обновлении статуса тикета {ticket_id}: {e}") await session.rollback() return None @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ characters = string.ascii_lowercase + string.digits return ''.join(random.choices(characters, k=length))