Compare commits

22 Commits

Author SHA1 Message Date
574d2094e5 Некоторые поправки что бы всё работало 2025-12-05 15:47:17 +03:00
f16cb3bc2e добавил подсчёт количества рефералов 2025-12-05 13:58:45 +03:00
root
eb9e00b27c Убрал мусор, добавил скрипт для тарифов 2025-12-01 16:03:50 +03:00
root
cc95ae1a6b Интеграция биллинг при покупке подписки 2025-11-29 22:19:13 +03:00
root
4d3f8b2ad3 Удалил пополнение баланса, так как его тут не должно быть, это ответственность billing 2025-11-29 22:18:24 +03:00
root
8348a9b44b Мелкие изменения в модели и дебаг для ошибки в получении транзакций 2025-11-29 20:00:12 +03:00
root
a8a31940d5 Интегрировал биллинг при оплате подписки 2025-11-29 15:12:53 +03:00
root
9ffa5ba0f7 Ошибка в ендпоинте получения последней подписки, исправил 2025-11-28 18:40:42 +03:00
root
61fadc5c0d Добавление сервиса для биллинг 2025-11-28 18:35:55 +03:00
root
6a701da4f7 Я случайно букву поставил в репозитории и убрал self из db_manager 2025-11-27 19:52:48 +03:00
root
a001694608 Встроил марзбан в бекенд, исправил бывшие проблемы с получением активной подписки 2025-11-26 18:04:22 +03:00
root
e975bf4774 Переделал модель БД под новую, переделал Репозиторий, переделал сервисы, убрал монгодб, изменил необходимые пакеты, марзбан я добавил, но не настроил. Весь старый бот вроде работает(только в рефералке не уверен) 2025-11-24 23:43:40 +03:00
root
f0f3b96005 Рабочая версия 2025-11-23 17:45:29 +03:00
Disledg
54f04cc355 Вроде пофиксил всё 2025-01-18 17:33:44 +03:00
Disledg
0392eee6e1 Модель проблемы вызывала 2025-01-18 16:43:27 +03:00
Disledg
6d6b8832cf Изменение эндпоинтов и БД для работы с длинными telegram_id(сменил тип данных с инт на стринг) 2025-01-18 15:48:02 +03:00
Disledg
1aabe8f88e Переделаны часть обработчиков и роутов для работы с тикетами. 2025-01-12 06:30:53 +03:00
Disledg
7c36a0f157 Последующие фиксы под изменения в боте 2025-01-07 11:56:02 +03:00
Disledg
54bfedd6f2 Рефакторинга чуть чуть вроде 2025-01-07 08:30:17 +03:00
Disledg
63f7251860 нач сап системы 2025-01-04 21:17:39 +03:00
Disledg
9407806cc2 начало сап системы и помойму всё 2025-01-04 21:17:24 +03:00
Disledg
3544562b96 Сделаны подписки и переделаны роуты 2024-12-28 21:31:07 +03:00
25 changed files with 1642 additions and 772 deletions

0
.env
View File

View File

@@ -1,14 +1,22 @@
# Используем базовый Python-образ FROM python:3.10-slim
FROM python:3.12-slim
# Устанавливаем рабочую директорию # Установка зависимостей системы
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Рабочая директория
WORKDIR /app WORKDIR /app
# Копируем файлы проекта # Копируем requirements.txt и устанавливаем зависимости
COPY . . COPY requirements.txt ./
# Устанавливаем зависимости
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Указываем команду запуска бота # Копируем весь код приложения в контейнер
CMD ["python", "main.py"] COPY . .
# Открываем порт для приложения
EXPOSE 8000
# Команда для запуска приложения
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -1,6 +1,5 @@
from .payment_routes import router as payment_router #from .payment_routes import router as payment_router
from .user_routes import router as user_router from .user_routes import router
from .subscription_routes import router as subscription_router from .subscription_routes import router as subscription_router
# Экспорт всех маршрутов # Экспорт всех маршрутов
__all__ = ["payment_router", "user_router", "subscription_router"] __all__ = [ "router", "subscription_router"]

View File

