Files
Telegram-bot-old/databases/postgresql.py
Disledg 72d7fdd751 Короче опять дохуя изменений:
1. Оно перво на перво работает
2.Реализовано почти всё вроде из кнопок, осталось ток оплату подписок, выдачу URI и пополнение конечно.
3.Убрал .json конфиг и сделал всё через переменные окружения
2024-12-07 18:05:27 +03:00

226 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from databases.model import User, Subscription, Transaction, Administrators
from sqlalchemy.future import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import desc
from dateutil.relativedelta import relativedelta
from datetime import datetime
from utils.panel import PanelInteraction
from databases.mongodb import MongoDBRepository
import random
import string
import logging
import asyncio
class DatabaseManager:
def __init__(self, session_generator):
"""
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
"""
self.session_generator = session_generator
self.logger = logging.getLogger(__name__)
self.mongo_repo = MongoDBRepository()
async def create_user(self, telegram_id: int):
"""
Создаёт нового пользователя, если его нет.
"""
async for session in self.session_generator():
try:
username = self.generate_string(6)
result = await session.execute(select(User).where(User.telegram_id == int(telegram_id)))
user = result.scalars().first()
if not user:
new_user = User(telegram_id=int(telegram_id), username=username)
session.add(new_user)
await session.commit()
return new_user
return user
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}")
await session.rollback()
return "ERROR"
async def get_user_by_telegram_id(self, telegram_id: int):
"""
Возвращает пользователя по Telegram ID.
"""
async for session in self.session_generator():
try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
return result.scalars().first()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}")
return None
async def add_transaction(self, user_id: int, amount: float):
"""
Добавляет транзакцию для пользователя.
"""
async for session in self.session_generator():
try:
transaction = Transaction(user_id=user_id, amount=amount)
session.add(transaction)
await session.commit()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}")
await session.rollback()
async def update_balance(self, telegram_id: int, amount: float):
"""
Обновляет баланс пользователя и добавляет транзакцию.
"""
async for session in self.session_generator():
try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
user = result.scalars().first()
if user:
user.balance = amount
await self.add_transaction(user.id, amount)
await session.commit()
else:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при обновлении баланса: {e}")
await session.rollback()
async def last_subscription(self, user_id: int):
"""
Возвращает список подписок пользователя.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription)
.where(Subscription.user_id == user_id)
.order_by(desc(Subscription.created_at))
)
return result.scalars().all()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении последней подписки пользователя {user_id}: {e}")
return "ERROR"
async def last_transaction(self, user_id: int):
"""
Возвращает список транзакций пользователя.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Transaction)
.where(Transaction.user_id == user_id)
.order_by(desc(Transaction.created_at))
)
transactions = result.scalars().all()
return transactions
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}")
return "ERROR"
async def buy_sub(self, telegram_id: str, plan_id: str):
async for session in self.session_generator():
try:
result = await self.create_user(telegram_id)
if not result:
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
# Получение тарифного плана из MongoDB
plan = await self.mongo_repo.get_subscription_plan(plan_id)
if not plan:
self.logger.error(f"Тарифный план {plan_id} не найден.")
return "ERROR"
# Проверка достаточности средств для покупки подписки
cost = plan["cost"]
if result.balance >= cost:
result.balance -= cost
await session.commit()
# Создание подписки для пользователя
expiry_date = datetime.now(datetime.timezone.utc) + relativedelta(months=plan["duration_months"])
new_subscription = Subscription(user_id=result.id, vpn_server_id=None, plan=plan_id, expiry_date=expiry_date)
session.add(new_subscription)
await session.commit()
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id}.")
return "OK"
else:
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.")
return "INSUFFICIENT_FUNDS"
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}")
await session.rollback()
return "ERROR"
async def add_to_server(self, telegram_id: int):
"""
Метод для добавления пользователя на сервер.
"""
async for session in self.session_generator():
try:
# Получаем подписку пользователя по telegram_id
result = await session.execute(select(Subscription).join(User).where(User.telegram_id == int(telegram_id)))
user_sub = result.scalars().first()
if not user_sub:
self.logger.error(f"Не удалось найти подписку для пользователя с Telegram ID {telegram_id}.")
return "ERROR"
# Получаем информацию о пользователе
user_result = await session.execute(select(User).where(User.telegram_id == telegram_id))
user = user_result.scalars().first()
# Получаем сервер с MongoDB
server = await self.mongo_repo.get_server(user_sub.vpn_server_id)
if not server:
self.logger.error(f"Не удалось найти сервер с ID {user_sub.vpn_server_id}.")
return "ERROR"
# Найдем клиента на сервере по telegram_id
client = None
for client_info in server['clients']:
if client_info['subscriptions']['tgId'] == telegram_id:
client = client_info
break
if not client:
self.logger.error(f"Не удалось найти клиента с Telegram ID {telegram_id} на сервере.")
return "ERROR"
# Доступ к данным сервера для добавления клиента
server_info = server['server']
url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}"
login_data = {
'username': server_info['login'],
'password': server_info['password'],
}
# Инициализируем взаимодействие с панелью управления
panel = PanelInteraction(url_base, login_data, self.logger)
# Добавляем клиента на сервер
client_id = client['id']
expiry_date_iso = user_sub.expiry_date.isoformat()
response = await panel.add_client(client_id, expiry_date_iso, user.username)
# Логируем результат
if response == "OK":
self.logger.info(f"Клиент {telegram_id} успешно добавлен на сервер.")
return "OK"
else:
self.logger.error(f"Ошибка при добавлении клиента {telegram_id} на сервер: {response}")
return "ERROR"
except Exception as e:
self.logger.error(f"Ошибка при установке на сервер для пользователя {telegram_id}: {e}")
return "ERROR"
@staticmethod
def generate_string(length):
"""
Генерирует случайную строку заданной длины.
"""
characters = string.ascii_lowercase + string.digits
return ''.join(random.choices(characters, k=length))