Files
backend/app/services/db_manager.py

264 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from decimal import Decimal
import json
from instance.model import User, Subscription, Transaction
from app.services.marzban import MarzbanService
from .postgres_rep import PostgresRepository
from instance.model import Transaction,TransactionType
from dateutil.relativedelta import relativedelta
from datetime import datetime
import random
import string
from typing import Optional
import logging
from uuid import UUID
class DatabaseManager:
def __init__(self, session_generator):
"""
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
"""
self.logger = logging.getLogger(__name__)
self.postgres_repo = PostgresRepository(session_generator, self.logger)
async def create_user(self, telegram_id: int, invented_by: Optional[int]= None):
"""
Создаёт пользователя.
"""
try:
username = self.generate_string(6)
return await self.postgres_repo.create_user(telegram_id, username, invented_by)
except Exception as e:
self.logger.error(f"Ошибка при создании пользователя:{e}")
async def get_user_by_telegram_id(self, telegram_id: int):
"""
Возвращает пользователя по Telegram ID.
"""
return await self.postgres_repo.get_user_by_telegram_id(telegram_id)
async def add_transaction(self, telegram_id: int, amount: float):
"""
Добавляет транзакцию.
"""
tran = Transaction(
user_id=telegram_id,
amount=Decimal(amount),
type=TransactionType.DEPOSIT
)
return await self.postgres_repo.add_record(tran)
async def add_referal(self,referrer_id: int, new_user_telegram_id: int):
"""
Добавление рефералу пользователей
"""
return await self.postgres_repo.add_referral(referrer_id,new_user_telegram_id)
async def get_transaction(self, telegram_id: int, limit: int = 10):
"""
Возвращает транзакции.
"""
return await self.postgres_repo.get_last_transactions(telegram_id, limit)
async def update_balance(self, telegram_id: int, amount: float):
"""
Обновляет баланс пользователя и добавляет транзакцию.
"""
self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
updated = await self.postgres_repo.update_balance(user, amount)
if not updated:
self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
return "ERROR"
self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
await self.add_transaction(user.telegram_id, amount)
return "OK"
async def get_active_subscription(self, telegram_id: int):
"""
Проверяет наличие активной подписки.
"""
return await self.postgres_repo.get_active_subscription(telegram_id)
async def get_last_subscriptions(self, telegram_id: int, limit: int = 1):
"""
Возвращает список последних подписок.
"""
return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id, limit)
async def buy_sub(self, telegram_id: int, plan_id: str):
"""
Покупает подписку.
"""
# active_subscription = await self.get_active_subscription(telegram_id)
# self.logger.info(f"{active_subscription}")
# if active_subscription:
# self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.")
# return "ACTIVE_SUBSCRIPTION_EXISTS"
# result = await self._initialize_user_and_plan(telegram_id, plan_id)
# if isinstance(result, str):
# return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS"
# user, plan = result
# await self.postgres_repo.update_balance(user,-plan['price'])
# new_subscription, server = await self._create_subscription_and_add_client(user, plan)
# if not new_subscription:
# return "ERROR"
# self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.")
# return "OK"
pass
async def _initialize_user_and_plan(self, telegram_id, plan_id):
"""
Инициализирует пользователя и план подписки.
"""
# user = await self.get_user_by_telegram_id(telegram_id)
# if not user:
# self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
# return "ERROR"
# plan = await self.mongo_repo.get_subscription_plan(plan_id)
# if not plan:
# self.logger.error(f"Тарифный план {plan_id} не найден.")
# return "TARIFF_NOT_FOUND"
# cost = int(plan["price"])
# if user.balance < cost:
# self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.")
# return "INSUFFICIENT_FUNDS"
# return user, plan
pass
async def _create_subscription_and_add_client(self, user, plan):
"""
Создаёт подписку и добавляет клиента на сервер.
"""
# expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"])
# server = await self.mongo_repo.get_server_with_least_clients()
# if not server:
# self.logger.error("Нет доступных серверов для подписки.")
# return None, None
# new_subscription = Subscription(
# user_id=user.id,
# vpn_server_id=str(server["server"]["name"]),
# plan=plan["name"],
# expiry_date=expiry_date,
# )
# panel = PanelInteraction(
# base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}",
# login_data={"username": server["server"]["login"], "password": server["server"]["password"]},
# logger=self.logger,
# certificate=server["server"]["certificate"]["data"],
# )
# response = await panel.add_client(
# inbound_id=1,
# expiry_date=expiry_date.isoformat(),
# email=user.username,
# )
# if response != "OK":
# self.logger.error(f"Ошибка при добавлении клиента: {response}")
# return None, None
# await self.postgres_repo.add_record(new_subscription)
# return new_subscription, server
pass
async def generate_uri(self, telegram_id: int):
"""
Генерация URI для пользователя.
:param telegram_id: Telegram ID пользователя.
:return: Строка URI или None в случае ошибки.
"""
# try:
# # Извлечение данных
# subscription = await self.postgres_repo.get_active_subscription(telegram_id)
# if not subscription:
# self.logger.error(f"Подписки для пользователя {telegram_id} не найдены.")
# return "SUB_ERROR"
# server = await self.mongo_repo.get_server(subscription.vpn_server_id)
# if not server:
# self.logger.error(f"Сервер с ID {subscription.vpn_server_id} не найден в MongoDB.")
# return None
# user = await self.postgres_repo.get_user_by_telegram_id(telegram_id)
# if not user:
# self.logger.error(f"Пользователь с telegram_id {telegram_id} не найден.")
# return None
# email = user.username # Используем email из данных пользователя
# panel = PanelInteraction(
# base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}",
# login_data={"username": server["server"]["login"], "password": server["server"]["password"]},
# logger=self.logger,
# certificate=server["server"]["certificate"]["data"],
# )
# inbound_info = await panel.get_inbound_info(inbound_id=1) # Используем фиксированный ID
# if not inbound_info:
# self.logger.error(f"Не удалось получить информацию об инбаунде для ID {subscription.vpn_server_id}.")
# return None
# # Логируем полученные данные
# self.logger.info(f"Inbound Info: {inbound_info}")
# # Разбор JSON-строк
# try:
# stream_settings = json.loads(inbound_info["obj"]["streamSettings"])
# except KeyError as e:
# self.logger.error(f"Ключ 'streamSettings' отсутствует: {e}")
# return None
# except json.JSONDecodeError as e:
# self.logger.error(f"Ошибка разбора JSON для 'streamSettings': {e}")
# return None
# settings = json.loads(inbound_info["obj"]["settings"]) # Разбираем JSON
# # Находим клиента по email
# client = next((c for c in settings["clients"] if c["email"] == email), None)
# if not client:
# self.logger.error(f"Клиент с email {email} не найден среди клиентов.")
# return None
# server_info = server["server"]
# # Преобразование данных в формат URI
# uri = (
# f"vless://{client['id']}@{server_info['ip']}:443?"
# f"type={stream_settings['network']}&security={stream_settings['security']}"
# f"&pbk={stream_settings['realitySettings']['settings']['publicKey']}"
# f"&fp={stream_settings['realitySettings']['settings']['fingerprint']}"
# f"&sni={stream_settings['realitySettings']['serverNames'][0]}"
# f"&sid={stream_settings['realitySettings']['shortIds'][0]}"
# f"&spx=%2F&flow={client['flow']}"
# f"#{inbound_info['obj']['remark']}-{client['email']}"
# )
# self.logger.info(f"Сформирован URI для пользователя {telegram_id}: {uri}")
# return uri
# except Exception as e:
# self.logger.error(f"Ошибка при генерации URI для пользователя {telegram_id}: {e}")
# return None
pass
@staticmethod
def generate_string(length):
"""
Генерирует случайную строку заданной длины.
"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))