diff --git a/app/routes/payment_routes.py b/app/routes/payment_routes.py index e69de29..2eebab2 100644 --- a/app/routes/payment_routes.py +++ b/app/routes/payment_routes.py @@ -0,0 +1,65 @@ +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from yookassa import Payment +from yookassa.domain.notification import WebhookNotification +from app.services.db_manager import DatabaseManager + + +router = APIRouter() + +class CreatePaymentRequest(BaseModel): + telegram_id: int + amount: float + +class PaymentResponse(BaseModel): + payment_url: str + payment_id: str + +@router.post("/payment/create", response_model=PaymentResponse) +async def create_payment(request: CreatePaymentRequest): + """ + Создаёт платёж через ЮKassa и возвращает ссылку для оплаты. + """ + try: + # Создание платежа через API ЮKassa + payment = Payment.create({ + "amount": { + "value": f"{request.amount:.2f}", + "currency": "RUB" + }, + "confirmation": { + "type": "redirect", # Тип подтверждения (redirect или embedded) + "return_url": "https://your-app.com/success" # URL возврата + }, + "description": f"Пополнение баланса для пользователя {request.telegram_id}" + }) + + # Возвращаем ссылку для оплаты и ID платежа + return PaymentResponse( + payment_url=payment.confirmation.confirmation_url, + payment_id=payment.id + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Ошибка создания платежа: {str(e)}") + + +@router.post("/payment/notification") +async def payment_notification(notification: dict, database_manager: DatabaseManager): + """ + Обрабатывает уведомления от ЮKassa. + """ + try: + # Парсим уведомление + webhook = WebhookNotification(notification) + payment_object = webhook.object + + # Проверяем статус платежа + if payment_object["status"] == "succeeded": + # Обновляем баланс пользователя + telegram_id = int(payment_object["description"].split()[-1]) + amount = float(payment_object["amount"]["value"]) + await database_manager.update_balance(telegram_id, amount) + + return {"status": "ok"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Ошибка обработки уведомления: {str(e)}") diff --git a/app/routes/subscription_routes.py b/app/routes/subscription_routes.py index 13eea2f..127b033 100644 --- a/app/routes/subscription_routes.py +++ b/app/routes/subscription_routes.py @@ -1,3 +1,4 @@ +from typing import List from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel from app.services.db_manager import DatabaseManager @@ -53,12 +54,18 @@ async def buy_subscription( # Эндпоинт для получения последней подписки @router.get("/subscription/{user_id}/last", response_model=SubscriptionResponse) async def last_subscription(user_id: UUID, database_manager: DatabaseManager = Depends(get_database_manager)): + """ + Возвращает последнюю подписку пользователя. + """ logger.info(f"Получение последней подписки для пользователя: {user_id}") try: - sub = await database_manager.last_subscription(user_id) - if sub is None: + subscriptions = await database_manager.last_subscriptions(user_id=str(user_id), limit=1) + + if not subscriptions: logger.warning(f"Подписки для пользователя {user_id} не найдены") raise HTTPException(status_code=404, detail="No subscriptions found") + sub = subscriptions[0] + return { "id": sub.id, "plan": sub.plan, @@ -74,4 +81,36 @@ async def last_subscription(user_id: UUID, database_manager: DatabaseManager = D logger.error(f"Неожиданная ошибка: {e}") raise HTTPException(status_code=500, detail=str(e)) +@router.get("/subscriptions/{user_id}", response_model=List[SubscriptionResponse]) +async def get_subscriptions(user_id: UUID, database_manager: DatabaseManager = Depends(get_database_manager)): + """ + Возвращает список подписок пользователя. + """ + logger.info(f"Получение подписок для пользователя: {user_id}") + try: + # Получаем подписки без ограничений или с указанным лимитом + subscriptions = await database_manager.last_subscriptions(user_id=str(user_id)) + + if not subscriptions: + logger.warning(f"Подписки для пользователя {user_id} не найдены") + raise HTTPException(status_code=404, detail="No subscriptions found") + + # Формируем список подписок для ответа + return [ + { + "id": sub.id, + "plan": sub.plan, + "vpn_server_id": sub.vpn_server_id, + "expiry_date": sub.expiry_date.isoformat(), + "created_at": sub.created_at.isoformat(), + "updated_at": sub.updated_at.isoformat(), + } + for sub in subscriptions + ] + except SQLAlchemyError as e: + logger.error(f"Ошибка базы данных при получении подписок для пользователя {user_id}: {e}") + raise HTTPException(status_code=500, detail="Database error") + except Exception as e: + logger.error(f"Неожиданная ошибка: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/routes/support_routes.py b/app/routes/support_routes.py new file mode 100644 index 0000000..5404467 --- /dev/null +++ b/app/routes/support_routes.py @@ -0,0 +1,187 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.exc import SQLAlchemyError +from app.services.db_manager import DatabaseManager +from instance.configdb import get_database_manager +from uuid import UUID +from pydantic import BaseModel +from enum import Enum +from typing import List, Literal, Optional +from datetime import datetime +import logging +logger = logging.getLogger(__name__) +class TicketStatus(str, Enum): + OPEN = "open" + PENDING = "pending" + CLOSED = "closed" + +class CreateTicketRequest(BaseModel): + subject: str + message: str + +class TicketResponse(BaseModel): + id: int + user_id: UUID + subject: str + message: str + status: TicketStatus + created_at: datetime + updated_at: datetime + +class TicketMessageRequest(BaseModel): + message: str + +class TicketMessageResponse(BaseModel): + id: int + ticket_id: int + sender: str + message: str + created_at: datetime + +router = APIRouter() + +def handle_exception(e: Exception, message: str): + logger.error(f"{message}: {e}") + raise HTTPException(status_code=500, detail=f"{message}: {str(e)}") + + + +@router.post("/support/tickets/{ticket_id}/messages", response_model=TicketMessageResponse, summary="Добавить сообщение") +async def add_message( + ticket_id: int, + request: TicketMessageRequest, + sender: Literal["user", "support"], # "user" или "support" + database_manager: DatabaseManager = Depends(get_database_manager) +): + """ + Добавляет сообщение в тикет. + + Args: + ticket_id (int): ID тикета. + request (TicketMessageRequest): Данные сообщения. + sender (str): Отправитель ("user" или "support"). + database_manager (DatabaseManager): Управление базой данных. + + Returns: + TicketMessageResponse: Данные созданного сообщения. + """ + try: + message = await database_manager.add_ticket_message(ticket_id=ticket_id, sender=sender, message=request.message) + if not message: + raise HTTPException(status_code=404, detail="Тикет не найден или ошибка добавления сообщения") + return message + except Exception as e: + handle_exception(e,"Ошибка добавления сообщения") + +@router.get("/support/tickets/{ticket_id}/messages", response_model=List[TicketMessageResponse], summary="Получить сообщения") +async def get_messages( + ticket_id: int, + database_manager: DatabaseManager = Depends(get_database_manager) +): + """ + Возвращает список сообщений в тикете. + + Args: + ticket_id (int): ID тикета, для которого нужно получить сообщения. + database_manager (DatabaseManager): Менеджер базы данных. + + Returns: + List[TicketMessageResponse]: Список сообщений, связанных с тикетом. + + Raises: + HTTPException: 404, если сообщения для тикета не найдены. + HTTPException: 500, если произошла ошибка на сервере. + """ + try: + messages = await database_manager.get_ticket_messages(ticket_id=ticket_id) + if not messages: + raise HTTPException(status_code=404, detail="Сообщения для тикета не найдены") + return messages + except Exception as e: + handle_exception(e,"Ошибка получения сообщения") + + +@router.post("/support/tickets", response_model=TicketResponse, summary="Создать тикет") +async def create_ticket( + request: CreateTicketRequest, + user_id: UUID, + database_manager: DatabaseManager = Depends(get_database_manager) +): + """ + Создаёт новый тикет для пользователя. + + Args: + request (CreateTicketRequest): Данные для создания тикета (тема и сообщение). + user_id (UUID): Идентификатор пользователя, создающего тикет. + database_manager (DatabaseManager): Менеджер базы данных. + + Returns: + TicketResponse: Данные созданного тикета. + + Raises: + HTTPException: 500, если произошла ошибка при создании тикета. + """ + try: + ticket = await database_manager.create_ticket( + user_id=user_id, + subject=request.subject, + message=request.message + ) + return ticket + except Exception as e: + handle_exception(e,"Ошибка содания тикета") + +@router.get("/support/tickets", response_model=List[TicketResponse], summary="Получить список тикетов") +async def list_tickets( + user_id: UUID, + database_manager: DatabaseManager = Depends(get_database_manager) +): + """ + Возвращает список тикетов пользователя. + + Args: + user_id (UUID): Идентификатор пользователя, чьи тикеты нужно получить. + database_manager (DatabaseManager): Менеджер базы данных. + + Returns: + List[TicketResponse]: Список тикетов пользователя. + + Raises: + HTTPException: 404, если тикеты не найдены. + HTTPException: 500, если произошла ошибка на сервере. + """ + try: + tickets = await database_manager.list_tickets(user_id=user_id) + if not tickets: + raise HTTPException(status_code=404, detail="Тикеты не найдены") + return tickets + except Exception as e: + handle_exception(e,"Ошибка получения тикетов") + +@router.patch("/support/tickets/{ticket_id}", response_model=TicketResponse, summary="Обновить статус тикета") +async def update_ticket_status( + ticket_id: int, + status: TicketStatus, + database_manager: DatabaseManager = Depends(get_database_manager) +): + """ + Обновляет статус тикета. + + Args: + ticket_id (int): ID тикета, статус которого нужно обновить. + status (TicketStatus): Новый статус тикета (open, pending, closed). + database_manager (DatabaseManager): Менеджер базы данных. + + Returns: + TicketResponse: Обновлённые данные тикета. + + Raises: + HTTPException: 404, если тикет не найден. + HTTPException: 500, если произошла ошибка при обновлении тикета. + """ + try: + ticket = await database_manager.update_ticket_status(ticket_id=ticket_id, status=status) + if not ticket: + raise HTTPException(status_code=404, detail="Тикет не найден") + return ticket + except Exception as e: + handle_exception(e,"Ошибка обновления тикета") diff --git a/app/services/db_manager.py b/app/services/db_manager.py index 5281b06..0dd0dfe 100644 --- a/app/services/db_manager.py +++ b/app/services/db_manager.py @@ -1,4 +1,4 @@ -from instance.model import User, Subscription, Transaction, Administrators +from instance.model import User, Subscription, Transaction, Administrators, SupportTicket,TicketMessage,TicketStatus from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import desc @@ -85,9 +85,11 @@ class DatabaseManager: await session.rollback() return "ERROR" - async def last_subscription(self, user_id: str): + async def last_subscriptions(self, user_id: str, limit: int = 10): """ - Возвращает последнюю подписку пользователя. + Возвращает список последних подписок пользователя, ограниченный заданным количеством. + :param user_id: ID пользователя + :param limit: Максимальное количество подписок для возврата """ async for session in self.session_generator(): try: @@ -95,18 +97,20 @@ class DatabaseManager: select(Subscription) .where(Subscription.user_id == str(user_id)) .order_by(desc(Subscription.created_at)) - .limit(1) # Применяем limit правильно + .limit(limit) # Ограничиваем количество результатов ) - subscription = result.scalar_one_or_none() - if subscription: - return subscription + subscriptions = result.scalars().all() # Получаем все результаты до лимита + if subscriptions: + return subscriptions else: - return None + self.logger.info(f"Для пользователя {user_id} подписки не найдены.") + return [] # Возвращаем пустой список, если подписок нет except SQLAlchemyError as e: - self.logger.error(f"Ошибка при получении подписки для пользователя {user_id}: {e}") + self.logger.error(f"Ошибка при получении подписок для пользователя {user_id}: {e}") return "ERROR" + async def last_transaction(self, user_id: UUID): """ Возвращает список транзакций пользователя. @@ -228,6 +232,97 @@ class DatabaseManager: except Exception as e: self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}") return None + + async def add_ticket_message(self, ticket_id: int, sender: str, message: str): + """ + Добавляет сообщение к тикету. + """ + async for session in self.session_generator(): + try: + self.logger.info(f"Попытка добавления сообщения в тикет {ticket_id} от {sender}") + ticket_message = TicketMessage(ticket_id=ticket_id, sender=sender, message=message) + session.add(ticket_message) + await session.commit() + self.logger.info(f"Сообщение добавлено к тикету {ticket_id}: {message}") + return ticket_message + except SQLAlchemyError as e: + self.logger.error(f"Ошибка при добавлении сообщения в тикет {ticket_id}: {e}") + await session.rollback() + return None + + async def get_ticket_messages(self, ticket_id: int): + """ + Возвращает список сообщений для указанного тикета. + """ + async for session in self.session_generator(): + try: + self.logger.info(f"Получение сообщений для тикета {ticket_id}") + result = await session.execute( + select(TicketMessage).where(TicketMessage.ticket_id == ticket_id).order_by(TicketMessage.created_at) + ) + messages = result.scalars().all() + self.logger.info(f"Найдено {len(messages)} сообщений для тикета {ticket_id}") + return messages + except SQLAlchemyError as e: + self.logger.error(f"Ошибка при получении сообщений для тикета {ticket_id}: {e}") + return None + + async def create_ticket(self, user_id: int, subject: str, message: str): + """ + Создаёт новый тикет. + """ + async for session in self.session_generator(): + try: + self.logger.info(f"Создание тикета для пользователя {user_id}: {subject}") + ticket = SupportTicket(user_id=user_id, subject=subject, message=message) + session.add(ticket) + await session.commit() + self.logger.info(f"Тикет создан с ID {ticket.id} для пользователя {user_id}") + return ticket + except SQLAlchemyError as e: + self.logger.error(f"Ошибка при создании тикета: {e}") + await session.rollback() + return None + + async def list_tickets(self, user_id: int): + """ + Возвращает список тикетов пользователя. + """ + async for session in self.session_generator(): + try: + self.logger.info(f"Получение списка тикетов для пользователя {user_id}") + result = await session.execute( + select(SupportTicket).where(SupportTicket.user_id == user_id) + ) + tickets = result.scalars().all() + self.logger.info(f"Найдено {len(tickets)} тикетов для пользователя {user_id}") + return tickets + except SQLAlchemyError as e: + self.logger.error(f"Ошибка при получении тикетов для пользователя {user_id}: {e}") + return None + + async def update_ticket_status(self, ticket_id: int, status: TicketStatus): + """ + Обновляет статус тикета. + """ + async for session in self.session_generator(): + try: + self.logger.info(f"Попытка обновления статуса тикета {ticket_id} на {status}") + result = await session.execute( + select(SupportTicket).where(SupportTicket.id == ticket_id) + ) + ticket = result.scalars().first() + if ticket: + ticket.status = status + await session.commit() + self.logger.info(f"Статус тикета {ticket_id} обновлён на {status}") + return ticket + self.logger.warning(f"Тикет с ID {ticket_id} не найден для обновления статуса") + return None + except SQLAlchemyError as e: + self.logger.error(f"Ошибка при обновлении статуса тикета {ticket_id}: {e}") + await session.rollback() + return None diff --git a/app/services/xui_rep.py b/app/services/xui_rep.py index fb3a263..48a1f89 100644 --- a/app/services/xui_rep.py +++ b/app/services/xui_rep.py @@ -204,4 +204,27 @@ class PanelInteraction: self.logger.error(f"Add client request failed: {e}") return None + async def delete_depleted_clients(self, inbound_id=None): + """ + Удалить исчерпанных клиентов. + + :param inbound_id: ID входящего соединения (inbound), если None, удаляет для всех. + :return: Ответ сервера или None в случае ошибки. + """ + await self._ensure_logged_in() + url = f"{self.base_url}/panel/api/inbounds/delDepletedClients/{inbound_id or ''}".rstrip('/') + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, headers=self.headers, ssl=self.ssl_context, timeout=10) as response: + if response.status == 200: + self.logger.info(f"Depleted clients deleted successfully for inbound_id: {inbound_id}") + return await response.json() + else: + error_details = await response.text() + self.logger.error(f"Failed to delete depleted clients: {response.status} - {error_details}") + return None + except aiohttp.ClientError as e: + self.logger.error(f"Delete depleted clients request failed: {e}") + return None + diff --git a/instance/model.py b/instance/model.py index cea7a32..6c77500 100644 --- a/instance/model.py +++ b/instance/model.py @@ -1,7 +1,9 @@ -from sqlalchemy import Column, String, Numeric, DateTime, Boolean, ForeignKey, Integer +from sqlalchemy import Column, String, Numeric, DateTime, Boolean, ForeignKey, Integer, Enum, Text +from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import declarative_base, relationship, sessionmaker from datetime import datetime +from enum import Enum as PyEnum import uuid Base = declarative_base() @@ -9,11 +11,18 @@ Base = declarative_base() def generate_uuid(): return str(uuid.uuid4()) + +class TicketStatus(PyEnum): + OPEN = "open" + PENDING = "pending" + CLOSED = "closed" + + """Пользователи""" class User(Base): __tablename__ = 'users' - id = Column(String, primary_key=True, default=generate_uuid) + id = Column(UUID(as_uuid=True), primary_key=True, default=generate_uuid) telegram_id = Column(Integer, unique=True, nullable=False) username = Column(String) balance = Column(Numeric(10, 2), default=0.0) @@ -29,7 +38,7 @@ class Subscription(Base): __tablename__ = 'subscriptions' id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(String, ForeignKey('users.id')) + user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) vpn_server_id = Column(String) plan = Column(String) expiry_date = Column(DateTime) @@ -43,18 +52,40 @@ class Transaction(Base): __tablename__ = 'transactions' id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(String, ForeignKey('users.id')) + user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) amount = Column(Numeric(10, 2)) transaction_type = Column(String) created_at = Column(DateTime, default=datetime.utcnow) user = relationship("User", back_populates="transactions") +"""Тикет""" +class SupportTicket(Base): + __tablename__ = "support_tickets" + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) + subject = Column(String, nullable=False) + message = Column(String, nullable=False) + status = Column(Enum(TicketStatus), default=TicketStatus.OPEN, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + +"""Сообщения из тикетов""" +class TicketMessage(Base): + __tablename__ = "ticket_messages" + + id = Column(Integer, primary_key=True, index=True) + ticket_id = Column(Integer, ForeignKey("support_tickets.id"), nullable=False) # ID тикета + sender = Column(String, nullable=False) # "user" или "support" + message = Column(Text, nullable=False) # Текст сообщения + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) # + """Администраторы""" class Administrators(Base): __tablename__ = 'admins' id = Column(String, primary_key=True, default=generate_uuid) - user_id = Column(String, ForeignKey('users.id')) + user_id = Column(UUID(as_uuid=True), ForeignKey('users.id')) user = relationship("User", back_populates="admins") diff --git a/main.py b/main.py index ad804a5..e3e4926 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,12 @@ from fastapi import FastAPI from instance.configdb import init_postgresql, init_mongodb, close_connections -from app.routes import user_router, subscription_router +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from app.routes import user_router, subscription_router, sup_router from app.services.db_manager import DatabaseManager from instance.configdb import get_postgres_session +from app.services.mongo_rep import MongoDBRepository +from app.services.xui_rep import PanelInteraction import logging logging.basicConfig(level=logging.INFO) @@ -11,6 +15,47 @@ logger = logging.getLogger(__name__) app = FastAPI() database_manager = DatabaseManager(session_generator=get_postgres_session) +mongo_repo = MongoDBRepository() +async def delete_depleted_clients_task(): + """ + Удаляет исчерпанных клиентов на всех серверах из MongoDB. + """ + try: + # Получаем список серверов из MongoDB + servers = await mongo_repo.list_servers() + if not servers: + logger.warning("Список серверов пуст. Задача пропущена.") + return + + # Проходим по каждому серверу и вызываем delete_depleted_clients + for server in servers: + base_url = server.get("base_url") + login_data = server.get("login_data") + certificate = server.get("certificate") + + if not base_url or not login_data: + logger.error(f"Пропуск сервера из-за отсутствия данных: {server}") + continue + + # Создаём экземпляр PanelInteraction + panel = PanelInteraction( + base_url=base_url, + login_data=login_data, + logger=logger, + certificate=certificate + ) + + # Выполняем удаление исчерпанных клиентов + response = await panel.delete_depleted_clients() + if response: + logger.info(f"Удаление клиентов завершено успешно для сервера: {base_url}") + else: + logger.warning(f"Не удалось удалить клиентов для сервера: {base_url}") + + except Exception as e: + logger.error(f"Ошибка при выполнении задачи delete_depleted_clients: {e}") + +scheduler = AsyncIOScheduler() @app.on_event("startup") async def startup(): @@ -18,6 +63,8 @@ async def startup(): Инициализация подключения к базам данных. """ try: + scheduler.add_job(delete_depleted_clients_task, CronTrigger(hour=23, minute=59)) + scheduler.start() logger.info("Инициализация PostgreSQL...") await init_postgresql() logger.info("PostgreSQL успешно инициализирован.") @@ -44,7 +91,7 @@ async def shutdown(): app.include_router(user_router, prefix="/api") #app.include_router(payment_router, prefix="/api") app.include_router(subscription_router, prefix="/api") - +app.include_router(sup_router, prefix="/api") @app.get("/") def read_root(): return {"message": "FastAPI приложение работает!"} diff --git a/requirements.txt b/requirements.txt index 6bc80d9..27f1406 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ aiohttp==3.11.11 aiosignal==1.3.2 annotated-types==0.7.0 anyio==4.7.0 +APScheduler==3.11.0 asyncpg==0.30.0 attrs==24.3.0 blinker==1.9.0 @@ -28,6 +29,7 @@ sniffio==1.3.1 SQLAlchemy==2.0.36 starlette==0.41.3 typing_extensions==4.12.2 +tzlocal==5.2 uvicorn==0.34.0 Werkzeug==3.1.3 yarl==1.18.3