@@ -1,26 +1,31 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Depends from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel from pydantic import BaseModel
from app.services.db_manager import DatabaseManager from app.services import DatabaseManager
from instance.configdb import get_database_manager
from uuid import UUID
import logging
from sqlalchemy.exc import SQLAlchemyError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
# DatabaseManager должен передаваться через Depends
def get_database_manager():
# Здесь должна быть логика инициализации DatabaseManager
return DatabaseManager()
# Схемы запросов и ответов
class BuySubscriptionRequest(BaseModel): class BuySubscriptionRequest(BaseModel):
telegram_id: int telegram_id: int
plan_id: str plan_name: str
class SubscriptionResponse(BaseModel): class SubscriptionResponse(BaseModel):
id: str id: str
plan: str user_id: int
vpn_server_id: str plan_name: str
expiry_date: str vpn_server_id: Optional[str]
status: str
start_date: str
end_date: str
created_at: str created_at: str
updated_at: str
# Эндпоинт для покупки подписки # Эндпоинт для покупки подписки
@router.post("/subscription/buy", response_model=dict) @router.post("/subscription/buy", response_model=dict)
@@ -32,41 +37,164 @@ async def buy_subscription(
Покупка подписки. Покупка подписки.
""" """
try: try:
result = await database_manager.buy_sub(request_data.telegram_id, request_data.plan_id) logger.info(f"Получен запрос на покупку подписки: {request_data.dict()}")
result = await database_manager.buy_sub(request_data.telegram_id, request_data.plan_name)
logger.info(f"Результат buy_sub: {result}")
if result == "ERROR": if result == "ERROR":
raise HTTPException(status_code=500, detail="Failed to buy subscription") raise HTTPException(status_code=500, detail="Internal server error")
elif result == "INSUFFICIENT_FUNDS": elif result == "INSUFFICIENT_FUNDS":
raise HTTPException(status_code=400, detail="Insufficient funds") raise HTTPException(status_code=400, detail="INSUFFICIENT_FUNDS")
elif result == "TARIFF_NOT_FOUND":
raise HTTPException(status_code=400, detail="TARIFF_NOT_FOUND")
elif result == "ACTIVE_SUBSCRIPTION_EXISTS":
raise HTTPException(status_code=400, detail="ACTIVE_SUBSCRIPTION_EXISTS")
elif result == "USER_NOT_FOUND":
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
elif result == "SUBSCRIPTION_CREATION_FAILED":
raise HTTPException(status_code=500, detail="Failed to create subscription")
elif result == "PAYMENT_FAILED_AFTER_SUBSCRIPTION":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
elif result == "SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
return {"message": "Subscription purchased successfully"} # Если успешно, генерируем URI
if isinstance(result, dict) and result.get('status') == 'OK':
uri_result = await database_manager.generate_uri(request_data.telegram_id)
logger.info(f"Результат генерации URI: {uri_result}")
return {
"status": "success",
"subscription_id": result.get('subscription_id'),
"uri": uri_result[0] if uri_result and isinstance(uri_result, list) else uri_result
}
else:
return {"status": "success", "message": "Subscription created"}
except HTTPException as http_exc:
logger.error(f"HTTPException в buy_subscription: {http_exc.detail}")
raise http_exc
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) logger.error(f"Неожиданная ошибка в buy_subscription: {str(e)}")
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
# Эндпоинт для получения последней подписки # Эндпоинт для получения последней подписки
@router.get("/subscription/{user_id}/last", response_model=list[SubscriptionResponse]) @router.get("/subscription/{telegram_id}/last", response_model=SubscriptionResponse)
async def last_subscription( async def last_subscription(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):
user_id: int,
database_manager: DatabaseManager = Depends(get_database_manager)
):
""" """
Получение последней подписки пользователя. Возвращает последнюю подписку пользователя.
""" """
logger.info(f"Получение последней подписки для пользователя: {telegram_id}")
try: try:
subscriptions = await database_manager.last_subscription(user_id) subscription = await database_manager.get_last_subscriptions(telegram_id=telegram_id)
if subscriptions == "ERROR":
raise HTTPException(status_code=500, detail="Failed to fetch subscriptions")
return [ if not subscription :
{ logger.warning(f"Подписки для пользователя {telegram_id} не найдены")
"id": sub.id, raise HTTPException(status_code=404, detail="No subscriptions found")
"plan": sub.plan,
"vpn_server_id": sub.vpn_server_id, plan = await database_manager.get_plan_by_id(subscription.plan_id)
"expiry_date": sub.expiry_date.isoformat(),
"created_at": sub.created_at.isoformat(), if not plan:
"updated_at": sub.updated_at.isoformat(), logger.warning(f"Тариф для пользователя {telegram_id} не найдены")
} for sub in subscriptions raise HTTPException(status_code=404, detail="No plan found")
]
return {
"id": str(subscription.id),
"user_id": subscription.user_id,
"plan_name": plan.name,
"vpn_server_id": subscription.vpn_server_id,
"status": subscription.status.value,
"start_date": subscription.start_date.isoformat(),
"end_date": subscription.end_date.isoformat(),
"created_at": subscription.created_at.isoformat(),
}
except SQLAlchemyError as e:
logger.error(f"Ошибка базы данных при получении подписки для пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail="Database error")
except HTTPException as e:
# Пропускаем HTTPException, чтобы FastAPI обработал её автоматически
raise e
except Exception as e: except Exception as e:
logger.error(f"Неожиданная ошибка: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@router.get("/subscriptions/{telegram_id}", response_model=List[SubscriptionResponse])
async def get_subscriptions(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):
"""
Возвращает список подписок пользователя.
"""
logger.info(f"Получение подписок для пользователя: {telegram_id}")
try:
# Получаем подписки без ограничений или с указанным лимитом
subscription = await database_manager.get_last_subscriptions(telegram_id=telegram_id)
if not subscription:
logger.warning(f"Подписки для пользователя {telegram_id} не найдены")
raise HTTPException(status_code=404, detail="No subscriptions found")
plan = await database_manager.get_plan_by_id(subscription.plan_id)
if not plan:
logger.warning(f"Тариф для подписки {subscription.id} не найден")
plan_name = "Unknown"
else:
plan_name = plan.name
# Формируем список подписок для ответа
# return [
# {
# "id": sub.id,
# "plan": sub.plan,
# "vpn_server_id": sub.vpn_server_id,
# "expiry_date": sub.expiry_date.isoformat(),
# "created_at": sub.created_at.isoformat(),
# "updated_at": sub.updated_at.isoformat(),
# }
# for sub in subscription
# ]
return [{
"id": str(subscription.id), # Конвертируем UUID в строку
"user_id": subscription.user_id,
"plan_name": plan_name,
"vpn_server_id": subscription.vpn_server_id,
"end_date": subscription.end_date.isoformat(),
"status": subscription.status.value, # Извлекаем значение enum
"start_date": subscription.start_date.isoformat(),
"created_at": subscription.created_at.isoformat()
}]
except SQLAlchemyError as e:
logger.error(f"Ошибка базы данных при получении подписок для пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.error(f"Неожиданная ошибка: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/uri", response_model=dict)
async def get_uri(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):
"""
Возвращает список подписок пользователя.
"""
logger.info(f"Получение подписок для пользователя: {telegram_id}")
try:
# Получаем подписки без ограничений или с указанным лимитом
uri = await database_manager.generate_uri(telegram_id)
if uri == "SUB_ERROR":
raise HTTPException(status_code=404, detail="SUB_ERROR")
if not uri:
logger.warning(f"Не удалось сгенерировать URI для пользователя с telegram_id {telegram_id}, данные -> {uri}")
raise HTTPException(status_code=404, detail="URI not found")
return {"detail": uri[0]}
except HTTPException as e:
# Пропускаем HTTPException, чтобы FastAPI обработал её автоматически
raise e
except SQLAlchemyError as e:
logger.error(f"Ошибка базы данных при получении подписок для пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.error(f"Неожиданная ошибка: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,22 +1,39 @@
from datetime import datetime
import sys
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from app.services.db_manager import DatabaseManager from fastapi.exceptions import HTTPException
from app.services import DatabaseManager
from sqlalchemy.exc import SQLAlchemyError
from instance.configdb import get_database_manager from instance.configdb import get_database_manager
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional
from uuid import UUID
import logging
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
router = APIRouter() router = APIRouter()
# Модели запросов и ответов # Модели запросов и ответов
class CreateUserRequest(BaseModel): class CreateUserRequest(BaseModel):
telegram_id: int telegram_id: int
invited_by: Optional[int] = None
class UserResponse(BaseModel): class UserResponse(BaseModel):
id: str
telegram_id: int telegram_id: int
username: str username: Optional[str]
balance: float balance: float
invited_by: Optional[int] = None
created_at: str created_at: str
updated_at: str updated_at: str
class AddReferal(BaseModel):
invited_id: int
@router.post("/user/create", response_model=UserResponse, summary="Создать пользователя") @router.post("/user/create", response_model=UserResponse, summary="Создать пользователя")
async def create_user( async def create_user(
@@ -27,15 +44,15 @@ async def create_user(
Создание пользователя через Telegram ID. Создание пользователя через Telegram ID.
""" """
try: try:
user = await db_manager.create_user(request.telegram_id) user = await db_manager.create_user(request.telegram_id,request.invited_by)
if user == "ERROR": if user == None:
raise HTTPException(status_code=500, detail="Failed to create user") raise HTTPException(status_code=500, detail="Failed to create user")
return UserResponse( return UserResponse(
id=user.id,
telegram_id=user.telegram_id, telegram_id=user.telegram_id,
username=user.username, username=user.username,
balance=user.balance, balance=user.balance,
invited_by=user.invited_by if user.invited_by is not None else None,
created_at=user.created_at.isoformat(), created_at=user.created_at.isoformat(),
updated_at=user.updated_at.isoformat() updated_at=user.updated_at.isoformat()
) )
@@ -43,6 +60,7 @@ async def create_user(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}", response_model=UserResponse, summary="Получить информацию о пользователе") @router.get("/user/{telegram_id}", response_model=UserResponse, summary="Получить информацию о пользователе")
async def get_user( async def get_user(
telegram_id: int, telegram_id: int,
@@ -52,17 +70,142 @@ async def get_user(
Получение информации о пользователе. Получение информации о пользователе.
""" """
try: try:
print(f"Получение пользователя с telegram_id: {telegram_id}")
user = await db_manager.get_user_by_telegram_id(telegram_id) user = await db_manager.get_user_by_telegram_id(telegram_id)
if not user: if not user:
logger.warning(f"Пользователь с telegram_id {telegram_id} не найден.")
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
return UserResponse( print(f"Пользователь найден: ID={user.telegram_id}, Username={user.username}")
id=user.id, user_response = UserResponse(
telegram_id=user.telegram_id, telegram_id=user.telegram_id,
username=user.username, username=user.username,
balance=user.balance, balance=user.balance,
invited_by=user.invited_by if user.invited_by is not None else None,
created_at=user.created_at.isoformat(), created_at=user.created_at.isoformat(),
updated_at=user.updated_at.isoformat() updated_at=user.updated_at.isoformat()
) )
return user_response
except HTTPException as http_ex: # Позволяет обработать HTTPException отдельно
raise http_ex
except SQLAlchemyError as e:
logger.error(f"Ошибка базы данных при получении пользователя с telegram_id {telegram_id}: {e}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e: except Exception as e:
logger.exception(f"Неожиданная ошибка при получении пользователя с telegram_id {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}/transactions", summary="Последние транзакции пользователя")
async def last_transactions(
telegram_id: int,
db_manager: DatabaseManager = Depends(get_database_manager)
):
"""
Возвращает список последних транзакций пользователя.
"""
logger.info(f"Получен запрос на транзакции для пользователя: {telegram_id}")
try:
logger.debug(f"Вызов метода get_transaction с user_id={telegram_id}")
transactions = await db_manager.get_transaction(telegram_id)
if transactions == "ERROR":
logger.error(f"Ошибка при получении транзакций для пользователя: {telegram_id}")
raise HTTPException(status_code=500, detail="Failed to fetch transactions")
if transactions == None:
response = []
logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}")
return response
logger.debug(f"Транзакции для {telegram_id}: {transactions}")
response = []
for tx in transactions:
# Проверяем, что транзакция существует и имеет created_at
if tx is None:
continue
if tx.status.value == "pending":
continue
# Обрабатываем created_at (может быть None)
created_at_str = None
if tx.created_at:
created_at_str = tx.created_at.isoformat()
else:
created_at_str = datetime.utcnow().isoformat() # или любое значение по умолчанию
response.append({
"id": tx.id,
"amount": float(tx.amount) if tx.amount else 0.0,
"created_at": created_at_str,
"type": tx.type.value if hasattr(tx.type, 'value') else str(tx.type),
})
logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}")
return response
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка для {telegram_id}: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных для {telegram_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка для {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/user/{referrer_id}/add_referral", summary="Обновить баланс")
async def add_referal(
referrer_id: int,
request: AddReferal,
db_manager: DatabaseManager = Depends(get_database_manager)
):
"""
Обновляет баланс пользователя.
"""
logger.info(f"Получен запрос на добавление реферала: telegram_id={referrer_id}")
try:
result = await db_manager.add_referal(referrer_id,request.invited_id)
if result == "ERROR":
logger.error(f"Ошибка добавления реферала для {referrer_id} c айди {request.invited_id}")
raise HTTPException(status_code=500, detail="Failed to update balance")
logger.info(f"Добавлен реферал для {referrer_id} c айди {request.invited_id}")
return {"message": "Balance updated successfully"}
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных при добавлении рефералу {referrer_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при добавлении рефералу {referrer_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}/referrals", summary="Получить колво рефералов")
async def get_referral_count(
telegram_id: int,
db_manager: DatabaseManager = Depends(get_database_manager)
):
try:
result = await db_manager.get_referrals_count(telegram_id)
if result == "ERROR":
logger.error(f"Ошибка получения рефералов для пользователя: {telegram_id}")
raise HTTPException(status_code=500, detail="Failed to get referrals")
logger.info(f"Количество приглашённых {result}")
return {"invited_count": result}
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных при получении рефералов пользователя {telegram_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при получении рефералов {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))

4
app/services/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .db_manager import DatabaseManager
#from .marzban import MarzbanService
__all__ = ['DatabaseManager']

View File

@@ -0,0 +1,80 @@
import aiohttp
import logging
from typing import Dict, Any, Optional
from enum import Enum
class BillingErrorCode(Enum):
INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
USER_NOT_FOUND = "USER_NOT_FOUND"
PAYMENT_FAILED = "PAYMENT_FAILED"
SERVICE_UNAVAILABLE = "SERVICE_UNAVAILABLE"
class BillingAdapter:
def __init__(self, base_url: str):
self.base_url = base_url
self.logger = logging.getLogger(__name__)
self.session = None
async def get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
async def withdraw_funds(self, user_id: int, amount: float, description: str = "") -> Dict[str, Any]:
"""
Списание средств через биллинг-сервис
"""
try:
session = await self.get_session()
payload = {
"user_id": user_id,
"amount": amount,
"description": description or f"Payment for subscription"
}
self.logger.info(f"Withdrawing {amount} from user {user_id}")
async with session.post(f"{self.base_url}/billing/payments/withdraw", json=payload) as response:
if response.status == 200:
result = await response.json()
if result.get("success"):
return {"status": "success"}
else:
error = result.get("error", "WITHDRAWAL_FAILED")
self.logger.error(f"Withdrawal failed: {error}")
return {"status": "error", "code": error}
else:
self.logger.error(f"Billing service error: {response.status}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except aiohttp.ClientError as e:
self.logger.error(f"Billing service connection error: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Unexpected error in withdraw_funds: {str(e)}")
return {"status": "error", "code": "PAYMENT_FAILED"}
async def get_balance(self, user_id: int) -> Dict[str, Any]:
"""
Получение баланса пользователя
"""
try:
session = await self.get_session()
async with session.get(f"{self.base_url}/billing/balance/{user_id}") as response:
if response.status == 200:
result = await response.json()
return {"status": "success", "balance": result.get("balance", 0)}
elif response.status == 404:
return {"status": "error", "code": "USER_NOT_FOUND"}
else:
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Error getting balance: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
async def close(self):
if self.session:
await self.session.close()

View File

@@ -1,204 +1,243 @@
from instance.model import User, Subscription, Transaction, Administrators from decimal import Decimal
from sqlalchemy.future import select import json
from sqlalchemy.exc import SQLAlchemyError from instance.model import User, Subscription, Transaction
from sqlalchemy import desc from app.services.billing_service import BillingAdapter
from app.services.marzban import MarzbanService, MarzbanUser
from .postgres_rep import PostgresRepository
from instance.model import Transaction,TransactionType, Plan
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from datetime import datetime from datetime import datetime, timezone
from .xui_rep import PanelInteraction
from .mongo_rep import MongoDBRepository
import random import random
import string import string
from typing import Optional
import logging import logging
import asyncio from uuid import UUID
class DatabaseManager: class DatabaseManager:
def __init__(self, session_generator): def __init__(self, session_generator,marzban_username,marzban_password,marzban_url,billing_base_url):
""" """
Инициализация с асинхронным генератором сессий (например, get_postgres_session). Инициализация с асинхронным генератором сессий (например, get_postgres_session).
""" """
self.session_generator = session_generator
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.mongo_repo = MongoDBRepository() self.postgres_repo = PostgresRepository(session_generator, self.logger)
self.marzban_service = MarzbanService(marzban_url,marzban_username,marzban_password)
self.billing_adapter = BillingAdapter(billing_base_url)
async def create_user(self, telegram_id: int): async def create_user(self, telegram_id: int, invented_by: Optional[int]= None):
""" """
Создаёт нового пользователя, если его нет. Создаёт пользователя.
""" """
async for session in self.session_generator():
try: try:
username = self.generate_string(6) username = self.generate_string(6)
result = await session.execute(select(User).where(User.telegram_id == int(telegram_id))) return await self.postgres_repo.create_user(telegram_id, username, invented_by)
user = result.scalars().first() except Exception as e:
if not user: self.logger.error(f"Ошибка при создании пользователя:{e}")
new_user = User(telegram_id=int(telegram_id), username=username)
session.add(new_user)
await session.commit()
return new_user
return user
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}")
await session.rollback()
return "ERROR"
async def get_user_by_telegram_id(self, telegram_id: int): async def get_user_by_telegram_id(self, telegram_id: int):
""" """
Возвращает пользователя по Telegram ID. Возвращает пользователя по Telegram ID.
""" """
async for session in self.session_generator(): return await self.postgres_repo.get_user_by_telegram_id(telegram_id)
async def add_transaction(self, telegram_id: int, amount: float):
"""
Добавляет транзакцию.
"""
tran = Transaction(
user_id=telegram_id,
amount=Decimal(amount),
type=TransactionType.DEPOSIT
)
return await self.postgres_repo.add_record(tran)
async def add_referal(self,referrer_id: int, new_user_telegram_id: int):
"""
Добавление рефералу пользователей
"""
return await self.postgres_repo.add_referral(referrer_id,new_user_telegram_id)
async def get_transaction(self, telegram_id: int, limit: int = 10):
"""
Возвращает транзакции.
"""
return await self.postgres_repo.get_last_transactions(telegram_id, limit)
async def get_referrals_count(self,telegram_id: int) -> int:
"""
Docstring for get_referrals_count
:param self: Description
:param telegram_id: Description
:type telegram_id: int
:return: Description
:rtype: int
"""
return await self.postgres_repo.get_referrals_count(telegram_id)
# async def update_balance(self, telegram_id: int, amount: float):
# """
# Обновляет баланс пользователя и добавляет транзакцию.
# """
# self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
# user = await self.get_user_by_telegram_id(telegram_id)
# if not user:
# self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
# return "ERROR"
# updated = await self.postgres_repo.update_balance(user, amount)
# if not updated:
# self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
# return "ERROR"
# self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
# await self.add_transaction(user.telegram_id, amount)
# return "OK"
async def get_active_subscription(self, telegram_id: int):
"""
Проверяет наличие активной подписки.
"""
try: try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
return result.scalars().first() return await self.postgres_repo.get_active_subscription(telegram_id)
except SQLAlchemyError as e: except Exception as e:
self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") self.logger.error(f"Неожиданная ошибка в get_active_subscription: {str(e)}")
return "ERROR"
async def get_plan_by_id(self, plan_id):
"""
Ищет по названию плана.
"""
try:
return await self.postgres_repo.get_plan_by_id(plan_id)
except Exception as e:
self.logger.error(f"Неожиданная ошибка в get_plan_by_name: {str(e)}")
return None return None
async def add_transaction(self, user_id: int, amount: float): async def get_last_subscriptions(self, telegram_id: int, limit: int = 1):
""" """
Добавляет транзакцию для пользователя. Возвращает список последних подписок.
""" """
async for session in self.session_generator(): return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id)
try:
transaction = Transaction(user_id=user_id, amount=amount)
session.add(transaction)
await session.commit()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}")
await session.rollback()
async def update_balance(self, telegram_id: int, amount: float): async def buy_sub(self, telegram_id: int, plan_name: str):
""" """
Обновляет баланс пользователя и добавляет транзакцию. Покупка подписки: сначала создаем подписку, потом списываем деньги
""" """
async for session in self.session_generator():
try: try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id)) self.logger.info(f"Покупка подписки: user={telegram_id}, plan={plan_name}")
user = result.scalars().first()
if user:
user.balance += int(amount)
await self.add_transaction(user.id, amount)
await session.commit()
else:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при обновлении баланса: {e}")
await session.rollback()
return "ERROR"
async def last_subscription(self, user_id: int): # 1. Проверка активной подписки
""" if await self.get_active_subscription(telegram_id):
Возвращает список подписок пользователя. return "ACTIVE_SUBSCRIPTION_EXISTS"
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription)
.where(Subscription.user_id == user_id)
.order_by(desc(Subscription.created_at))
)
return result.scalars().all()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении последней подписки пользователя {user_id}: {e}")
return "ERROR"
async def last_transaction(self, user_id: int): # 2. Получаем план
""" plan = await self.postgres_repo.get_subscription_plan(plan_name)
Возвращает список транзакций пользователя.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Transaction)
.where(Transaction.user_id == user_id)
.order_by(desc(Transaction.created_at))
)
transactions = result.scalars().all()
return transactions
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}")
return "ERROR"
async def buy_sub(self, telegram_id: str, plan_id: str):
async for session in self.session_generator():
try:
result = await self.create_user(telegram_id)
if not result:
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
# Получение тарифного плана из MongoDB
plan = await self.mongo_repo.get_subscription_plan(plan_id)
if not plan: if not plan:
self.logger.error(f"Тарифный план {plan_id} не найден.") return "TARIFF_NOT_FOUND"
return "ERROR"
# Проверка достаточности средств # 3. Проверяем пользователя
cost = int(plan["price"]) user = await self.get_user_by_telegram_id(telegram_id)
if result.balance < cost: if not user:
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_id}.") return "USER_NOT_FOUND"
# 4. Проверяем баланс (только для информации)
balance_result = await self.billing_adapter.get_balance(telegram_id)
if balance_result["status"] == "error":
return "BILLING_SERVICE_ERROR"
if balance_result["balance"] < plan.price:
return "INSUFFICIENT_FUNDS" return "INSUFFICIENT_FUNDS"
# Списываем средства # 5. СОЗДАЕМ ПОДПИСКУ (самое важное - сначала!)
result.balance -= cost new_subscription = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
return "SUBSCRIPTION_CREATION_FAILED"
# Создаем подписку # 6. ТОЛЬКО ПОСЛЕ УСПЕШНОГО СОЗДАНИЯ ПОДПИСКИ - списываем деньги
expiry_date = datetime.utcnow() + relativedelta(months=plan["duration_months"]) withdraw_result = await self.billing_adapter.withdraw_funds(
server = await self.mongo_repo.get_server_with_least_clients() telegram_id,
self.logger.info(f"Выбран сервер для подписки: {server}") float(plan.price),
new_subscription = Subscription( f"Оплата подписки {plan_name}"
user_id=result.id,
vpn_server_id=str(server['server']["name"]),
plan=plan_id,
expiry_date=expiry_date
) )
session.add(new_subscription)
# Попытка добавить пользователя на сервер if withdraw_result["status"] == "error":
# Получаем информацию о пользователе await self.postgres_repo.delete_subscription(new_subscription.id)
user = result # так как result уже содержит пользователя self.logger.error(f"Payment failed but subscription created: {new_subscription.id}")
if not user: return "PAYMENT_FAILED_AFTER_SUBSCRIPTION"
self.logger.error(f"Не удалось найти пользователя для добавления на сервер.")
await session.rollback()
return "ERROR"
# Получаем сервер из MongoDB # 7. ВСЕ УСПЕШНО
server_data = await self.mongo_repo.get_server(new_subscription.vpn_server_id) self.logger.info(f"Подписка успешно создана и оплачена: {new_subscription.id}")
if not server_data: return {"status": "OK", "subscription_id": str(new_subscription.id)}
self.logger.error(f"Не удалось найти сервер с ID {new_subscription.vpn_server_id}.")
await session.rollback()
return "ERROR"
server_info = server_data['server']
url_base = f"https://{server_info['ip']}:{server_info['port']}/{server_info['secretKey']}"
login_data = {
'username': server_info['login'],
'password': server_info['password'],
}
panel = PanelInteraction(url_base, login_data, self.logger,server_info['certificate']['data'])
expiry_date_iso = new_subscription.expiry_date.isoformat()
# Добавляем на сервер
response = await panel.add_client(user.id, expiry_date_iso, user.username)
if response != "OK":
self.logger.error(f"Ошибка при добавлении клиента {telegram_id} на сервер: {response}")
# Если не получилось добавить на сервер, откатываем транзакцию
await session.rollback()
return "ERROR"
# Если мы здесь - значит и подписка, и добавление на сервер успешны
await session.commit()
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id} на план {plan_id} и клиент добавлен на сервер.")
return "OK"
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при покупке подписки {plan_id} для пользователя {telegram_id}: {e}")
await session.rollback()
return "ERROR"
except Exception as e: except Exception as e:
self.logger.error(f"Непредвиденная ошибка: {e}") self.logger.error(f"Ошибка в buy_sub: {str(e)}")
await session.rollback() return "ERROR"
async def _create_subscription_and_add_client(self, user: User, plan: Plan):
"""Создаёт подписку и добавляет клиента на сервер."""
try:
self.logger.info(f"Создание подписки для user_id={user.telegram_id}, plan={plan.name}")
# Проверяем типы объектов
self.logger.info(f"Тип user: {type(user)}, тип plan: {type(plan)}")
expiry_date = datetime.utcnow() + relativedelta(days=plan.duration_days)
new_subscription = Subscription(
user_id=user.telegram_id,
vpn_server_id="BASE SERVER NEED TO UPDATE",
plan_id=plan.id,
end_date=expiry_date,
start_date=datetime.utcnow()
)
self.logger.info(f"Создан объект подписки: {new_subscription}")
response = await self.marzban_service.create_user(user, new_subscription)
self.logger.info(f"Ответ от Marzban: {response}")
if response == "USER_ALREADY_EXISTS":
response = await self.marzban_service.get_user_status(user)
result = await self.marzban_service.update_user(user, new_subscription)
# if not isinstance(response,MarzbanUser) or not isinstance(result,MarzbanUser):
# self.logger.error(f"Ошибка при добавлении клиента: {response}, {result}")
# return None
await self.postgres_repo.add_record(new_subscription)
self.logger.info(f"Подписка сохранена в БД с ID: {new_subscription.id}")
return new_subscription
except Exception as e:
self.logger.error(f"Неожиданная ошибка в _create_subscription_and_add_client: {str(e)}")
import traceback
self.logger.error(f"Трассировка: {traceback.format_exc()}")
return None
async def generate_uri(self, telegram_id: int):
"""
Генерация URI для пользователя.
:param telegram_id: Telegram ID пользователя.
:return: Строка URI или None в случае ошибки.
"""
try:
user = await self.get_user_by_telegram_id(telegram_id)
if user == False or user == None:
self.logger.error(f"Ошибка при получении клиента: user = {user}")
return "ERROR"
result = await self.marzban_service.get_config_links(user)
if result == None:
self.logger.error(f"Ошибка при получении ссылки клиента: result = {user}")
return "ERROR"
self.logger.info(f"Итог generate_uri: result = {result}")
return result
except Exception as e:
self.logger.error(f"Неожиданная ошибка в generate_uri: {str(e)}")
return "ERROR" return "ERROR"
@@ -207,5 +246,13 @@ class DatabaseManager:
""" """
Генерирует случайную строку заданной длины. Генерирует случайную строку заданной длины.
""" """
characters = string.ascii_lowercase + string.digits return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
return ''.join(random.choices(characters, k=length))
@staticmethod
def _is_subscription_expired(expire_timestamp: int) -> bool:
"""Проверяет, истекла ли подписка"""
current_time = datetime.now(timezone.utc)
expire_time = datetime.fromtimestamp(expire_timestamp, tz=timezone.utc)
return expire_time < current_time

288
app/services/marzban.py Normal file
View File

@@ -0,0 +1,288 @@
from typing import Any, Dict, Optional, Literal
import aiohttp
import requests
from datetime import date, datetime, time, timezone
import logging
from instance import User, Subscription
class UserAlreadyExistsError(Exception):
"""Пользователь уже существует в системе"""
pass
class MarzbanUser:
"""Модель пользователя Marzban"""
def __init__(self, data: Dict[str, Any]):
self.username = data.get('username')
self.status = data.get('status')
self.expire = data.get('expire')
self.data_limit = data.get('data_limit')
self.data_limit_reset_strategy = data.get('data_limit_reset_strategy')
self.used_traffic = data.get('used_traffic')
self.lifetime_used_traffic = data.get('lifetime_used_traffic')
self.subscription_url = data.get('subscription_url')
self.online_at = data.get('online_at')
self.created_at = data.get('created_at')
self.proxies = data.get('proxies', {})
self.inbounds = data.get('inbounds', {})
self.note = data.get('note')
class UserStatus:
"""Статус пользователя"""
def __init__(self, data: Dict[str, Any]):
self.used_traffic = data.get('used_traffic', 0)
self.lifetime_used_traffic = data.get('lifetime_used_traffic', 0)
self.online_at = data.get('online_at')
self.status = data.get('status')
self.expire = data.get('expire')
self.data_limit = data.get('data_limit')
class MarzbanService:
def __init__(self, baseURL: str, username: str, password: str) -> None:
self.base_url = baseURL.rstrip('/')
self.token = self._get_token(username, password)
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
self._session: Optional[aiohttp.ClientSession] = None
def _get_token(self, username: str, password: str) -> str:
"""Получение токена авторизации"""
try:
response = requests.post(
f"{self.base_url}/api/admin/token",
data={'username': username, 'password': password}
)
response.raise_for_status()
return response.json()['access_token']
except requests.RequestException as e:
logging.error(f"Failed to get token: {e}")
raise Exception(f"Authentication failed: {e}")
async def _get_session(self) -> aiohttp.ClientSession:
"""Ленивое создание сессии"""
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=30)
self._session = aiohttp.ClientSession(timeout=timeout)
return self._session
async def _make_request(self, endpoint: str, method: Literal["get", "post", "put", "delete"],
data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Улучшенный метод для запросов"""
url = f"{self.base_url}{endpoint}"
try:
session = await self._get_session()
async with session.request(
method=method.upper(),
url=url,
headers=self.headers,
json=data
) as response:
response_data = await response.json() if response.content_length else {}
if response.status == 409:
raise UserAlreadyExistsError(f"User already exists: {response_data}")
if response.status not in (200, 201):
raise Exception(f"HTTP {response.status}: {response_data}")
return response_data
except UserAlreadyExistsError:
raise # Пробрасываем наверх
except aiohttp.ClientError as e:
logging.error(f"Network error during {method.upper()} to {url}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error during request to {url}: {e}")
raise
async def create_user(self, user: User, subscription: Subscription) -> str | MarzbanUser:
"""Создает нового пользователя в Marzban"""
logging.info(f"Конец подписки пользователя {user.telegram_id} {subscription.end_date}")
if subscription.end_date:
if isinstance(subscription.end_date, datetime):
if subscription.end_date.tzinfo is None:
end_date = subscription.end_date.replace(tzinfo=timezone.utc)
else:
end_date = subscription.end_date
expire_timestamp = int(end_date.timestamp())
elif isinstance(subscription.end_date, date):
end_datetime = datetime.combine(
subscription.end_date,
time(23, 59, 59),
tzinfo=timezone.utc
)
expire_timestamp = int(end_datetime.timestamp())
else:
expire_timestamp = 0
else:
expire_timestamp = 0
data = {
"username": user.username,
"status": "active",
"expire": expire_timestamp,
"data_limit": 100 * 1073741824, # Конвертируем GB в bytes
"data_limit_reset_strategy": "no_reset",
"proxies": {
"trojan": {}
},
"inbounds": {
"trojan": ["TROJAN WS NOTLS"]
},
"note": f"Telegram: {user.telegram_id}",
"on_hold_timeout": None,
"on_hold_expire_duration": 0,
"next_plan": {
"add_remaining_traffic": False,
"data_limit": 0,
"expire": 0,
"fire_on_either": True
}
}
try:
response_data = await self._make_request("/api/user", "post", data)
marzban_user = MarzbanUser(response_data)
logging.info(f"Пользователь {user.username} успешно создан в Marzban")
return marzban_user
except UserAlreadyExistsError:
logging.warning(f"Пользователь {user.telegram_id} уже существует в Marzban")
return "USER_ALREADY_EXISTS"
except Exception as e:
logging.error(f"Failed to create user {user.username}: {e}")
raise Exception(f"Failed to create user: {e}")
async def update_user(self, user: User, subscription: Subscription) -> MarzbanUser:
"""Обновляет существующего пользователя"""
username = user.username
if subscription.end_date:
if isinstance(subscription.end_date, datetime):
# Если это datetime, преобразуем в timestamp
if subscription.end_date.tzinfo is None:
end_date = subscription.end_date.replace(tzinfo=timezone.utc)
else:
end_date = subscription.end_date
expire_timestamp = int(end_date.timestamp())
elif isinstance(subscription.end_date, date):
# Если это date, создаем datetime на конец дня и преобразуем в timestamp
end_datetime = datetime.combine(
subscription.end_date,
time(23, 59, 59),
tzinfo=timezone.utc
)
expire_timestamp = int(end_datetime.timestamp())
else:
expire_timestamp = 0
else:
expire_timestamp = 0
data = {
"status": "active",
"expire": expire_timestamp
}
try:
response_data = await self._make_request(f"/api/user/{username}", "put", data)
marzban_user = MarzbanUser(response_data)
logging.info(f"User {username} updated successfully")
return marzban_user
except Exception as e:
logging.error(f"Failed to update user {username}: {e}")
raise Exception(f"Failed to update user: {e}")
async def disable_user(self, user: User) -> bool:
"""Отключает пользователя"""
username = user.username
data = {
"status": "disabled"
}
try:
await self._make_request(f"/api/user/{username}", "put", data)
logging.info(f"User {username} disabled successfully")
return True
except Exception as e:
logging.error(f"Failed to disable user {username}: {e}")
return False
async def enable_user(self, user: User) -> bool:
"""Включает пользователя"""
username = user.username
data = {
"status": "active"
}
try:
await self._make_request(f"/api/user/{username}", "put", data)
logging.info(f"User {username} enabled successfully")
return True
except Exception as e:
logging.error(f"Failed to enable user {username}: {e}")
return False
async def delete_user(self, user: User) -> bool:
"""Полностью удаляет пользователя из Marzban"""
try:
await self._make_request(f"/api/user/{user.username}", "delete")
logging.info(f"User {user.username} deleted successfully")
return True
except Exception as e:
logging.error(f"Failed to delete user {user.username}: {e}")
return False
async def get_user_status(self, user: User) -> UserStatus:
"""Получает текущий статус пользователя"""
try:
response_data = await self._make_request(f"/api/user/{user.username}", "get")
return UserStatus(response_data)
except Exception as e:
logging.error(f"Failed to get status for user {user.username}: {e}")
raise Exception(f"Failed to get user status: {e}")
async def get_subscription_url(self, user: User) -> str | None:
"""Возвращает готовую subscription_url для подключения"""
try:
response_data = await self._make_request(f"/api/user/{user.username}", "get")
return response_data.get('subscription_url', '')
except Exception as e:
logging.error(f"Failed to get subscription URL for user {user.username}: {e}")
return None
async def get_config_links(self, user: User) -> str:
"""Возвращает конфигурации для подключения"""
username = user.username
try:
response_data = await self._make_request(f"/api/user/{username}", "get")
return response_data.get('links', '')
except Exception as e:
logging.error(f"Failed to get configurations URL's for user {username}: {e}")
return ""
async def check_marzban_health(self) -> bool:
"""Проверяет доступность Marzban API"""
try:
await self._make_request("/api/admin", "get")
return True
except Exception as e:
logging.error(f"Marzban health check failed: {e}")
return False
async def close(self):
"""Закрытие сессии"""
if self._session and not self._session.closed:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@@ -1,158 +0,0 @@
import os
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import DuplicateKeyError, NetworkTimeout
import logging
class MongoDBRepository:
def __init__(self):
# Настройки MongoDB из переменных окружения
mongo_uri = os.getenv("MONGO_URL")
database_name = os.getenv("DB_NAME")
server_collection = os.getenv("SERVER_COLLECTION", "servers")
plan_collection = os.getenv("PLAN_COLLECTION", "plans")
# Подключение к базе данных и коллекциям
self.client = AsyncIOMotorClient(mongo_uri)
self.db = self.client[database_name]
self.collection = self.db[server_collection] # Коллекция серверов
self.plans_collection = self.db[plan_collection] # Коллекция планов
self.logger = logging.getLogger(__name__)
async def add_subscription_plan(self, plan_data):
"""Добавляет новый тарифный план в коллекцию."""
try:
result = await self.plans_collection.insert_one(plan_data)
self.logger.debug(f"Тарифный план добавлен с ID: {result.inserted_id}")
return result.inserted_id
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def get_subscription_plan(self, plan_name):
"""Получает тарифный план по его имени."""
try:
plan = await self.plans_collection.find_one({"name": plan_name})
if plan:
self.logger.debug(f"Найден тарифный план: {plan}")
else:
self.logger.error(f"Тарифный план {plan_name} не найден.")
return plan
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def add_server(self, server_data):
"""Добавляет новый VPN сервер в коллекцию."""
try:
result = await self.collection.insert_one(server_data)
self.logger.debug(f"VPN сервер добавлен с ID: {result.inserted_id}")
return result.inserted_id
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def get_server(self, server_name: str):
"""Получает сервер VPN по его ID."""
try:
server = await self.collection.find_one({"server.name": server_name})
if server:
self.logger.debug(f"Найден VPN сервер: {server}")
else:
self.logger.debug(f"VPN сервер с ID {server_name} не найден.")
return server
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def get_server_with_least_clients(self):
"""Возвращает сервер с наименьшим количеством подключенных клиентов."""
try:
pipeline = [
{
"$addFields": {
"current_clients": {"$size": {"$ifNull": ["$clients", []]}}
}
},
{
"$sort": {"current_clients": 1}
},
{
"$limit": 1
}
]
result = await self.collection.aggregate(pipeline).to_list(length=1)
if result:
server = result[0]
self.logger.debug(f"Найден сервер с наименьшим количеством клиентов: {server}")
return server
else:
self.logger.debug("Не найдено серверов.")
return None
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def update_server(self, server_name, update_data):
"""Обновляет данные VPN сервера."""
try:
result = await self.collection.update_one({"server_name": server_name}, {"$set": update_data})
if result.matched_count > 0:
self.logger.debug(f"VPN сервер с ID {server_name} обновлен.")
else:
self.logger.debug(f"VPN сервер с ID {server_name} не найден.")
return result.matched_count > 0
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def delete_server(self, server_name):
"""Удаляет VPN сервер по его ID."""
try:
result = await self.collection.delete_one({"name": server_name})
if result.deleted_count > 0:
self.logger.debug(f"VPN сервер с ID {server_name} удален.")
else:
self.logger.debug(f"VPN сервер с ID {server_name} не найден.")
return result.deleted_count > 0
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def list_servers(self):
"""Возвращает список всех VPN серверов."""
try:
servers = await self.collection.find().to_list(length=1000) # Получить до 1000 серверов (можно настроить)
self.logger.debug(f"Найдено {len(servers)} VPN серверов.")
return servers
except DuplicateKeyError:
self.logger.error("Дублирующий ключ.")
except NetworkTimeout:
self.logger.error("Сетевой таймаут.")
async def __aenter__(self):
"""
Метод вызывается при входе в блок with.
"""
self.logger.debug("Контекстный менеджер: подключение открыто.")
return self
async def __aexit__(self, exc_type, exc_value, traceback):
"""
Метод вызывается при выходе из блока with.
"""
await self.close_connection()
if exc_type:
self.logger.error(f"Контекстный менеджер завершён с ошибкой: {exc_value}")
else:
self.logger.debug("Контекстный менеджер: подключение закрыто.")

View File

@@ -1,7 +1,12 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import desc from decimal import Decimal
from instance.model import User, Subscription, Transaction from sqlalchemy import asc, desc, update
from sqlalchemy.orm import joinedload
from instance.model import Referral, User, Subscription, Transaction, Plan
class PostgresRepository: class PostgresRepository:
@@ -9,13 +14,13 @@ class PostgresRepository:
self.session_generator = session_generator self.session_generator = session_generator
self.logger = logger self.logger = logger
async def create_user(self, telegram_id: int, username: str): async def create_user(self, telegram_id: int, username: str, invited_by: Optional[int]= None):
""" """
Создаёт нового пользователя в PostgreSQL. Создаёт нового пользователя в PostgreSQL.
""" """
async for session in self.session_generator(): async for session in self.session_generator():
try: try:
new_user = User(telegram_id=telegram_id, username=username) new_user = User(telegram_id=telegram_id, username=username, invited_by=invited_by)
session.add(new_user) session.add(new_user)
await session.commit() await session.commit()
return new_user return new_user
@@ -24,6 +29,30 @@ class PostgresRepository:
await session.rollback() await session.rollback()
return None return None
async def get_active_subscription(self, telegram_id: int):
"""
Проверяет наличие активной подписки у пользователя.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription)
.join(User, Subscription.user_id == User.telegram_id)
.where(User.telegram_id == telegram_id, Subscription.end_date > datetime.utcnow())
)
subscription = result.scalars().first()
if subscription:
# Отделяем объект от сессии
session.expunge(subscription)
self.logger.info(f"Пользователь с id {telegram_id}, проверен и имеет подписку ID: {subscription.id}")
else:
self.logger.info(f"Пользователь с id {telegram_id}, проверен и имеет None")
return subscription
except SQLAlchemyError as e:
self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}")
return None
async def get_user_by_telegram_id(self, telegram_id: int): async def get_user_by_telegram_id(self, telegram_id: int):
""" """
Возвращает пользователя по Telegram ID. Возвращает пользователя по Telegram ID.
@@ -31,61 +60,40 @@ class PostgresRepository:
async for session in self.session_generator(): async for session in self.session_generator():
try: try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id)) result = await session.execute(select(User).where(User.telegram_id == telegram_id))
if result:
return result.scalars().first() return result.scalars().first()
return False
except SQLAlchemyError as e: except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}")
return None return False
async def add_transaction(self, user_id: int, amount: float):
"""
Добавляет транзакцию для пользователя.
"""
async for session in self.session_generator():
try:
transaction = Transaction(user_id=user_id, amount=amount)
session.add(transaction)
await session.commit()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка добавления транзакции для пользователя {user_id}: {e}")
await session.rollback()
async def update_balance(self, telegram_id: int, amount: float): # async def update_balance(self, user: User, amount: float):
""" # """
Обновляет баланс пользователя. # Обновляет баланс пользователя.
"""
async for session in self.session_generator():
try:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
user = result.scalars().first()
if user:
user.balance += amount
await session.commit()
return user
else:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return None
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при обновлении баланса: {e}")
await session.rollback()
return None
async def last_subscription(self, user_id: int): # :param user: Объект пользователя.
""" # :param amount: Сумма для добавления/вычитания.
Возвращает последние подписки пользователя. # :return: True, если успешно, иначе False.
""" # """
async for session in self.session_generator(): # self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
try: # async for session in self.session_generator():
result = await session.execute( # try:
select(Subscription) # user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
.where(Subscription.user_id == user_id) # if not user:
.order_by(desc(Subscription.created_at)) # self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
) # return False
return result.scalars().all() # # Приведение amount к Decimal
except SQLAlchemyError as e: # user.balance += Decimal(amount)
self.logger.error(f"Ошибка при получении последней подписки пользователя {user_id}: {e}") # await session.commit()
return None # self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
# return True
# except SQLAlchemyError as e:
# self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
# await session.rollback()
# return False
async def last_transaction(self, user_id: int): async def get_last_transactions(self, user_telegram_id: int, limit: int = 10):
""" """
Возвращает последние транзакции пользователя. Возвращает последние транзакции пользователя.
""" """
@@ -93,10 +101,201 @@ class PostgresRepository:
try: try:
result = await session.execute( result = await session.execute(
select(Transaction) select(Transaction)
.where(Transaction.user_id == user_id) .where(Transaction.user_id == user_telegram_id)
.order_by(desc(Transaction.created_at)) .order_by(desc(Transaction.created_at))
.limit(limit)
) )
return result.scalars().all() return result.scalars().all()
except SQLAlchemyError as e: except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении транзакций пользователя {user_id}: {e}") self.logger.error(f"Ошибка получения транзакций пользователя {user_telegram_id}: {e}")
return None return None
async def get_last_subscription_by_user_id(self, user_telegram_id: int):
"""
Извлекает последнюю подписку пользователя на основании user_id.
:param user_id: UUID пользователя.
:return: Объект Subscription или None.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription)
.where(Subscription.user_id == user_telegram_id)
.order_by(desc(Subscription.created_at))
.options(joinedload(Subscription.plan))
.limit(1)
)
subscription = result.scalars().first()
self.logger.info(f"Найдены такие подписки: {subscription}")
if subscription:
session.expunge(subscription)
self.logger.info(f"Найдена подписка ID: {subscription.id} для пользователя {user_telegram_id}")
return subscription
else:
return None
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}")
return None
async def delete_subscription(self, subscription_id: UUID) -> bool:
"""
Удаляет подписку по её ID.
:param subscription_id: UUID подписки для удаления
:return: True если удалено успешно, False в случае ошибки
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription).where(Subscription.id == subscription_id)
)
subscription = result.scalars().first()
if not subscription:
self.logger.warning(f"Подписка с ID {subscription_id} не найдена")
return False
await session.delete(subscription)
await session.commit()
self.logger.info(f"Подписка с ID {subscription_id} успешно удалена")
return True
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при удалении подписки {subscription_id}: {e}")
await session.rollback()
return False
async def add_record(self, record):
"""
Добавляет запись в базу данных.
:param record: Объект записи.
:return: Запись или None в случае ошибки.
"""
async for session in self.session_generator():
try:
session.add(record)
await session.commit()
return record
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при добавлении записи: {record}: {e}")
await session.rollback()
raise Exception
async def add_referral(self, referrer_id: int, referral_id: int):
"""
Добавление реферальной связи между пользователями.
"""
async for session in self.session_generator():
try:
# Проверить, существует ли уже такая реферальная связь
existing_referral = await session.execute(
select(Referral)
.where(
(Referral.inviter_id == referrer_id) &
(Referral.invited_id == referral_id)
)
)
existing_referral = existing_referral.scalars().first()
if existing_referral:
raise ValueError("Referral relationship already exists")
# Проверить, что пользователи существуют
referrer = await session.execute(
select(User).where(User.telegram_id == referrer_id)
)
referrer = referrer.scalars().first()
if not referrer:
raise ValueError("Referrer not found")
referral_user = await session.execute(
select(User).where(User.telegram_id == referral_id)
)
referral_user = referral_user.scalars().first()
if not referral_user:
raise ValueError("Referral user not found")
# Проверить, что пользователь не приглашает сам себя
if referrer_id == referral_id:
raise ValueError("User cannot refer themselves")
# Создать новую реферальную связь
new_referral = Referral(
inviter_id=referrer_id,
invited_id=referral_id
)
session.add(new_referral)
await session.commit()
self.logger.info(f"Реферальная связь создана: {referrer_id} -> {referral_id}")
except Exception as e:
await session.rollback()
self.logger.error(f"Ошибка при добавлении реферальной связи: {str(e)}")
raise
async def get_subscription_plan(self, plan_name:str) -> Plan | None:
"""
Поиск плана для подписки
:param plan_name: Объект записи.
:return: Запись или None в случае ошибки.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Plan)
.where(Plan.name == plan_name)
)
return result.scalar_one_or_none()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при поиске плана: {plan_name}: {e}")
await session.rollback()
return None
async def get_plan_by_id(self, plan_id: int) -> Plan | None:
"""
Поиск плана для подписки
:param plan_name: Объект записи.
:return: Запись или None в случае ошибки.
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Plan)
.where(Plan.id == plan_id)
)
return result.scalar_one_or_none()
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при поиске плана: {plan_id}: {e}")
await session.rollback()
return None
async def get_referrals_count(self, user_telegram_id: int) -> int:
"""
Получить количество рефералов пользователя.
:param user_telegram_id: Telegram ID пользователя-пригласителя
:return: Количество рефералов
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Referral)
.where(Referral.inviter_id == user_telegram_id)
)
referrals = result.scalars().all()
return len(referrals)
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении количества рефералов для пользователя {user_telegram_id}: {e}")
return 0

View File

@@ -1,232 +0,0 @@
import aiohttp
import uuid
import base64
import ssl
def generate_uuid():
return str(uuid.uuid4())
class PanelInteraction:
def __init__(self, base_url, login_data, logger, certificate=None, is_encoded=True):
"""
Initialize the PanelInteraction class.
:param base_url: Base URL for the panel.
:param login_data: Login data (username/password or token).
:param logger: Logger for debugging.
:param certificate: Certificate content (Base64-encoded or raw string).
:param is_encoded: Indicates whether the certificate is Base64-encoded.
"""
self.base_url = base_url
self.login_data = login_data
self.logger = logger
self.cert_content = self._decode_certificate(certificate, is_encoded)
self.session_id = None # Session ID will be initialized lazily
self.headers = None
def _decode_certificate(self, certificate, is_encoded):
"""
Decode the provided certificate content.
:param certificate: Certificate content (Base64-encoded or raw string).
:param is_encoded: Indicates whether the certificate is Base64-encoded.
:return: Decoded certificate content as bytes.
"""
if not certificate:
self.logger.error("No certificate provided.")
raise ValueError("Certificate is required.")
try:
# Создаем SSLContext
ssl_context = ssl.create_default_context()
# Декодируем, если нужно
if is_encoded:
certificate = base64.b64decode(certificate).decode()
# Загружаем сертификат в SSLContext
ssl_context.load_verify_locations(cadata=certificate)
return ssl_context
except Exception as e:
self.logger.error(f"Error while decoding certificate: {e}")
raise ValueError("Invalid certificate format or content.") from e
async def _ensure_logged_in(self):
"""
Ensure the session ID is available for authenticated requests.
"""
if not self.session_id:
self.session_id = await self.login()
if self.session_id:
self.headers = {
'Accept': 'application/json',
'Cookie': f'3x-ui={self.session_id}',
'Content-Type': 'application/json'
}
else:
raise ValueError("Unable to log in and retrieve session ID.")
async def login(self):
"""
Perform login to the panel.
:return: Session ID or None.
"""
login_url = f"{self.base_url}/login"
self.logger.info(f"Attempting to login at: {login_url}")
async with aiohttp.ClientSession() as session:
try:
async with session.post(
login_url, data=self.login_data, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
session_id = response.cookies.get("3x-ui")
if session_id:
return session_id.value
else:
self.logger.error("Login failed: No session ID received.")
return None
else:
self.logger.error(f"Login failed: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Login request failed: {e}")
return None
async def get_inbound_info(self, inbound_id):
"""
Fetch inbound information by ID.
:param inbound_id: ID of the inbound.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/get/{inbound_id}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=self.headers, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
return await response.json()
else:
self.logger.error(f"Failed to get inbound info: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Get inbound info request failed: {e}")
return None
async def get_client_traffic(self, email):
"""
Fetch traffic information for a specific client.
:param email: Client's email.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/getClientTraffics/{email}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=self.headers, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
return await response.json()
else:
self.logger.error(f"Failed to get client traffic: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Get client traffic request failed: {e}")
return None
async def update_client_expiry(self, client_uuid, new_expiry_time, client_email):
"""
Update the expiry date of a specific client.
:param client_uuid: UUID of the client.
:param new_expiry_time: New expiry date in ISO format.
:param client_email: Client's email.
:return: None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/updateClient"
update_data = {
"id": 1,
"settings": {
"clients": [
{
"id": client_uuid,
"alterId": 0,
"email": client_email,
"limitIp": 2,
"totalGB": 0,
"expiryTime": new_expiry_time,
"enable": True,
"tgId": "",
"subId": ""
}
]
}
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, headers=self.headers, json=update_data, ssl=self.cert_content
) as response:
if response.status == 200:
self.logger.info("Client expiry updated successfully.")
else:
self.logger.error(f"Failed to update client expiry: {response.status}")
except aiohttp.ClientError as e:
self.logger.error(f"Update client expiry request failed: {e}")
async def add_client(self, inbound_id, expiry_date, email):
"""
Add a new client to an inbound.
:param inbound_id: ID of the inbound.
:param expiry_date: Expiry date in ISO format.
:param email: Client's email.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/addClient"
client_info = {
"clients": [
{
"id": generate_uuid(),
"alterId": 0,
"email": email,
"limitIp": 2,
"totalGB": 0,
"flow": "xtls-rprx-vision",
"expiryTime": expiry_date,
"enable": True,
"tgId": "",
"subId": ""
}
]
}
payload = {
"id": inbound_id,
"settings": client_info
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, headers=self.headers, json=payload, ssl=self.cert_content
) as response:
if response.status == 200:
return await response.status
else:
self.logger.error(f"Failed to add client: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Add client request failed: {e}")
return None

View File

9
instance/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from .model import User,Transaction,Subscription,SubscriptionStatus,Referral,Plan,TransactionType
from .config import setup_logging
from .configdb import get_postgres_session,init_postgresql,close_connections
__all__ = ['User','Transaction',
'SubscriptionStatus','Subscription',
'Referral','Plan','get_postgres_session',
'setup_logging','init_postgresql',
'close_connections','TransactionType']

View File

@@ -0,0 +1,42 @@
import logging
import sys
def setup_logging():
# Очистка существующих обработчиков
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Настройка формата
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Обработчик для вывода в консоль
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# Установка уровня для корневого логгера
logging.basicConfig(
level=logging.INFO,
handlers=[console_handler],
force=True
)
# Установка уровня для конкретных логгеров
loggers = [
'app.routes',
'app.services',
'main',
'__main__'
]
for logger_name in loggers:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
# Удаляем существующие обработчики
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Добавляем наш обработчик
logger.addHandler(console_handler)
# Запрещаем передачу родительским логгерам
logger.propagate = False

View File

@@ -1,25 +1,24 @@
import os import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from motor.motor_asyncio import AsyncIOMotorClient
from app.services.db_manager import DatabaseManager from app.services.db_manager import DatabaseManager
from .model import Base from .model import Base
try:
# Настройки PostgreSQL из переменных окружения # Настройки PostgreSQL из переменных окружения
POSTGRES_DSN = os.getenv("POSTGRES_URL") POSTGRES_DSN = os.getenv("POSTGRES_URL")
BASE_URL_MARZBAN = os.getenv("BASE_URL_MARZBAN")
USERNAME_MARZBA = os.getenv('USERNAME_MARZBAN')
PASSWORD_MARZBAN = os.getenv('PASSWORD_MARZBAN')
BILLING_URL = os.getenv('BILLING_URL')
# Создание движка для PostgreSQL # Создание движка для PostgreSQL
postgres_engine = create_async_engine(POSTGRES_DSN, echo=False) if POSTGRES_DSN is None or BASE_URL_MARZBAN is None or USERNAME_MARZBA is None or PASSWORD_MARZBAN is None or BILLING_URL is None:
raise Exception
postgres_engine = create_async_engine(POSTGRES_DSN, echo=False)
except Exception as e:
print("Ошибки при инициализации сессии постгреса")
AsyncSessionLocal = sessionmaker(bind=postgres_engine, class_=AsyncSession, expire_on_commit=False) AsyncSessionLocal = sessionmaker(bind=postgres_engine, class_=AsyncSession, expire_on_commit=False)
# Настройки MongoDB из переменных окружения
MONGO_URI = os.getenv("MONGO_URL")
DATABASE_NAME = os.getenv("DB_NAME")
# Создание клиента MongoDB
mongo_client = AsyncIOMotorClient(MONGO_URI)
mongo_db = mongo_client[DATABASE_NAME]
# Инициализация PostgreSQL # Инициализация PostgreSQL
async def init_postgresql(): async def init_postgresql():
""" """
@@ -32,18 +31,6 @@ async def init_postgresql():
except Exception as e: except Exception as e:
print(f"Failed to connect to PostgreSQL: {e}") print(f"Failed to connect to PostgreSQL: {e}")
# Инициализация MongoDB
async def init_mongodb():
"""
Проверка подключения к MongoDB.
"""
try:
# Проверяем подключение к MongoDB
await mongo_client.admin.command("ping")
print("MongoDB connected.")
except Exception as e:
print(f"Failed to connect to MongoDB: {e}")
# Получение сессии PostgreSQL # Получение сессии PostgreSQL
async def get_postgres_session(): async def get_postgres_session():
""" """
@@ -61,12 +48,9 @@ async def close_connections():
await postgres_engine.dispose() await postgres_engine.dispose()
print("PostgreSQL connection closed.") print("PostgreSQL connection closed.")
# Закрытие MongoDB
mongo_client.close()
print("MongoDB connection closed.")
def get_database_manager() -> DatabaseManager: def get_database_manager() -> DatabaseManager:
""" """
Функция-зависимость для получения экземпляра DatabaseManager. Функция-зависимость для получения экземпляра DatabaseManager.
""" """
return DatabaseManager(get_postgres_session) return DatabaseManager(get_postgres_session, USERNAME_MARZBA,PASSWORD_MARZBAN,BASE_URL_MARZBAN,BILLING_URL)

View File

@@ -1,60 +1,100 @@
from sqlalchemy import Column, String, Numeric, DateTime, Boolean, ForeignKey, Integer from sqlalchemy import Column, String, Numeric, DateTime, ForeignKey, Integer, Enum, Text, BigInteger
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, relationship, sessionmaker from sqlalchemy.orm import declarative_base, relationship
from datetime import datetime from datetime import datetime
from enum import Enum as PyEnum
import uuid import uuid
Base = declarative_base() Base = declarative_base()
def generate_uuid(): class SubscriptionStatus(PyEnum):
return str(uuid.uuid4()) ACTIVE = "active"
EXPIRED = "expired"
CANCELLED = "cancelled"
"""Пользователи""" class TransactionStatus(PyEnum):
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"
class TransactionType(PyEnum):
DEPOSIT = "deposit"
WITHDRAWAL = "withdrawal"
PAYMENT = "payment"
# Пользователи
class User(Base): class User(Base):
__tablename__ = 'users' __tablename__ = 'users'
id = Column(String, primary_key=True, default=generate_uuid) telegram_id = Column(BigInteger, primary_key=True)
telegram_id = Column(Integer, unique=True, nullable=False) username = Column(String(255))
username = Column(String)
balance = Column(Numeric(10, 2), default=0.0) balance = Column(Numeric(10, 2), default=0.0)
ref_code = Column(String(7), unique=True) # Реферальный код пользователя
invited_by = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow) created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
subscriptions = relationship("Subscription", back_populates="user") subscriptions = relationship("Subscription", back_populates="user")
transactions = relationship("Transaction", back_populates="user") transactions = relationship("Transaction", back_populates="user")
admins = relationship("Administrators", back_populates="user") sent_referrals = relationship("Referral",
foreign_keys="Referral.inviter_id",
back_populates="inviter")
received_referrals = relationship("Referral",
foreign_keys="Referral.invited_id",
back_populates="invited")
"""Подписки""" # Реферальные связи
class Referral(Base):
__tablename__ = 'referrals'
id = Column(Integer, primary_key=True)
inviter_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False)
invited_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
inviter = relationship("User", foreign_keys=[inviter_id], back_populates="sent_referrals")
invited = relationship("User", foreign_keys=[invited_id], back_populates="received_referrals")
# Тарифные планы
class Plan(Base):
__tablename__ = 'plans'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
price = Column(Numeric(10, 2), nullable=False)
duration_days = Column(Integer, nullable=False)
description = Column(Text)
subscriptions = relationship("Subscription", back_populates="plan")
# Подписки
class Subscription(Base): class Subscription(Base):
__tablename__ = 'subscriptions' __tablename__ = 'subscriptions'
id = Column(String, primary_key=True, default=generate_uuid) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(String, ForeignKey('users.id')) user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False)
vpn_server_id = Column(String) plan_id = Column(Integer, ForeignKey('plans.id'), nullable=False)
plan = Column(String) vpn_server_id = Column(String) # ID сервера в Marzban
expiry_date = Column(DateTime) status = Column(Enum(SubscriptionStatus), default=SubscriptionStatus.ACTIVE)
start_date = Column(DateTime, nullable=False)
end_date = Column(DateTime, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow) created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user = relationship("User", back_populates="subscriptions") user = relationship("User", back_populates="subscriptions")
plan = relationship("Plan", back_populates="subscriptions")
"""Транзакции""" # Транзакции
class Transaction(Base): class Transaction(Base):
__tablename__ = 'transactions' __tablename__ = 'transactions'
id = Column(String, primary_key=True, default=generate_uuid) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(String, ForeignKey('users.id')) user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=False)
amount = Column(Numeric(10, 2)) amount = Column(Numeric(10, 2), nullable=False)
transaction_type = Column(String) status = Column(Enum(TransactionStatus), default=TransactionStatus.PENDING)
type = Column(Enum(TransactionType), nullable=False)
payment_provider = Column(String(100))
payment_id = Column(String, unique=True) # ID платежа в внешней системе
created_at = Column(DateTime, default=datetime.utcnow) created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="transactions") user = relationship("User", back_populates="transactions")
"""Администраторы"""
class Administrators(Base):
__tablename__ = 'admins'
id = Column(String, primary_key=True, default=generate_uuid)
user_id = Column(String, ForeignKey('users.id'))
user = relationship("User", back_populates="admins")

51
main.py
View File

@@ -1,42 +1,53 @@
import os
import sys
from fastapi import FastAPI from fastapi import FastAPI
from instance.configdb import init_postgresql, init_mongodb, close_connections from instance import setup_logging
from app.routes import user_router, payment_router, subscription_router import logging
from app.services.db_manager import DatabaseManager setup_logging()
from instance.configdb import get_postgres_session # logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
# handlers=[logging.StreamHandler(sys.stdout)],
# force=True
# )
from instance import init_postgresql, close_connections
from app.routes import router, subscription_router
logger = logging.getLogger(__name__)
# Создаём приложение FastAPI
app = FastAPI() app = FastAPI()
# Инициализация менеджера базы данных
database_manager = DatabaseManager(session_generator=get_postgres_session)
# Событие при старте приложения
@app.on_event("startup") @app.on_event("startup")
async def startup(): async def startup():
""" """
Инициализация подключения к базам данных. Инициализация подключения к базам данных.
""" """
try:
logger.info("Инициализация PostgreSQL...")
await init_postgresql() await init_postgresql()
await init_mongodb() logger.info("PostgreSQL успешно инициализирован.")
except Exception as e:
logger.error(f"Ошибка при инициализации баз данных: {e}")
raise RuntimeError("Не удалось инициализировать базы данных")
# Событие при завершении работы приложения
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown(): async def shutdown():
""" """
Закрытие соединений с базами данных. Закрытие соединений с базами данных.
""" """
try:
logger.info("Закрытие соединений с базами данных...")
await close_connections() await close_connections()
logger.info("Соединения с базами данных успешно закрыты.")
except Exception as e:
logger.error(f"Ошибка при закрытии соединений: {e}")
# Подключение маршрутов app.include_router(router, prefix="/api")
app.include_router(user_router, prefix="/api") #app.include_router(payment_router, prefix="/api")
app.include_router(payment_router, prefix="/api")
app.include_router(subscription_router, prefix="/api") app.include_router(subscription_router, prefix="/api")
# Пример корневого маршрута
@app.get("/") @app.get("/")
async def root(): def read_root():
""" return {"message": "FastAPI приложение работает!"}
Пример маршрута, использующего DatabaseManager.
"""
user = await database_manager.create_user(telegram_id=12345)
return {"message": "User created", "user": {"id": user.id, "telegram_id": user.telegram_id}}

View File

@@ -3,14 +3,18 @@ aiohttp==3.11.11
aiosignal==1.3.2 aiosignal==1.3.2
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.7.0 anyio==4.7.0
APScheduler==3.11.0
asyncpg==0.30.0
attrs==24.3.0 attrs==24.3.0
blinker==1.9.0 blinker==1.9.0
bson==0.5.10 certifi==2025.11.12
charset-normalizer==3.4.4
click==8.1.7 click==8.1.7
dnspython==2.7.0 dnspython==2.7.0
fastapi==0.115.6 fastapi==0.115.6
frozenlist==1.5.0 frozenlist==1.5.0
greenlet==3.1.1 greenlet==3.1.1
h11==0.14.0
idna==3.10 idna==3.10
itsdangerous==2.2.0 itsdangerous==2.2.0
Jinja2==3.1.4 Jinja2==3.1.4
@@ -22,10 +26,14 @@ pydantic==2.10.4
pydantic_core==2.27.2 pydantic_core==2.27.2
pymongo==4.9.2 pymongo==4.9.2
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
requests==2.32.5
six==1.17.0 six==1.17.0
sniffio==1.3.1 sniffio==1.3.1
SQLAlchemy==2.0.36 SQLAlchemy==2.0.36
starlette==0.41.3 starlette==0.41.3
typing_extensions==4.12.2 typing_extensions==4.12.2
tzlocal==5.2
urllib3==2.5.0
uvicorn==0.34.0
Werkzeug==3.1.3 Werkzeug==3.1.3
yarl==1.18.3 yarl==1.18.3

0
run.py
View File

View File

270
tests/add_plans.py Normal file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
"""
Автономный скрипт для инициализации тарифных планов в PostgreSQL.
Использует данные подключения из docker-compose.yml
"""
import asyncio
import argparse
import sys
from typing import List, Dict
from sqlalchemy import Column, Integer, String, Numeric, Text, delete, insert
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from decimal import Decimal
Base = declarative_base()
class Plan(Base):
"""Модель тарифного плана"""
__tablename__ = 'plans'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
price = Column(Numeric(10, 2), nullable=False)
duration_days = Column(Integer, nullable=False)
description = Column(Text, nullable=True)
# Данные из вашего docker-compose.yml
DEFAULT_CONFIG = {
'host': 'localhost',
'port': 5432,
'database': 'postgresql',
'user': 'AH3J9GSPBYOP',
'password': 'uPS9?y~mcu2',
'url': 'postgresql+asyncpg://AH3J9GSPBYOP:uPS9?y~mcu2@localhost:5432/postgresql'
}
PLANS_DATA = [
{'name': 'Lark_Standart_1', 'price': Decimal('200.00'), 'duration_days': 30},
{'name': 'Lark_Pro_1', 'price': Decimal('400.00'), 'duration_days': 30},
{'name': 'Lark_Family_1', 'price': Decimal('700.00'), 'duration_days': 30},
{'name': 'Lark_Standart_6', 'price': Decimal('1200.00'), 'duration_days': 180},
{'name': 'Lark_Standart_12', 'price': Decimal('2400.00'), 'duration_days': 360},
{'name': 'Lark_Pro_6', 'price': Decimal('2000.00'), 'duration_days': 180},
{'name': 'Lark_Pro_12', 'price': Decimal('4800.00'), 'duration_days': 360},
{'name': 'Lark_Family_6', 'price': Decimal('4200.00'), 'duration_days': 180},
{'name': 'Lark_Family_12', 'price': Decimal('8400.00'), 'duration_days': 360},
]
def print_banner():
"""Печатает баннер скрипта"""
print("=" * 60)
print("🚀 ИНИЦИАЛИЗАЦИЯ ТАРИФНЫХ ПЛАНОВ В БАЗЕ ДАННЫХ")
print("=" * 60)
print()
def create_db_url(config: dict) -> str:
"""Создает URL для подключения к базе данных"""
if config.get('url'):
return config['url']
return f"postgresql+asyncpg://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
async def check_connection(engine) -> bool:
"""Проверяет подключение к базе данных"""
try:
async with engine.connect() as conn:
result = await conn.execute("SELECT version()")
version = result.scalar()
print(f"✅ Подключено к PostgreSQL: {version.split(',')[0]}")
return True
except Exception as e:
print(f"❌ Ошибка подключения к базе данных: {e}")
return False
async def get_existing_plans(session) -> List:
"""Получает существующие тарифные планы"""
result = await session.execute(
"SELECT id, name, price, duration_days FROM plans ORDER BY price"
)
return result.fetchall()
async def clear_table(session, table_name: str = 'plans') -> bool:
"""Очищает указанную таблицу"""
try:
await session.execute(delete(Plan))
await session.commit()
print(f"✅ Таблица '{table_name}' очищена")
return True
except Exception as e:
print(f"❌ Ошибка при очистке таблицы: {e}")
await session.rollback()
return False
async def add_plans_to_db(session, plans_data: List[Dict]) -> int:
"""Добавляет тарифные планы в базу данных"""
try:
added_count = 0
for plan in plans_data:
await session.execute(
insert(Plan).values(**plan)
)
added_count += 1
await session.commit()
return added_count
except Exception as e:
await session.rollback()
raise e
async def print_plans_table(plans: List) -> None:
"""Выводит таблицу с тарифными планами"""
if not plans:
print("📭 Таблица 'plans' пуста")
return
print(f"\n📊 Текущие тарифные планы ({len(plans)} шт.):")
print("-" * 70)
print(f"{'ID':<5} {'Название':<25} {'Цена (руб.)':<15} {'Дней':<10}")
print("-" * 70)
for plan in plans:
print(f"{plan[0]:<5} {plan[1]:<25} {plan[2]:<15} {plan[3]:<10}")
print("-" * 70)
# Подсчет статистики
total_price = sum(float(p[2]) for p in plans)
avg_price = total_price / len(plans) if plans else 0
print(f"💰 Общая сумма всех тарифов: {total_price:.2f} руб.")
print(f"📈 Средняя цена тарифа: {avg_price:.2f} руб.")
print(f"📅 Всего предложений: {len(plans)}")
async def main(config: dict, clear_existing: bool = True, dry_run: bool = False):
"""Основная функция скрипта"""
print_banner()
# Создаем URL для подключения
db_url = create_db_url(config)
print(f"📡 Параметры подключения:")
print(f" Хост: {config['host']}:{config['port']}")
print(f" База данных: {config['database']}")
print(f" Пользователь: {config['user']}")
print(f" {'🚨 РЕЖИМ ТЕСТА (dry-run)' if dry_run else ''}")
print()
try:
# Подключаемся к базе данных
print("🔄 Подключение к базе данных...")
engine = create_async_engine(db_url, echo=False)
# Проверяем подключение
if not await check_connection(engine):
print("\nНе удалось подключиться к базе данных")
return False
# Создаем фабрику сессий
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with AsyncSessionLocal() as session:
# Получаем текущие тарифы
print("\n🔍 Проверяем существующие тарифы...")
existing_plans = await get_existing_plans(session)
if existing_plans:
await print_plans_table(existing_plans)
if clear_existing and not dry_run:
print("\n⚠️ ВНИМАНИЕ: Будут удалены все существующие тарифы!")
confirm = input("Продолжить? (y/N): ")
if confirm.lower() != 'y':
print("❌ Операция отменена пользователем")
return False
# Очищаем таблицу
await clear_table(session)
elif dry_run:
print("\n⚠️ DRY-RUN: Существующие тарифы НЕ будут удалены")
else:
print("📭 Таблица 'plans' пуста, создаем новые тарифы...")
# Добавляем новые тарифы
if not dry_run:
print(f"\n Добавляем {len(PLANS_DATA)} тарифных планов...")
added_count = await add_plans_to_db(session, PLANS_DATA)
print(f"✅ Успешно добавлено {added_count} тарифов")
else:
print(f"\n⚠️ DRY-RUN: Планируется добавить {len(PLANS_DATA)} тарифов:")
for i, plan in enumerate(PLANS_DATA, 1):
print(f" {i}. {plan['name']} - {plan['price']} руб. ({plan['duration_days']} дней)")
# Показываем финальный результат
print("\n🎯 ФИНАЛЬНЫЙ РЕЗУЛЬТАТ:")
final_plans = await get_existing_plans(session)
await print_plans_table(final_plans)
await engine.dispose()
print("\n✅ Скрипт успешно выполнен!")
return True
except Exception as e:
print(f"\n❌ Критическая ошибка: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Инициализация тарифных планов в базе данных',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Примеры использования:
%(prog)s # Использует настройки по умолчанию
%(prog)s --no-clear # Не очищать существующие тарифы
%(prog)s --dry-run # Только показать что будет сделано
%(prog)s --host 192.168.1.100 # Указать другой хост
%(prog)s --url "postgresql://..." # Указать полный URL
"""
)
parser.add_argument('--host', help='Хост базы данных', default=DEFAULT_CONFIG['host'])
parser.add_argument('--port', type=int, help='Порт базы данных', default=DEFAULT_CONFIG['port'])
parser.add_argument('--database', help='Имя базы данных', default=DEFAULT_CONFIG['database'])
parser.add_argument('--user', help='Имя пользователя', default=DEFAULT_CONFIG['user'])
parser.add_argument('--password', help='Пароль', default=DEFAULT_CONFIG['password'])
parser.add_argument('--url', help='Полный URL подключения (игнорирует остальные параметры)')
parser.add_argument('--no-clear', action='store_true', help='Не очищать существующие тарифы')
parser.add_argument('--dry-run', action='store_true', help='Только показать что будет сделано')
args = parser.parse_args()
# Формируем конфигурацию
config = DEFAULT_CONFIG.copy()
if args.url:
config['url'] = args.url
else:
config.update({
'host': args.host,
'port': args.port,
'database': args.database,
'user': args.user,
'password': args.password,
'url': None # Будет сгенерирован автоматически
})
# Запускаем скрипт
success = asyncio.run(main(
config=config,
clear_existing=not args.no_clear,
dry_run=args.dry_run
))
sys.exit(0 if success else 1)

View File

View File