diff --git a/app/routes/__init__.py b/app/routes/__init__.py index 0ae7429..22b6ccc 100644 --- a/app/routes/__init__.py +++ b/app/routes/__init__.py @@ -1,6 +1,5 @@ #from .payment_routes import router as payment_router from .user_routes import router from .subscription_routes import router as subscription_router -from .support_routes import router as sup_router # Экспорт всех маршрутов -__all__ = [ "router", "subscription_router","sup_router"] +__all__ = [ "router", "subscription_router"] diff --git a/app/routes/payment_routes.py b/app/routes/payment_routes.py deleted file mode 100644 index a6506e1..0000000 --- a/app/routes/payment_routes.py +++ /dev/null @@ -1,65 +0,0 @@ -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel -from yookassa import Payment -from yookassa.domain.notification import WebhookNotification -from app.services.db_manager import DatabaseManager - - -router = APIRouter() - -class CreatePaymentRequest(BaseModel): - telegram_id: str - amount: float - -class PaymentResponse(BaseModel): - payment_url: str - payment_id: str - -@router.post("/payment/create", response_model=PaymentResponse) -async def create_payment(request: CreatePaymentRequest): - """ - Создаёт платёж через ЮKassa и возвращает ссылку для оплаты. - """ - try: - # Создание платежа через API ЮKassa - payment = Payment.create({ - "amount": { - "value": f"{request.amount:.2f}", - "currency": "RUB" - }, - "confirmation": { - "type": "redirect", # Тип подтверждения (redirect или embedded) - "return_url": "https://your-app.com/success" # URL возврата - }, - "description": f"Пополнение баланса для пользователя {request.telegram_id}" - }) - - # Возвращаем ссылку для оплаты и ID платежа - return PaymentResponse( - payment_url=payment.confirmation.confirmation_url, - payment_id=payment.id - ) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Ошибка создания платежа: {str(e)}") - - -@router.post("/payment/notification") -async def payment_notification(notification: dict, database_manager: DatabaseManager): - """ - Обрабатывает уведомления от ЮKassa. - """ - try: - # Парсим уведомление - webhook = WebhookNotification(notification) - payment_object = webhook.object - - # Проверяем статус платежа - if payment_object["status"] == "succeeded": - # Обновляем баланс пользователя - telegram_id = int(payment_object["description"].split()[-1]) - amount = float(payment_object["amount"]["value"]) - await database_manager.update_balance(telegram_id, amount) - - return {"status": "ok"} - except Exception as e: - raise HTTPException(status_code=500, detail=f"Ошибка обработки уведомления: {str(e)}") diff --git a/app/routes/subscription_routes.py b/app/routes/subscription_routes.py index 5c682ec..949e6a2 100644 --- a/app/routes/subscription_routes.py +++ b/app/routes/subscription_routes.py @@ -1,7 +1,7 @@ from typing import List from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel -from app.services.db_manager import DatabaseManager +from app.services import DatabaseManager from instance.configdb import get_database_manager from uuid import UUID import logging @@ -14,7 +14,7 @@ router = APIRouter() class BuySubscriptionRequest(BaseModel): - telegram_id: str + telegram_id: int plan_id: str class SubscriptionResponse(BaseModel): @@ -56,17 +56,17 @@ async def buy_subscription( # Эндпоинт для получения последней подписки -@router.get("/subscription/{user_id}/last", response_model=SubscriptionResponse) -async def last_subscription(user_id: UUID, database_manager: DatabaseManager = Depends(get_database_manager)): +@router.get("/subscription/{telegram_id}/last", response_model=SubscriptionResponse) +async def last_subscription(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)): """ Возвращает последнюю подписку пользователя. """ - logger.info(f"Получение последней подписки для пользователя: {user_id}") + logger.info(f"Получение последней подписки для пользователя: {telegram_id}") try: - subscriptions = await database_manager.get_last_subscriptions(user_id=user_id, limit=1) + subscriptions = await database_manager.get_last_subscriptions(telegram_id=telegram_id, limit=1) if not subscriptions: - logger.warning(f"Подписки для пользователя {user_id} не найдены") + logger.warning(f"Подписки для пользователя {telegram_id} не найдены") raise HTTPException(status_code=404, detail="No subscriptions found") sub = subscriptions[0] @@ -80,7 +80,7 @@ async def last_subscription(user_id: UUID, database_manager: DatabaseManager = D "updated_at": sub.updated_at.isoformat(), } except SQLAlchemyError as e: - logger.error(f"Ошибка базы данных при получении подписки для пользователя {user_id}: {e}") + logger.error(f"Ошибка базы данных при получении подписки для пользователя {telegram_id}: {e}") raise HTTPException(status_code=500, detail="Database error") except HTTPException as e: # Пропускаем HTTPException, чтобы FastAPI обработал её автоматически @@ -89,18 +89,18 @@ async def last_subscription(user_id: UUID, database_manager: DatabaseManager = D logger.error(f"Неожиданная ошибка: {e}") raise HTTPException(status_code=500, detail="Internal Server Error") -@router.get("/subscriptions/{user_id}", response_model=List[SubscriptionResponse]) -async def get_subscriptions(user_id: UUID, database_manager: DatabaseManager = Depends(get_database_manager)): +@router.get("/subscriptions/{telegram_id}", response_model=List[SubscriptionResponse]) +async def get_subscriptions(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)): """ Возвращает список подписок пользователя. """ - logger.info(f"Получение подписок для пользователя: {user_id}") + logger.info(f"Получение подписок для пользователя: {telegram_id}") try: # Получаем подписки без ограничений или с указанным лимитом - subscriptions = await database_manager.last_subscriptions(user_id=str(user_id)) + subscriptions = await database_manager.get_last_subscriptions(telegram_id=telegram_id) if not subscriptions: - logger.warning(f"Подписки для пользователя {user_id} не найдены") + logger.warning(f"Подписки для пользователя {telegram_id} не найдены") raise HTTPException(status_code=404, detail="No subscriptions found") # Формируем список подписок для ответа @@ -116,17 +116,15 @@ async def get_subscriptions(user_id: UUID, database_manager: DatabaseManager = D for sub in subscriptions ] except SQLAlchemyError as e: - logger.error(f"Ошибка базы данных при получении подписок для пользователя {user_id}: {e}") + logger.error(f"Ошибка базы данных при получении подписок для пользователя {telegram_id}: {e}") raise HTTPException(status_code=500, detail="Database error") except Exception as e: logger.error(f"Неожиданная ошибка: {e}") raise HTTPException(status_code=500, detail=str(e)) - except HTTPException as e: - # Пропускаем HTTPException, чтобы FastAPI обработал её автоматически - raise e + @router.get("/uri", response_model=dict) -async def get_uri(telegram_id: str, database_manager: DatabaseManager = Depends(get_database_manager)): +async def get_uri(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)): """ Возвращает список подписок пользователя. """ diff --git a/app/routes/support_routes.py b/app/routes/support_routes.py deleted file mode 100644 index 107d3be..0000000 --- a/app/routes/support_routes.py +++ /dev/null @@ -1,249 +0,0 @@ -from fastapi import APIRouter, Depends, HTTPException -from sqlalchemy.exc import SQLAlchemyError -from app.services.db_manager import DatabaseManager -from instance.configdb import get_database_manager -from uuid import UUID -from pydantic import BaseModel -from fastapi import Response, status -from enum import Enum -from typing import List, Literal, Optional -from datetime import datetime -import logging -logger = logging.getLogger(__name__) - -class UpdateTicketStatusRequest(BaseModel): - new_status: str - -class TicketStatus(str, Enum): - OPEN = "open" - PENDING = "pending" - CLOSED = "closed" - -class CreateTicketRequest(BaseModel): - subject: str - message: str - -class TicketResponse(BaseModel): - id: int - user_id: UUID - subject: str - message: str - status: TicketStatus - created_at: datetime - updated_at: datetime - -class TicketMessageRequest(BaseModel): - message: str - -class TicketMessageResponse(BaseModel): - ticket_id: int - sender: str - message: str - created_at: datetime - -router = APIRouter() - -def handle_exception(e: Exception, message: str): - logger.error(f"{message}: {e}") - raise HTTPException(status_code=500, detail=f"{message}: {str(e)}") - -@router.post("/support/tickets/{ticket_id}/messages", summary="Добавить сообщение") -async def add_message( - ticket_id: int, - request: TicketMessageRequest, - sender: Literal["user", "support"], # "user" или "support" - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Добавляет сообщение в тикет. - - Args: - ticket_id (int): ID тикета. - request (TicketMessageRequest): Данные сообщения. - sender (str): Отправитель ("user" или "support"). - database_manager (DatabaseManager): Управление базой данных. - - Returns: - TicketMessageResponse: Данные созданного сообщения. - """ - try: - message = await database_manager.add_message_to_ticket(ticket_id=ticket_id, sender=sender, message=request.message) - if not message: - raise HTTPException(status_code=404, detail="Тикет не найден или ошибка добавления сообщения") - if message != "OK": - raise HTTPException(status_code=500, detail="Ошибка добавления сообщения") - return Response(status_code=status.HTTP_200_OK) - except Exception as e: - handle_exception(e,"Ошибка добавления сообщения") - -@router.get("/support/tickets/{ticket_id}/messages", response_model=List[TicketMessageResponse], summary="Получить сообщения") -async def get_messages( - ticket_id: int, - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Возвращает список сообщений в тикете. - - Args: - ticket_id (int): ID тикета, для которого нужно получить сообщения. - database_manager (DatabaseManager): Менеджер базы данных. - - Returns: - List[TicketMessageResponse]: Список сообщений, связанных с тикетом. - - Raises: - HTTPException: 404, если сообщения для тикета не найдены. - HTTPException: 500, если произошла ошибка на сервере. - """ - try: - messages = await database_manager.get_ticket_messages(ticket_id=ticket_id) - ticket_info = await database_manager.get_ticket(ticket_id) - logger.info(messages) - - result_messages = [] - if messages: - for message in messages: - correct_response = TicketMessageResponse( - ticket_id=ticket_id, - sender=message.sender, - message=message.message, - created_at=message.created_at - ) - result_messages.append(correct_response) - - result_messages.insert(0, - TicketMessageResponse( - ticket_id=ticket_id, - sender="user", - message=ticket_info['message'], - created_at=ticket_info["created_at"] - ) - ) - return result_messages - except Exception as e: - handle_exception(e,"Ошибка получения сообщения") - - -@router.get("/support/ticket/{ticket_id}", response_model=TicketResponse, summary="Получить информацию о тикете") -async def get_ticket( - ticket_id: int, - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Возвращает информацию о тикете. - - Args: - ticket_id (int): ID тикета, информацию котрого хочет получить пользователь. - database_manager (DatabaseManager): Менеджер базы данных. - - Returns: - TicketResponse: Информация о тикете. - - Raises: - HTTPException: 404, если тикет не найден. - HTTPException: 500, если произошла ошибка на сервере. - """ - try: - # Получаем данные о тикете - ticket = await database_manager.get_ticket(ticket_id=ticket_id) - if not ticket: - raise HTTPException(status_code=404, detail="Тикет не найден") - - # Возвращаем данные через Pydantic-модель - return TicketResponse(**ticket) - except SQLAlchemyError as e: - handle_exception(e, "Ошибка получения тикета") - except Exception as e: - handle_exception(e, "Неизвестная ошибка при получении тикета") - - - -@router.post("/support/tickets", response_model=TicketResponse, summary="Создать тикет") -async def create_ticket( - request: CreateTicketRequest, - user_id: UUID, - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Создаёт новый тикет для пользователя. - - Args: - request (CreateTicketRequest): Данные для создания тикета (тема и сообщение). - user_id (UUID): Идентификатор пользователя, создающего тикет. - database_manager (DatabaseManager): Менеджер базы данных. - - Returns: - TicketResponse: Данные созданного тикета. - - Raises: - HTTPException: 500, если произошла ошибка при создании тикета. - """ - try: - ticket = await database_manager.create_ticket( - user_id=user_id, - subject=request.subject, - message=request.message - ) - return ticket - except Exception as e: - handle_exception(e,"Ошибка содания тикета") - -@router.get("/support/tickets", response_model=List[TicketResponse], summary="Получить список тикетов") -async def list_tickets( - user_id: UUID, - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Возвращает список тикетов пользователя. - - Args: - user_id (UUID): Идентификатор пользователя, чьи тикеты нужно получить. - database_manager (DatabaseManager): Менеджер базы данных. - - Returns: - List[TicketResponse]: Список тикетов пользователя. - - Raises: - HTTPException: 404, если тикеты не найдены. - HTTPException: 500, если произошла ошибка на сервере. - """ - try: - tickets = await database_manager.get_active_tickets(user_id=user_id) - if not tickets: - raise HTTPException(status_code=404, detail="Тикеты не найдены") - return tickets - except Exception as e: - handle_exception(e,"Ошибка получения тикетов") - -@router.patch("/support/ticket/{ticket_id}/status", summary="Обновить статус тикета") -async def update_ticket_status( - ticket_id: int, - request: UpdateTicketStatusRequest, - database_manager: DatabaseManager = Depends(get_database_manager) -): - """ - Обновляет статус тикета. - - Args: - ticket_id (int): ID тикета. - request (UpdateTicketStatusRequest): Запрос с новым статусом. - database_manager (DatabaseManager): Менеджер базы данных. - - Returns: - dict: Подтверждение обновления статуса. - - Raises: - HTTPException: Если тикет не найден или произошла ошибка. - """ - try: - result = await database_manager.update_ticket_status(ticket_id, request.new_status) - if result != "OK": - return "ERROR" - return "OK" - except ValueError as e: - logger.error(f"Тикет с ID {ticket_id} не найден: {e}") - raise HTTPException(status_code=404, detail="Тикет не найден.") - except Exception as e: - logger.error(f"Ошибка обновления статуса тикета {ticket_id}: {e}") - raise HTTPException(status_code=500, detail="Не удалось обновить статус тикета.") - diff --git a/app/routes/user_routes.py b/app/routes/user_routes.py index 0712ee7..f80e413 100644 --- a/app/routes/user_routes.py +++ b/app/routes/user_routes.py @@ -1,7 +1,7 @@ import sys from fastapi import APIRouter, Depends, HTTPException from fastapi.exceptions import HTTPException -from app.services.db_manager import DatabaseManager +from app.services import DatabaseManager from sqlalchemy.exc import SQLAlchemyError from instance.configdb import get_database_manager from pydantic import BaseModel @@ -20,20 +20,19 @@ router = APIRouter() # Модели запросов и ответов class CreateUserRequest(BaseModel): - telegram_id: str - referrer_id: Optional[str] = None + telegram_id: int + invited_by: Optional[int] = None class UserResponse(BaseModel): - id: UUID - telegram_id: str + telegram_id: int username: Optional[str] balance: float - referrer_id: Optional[str] + invited_by: Optional[int] = None created_at: str updated_at: str class AddReferal(BaseModel): - new_user_id: str + invited_id: int @router.post("/user/create", response_model=UserResponse, summary="Создать пользователя") async def create_user( @@ -44,16 +43,15 @@ async def create_user( Создание пользователя через Telegram ID. """ try: - user = await db_manager.create_user(request.telegram_id,request.referrer_id) - if user == "ERROR": + user = await db_manager.create_user(request.telegram_id,request.invited_by) + if user == None: raise HTTPException(status_code=500, detail="Failed to create user") return UserResponse( - id=user.id, telegram_id=user.telegram_id, username=user.username, balance=user.balance, - referrer_id=user.referrer_id if user.referrer_id is not None else None, + invited_by=user.invited_by if user.invited_by is not None else None, created_at=user.created_at.isoformat(), updated_at=user.updated_at.isoformat() ) @@ -64,7 +62,7 @@ async def create_user( @router.get("/user/{telegram_id}", response_model=UserResponse, summary="Получить информацию о пользователе") async def get_user( - telegram_id: str, + telegram_id: int, db_manager: DatabaseManager = Depends(get_database_manager) ): """ @@ -77,13 +75,12 @@ async def get_user( logger.warning(f"Пользователь с telegram_id {telegram_id} не найден.") raise HTTPException(status_code=404, detail="User not found") - print(f"Пользователь найден: ID={user.id}, Username={user.username}") + print(f"Пользователь найден: ID={user.telegram_id}, Username={user.username}") user_response = UserResponse( - id=user.id, telegram_id=user.telegram_id, username=user.username, balance=user.balance, - referrer_id=user.referrer_id if user.referrer_id is not None else None, + invited_by=user.invited_by if user.invited_by is not None else None, created_at=user.created_at.isoformat(), updated_at=user.updated_at.isoformat() ) @@ -105,7 +102,7 @@ async def get_user( @router.post("/user/{telegram_id}/balance/{amount}", summary="Обновить баланс") async def update_balance( - telegram_id: str, + telegram_id: int, amount: float, db_manager: DatabaseManager = Depends(get_database_manager) ): @@ -132,52 +129,55 @@ async def update_balance( raise HTTPException(status_code=500, detail=str(e)) -@router.get("/user/{user_id}/transactions", summary="Последние транзакции пользователя") +@router.get("/user/{telegram_id}/transactions", summary="Последние транзакции пользователя") async def last_transactions( - user_id: UUID, + telegram_id: int, db_manager: DatabaseManager = Depends(get_database_manager) ): """ Возвращает список последних транзакций пользователя. """ - logger.info(f"Получен запрос на транзакции для пользователя: {user_id}") + logger.info(f"Получен запрос на транзакции для пользователя: {telegram_id}") try: - logger.debug(f"Вызов метода get_transaction с user_id={user_id}") - transactions = await db_manager.get_transaction(user_id) + logger.debug(f"Вызов метода get_transaction с user_id={telegram_id}") + transactions = await db_manager.get_transaction(telegram_id) if transactions == "ERROR": - logger.error(f"Ошибка при получении транзакций для пользователя: {user_id}") + logger.error(f"Ошибка при получении транзакций для пользователя: {telegram_id}") raise HTTPException(status_code=500, detail="Failed to fetch transactions") - - logger.debug(f"Транзакции для {user_id}: {transactions}") + if transactions == None: + response = [] + logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}") + return response + logger.debug(f"Транзакции для {telegram_id}: {transactions}") response = [ { "id": tx.id, "amount": tx.amount, "created_at": tx.created_at.isoformat(), - "transaction_type": tx.transaction_type, + "type": tx.type, } for tx in transactions ] - logger.info(f"Формирование ответа для пользователя {user_id}: {response}") + logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}") return response except HTTPException as http_ex: - logger.warning(f"HTTP ошибка для {user_id}: {http_ex.detail}") + logger.warning(f"HTTP ошибка для {telegram_id}: {http_ex.detail}") raise http_ex except SQLAlchemyError as db_ex: - logger.error(f"Ошибка базы данных для {user_id}: {db_ex}") + logger.error(f"Ошибка базы данных для {telegram_id}: {db_ex}") raise HTTPException(status_code=500, detail="Database error") except Exception as e: - logger.exception(f"Неожиданная ошибка для {user_id}: {e}") + logger.exception(f"Неожиданная ошибка для {telegram_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/user/{referrer_id}/add_referral", summary="Обновить баланс") async def add_referal( - referrer_id: str, + referrer_id: int, request: AddReferal, db_manager: DatabaseManager = Depends(get_database_manager) ): @@ -186,12 +186,12 @@ async def add_referal( """ logger.info(f"Получен запрос на добавление реферала: telegram_id={referrer_id}") try: - result = await db_manager.add_referal(referrer_id) + result = await db_manager.add_referal(referrer_id,request.invited_id) if result == "ERROR": - logger.error(f"Ошибка добавления реферала для {referrer_id} c айди {request.new_user_id}") + logger.error(f"Ошибка добавления реферала для {referrer_id} c айди {request.invited_id}") raise HTTPException(status_code=500, detail="Failed to update balance") - logger.info(f"Добавлен реферал для {referrer_id} c айди {request.new_user_id}") + logger.info(f"Добавлен реферал для {referrer_id} c айди {request.invited_id}") return {"message": "Balance updated successfully"} except HTTPException as http_ex: logger.warning(f"HTTP ошибка: {http_ex.detail}") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..d6a4af0 --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1,4 @@ +from .db_manager import DatabaseManager +#from .marzban import MarzbanService + +__all__ = ['DatabaseManager'] \ No newline at end of file diff --git a/app/services/db_manager.py b/app/services/db_manager.py index 6b8dc80..dd2c767 100644 --- a/app/services/db_manager.py +++ b/app/services/db_manager.py @@ -1,10 +1,9 @@ from decimal import Decimal import json -from instance.model import User, Subscription, Transaction, SupportTicket, TicketMessage, TicketStatus -from .xui_rep import PanelInteraction +from instance.model import User, Subscription, Transaction +from app.services.marzban import MarzbanService from .postgres_rep import PostgresRepository -from .mongo_rep import MongoDBRepository -from instance.model import Transaction +from instance.model import Transaction,TransactionType from dateutil.relativedelta import relativedelta from datetime import datetime import random @@ -20,52 +19,46 @@ class DatabaseManager: Инициализация с асинхронным генератором сессий (например, get_postgres_session). """ self.logger = logging.getLogger(__name__) - self.mongo_repo = MongoDBRepository() self.postgres_repo = PostgresRepository(session_generator, self.logger) - async def get_active_tickets(self, user_id: UUID): - """ - Получает активные подписки пользователя - """ - return await self.postgres_repo.list_active_tickets(user_id) - async def create_user(self, telegram_id: str, referrer_id: Optional[str]= None): + async def create_user(self, telegram_id: int, invented_by: Optional[int]= None): """ Создаёт пользователя. """ try: username = self.generate_string(6) - return await self.postgres_repo.create_user(telegram_id, username, referrer_id) + return await self.postgres_repo.create_user(telegram_id, username, invented_by) except Exception as e: self.logger.error(f"Ошибка при создании пользователя:{e}") - async def get_user_by_telegram_id(self, telegram_id: str): + async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ return await self.postgres_repo.get_user_by_telegram_id(telegram_id) - async def add_transaction(self, user_id: UUID, amount: float): + async def add_transaction(self, telegram_id: int, amount: float): """ Добавляет транзакцию. """ tran = Transaction( - user_id=user_id, + user_id=telegram_id, amount=Decimal(amount), - transaction_type="default" + type=TransactionType.DEPOSIT ) return await self.postgres_repo.add_record(tran) - async def add_referal(self,referrer_id: str, new_user_id: str): + async def add_referal(self,referrer_id: int, new_user_telegram_id: int): """ Добавление рефералу пользователей """ - return await self.postgres_repo.add_referal(referrer_id,new_user_id) - async def get_transaction(self, user_id: UUID, limit: int = 10): + return await self.postgres_repo.add_referral(referrer_id,new_user_telegram_id) + async def get_transaction(self, telegram_id: int, limit: int = 10): """ Возвращает транзакции. """ - return await self.postgres_repo.get_last_transactions(user_id, limit) + return await self.postgres_repo.get_last_transactions(telegram_id, limit) - async def update_balance(self, telegram_id: str, amount: float): + async def update_balance(self, telegram_id: int, amount: float): """ Обновляет баланс пользователя и добавляет транзакцию. """ @@ -81,227 +74,186 @@ class DatabaseManager: return "ERROR" self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции") - await self.add_transaction(user.id, amount) + await self.add_transaction(user.telegram_id, amount) return "OK" - async def get_active_subscription(self, telegram_id: str): + async def get_active_subscription(self, telegram_id: int): """ Проверяет наличие активной подписки. """ return await self.postgres_repo.get_active_subscription(telegram_id) - async def get_last_subscriptions(self, user_id: UUID, limit: int ): + async def get_last_subscriptions(self, telegram_id: int, limit: int = 1): """ Возвращает список последних подписок. """ - return await self.postgres_repo.get_last_subscription_by_user_id(user_id, limit) + return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id, limit) - async def buy_sub(self, telegram_id: str, plan_id: str): + async def buy_sub(self, telegram_id: int, plan_id: str): """ Покупает подписку. """ - active_subscription = await self.get_active_subscription(telegram_id) - self.logger.info(f"{active_subscription}") - if active_subscription: - self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") - return "ACTIVE_SUBSCRIPTION_EXISTS" + # active_subscription = await self.get_active_subscription(telegram_id) + # self.logger.info(f"{active_subscription}") + # if active_subscription: + # self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.") + # return "ACTIVE_SUBSCRIPTION_EXISTS" - result = await self._initialize_user_and_plan(telegram_id, plan_id) - if isinstance(result, str): - return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" + # result = await self._initialize_user_and_plan(telegram_id, plan_id) + # if isinstance(result, str): + # return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS" - user, plan = result - await self.postgres_repo.update_balance(user,-plan['price']) - new_subscription, server = await self._create_subscription_and_add_client(user, plan) + # user, plan = result + # await self.postgres_repo.update_balance(user,-plan['price']) + # new_subscription, server = await self._create_subscription_and_add_client(user, plan) - if not new_subscription: - return "ERROR" + # if not new_subscription: + # return "ERROR" - self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.") - return "OK" + # self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.") + # return "OK" + pass async def _initialize_user_and_plan(self, telegram_id, plan_id): """ Инициализирует пользователя и план подписки. """ - user = await self.get_user_by_telegram_id(telegram_id) - if not user: - self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") - return "ERROR" + # user = await self.get_user_by_telegram_id(telegram_id) + # if not user: + # self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.") + # return "ERROR" - plan = await self.mongo_repo.get_subscription_plan(plan_id) - if not plan: - self.logger.error(f"Тарифный план {plan_id} не найден.") - return "TARIFF_NOT_FOUND" + # plan = await self.mongo_repo.get_subscription_plan(plan_id) + # if not plan: + # self.logger.error(f"Тарифный план {plan_id} не найден.") + # return "TARIFF_NOT_FOUND" - cost = int(plan["price"]) - if user.balance < cost: - self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") - return "INSUFFICIENT_FUNDS" + # cost = int(plan["price"]) + # if user.balance < cost: + # self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") + # return "INSUFFICIENT_FUNDS" - return user, plan + # return user, plan + pass async def _create_subscription_and_add_client(self, user, plan): """ Создаёт подписку и добавляет клиента на сервер. """ - expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) - server = await self.mongo_repo.get_server_with_least_clients() - if not server: - self.logger.error("Нет доступных серверов для подписки.") - return None, None + # expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) + # server = await self.mongo_repo.get_server_with_least_clients() + # if not server: + # self.logger.error("Нет доступных серверов для подписки.") + # return None, None - new_subscription = Subscription( - user_id=user.id, - vpn_server_id=str(server["server"]["name"]), - plan=plan["name"], - expiry_date=expiry_date, - ) + # new_subscription = Subscription( + # user_id=user.id, + # vpn_server_id=str(server["server"]["name"]), + # plan=plan["name"], + # expiry_date=expiry_date, + # ) - panel = PanelInteraction( - base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", - login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, - logger=self.logger, - certificate=server["server"]["certificate"]["data"], - ) + # panel = PanelInteraction( + # base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", + # login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, + # logger=self.logger, + # certificate=server["server"]["certificate"]["data"], + # ) - response = await panel.add_client( - inbound_id=1, - expiry_date=expiry_date.isoformat(), - email=user.username, - ) - if response != "OK": - self.logger.error(f"Ошибка при добавлении клиента: {response}") - return None, None - await self.postgres_repo.add_record(new_subscription) + # response = await panel.add_client( + # inbound_id=1, + # expiry_date=expiry_date.isoformat(), + # email=user.username, + # ) + # if response != "OK": + # self.logger.error(f"Ошибка при добавлении клиента: {response}") + # return None, None + # await self.postgres_repo.add_record(new_subscription) - return new_subscription, server + # return new_subscription, server + pass - async def generate_uri(self, telegram_id: str): + async def generate_uri(self, telegram_id: int): """ Генерация URI для пользователя. :param telegram_id: Telegram ID пользователя. :return: Строка URI или None в случае ошибки. """ - try: - # Извлечение данных - subscription = await self.postgres_repo.get_active_subscription(telegram_id) - if not subscription: - self.logger.error(f"Подписки для пользователя {telegram_id} не найдены.") - return "SUB_ERROR" + # try: + # # Извлечение данных + # subscription = await self.postgres_repo.get_active_subscription(telegram_id) + # if not subscription: + # self.logger.error(f"Подписки для пользователя {telegram_id} не найдены.") + # return "SUB_ERROR" - server = await self.mongo_repo.get_server(subscription.vpn_server_id) - if not server: - self.logger.error(f"Сервер с ID {subscription.vpn_server_id} не найден в MongoDB.") - return None + # server = await self.mongo_repo.get_server(subscription.vpn_server_id) + # if not server: + # self.logger.error(f"Сервер с ID {subscription.vpn_server_id} не найден в MongoDB.") + # return None - user = await self.postgres_repo.get_user_by_telegram_id(telegram_id) - if not user: - self.logger.error(f"Пользователь с telegram_id {telegram_id} не найден.") - return None + # user = await self.postgres_repo.get_user_by_telegram_id(telegram_id) + # if not user: + # self.logger.error(f"Пользователь с telegram_id {telegram_id} не найден.") + # return None - email = user.username # Используем email из данных пользователя + # email = user.username # Используем email из данных пользователя - panel = PanelInteraction( - base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", - login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, - logger=self.logger, - certificate=server["server"]["certificate"]["data"], - ) + # panel = PanelInteraction( + # base_url=f"https://{server['server']['ip']}:{server['server']['port']}/{server['server']['secretKey']}", + # login_data={"username": server["server"]["login"], "password": server["server"]["password"]}, + # logger=self.logger, + # certificate=server["server"]["certificate"]["data"], + # ) - inbound_info = await panel.get_inbound_info(inbound_id=1) # Используем фиксированный ID - if not inbound_info: - self.logger.error(f"Не удалось получить информацию об инбаунде для ID {subscription.vpn_server_id}.") - return None + # inbound_info = await panel.get_inbound_info(inbound_id=1) # Используем фиксированный ID + # if not inbound_info: + # self.logger.error(f"Не удалось получить информацию об инбаунде для ID {subscription.vpn_server_id}.") + # return None - # Логируем полученные данные - self.logger.info(f"Inbound Info: {inbound_info}") + # # Логируем полученные данные + # self.logger.info(f"Inbound Info: {inbound_info}") - # Разбор JSON-строк - try: - stream_settings = json.loads(inbound_info["obj"]["streamSettings"]) - except KeyError as e: - self.logger.error(f"Ключ 'streamSettings' отсутствует: {e}") - return None - except json.JSONDecodeError as e: - self.logger.error(f"Ошибка разбора JSON для 'streamSettings': {e}") - return None + # # Разбор JSON-строк + # try: + # stream_settings = json.loads(inbound_info["obj"]["streamSettings"]) + # except KeyError as e: + # self.logger.error(f"Ключ 'streamSettings' отсутствует: {e}") + # return None + # except json.JSONDecodeError as e: + # self.logger.error(f"Ошибка разбора JSON для 'streamSettings': {e}") + # return None - settings = json.loads(inbound_info["obj"]["settings"]) # Разбираем JSON + # settings = json.loads(inbound_info["obj"]["settings"]) # Разбираем JSON - # Находим клиента по email - client = next((c for c in settings["clients"] if c["email"] == email), None) - if not client: - self.logger.error(f"Клиент с email {email} не найден среди клиентов.") - return None + # # Находим клиента по email + # client = next((c for c in settings["clients"] if c["email"] == email), None) + # if not client: + # self.logger.error(f"Клиент с email {email} не найден среди клиентов.") + # return None - server_info = server["server"] + # server_info = server["server"] - # Преобразование данных в формат URI - uri = ( - f"vless://{client['id']}@{server_info['ip']}:443?" - f"type={stream_settings['network']}&security={stream_settings['security']}" - f"&pbk={stream_settings['realitySettings']['settings']['publicKey']}" - f"&fp={stream_settings['realitySettings']['settings']['fingerprint']}" - f"&sni={stream_settings['realitySettings']['serverNames'][0]}" - f"&sid={stream_settings['realitySettings']['shortIds'][0]}" - f"&spx=%2F&flow={client['flow']}" - f"#{inbound_info['obj']['remark']}-{client['email']}" - ) + # # Преобразование данных в формат URI + # uri = ( + # f"vless://{client['id']}@{server_info['ip']}:443?" + # f"type={stream_settings['network']}&security={stream_settings['security']}" + # f"&pbk={stream_settings['realitySettings']['settings']['publicKey']}" + # f"&fp={stream_settings['realitySettings']['settings']['fingerprint']}" + # f"&sni={stream_settings['realitySettings']['serverNames'][0]}" + # f"&sid={stream_settings['realitySettings']['shortIds'][0]}" + # f"&spx=%2F&flow={client['flow']}" + # f"#{inbound_info['obj']['remark']}-{client['email']}" + # ) - self.logger.info(f"Сформирован URI для пользователя {telegram_id}: {uri}") - return uri - except Exception as e: - self.logger.error(f"Ошибка при генерации URI для пользователя {telegram_id}: {e}") - return None - - async def get_ticket(self,ticket_id: int): - """ - Ищет тикет по айди - """ - return await self.postgres_repo.get_ticket(ticket_id) - - async def create_ticket(self, user_id: UUID, subject: str, message: str): - """ - Создаёт тикет - """ - ticket = SupportTicket(user_id=user_id,subject=subject,message=message) - return await self.postgres_repo.add_record(ticket) - - async def add_message_to_ticket(self,ticket_id : int,sender: str,message: str): - """ - Добавляет сообщения к тикету - """ - message = TicketMessage(ticket_id=ticket_id, sender=sender, message=message) - result = await self.postgres_repo.add_record(message) - if result == None: - return "ERROR" - return "OK" - - async def get_ticket_messages(self,ticket_id: int): - """ - Получает сообщения тикета - """ - return await self.postgres_repo.get_ticket_messages(ticket_id) - - async def update_ticket_status(self, ticket_id: int, new_status: str): - """ - Обновляет статус тикета. - - Args: - ticket_id (int): ID тикета, статус которого нужно обновить. - new_status (str): Новый статус тикета. - - Returns: - dict: Словарь с ID тикета и обновлённым статусом. - - Raises: - ValueError: Если тикет не найден. - """ - return await self.postgres_repo.set_new_status(ticket_id,new_status) + # self.logger.info(f"Сформирован URI для пользователя {telegram_id}: {uri}") + # return uri + # except Exception as e: + # self.logger.error(f"Ошибка при генерации URI для пользователя {telegram_id}: {e}") + # return None + pass @staticmethod def generate_string(length): diff --git a/app/services/marzban.py b/app/services/marzban.py new file mode 100644 index 0000000..c4fec17 --- /dev/null +++ b/app/services/marzban.py @@ -0,0 +1,282 @@ +from typing import Any, Dict, Optional, Literal +import aiohttp +import requests +from datetime import date, datetime, time, timezone +import logging +from instance import User, Subscription + +class MarzbanUser: + """Модель пользователя Marzban""" + def __init__(self, data: Dict[str, Any]): + self.username = data.get('username') + self.status = data.get('status') + self.expire = data.get('expire') + self.data_limit = data.get('data_limit') + self.data_limit_reset_strategy = data.get('data_limit_reset_strategy') + self.used_traffic = data.get('used_traffic') + self.lifetime_used_traffic = data.get('lifetime_used_traffic') + self.subscription_url = data.get('subscription_url') + self.online_at = data.get('online_at') + self.created_at = data.get('created_at') + self.proxies = data.get('proxies', {}) + self.inbounds = data.get('inbounds', {}) + self.note = data.get('note') + +class UserStatus: + """Статус пользователя""" + def __init__(self, data: Dict[str, Any]): + self.used_traffic = data.get('used_traffic', 0) + self.lifetime_used_traffic = data.get('lifetime_used_traffic', 0) + self.online_at = data.get('online_at') + self.status = data.get('status') + self.expire = data.get('expire') + self.data_limit = data.get('data_limit') + +class MarzbanService: + def __init__(self, baseURL: str, username: str, password: str) -> None: + self.base_url = baseURL.rstrip('/') + self.token = self._get_token(username, password) + self.headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json" + } + self._session: Optional[aiohttp.ClientSession] = None + + def _get_token(self, username: str, password: str) -> str: + """Получение токена авторизации""" + try: + response = requests.post( + f"{self.base_url}/api/admin/token", + data={'username': username, 'password': password} + ) + response.raise_for_status() + return response.json()['access_token'] + except requests.RequestException as e: + logging.error(f"Failed to get token: {e}") + raise Exception(f"Authentication failed: {e}") + + async def _get_session(self) -> aiohttp.ClientSession: + """Ленивое создание сессии""" + if self._session is None or self._session.closed: + timeout = aiohttp.ClientTimeout(total=30) + self._session = aiohttp.ClientSession(timeout=timeout) + return self._session + + async def _make_request(self, endpoint: str, method: Literal["get", "post", "put", "delete"], + data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Улучшенный метод для запросов""" + url = f"{self.base_url}{endpoint}" + + try: + session = await self._get_session() + + async with session.request( + method=method.upper(), + url=url, + headers=self.headers, + json=data + ) as response: + + response_data = await response.json() if response.content_length else {} + + if response.status not in (200, 201): + raise Exception(f"HTTP {response.status}: {response_data}") + + return response_data + + except aiohttp.ClientError as e: + logging.error(f"Network error during {method.upper()} to {url}: {e}") + raise + except Exception as e: + logging.error(f"Unexpected error during request to {url}: {e}") + raise + + async def create_user(self, user: User, subscription: Subscription) -> MarzbanUser: + """Создает нового пользователя в Marzban""" + username = f"user_{user.telegram_id}" + if subscription.end_date: + if isinstance(subscription.end_date, datetime): + if subscription.end_date.tzinfo is None: + end_date = subscription.end_date.replace(tzinfo=timezone.utc) + else: + end_date = subscription.end_date + expire_timestamp = int(end_date.timestamp()) + elif isinstance(subscription.end_date, date): + end_datetime = datetime.combine( + subscription.end_date, + time(23, 59, 59), + tzinfo=timezone.utc + ) + expire_timestamp = int(end_datetime.timestamp()) + else: + expire_timestamp = 0 + else: + expire_timestamp = 0 + + data = { + "username": username, + "status": "active", + "expire": expire_timestamp, + "data_limit": 100 * 1073741824, # Конвертируем GB в bytes + "data_limit_reset_strategy": "no_reset", + "proxies": { + "trojan": {} + }, + "inbounds": { + "trojan": ["TROJAN WS NOTLS"] + }, + "note": f"Telegram: {user.telegram_id}", + "on_hold_timeout": None, + "on_hold_expire_duration": 0, + "next_plan": { + "add_remaining_traffic": False, + "data_limit": 0, + "expire": 0, + "fire_on_either": True + } + } + + try: + response_data = await self._make_request("/api/user", "post", data) + marzban_user = MarzbanUser(response_data) + logging.info(f"User {username} created successfully") + return marzban_user + except Exception as e: + logging.error(f"Failed to create user {username}: {e}") + raise Exception(f"Failed to create user: {e}") + + async def update_user(self, user: User, subscription: Subscription) -> MarzbanUser: + """Обновляет существующего пользователя""" + username = f"user_{user.telegram_id}" + + if subscription.end_date: + if isinstance(subscription.end_date, datetime): + # Если это datetime, преобразуем в timestamp + if subscription.end_date.tzinfo is None: + end_date = subscription.end_date.replace(tzinfo=timezone.utc) + else: + end_date = subscription.end_date + expire_timestamp = int(end_date.timestamp()) + elif isinstance(subscription.end_date, date): + # Если это date, создаем datetime на конец дня и преобразуем в timestamp + end_datetime = datetime.combine( + subscription.end_date, + time(23, 59, 59), + tzinfo=timezone.utc + ) + expire_timestamp = int(end_datetime.timestamp()) + else: + expire_timestamp = 0 + else: + expire_timestamp = 0 + + data = { + "status": "active", + "expire": expire_timestamp + } + + try: + response_data = await self._make_request(f"/api/user/{username}", "put", data) + marzban_user = MarzbanUser(response_data) + logging.info(f"User {username} updated successfully") + return marzban_user + except Exception as e: + logging.error(f"Failed to update user {username}: {e}") + raise Exception(f"Failed to update user: {e}") + + async def disable_user(self, user: User) -> bool: + """Отключает пользователя""" + username = f"user_{user.telegram_id}" + + data = { + "status": "disabled" + } + + try: + await self._make_request(f"/api/user/{username}", "put", data) + logging.info(f"User {username} disabled successfully") + return True + except Exception as e: + logging.error(f"Failed to disable user {username}: {e}") + return False + + async def enable_user(self, user: User) -> bool: + """Включает пользователя""" + username = f"user_{user.telegram_id}" + + data = { + "status": "active" + } + + try: + await self._make_request(f"/api/user/{username}", "put", data) + logging.info(f"User {username} enabled successfully") + return True + except Exception as e: + logging.error(f"Failed to enable user {username}: {e}") + return False + + async def delete_user(self, user: User) -> bool: + """Полностью удаляет пользователя из Marzban""" + username = f"user_{user.telegram_id}" + + try: + await self._make_request(f"/api/user/{username}", "delete") + logging.info(f"User {username} deleted successfully") + return True + except Exception as e: + logging.error(f"Failed to delete user {username}: {e}") + return False + + async def get_user_status(self, user: User) -> UserStatus: + """Получает текущий статус пользователя""" + username = f"user_{user.telegram_id}" + + try: + response_data = await self._make_request(f"/api/user/{username}", "get") + return UserStatus(response_data) + except Exception as e: + logging.error(f"Failed to get status for user {username}: {e}") + raise Exception(f"Failed to get user status: {e}") + + async def get_subscription_url(self, user: User) -> str: + """Возвращает готовую subscription_url для подключения""" + username = f"user_{user.telegram_id}" + + try: + response_data = await self._make_request(f"/api/user/{username}", "get") + return response_data.get('subscription_url', '') + except Exception as e: + logging.error(f"Failed to get subscription URL for user {username}: {e}") + return "" + + async def get_config_links(self, user: User) -> str: + """Возвращает конфигурации для подключения""" + username = f"user_{user.telegram_id}" + + try: + response_data = await self._make_request(f"/api/user/{username}", "get") + return response_data.get('links', '') + except Exception as e: + logging.error(f"Failed to get configurations URL's for user {username}: {e}") + return "" + + async def check_marzban_health(self) -> bool: + """Проверяет доступность Marzban API""" + try: + await self._make_request("/api/admin", "get") + return True + except Exception as e: + logging.error(f"Marzban health check failed: {e}") + return False + + async def close(self): + """Закрытие сессии""" + if self._session and not self._session.closed: + await self._session.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() \ No newline at end of file diff --git a/app/services/mongo_rep.py b/app/services/mongo_rep.py deleted file mode 100644 index 522501c..0000000 --- a/app/services/mongo_rep.py +++ /dev/null @@ -1,158 +0,0 @@ -import os -from motor.motor_asyncio import AsyncIOMotorClient -from pymongo.errors import DuplicateKeyError, NetworkTimeout -import logging - - -class MongoDBRepository: - def __init__(self): - # Настройки MongoDB из переменных окружения - mongo_uri = os.getenv("MONGO_URL") - database_name = os.getenv("DB_NAME") - server_collection = os.getenv("SERVER_COLLECTION", "servers") - plan_collection = os.getenv("PLAN_COLLECTION", "plans") - - # Подключение к базе данных и коллекциям - self.client = AsyncIOMotorClient(mongo_uri) - self.db = self.client[database_name] - self.collection = self.db[server_collection] # Коллекция серверов - self.plans_collection = self.db[plan_collection] # Коллекция планов - self.logger = logging.getLogger(__name__) - - async def add_subscription_plan(self, plan_data): - """Добавляет новый тарифный план в коллекцию.""" - try: - result = await self.plans_collection.insert_one(plan_data) - self.logger.debug(f"Тарифный план добавлен с ID: {result.inserted_id}") - return result.inserted_id - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def get_subscription_plan(self, plan_name): - """Получает тарифный план по его имени.""" - try: - plan = await self.plans_collection.find_one({"name": plan_name}) - if plan: - self.logger.debug(f"Найден тарифный план: {plan}") - else: - self.logger.error(f"Тарифный план {plan_name} не найден.") - return plan - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def add_server(self, server_data): - """Добавляет новый VPN сервер в коллекцию.""" - try: - result = await self.collection.insert_one(server_data) - self.logger.debug(f"VPN сервер добавлен с ID: {result.inserted_id}") - return result.inserted_id - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def get_server(self, server_name: str): - """Получает сервер VPN по его ID.""" - try: - server = await self.collection.find_one({"server.name": server_name}) - if server: - self.logger.debug(f"Найден VPN сервер: {server}") - else: - self.logger.debug(f"VPN сервер с ID {server_name} не найден.") - return server - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def get_server_with_least_clients(self): - """Возвращает сервер с наименьшим количеством подключенных клиентов.""" - try: - pipeline = [ - { - "$addFields": { - "current_clients": {"$size": {"$ifNull": ["$clients", []]}} - } - }, - { - "$sort": {"current_clients": 1} - }, - { - "$limit": 1 - } - ] - - result = await self.collection.aggregate(pipeline).to_list(length=1) - if result: - server = result[0] - self.logger.debug(f"Найден сервер с наименьшим количеством клиентов: {server}") - return server - else: - self.logger.debug("Не найдено серверов.") - return None - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def update_server(self, server_name, update_data): - """Обновляет данные VPN сервера.""" - try: - result = await self.collection.update_one({"server_name": server_name}, {"$set": update_data}) - if result.matched_count > 0: - self.logger.debug(f"VPN сервер с ID {server_name} обновлен.") - else: - self.logger.debug(f"VPN сервер с ID {server_name} не найден.") - return result.matched_count > 0 - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def delete_server(self, server_name): - """Удаляет VPN сервер по его ID.""" - try: - result = await self.collection.delete_one({"name": server_name}) - if result.deleted_count > 0: - self.logger.debug(f"VPN сервер с ID {server_name} удален.") - else: - self.logger.debug(f"VPN сервер с ID {server_name} не найден.") - return result.deleted_count > 0 - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - async def list_servers(self): - """Возвращает список всех VPN серверов.""" - try: - servers = await self.collection.find().to_list(length=1000) # Получить до 1000 серверов (можно настроить) - self.logger.debug(f"Найдено {len(servers)} VPN серверов.") - return servers - except DuplicateKeyError: - self.logger.error("Дублирующий ключ.") - except NetworkTimeout: - self.logger.error("Сетевой таймаут.") - - - async def __aenter__(self): - """ - Метод вызывается при входе в блок with. - """ - self.logger.debug("Контекстный менеджер: подключение открыто.") - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - """ - Метод вызывается при выходе из блока with. - """ - await self.close_connection() - if exc_type: - self.logger.error(f"Контекстный менеджер завершён с ошибкой: {exc_value}") - else: - self.logger.debug("Контекстный менеджер: подключение закрыто.") - diff --git a/app/services/payment_service.py b/app/services/payment_service.py deleted file mode 100644 index e69de29..0000000 diff --git a/app/services/postgres_rep.py b/app/services/postgres_rep.py index c561655..090211c 100644 --- a/app/services/postgres_rep.py +++ b/app/services/postgres_rep.py @@ -1,11 +1,12 @@ from datetime import datetime +from typing import Optional from uuid import UUID from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from decimal import Decimal from sqlalchemy import asc, desc, update from sqlalchemy.orm import joinedload -from instance.model import TicketMessage, User, Subscription, Transaction,SupportTicket +from instance.model import Referral, User, Subscription, Transaction class PostgresRepository: @@ -13,13 +14,13 @@ class PostgresRepository: self.session_generator = session_generator self.logger = logger - async def create_user(self, telegram_id: str, username: str, referrer_id: str): + async def create_user(self, telegram_id: int, username: str, invited_by: Optional[int]= None): """ Создаёт нового пользователя в PostgreSQL. """ async for session in self.session_generator(): try: - new_user = User(telegram_id=telegram_id, username=username, referrer_id=referrer_id) + new_user = User(telegram_id=telegram_id, username=username, invited_by=invited_by) session.add(new_user) await session.commit() return new_user @@ -28,7 +29,7 @@ class PostgresRepository: await session.rollback() return None - async def get_active_subscription(self, telegram_id: str): + async def get_active_subscription(self, telegram_id: int): """ Проверяет наличие активной подписки у пользователя. """ @@ -46,7 +47,7 @@ class PostgresRepository: self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}") return None - async def get_user_by_telegram_id(self, telegram_id: str): + async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ @@ -69,24 +70,24 @@ class PostgresRepository: :param amount: Сумма для добавления/вычитания. :return: True, если успешно, иначе False. """ - self.logger.info(f"Обновление баланса пользователя: id={user.id}, current_balance={user.balance}, amount={amount}") + self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}") async for session in self.session_generator(): try: - user = await session.get(User, user.id) # Загружаем пользователя в той же сессии + user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии if not user: - self.logger.warning(f"Пользователь с ID {user.id} не найден.") + self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.") return False # Приведение amount к Decimal user.balance += Decimal(amount) await session.commit() - self.logger.info(f"Баланс пользователя id={user.id} успешно обновлен: new_balance={user.balance}") + self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}") return True except SQLAlchemyError as e: - self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.id}: {e}") + self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}") await session.rollback() return False - async def get_last_transactions(self, user_id: UUID, limit: int = 10): + async def get_last_transactions(self, user_telegram_id: int, limit: int = 10): """ Возвращает последние транзакции пользователя. """ @@ -94,16 +95,16 @@ class PostgresRepository: try: result = await session.execute( select(Transaction) - .where(Transaction.user_id == user_id) + .where(Transaction.user_id == user_telegram_id) .order_by(desc(Transaction.created_at)) .limit(limit) ) return result.scalars().all() except SQLAlchemyError as e: - self.logger.error(f"Ошибка получения транзакций пользователя {user_id}: {e}") + self.logger.error(f"Ошибка получения транзакций пользователя {user_telegram_id}: {e}") return None - async def get_last_subscription_by_user_id(self, user_id: UUID, limit: int = 1): + async def get_last_subscription_by_user_id(self, user_telegram_id: int, limit: int = 1): """ Извлекает последнюю подписку пользователя на основании user_id. @@ -114,15 +115,16 @@ class PostgresRepository: try: result = await session.execute( select(Subscription) - .where(Subscription.user_id == user_id) + .where(Subscription.user_id == user_telegram_id) .order_by(desc(Subscription.created_at)) .limit(limit) ) subscriptions = list(result.scalars()) + result.scalars() self.logger.info(f"Найдены такие подписки: {subscriptions}") return subscriptions except SQLAlchemyError as e: - self.logger.error(f"Ошибка при получении подписки для пользователя {user_id}: {e}") + self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}") return None async def add_record(self, record): @@ -142,117 +144,58 @@ class PostgresRepository: await session.rollback() return None - async def list_active_tickets(self, user_id: UUID): + async def add_referral(self, referrer_id: int, referral_id: int): + """ + Добавление реферальной связи между пользователями. + """ async for session in self.session_generator(): try: - tickets = await session.execute( - select(SupportTicket) + # Проверить, существует ли уже такая реферальная связь + existing_referral = await session.execute( + select(Referral) .where( - SupportTicket.user_id == user_id, - SupportTicket.status.in_([status.upper() for status in ["pending", "open"]]) - ) + (Referral.inviter_id == referrer_id) & + (Referral.invited_id == referral_id) + ) ) - result = list(tickets.scalars().all()) - self.logger.info(f"Получены активные тикеты: {result}") - return result - except SQLAlchemyError as e: - self.logger.error(f"Произошла ошибка при поиске активных тикетов: {e}") - return None + existing_referral = existing_referral.scalars().first() + + if existing_referral: + raise ValueError("Referral relationship already exists") - async def get_ticket(self, ticket_id): - async for session in self.session_generator(): - try: - ticket = await session.execute( - select(SupportTicket) - .where(SupportTicket.id == ticket_id) - ) - result = ticket.scalars().first() - self.logger.info(f"Получен тикет {ticket_id}.") - if result: - serialized_result = { - "id": result.id, - "user_id": result.user_id, - "subject": result.subject, - "message": result.message, - "status": result.status, - "created_at": result.created_at.isoformat(), - "updated_at": result.updated_at.isoformat(), - } - return serialized_result - except SQLAlchemyError as e: - self.logger.error(f"Произошла ошибка при поиске тикета {ticket_id}.") - return None - - async def get_ticket_messages(self, ticket_id: int): - async for session in self.session_generator(): - try: - # Выполняем запрос для получения сообщений, сортированных по дате - result = await session.execute( - select(TicketMessage) - .where(TicketMessage.ticket_id == ticket_id) - .order_by(asc(TicketMessage.created_at)) - ) - messages = result.scalars().all() - self.logger.info(f"Получены сообщения для тикета {ticket_id}, {messages}") - self.logger.info(messages) - return messages - except SQLAlchemyError as e: - self.logger.error(f"Ошибка при получении сообщений для тикета {ticket_id}: {e}") - return [] - - async def set_new_status(self,ticket_id: int, new_status: str): - async for session in self.session_generator(): - try: - # Выполняем обновление тикета - result = await session.execute( - update(SupportTicket) - .where(SupportTicket.id == ticket_id) - .values(status=new_status) - .execution_options(synchronize_session="fetch") - ) - if result.rowcount == 0: - raise ValueError(f"Тикет с ID {ticket_id} не найден.") - - await session.commit() - self.logger.info(f"Статус тикета {ticket_id} обновлён на '{new_status}'.") - return "OK" - except SQLAlchemyError as e: - self.logger.error(f"Ошибка обновления статуса тикета {ticket_id}: {e}") - await session.rollback() - return "ERROR" - - async def add_referal(self,referrer_id: str, referral_id:str): - """ - Добавление рефералу пользователей - """ - async for session in self.session_generator(): - try: + # Проверить, что пользователи существуют referrer = await session.execute( - select(User) - .where(User.id == referrer_id) - .options(joinedload(User.referrals)) # Загрузка связанных объектов + select(User).where(User.telegram_id == referrer_id) ) referrer = referrer.scalars().first() + if not referrer: raise ValueError("Referrer not found") - # Проверить, существует ли уже такой реферал - existing_referrals = [ref.id for ref in referrer.referrals] - if referrer_id in existing_referrals: - raise ValueError("Referral already exists") - - # Найти реферала - referral = await session.execute( - select(User).where(User.id == referral_id) + referral_user = await session.execute( + select(User).where(User.telegram_id == referral_id) ) - referral = referral.scalars().first() - - if not referral: + referral_user = referral_user.scalars().first() + + if not referral_user: raise ValueError("Referral user not found") - # Добавить реферала в список - referrer.referrals.append(referral) - await session.commit() - except Exception as e: - self.logger(f"Ошибка при добавлении рефералу пользователей") + # Проверить, что пользователь не приглашает сам себя + if referrer_id == referral_id: + raise ValueError("User cannot refer themselves") + # Создать новую реферальную связь + new_referral = Referral( + inviter_id=referrer_id, + invited_id=referral_id + ) + + session.add(new_referral) + await session.commit() + + self.logger.info(f"Реферальная связь создана: {referrer_id} -> {referral_id}") + + except Exception as e: + await session.rollback() + self.logger.error(f"Ошибка при добавлении реферальной связи: {str(e)}") + raise \ No newline at end of file diff --git a/app/services/xui_rep.py b/app/services/xui_rep.py deleted file mode 100644 index 26d5831..0000000 --- a/app/services/xui_rep.py +++ /dev/null @@ -1,232 +0,0 @@ -import aiohttp -import uuid -import json -import base64 -import ssl - -def generate_uuid(): - return str(uuid.uuid4()) - - -class PanelInteraction: - def __init__(self, base_url, login_data, logger, certificate=None, is_encoded=True): - self.base_url = base_url - self.login_data = login_data - self.logger = logger - self.ssl_context = self._create_ssl_context(certificate, is_encoded) - self.session_id = None - self.headers = None - - def _create_ssl_context(self, certificate, is_encoded): - if not certificate: - raise ValueError("Certificate is required.") - try: - ssl_context = ssl.create_default_context() - if is_encoded: - certificate = base64.b64decode(certificate).decode() - ssl_context.load_verify_locations(cadata=certificate) - return ssl_context - except Exception as e: - self.logger.error(f"Error creating SSL context: {e}") - raise ValueError("Invalid certificate format.") from e - - async def _ensure_logged_in(self): - if not self.session_id: - try: - self.session_id = await self.login() - if self.session_id: - self.headers = { - 'Accept': 'application/json', - 'Cookie': f'3x-ui={self.session_id}', - 'Content-Type': 'application/json' - } - else: - self.logger.error("Login failed: Unable to retrieve session ID.") - raise ValueError("Login failed: No session ID.") - except Exception as e: - self.logger.exception("Unexpected error during login.") - raise - - async def login(self): - login_url = f"{self.base_url}/login" - self.logger.info(f"Attempting to login at: {login_url}") - - async with aiohttp.ClientSession() as session: - try: - async with session.post( - login_url, data=self.login_data, ssl=self.ssl_context, timeout=10 - ) as response: - if response.status == 200: - session_id = response.cookies.get("3x-ui") - if session_id: - return session_id.value - else: - self.logger.error("Login failed: No session ID received.") - else: - error_details = await response.text() - self.logger.error(f"Login failed with status {response.status}: {error_details}") - except aiohttp.ClientError as e: - self.logger.exception(f"Login request failed: {e}") - raise - - async def get_inbound_info(self, inbound_id: int = 1): - """ - Fetch inbound information by ID. - - :param inbound_id: ID of the inbound. - :return: JSON response or None. - """ - await self._ensure_logged_in() - url = f"{self.base_url}/panel/api/inbounds/get/{inbound_id}" - async with aiohttp.ClientSession() as session: - try: - async with session.get( - url, headers=self.headers, ssl=self.ssl_context, timeout=10 - ) as response: - response_text = await response.text() # Получаем текст ответа - self.logger.info(f"Inbound Info (raw): {response_text}") # Логируем сырой текст - if response.status == 200: - return await response.json() - else: - self.logger.error(f"Failed to get inbound info: {response.status}") - return None - except aiohttp.ClientError as e: - self.logger.error(f"Get inbound info request failed: {e}") - return None - - async def get_client_traffic(self, email): - """ - Fetch traffic information for a specific client. - - :param email: Client's email. - :return: JSON response or None. - """ - await self._ensure_logged_in() - url = f"{self.base_url}/panel/api/inbounds/getClientTraffics/{email}" - async with aiohttp.ClientSession() as session: - try: - async with session.get( - url, headers=self.headers, ssl=self.ssl_context, timeout=10 - ) as response: - if response.status == 200: - return await response.json() - else: - self.logger.error(f"Failed to get client traffic: {response.status}") - return None - except aiohttp.ClientError as e: - self.logger.error(f"Get client traffic request failed: {e}") - return None - - async def update_client_expiry(self, client_uuid, new_expiry_time, client_email): - """ - Update the expiry date of a specific client. - - :param client_uuid: UUID of the client. - :param new_expiry_time: New expiry date in ISO format. - :param client_email: Client's email. - :return: None. - """ - await self._ensure_logged_in() - url = f"{self.base_url}/panel/api/inbounds/updateClient" - update_data = { - "id": 1, - "settings": { - "clients": [ - { - "id": client_uuid, - "alterId": 0, - "email": client_email, - "limitIp": 2, - "totalGB": 0, - "expiryTime": new_expiry_time, - "enable": True, - "tgId": "", - "subId": "" - } - ] - } - } - - async with aiohttp.ClientSession() as session: - try: - async with session.post( - url, headers=self.headers, json=update_data, ssl=self.ssl_context - ) as response: - if response.status == 200: - self.logger.info("Client expiry updated successfully.") - else: - self.logger.error(f"Failed to update client expiry: {response.status}") - except aiohttp.ClientError as e: - self.logger.error(f"Update client expiry request failed: {e}") - - async def add_client(self, inbound_id, expiry_date, email): - """ - Add a new client to an inbound. - - :param inbound_id: ID of the inbound. - :param expiry_date: Expiry date in ISO format. - :param email: Client's email. - :return: JSON response or None. - """ - await self._ensure_logged_in() - url = f"{self.base_url}/panel/api/inbounds/addClient" - client_info = { - "id": generate_uuid(), - "flow": "xtls-rprx-vision", - "email": email, - "limitIp": 2, - "totalGB": 0, - "expiryTime": expiry_date, - "enable": True, - "tgId": "", - "subId": "", - "reset": 0 - } - settings = json.dumps({"clients": [client_info]}) # Преобразуем объект в JSON-строку - - payload = { - "id": int(inbound_id), # Преобразуем inbound_id в число - "settings": settings # Передаем settings как JSON-строку - } - - async with aiohttp.ClientSession() as session: - try: - async with session.post( - url, headers=self.headers, json=payload, ssl=self.ssl_context - ) as response: - response_json = await response.json() - if response.status == 200 and response_json.get('success'): - self.logger.info(f"Клиент успешно добавлен: {response_json}") - return "OK" - else: - error_msg = response_json.get('msg', 'Причина не указана') - self.logger.error(f"Не удалось добавить клиента: {error_msg}") - return None - except aiohttp.ClientError as e: - self.logger.error(f"Add client request failed: {e}") - return None - - async def delete_depleted_clients(self, inbound_id=None): - """ - Удалить исчерпанных клиентов. - - :param inbound_id: ID входящего соединения (inbound), если None, удаляет для всех. - :return: Ответ сервера или None в случае ошибки. - """ - await self._ensure_logged_in() - url = f"{self.base_url}/panel/api/inbounds/delDepletedClients/{inbound_id or ''}".rstrip('/') - async with aiohttp.ClientSession() as session: - try: - async with session.post(url, headers=self.headers, ssl=self.ssl_context, timeout=10) as response: - if response.status == 200: - self.logger.info(f"Depleted clients deleted successfully for inbound_id: {inbound_id}") - return await response.json() - else: - error_details = await response.text() - self.logger.error(f"Failed to delete depleted clients: {response.status} - {error_details}") - return None - except aiohttp.ClientError as e: - self.logger.error(f"Delete depleted clients request failed: {e}") - return None - - diff --git a/instance/__init__.py b/instance/__init__.py new file mode 100644 index 0000000..63c481f --- /dev/null +++ b/instance/__init__.py @@ -0,0 +1,9 @@ +from .model import User,Transaction,Subscription,SubscriptionStatus,Referral,Plan,TransactionType +from .config import setup_logging +from .configdb import get_postgres_session,init_postgresql,close_connections + +__all__ = ['User','Transaction', + 'SubscriptionStatus','Subscription', + 'Referral','Plan','get_postgres_session', + 'setup_logging','init_postgresql', + 'close_connections','TransactionType'] \ No newline at end of file diff --git a/instance/config.py b/instance/config.py index 18c3dca..5309c1a 100644 --- a/instance/config.py +++ b/instance/config.py @@ -19,7 +19,7 @@ def setup_logging(): logging.basicConfig( level=logging.INFO, handlers=[console_handler], - force=True # Перезаписать существующие настройки + force=True ) # Установка уровня для конкретных логгеров diff --git a/instance/configdb.py b/instance/configdb.py index 0f171e9..1d0ae6a 100644 --- a/instance/configdb.py +++ b/instance/configdb.py @@ -1,25 +1,20 @@ import os from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker -from motor.motor_asyncio import AsyncIOMotorClient from app.services.db_manager import DatabaseManager from .model import Base - +try: # Настройки PostgreSQL из переменных окружения -POSTGRES_DSN = os.getenv("POSTGRES_URL") + POSTGRES_DSN = os.getenv("POSTGRES_URL") # Создание движка для PostgreSQL -postgres_engine = create_async_engine(POSTGRES_DSN, echo=False) + if POSTGRES_DSN is None: + raise Exception + postgres_engine = create_async_engine(POSTGRES_DSN, echo=False) +except Exception as e: + print("Ошибки при инициализации сессии постгреса") AsyncSessionLocal = sessionmaker(bind=postgres_engine, class_=AsyncSession, expire_on_commit=False) -# Настройки MongoDB из переменных окружения -MONGO_URI = os.getenv("MONGO_URL") -DATABASE_NAME = os.getenv("DB_NAME") - -# Создание клиента MongoDB -mongo_client = AsyncIOMotorClient(MONGO_URI) -mongo_db = mongo_client[DATABASE_NAME] - # Инициализация PostgreSQL async def init_postgresql(): """ @@ -32,18 +27,6 @@ async def init_postgresql(): except Exception as e: print(f"Failed to connect to PostgreSQL: {e}") -# Инициализация MongoDB -async def init_mongodb(): - """ - Проверка подключения к MongoDB. - """ - try: - # Проверяем подключение к MongoDB - await mongo_client.admin.command("ping") - print("MongoDB connected.") - except Exception as e: - print(f"Failed to connect to MongoDB: {e}") - # Получение сессии PostgreSQL async def get_postgres_session(): """ @@ -61,9 +44,6 @@ async def close_connections(): await postgres_engine.dispose() print("PostgreSQL connection closed.") - # Закрытие MongoDB - mongo_client.close() - print("MongoDB connection closed.") def get_database_manager() -> DatabaseManager: """ diff --git a/instance/model.py b/instance/model.py index a4b62d4..ecd940e 100644 --- a/instance/model.py +++ b/instance/model.py @@ -1,94 +1,100 @@ -from sqlalchemy import Column, String, Numeric, DateTime, Boolean, ForeignKey, Integer, Enum, Text +from sqlalchemy import Column, String, Numeric, DateTime, ForeignKey, Integer, Enum, Text, BigInteger from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine -from sqlalchemy.orm import declarative_base, relationship, sessionmaker +from sqlalchemy.orm import declarative_base, relationship from datetime import datetime from enum import Enum as PyEnum import uuid Base = declarative_base() -def generate_uuid(): - return str(uuid.uuid4()) +class SubscriptionStatus(PyEnum): + ACTIVE = "active" + EXPIRED = "expired" + CANCELLED = "cancelled" - -class TicketStatus(PyEnum): - OPEN = "open" +class TransactionStatus(PyEnum): PENDING = "pending" - CLOSED = "closed" + SUCCESS = "success" + FAILED = "failed" +class TransactionType(PyEnum): + DEPOSIT = "deposit" + WITHDRAWAL = "withdrawal" + PAYMENT = "payment" -"""Пользователи""" +# Пользователи class User(Base): __tablename__ = 'users' - - id = Column(UUID(as_uuid=True), primary_key=True, default=generate_uuid) - telegram_id = Column(String, unique=True, nullable=False) # telegram_id как уникальный идентификатор - username = Column(String) + + telegram_id = Column(BigInteger, primary_key=True) + username = Column(String(255)) balance = Column(Numeric(10, 2), default=0.0) - referrer_id = Column(String, ForeignKey('users.telegram_id'), nullable=True) # Ссылка на telegram_id + ref_code = Column(String(32), unique=True) # Реферальный код пользователя + invited_by = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - referrals = relationship("User", backref="referrer", remote_side=[telegram_id]) # Ссылка на telegram_id + + # Relationships subscriptions = relationship("Subscription", back_populates="user") transactions = relationship("Transaction", back_populates="user") - admins = relationship("Administrators", back_populates="user") + sent_referrals = relationship("Referral", + foreign_keys="Referral.inviter_id", + back_populates="inviter") + received_referrals = relationship("Referral", + foreign_keys="Referral.invited_id", + back_populates="invited") +# Реферальные связи +class Referral(Base): + __tablename__ = 'referrals' + + id = Column(Integer, primary_key=True) + inviter_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False) + invited_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False) + created_at = Column(DateTime, default=datetime.utcnow) + + inviter = relationship("User", foreign_keys=[inviter_id], back_populates="sent_referrals") + invited = relationship("User", foreign_keys=[invited_id], back_populates="received_referrals") -"""Подписки""" +# Тарифные планы +class Plan(Base): + __tablename__ = 'plans' + + id = Column(Integer, primary_key=True) + name = Column(String(100), nullable=False) + price = Column(Numeric(10, 2), nullable=False) + duration_days = Column(Integer, nullable=False) + description = Column(Text) + + subscriptions = relationship("Subscription", back_populates="plan") + +# Подписки class Subscription(Base): __tablename__ = 'subscriptions' - - id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) - vpn_server_id = Column(String) - plan = Column(String) - expiry_date = Column(DateTime) + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False) + plan_id = Column(Integer, ForeignKey('plans.id'), nullable=False) + vpn_server_id = Column(String) # ID сервера в Marzban + status = Column(Enum(SubscriptionStatus), default=SubscriptionStatus.ACTIVE) + start_date = Column(DateTime, nullable=False) + end_date = Column(DateTime, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - + user = relationship("User", back_populates="subscriptions") + plan = relationship("Plan", back_populates="subscriptions") -"""Транзакции""" +# Транзакции class Transaction(Base): __tablename__ = 'transactions' - - id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) - amount = Column(Numeric(10, 2)) - transaction_type = Column(String) - created_at = Column(DateTime, default=datetime.utcnow) - - user = relationship("User", back_populates="transactions") - -"""Тикет""" -class SupportTicket(Base): - __tablename__ = "support_tickets" - - id = Column(Integer, primary_key=True, index=True) - user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) - subject = Column(String, nullable=False) - message = Column(String, nullable=False) - status = Column(Enum(TicketStatus), default=TicketStatus.OPEN, nullable=False) - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) - -"""Сообщения из тикетов""" -class TicketMessage(Base): - __tablename__ = "ticket_messages" - - id = Column(Integer, primary_key=True, index=True) - ticket_id = Column(Integer, ForeignKey("support_tickets.id"), nullable=False) # ID тикета - sender = Column(String, nullable=False) # "user" или "support" - message = Column(Text, nullable=False) # Текст сообщения - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) # - -"""Администраторы""" -class Administrators(Base): - __tablename__ = 'admins' - - id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) - user = relationship("User", back_populates="admins") + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False) + amount = Column(Numeric(10, 2), nullable=False) + status = Column(Enum(TransactionStatus), default=TransactionStatus.PENDING) + type = Column(Enum(TransactionType), nullable=False) + payment_provider = Column(String(100)) + payment_id = Column(String, unique=True) # ID платежа в внешней системе + created_at = Column(DateTime, default=datetime.utcnow) + + user = relationship("User", back_populates="transactions") \ No newline at end of file diff --git a/main.py b/main.py index a279468..31305a2 100644 --- a/main.py +++ b/main.py @@ -1,71 +1,24 @@ import sys from fastapi import FastAPI -from instance.config import setup_logging +from instance import setup_logging import logging +setup_logging() +# logging.basicConfig( +# level=logging.INFO, +# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +# handlers=[logging.StreamHandler(sys.stdout)], +# force=True +# ) -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler(sys.stdout)], - force=True -) - - -from instance.configdb import init_postgresql, init_mongodb, close_connections -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.cron import CronTrigger -from app.routes import router, subscription_router, sup_router -from app.services.db_manager import DatabaseManager -from instance.configdb import get_postgres_session -from app.services.mongo_rep import MongoDBRepository -from app.services.xui_rep import PanelInteraction +from instance import init_postgresql, close_connections, get_postgres_session +from app.routes import router, subscription_router +from app.services import DatabaseManager logger = logging.getLogger(__name__) app = FastAPI() database_manager = DatabaseManager(session_generator=get_postgres_session) -mongo_repo = MongoDBRepository() -async def delete_depleted_clients_task(): - """ - Удаляет исчерпанных клиентов на всех серверах из MongoDB. - """ - try: - # Получаем список серверов из MongoDB - servers = await mongo_repo.list_servers() - if not servers: - logger.warning("Список серверов пуст. Задача пропущена.") - return - - # Проходим по каждому серверу и вызываем delete_depleted_clients - for server in servers: - base_url = server.get("base_url") - login_data = server.get("login_data") - certificate = server.get("certificate") - - if not base_url or not login_data: - logger.error(f"Пропуск сервера из-за отсутствия данных: {server}") - continue - - # Создаём экземпляр PanelInteraction - panel = PanelInteraction( - base_url=base_url, - login_data=login_data, - logger=logger, - certificate=certificate - ) - - # Выполняем удаление исчерпанных клиентов - response = await panel.delete_depleted_clients() - if response: - logger.info(f"Удаление клиентов завершено успешно для сервера: {base_url}") - else: - logger.warning(f"Не удалось удалить клиентов для сервера: {base_url}") - - except Exception as e: - logger.error(f"Ошибка при выполнении задачи delete_depleted_clients: {e}") - -scheduler = AsyncIOScheduler() @app.on_event("startup") async def startup(): @@ -73,15 +26,9 @@ async def startup(): Инициализация подключения к базам данных. """ try: - scheduler.add_job(delete_depleted_clients_task, CronTrigger(hour=23, minute=59)) - scheduler.start() logger.info("Инициализация PostgreSQL...") await init_postgresql() logger.info("PostgreSQL успешно инициализирован.") - - logger.info("Инициализация MongoDB...") - await init_mongodb() - logger.info("MongoDB успешно инициализирован.") except Exception as e: logger.error(f"Ошибка при инициализации баз данных: {e}") raise RuntimeError("Не удалось инициализировать базы данных") @@ -101,7 +48,7 @@ async def shutdown(): app.include_router(router, prefix="/api") #app.include_router(payment_router, prefix="/api") app.include_router(subscription_router, prefix="/api") -app.include_router(sup_router, prefix="/api") + @app.get("/") def read_root(): return {"message": "FastAPI приложение работает!"} diff --git a/requirements.txt b/requirements.txt index 27f1406..d86b84e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,8 @@ APScheduler==3.11.0 asyncpg==0.30.0 attrs==24.3.0 blinker==1.9.0 +certifi==2025.11.12 +charset-normalizer==3.4.4 click==8.1.7 dnspython==2.7.0 fastapi==0.115.6 @@ -24,12 +26,14 @@ pydantic==2.10.4 pydantic_core==2.27.2 pymongo==4.9.2 python-dateutil==2.9.0.post0 +requests==2.32.5 six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.36 starlette==0.41.3 typing_extensions==4.12.2 tzlocal==5.2 +urllib3==2.5.0 uvicorn==0.34.0 Werkzeug==3.1.3 yarl==1.18.3