from decimal import Decimal import json from instance.model import User, Subscription, Transaction from app.services.marzban import MarzbanService, MarzbanUser from .postgres_rep import PostgresRepository from instance.model import Transaction,TransactionType, Plan from dateutil.relativedelta import relativedelta from datetime import datetime, timezone import random import string from typing import Optional import logging from uuid import UUID class DatabaseManager: def __init__(self, session_generator,marzban_username,marzban_password,marzban_url): """ Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.logger = logging.getLogger(__name__) self.postgres_repo = PostgresRepository(session_generator, self.logger) self.marzban_service = MarzbanService(marzban_url,marzban_username,marzban_password) async def create_user(self, telegram_id: int, invented_by: Optional[int]= None): """ Создаёт пользователя. """ try: username = self.generate_string(6) return await self.postgres_repo.create_user(telegram_id, username, invented_by) except Exception as e: self.logger.error(f"Ошибка при создании пользователя:{e}") async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ return await self.postgres_repo.get_user_by_telegram_id(telegram_id) async def add_transaction(self, telegram_id: int, amount: float): """ Добавляет транзакцию. """ tran = Transaction( user_id=telegram_id, amount=Decimal(amount), type=TransactionType.DEPOSIT ) return await self.postgres_repo.add_record(tran) async def add_referal(self,referrer_id: int, new_user_telegram_id: int): """ Добавление рефералу пользователей """ return await self.postgres_repo.add_referral(referrer_id,new_user_telegram_id) async def get_transaction(self, telegram_id: int, limit: int = 10): """ Возвращает транзакции. """ return await self.postgres_repo.get_last_transactions(telegram_id, limit) async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}") user = await self.get_user_by_telegram_id(telegram_id) if not user: self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" updated = await self.postgres_repo.update_balance(user, amount) if not updated: self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}") return "ERROR" self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции") await self.add_transaction(user.telegram_id, amount) return "OK" async def get_active_subscription(self, telegram_id: int): """ Проверяет наличие активной подписки. """ try: return await self.postgres_repo.get_active_subscription(telegram_id) except Exception as e: self.logger.error(f"Неожиданная ошибка в get_active_subscription: {str(e)}") return "ERROR" async def get_plan_by_id(self, plan_id): """ Ищет по названию плана. """ try: return await self.postgres_repo.get_plan_by_id(plan_id) except Exception as e: self.logger.error(f"Неожиданная ошибка в get_plan_by_name: {str(e)}") return None async def get_last_subscriptions(self, telegram_id: int, limit: int = 1): """ Возвращает список последних подписок. """ return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id) async def buy_sub(self, telegram_id: int, plan_name: str): """ Покупает подписку. """ try: self.logger.info(f"Начало покупки подписки для пользователя {telegram_id}, план: {plan_name}") active_subscription = await self.get_active_subscription(telegram_id) self.logger.info(f"Активная подписка: {active_subscription}") if active_subscription: self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") return "ACTIVE_SUBSCRIPTION_EXISTS" result = await self._initialize_user_and_plan(telegram_id, plan_name) if isinstance(result, str): return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" user, plan = result self.logger.info(f"Пользователь и план найдены: user_id={user.telegram_id}, plan_price={plan.price}") new_subscription = await self._create_subscription_and_add_client(user, plan) if not new_subscription: self.logger.error(f"Не удалось создать подписку для пользователя {telegram_id}") return "ERROR" updated = await self.postgres_repo.update_balance(user,-plan.price) if updated == False: self.logger.error(f"Не удалось обновить баланс для пользователя {telegram_id}") return "ERROR" self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.") return {"status": "OK", "subscription_id": str(new_subscription.id)} except Exception as e: self.logger.error(f"Неожиданная ошибка в buy_sub: {str(e)}") return "ERROR" async def _initialize_user_and_plan(self, telegram_id, plan_name): """ Инициализирует пользователя и план подписки. """ try: user = await self.get_user_by_telegram_id(telegram_id) if not user: self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") return "ERROR" plan = await self.postgres_repo.get_subscription_plan(plan_name) if not plan: self.logger.error(f"Тарифный план {plan_name} не найден.") return "TARIFF_NOT_FOUND" cost = plan.price if user.balance < cost: self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_name}.") return "INSUFFICIENT_FUNDS" return user, plan except Exception as e: self.logger.error(f"Неожиданная ошибка в _initialize_user_and_plan: {str(e)}") return "ERROR" async def _create_subscription_and_add_client(self, user: User, plan: Plan): """Создаёт подписку и добавляет клиента на сервер.""" try: self.logger.info(f"Создание подписки для user_id={user.telegram_id}, plan={plan.name}") # Проверяем типы объектов self.logger.info(f"Тип user: {type(user)}, тип plan: {type(plan)}") expiry_date = datetime.utcnow() + relativedelta(days=plan.duration_days) new_subscription = Subscription( user_id=user.telegram_id, vpn_server_id="BASE SERVER NEED TO UPDATE", plan_id=plan.id, end_date=expiry_date, start_date=datetime.utcnow() ) self.logger.info(f"Создан объект подписки: {new_subscription}") response = await self.marzban_service.create_user(user, new_subscription) self.logger.info(f"Ответ от Marzban: {response}") if response == "USER_ALREADY_EXISTS": response = await self.marzban_service.get_user_status(user) result = await self.marzban_service.update_user(user, new_subscription) # if not isinstance(response,MarzbanUser) or not isinstance(result,MarzbanUser): # self.logger.error(f"Ошибка при добавлении клиента: {response}, {result}") # return None await self.postgres_repo.add_record(new_subscription) self.logger.info(f"Подписка сохранена в БД с ID: {new_subscription.id}") return new_subscription except Exception as e: self.logger.error(f"Неожиданная ошибка в _create_subscription_and_add_client: {str(e)}") import traceback self.logger.error(f"Трассировка: {traceback.format_exc()}") return None async def generate_uri(self, telegram_id: int): """ Генерация URI для пользователя. :param telegram_id: Telegram ID пользователя. :return: Строка URI или None в случае ошибки. """ try: user = await self.get_user_by_telegram_id(telegram_id) if user == False or user == None: self.logger.error(f"Ошибка при получении клиента: user = {user}") return "ERROR" result = await self.marzban_service.get_config_links(user) if result == None: self.logger.error(f"Ошибка при получении ссылки клиента: result = {user}") return "ERROR" self.logger.info(f"Итог generate_uri: result = {result}") return result except Exception as e: self.logger.error(f"Неожиданная ошибка в generate_uri: {str(e)}") return "ERROR" @staticmethod def generate_string(length): """ Генерирует случайную строку заданной длины. """ return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) @staticmethod def _is_subscription_expired(expire_timestamp: int) -> bool: """Проверяет, истекла ли подписка""" current_time = datetime.now(timezone.utc) expire_time = datetime.fromtimestamp(expire_timestamp, tz=timezone.utc) return expire_time < current_time