Files
backend/app/services/db_manager.py
2025-01-18 17:33:44 +03:00

308 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from decimal import Decimal
import json
from instance.model import User, Subscription, Transaction, SupportTicket, TicketMessage, TicketStatus
from .xui_rep import PanelInteraction
from .postgres_rep import PostgresRepository
from .mongo_rep import MongoDBRepository
from instance.model import Transaction
from dateutil.relativedelta import relativedelta
from datetime import datetime
import random
import string
from typing import Optional
import logging
from uuid import UUID
class DatabaseManager:
def __init__(self, session_generator):
"""
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
"""
self.logger = logging.getLogger(__name__)
self.mongo_repo = MongoDBRepository()
self.postgres_repo = PostgresRepository(session_generator, self.logger)
async def get_active_tickets(self, user_id: UUID):
"""
Получает активные подписки пользователя
"""
return await self.postgres_repo.list_active_tickets(user_id)
async def create_user(self, telegram_id: str, referrer_id: Optional[str]= None):
"""
Создаёт пользователя.
"""
try:
username = self.generate_string(6)
return await self.postgres_repo.create_user(telegram_id, username, referrer_id)
except Exception as e:
self.logger.error(f"Ошибка при создании пользователя:{e}")
async def get_user_by_telegram_id(self, telegram_id: str):
"""
Возвращает пользователя по Telegram ID.
"""
return await self.postgres_repo.get_user_by_telegram_id(telegram_id)
async def add_transaction(self, user_id: UUID, amount: float):
"""
Добавляет транзакцию.
"""
tran = Transaction(
user_id=user_id,
amount=Decimal(amount),
transaction_type="default"
)
return await self.postgres_repo.add_record(tran)
async def get_transaction(self, user_id: UUID, limit: int = 10):
"""
Возвращает транзакции.
"""
return await self.postgres_repo.get_last_transactions(user_id, limit)
async def update_balance(self, telegram_id: str, amount: float):
"""
Обновляет баланс пользователя и добавляет транзакцию.
"""
self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
updated = await self.postgres_repo.update_balance(user, amount)
if not updated:
self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
return "ERROR"
self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
await self.add_transaction(user.id, amount)
return "OK"
async def get_active_subscription(self, telegram_id: str):
"""
Проверяет наличие активной подписки.
"""
return await self.postgres_repo.get_active_subscription(telegram_id)
async def get_last_subscriptions(self, user_id: UUID, limit: int ):
"""
Возвращает список последних подписок.
"""
return await self.postgres_repo.get_last_subscription_by_user_id(user_id, limit)
async def buy_sub(self, telegram_id: str, plan_id: str):
"""
Покупает подписку.
"""
active_subscription = await self.get_active_subscription(telegram_id)
self.logger.info(f"{active_subscription}")
if active_subscription:
self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.")
return "ACTIVE_SUBSCRIPTION_EXISTS"
result = await self._initialize_user_and_plan(telegram_id, plan_id)
if isinstance(result, str):
return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS"
user, plan = result
await self.postgres_repo.update_balance(user,-plan['price'])
new_subscription, server = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
return "ERROR"
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.")
return "OK"
async def _initialize_user_and_plan(self, telegram_id, plan_id):
"""
Инициализирует пользователя и план подписки.
"""
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
plan = await self.mongo_repo.get_subscription_plan(plan_id)
if not plan:
self.logger.error(f"Тарифный план {plan_id} не найден.")
return "TARIFF_NOT_FOUND"
cost = int(plan["price"])
if user.balance < cost:
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.")
return "INSUFFICIENT_FUNDS"
return user, plan
async def _create_subscription_and_add_client(self, user, plan):
"""
Создаёт подписку и добавляет клиента на сервер.
"""
expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"])
server = await self.mongo_repo.get_server_with_least_clients()
if not server:
self.logger.error("Нет доступных серверов для подписки.")
return None, None
new_subscription = Subscription(
user_id=user.id,
vpn_server_id=str(server["server"]["name"]),
plan=plan["name"],
expiry_date=expiry_date,
)
panel = PanelInteraction(
base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}",
login_data={"username": server["server"]["login"], "password": server["server"]["password"]},
logger=self.logger,
certificate=server["server"]["certificate"]["data"],
)
response = await panel.add_client(
inbound_id=1,
expiry_date=expiry_date.isoformat(),
email=user.username,
)
if response != "OK":
self.logger.error(f"Ошибка при добавлении клиента: {response}")
return None, None
await self.postgres_repo.add_record(new_subscription)
return new_subscription, server
async def generate_uri(self, telegram_id: str):
"""
Генерация URI для пользователя.
:param telegram_id: Telegram ID пользователя.
:return: Строка URI или None в случае ошибки.
"""
try:
# Извлечение данных
subscription = await self.postgres_repo.get_active_subscription(telegram_id)
if not subscription:
self.logger.error(f"Подписки для пользователя {telegram_id} не найдены.")
return "SUB_ERROR"
server = await self.mongo_repo.get_server(subscription.vpn_server_id)
if not server:
self.logger.error(f"Сервер с ID {subscription.vpn_server_id} не найден в MongoDB.")
return None
user = await self.postgres_repo.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.error(f"Пользователь с telegram_id {telegram_id} не найден.")
return None
email = user.username # Используем email из данных пользователя
panel = PanelInteraction(
base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}",
login_data={"username": server["server"]["login"], "password": server["server"]["password"]},
logger=self.logger,
certificate=server["server"]["certificate"]["data"],
)
inbound_info = await panel.get_inbound_info(inbound_id=1) # Используем фиксированный ID
if not inbound_info:
self.logger.error(f"Не удалось получить информацию об инбаунде для ID {subscription.vpn_server_id}.")
return None
# Логируем полученные данные
self.logger.info(f"Inbound Info: {inbound_info}")
# Разбор JSON-строк
try:
stream_settings = json.loads(inbound_info["obj"]["streamSettings"])
except KeyError as e:
self.logger.error(f"Ключ 'streamSettings' отсутствует: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"Ошибка разбора JSON для 'streamSettings': {e}")
return None
settings = json.loads(inbound_info["obj"]["settings"]) # Разбираем JSON
# Находим клиента по email
client = next((c for c in settings["clients"] if c["email"] == email), None)
if not client:
self.logger.error(f"Клиент с email {email} не найден среди клиентов.")
return None
server_info = server["server"]
# Преобразование данных в формат URI
uri = (
f"vless://{client['id']}@{server_info['ip']}:443?"
f"type={stream_settings['network']}&security={stream_settings['security']}"
f"&pbk={stream_settings['realitySettings']['settings']['publicKey']}"
f"&fp={stream_settings['realitySettings']['settings']['fingerprint']}"
f"&sni={stream_settings['realitySettings']['serverNames'][0]}"
f"&sid={stream_settings['realitySettings']['shortIds'][0]}"
f"&spx=%2F&flow={client['flow']}"
f"#{inbound_info['obj']['remark']}-{client['email']}"
)
self.logger.info(f"Сформирован URI для пользователя {telegram_id}: {uri}")
return uri
except Exception as e:
self.logger.error(f"Ошибка при генерации URI для пользователя {telegram_id}: {e}")
return None
async def get_ticket(self,ticket_id: int):
"""
Ищет тикет по айди
"""
return await self.postgres_repo.get_ticket(ticket_id)
async def create_ticket(self, user_id: UUID, subject: str, message: str):
"""
Создаёт тикет
"""
ticket = SupportTicket(user_id=user_id,subject=subject,message=message)
return await self.postgres_repo.add_record(ticket)
async def add_message_to_ticket(self,ticket_id : int,sender: str,message: str):
"""
Добавляет сообщения к тикету
"""
message = TicketMessage(ticket_id=ticket_id, sender=sender, message=message)
result = await self.postgres_repo.add_record(message)
if result == None:
return "ERROR"
return "OK"
async def get_ticket_messages(self,ticket_id: int):
"""
Получает сообщения тикета
"""
return await self.postgres_repo.get_ticket_messages(ticket_id)
async def update_ticket_status(self, ticket_id: int, new_status: str):
"""
Обновляет статус тикета.
Args:
ticket_id (int): ID тикета, статус которого нужно обновить.
new_status (str): Новый статус тикета.
Returns:
dict: Словарь с ID тикета и обновлённым статусом.
Raises:
ValueError: Если тикет не найден.
"""
return await self.postgres_repo.set_new_status(ticket_id,new_status)
@staticmethod
def generate_string(length):
"""
Генерирует случайную строку заданной длины.
"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))