Интегрировал биллинг при оплате подписки
This commit is contained in:
@@ -43,7 +43,7 @@ async def buy_subscription(
|
||||
|
||||
logger.info(f"Результат buy_sub: {result}")
|
||||
|
||||
if result == "ERROR" or result is None:
|
||||
if result == "ERROR":
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
elif result == "INSUFFICIENT_FUNDS":
|
||||
raise HTTPException(status_code=400, detail="INSUFFICIENT_FUNDS")
|
||||
@@ -51,12 +51,24 @@ async def buy_subscription(
|
||||
raise HTTPException(status_code=400, detail="TARIFF_NOT_FOUND")
|
||||
elif result == "ACTIVE_SUBSCRIPTION_EXISTS":
|
||||
raise HTTPException(status_code=400, detail="ACTIVE_SUBSCRIPTION_EXISTS")
|
||||
elif result == "USER_NOT_FOUND":
|
||||
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
|
||||
elif result == "SUBSCRIPTION_CREATION_FAILED":
|
||||
raise HTTPException(status_code=500, detail="Failed to create subscription")
|
||||
elif result == "PAYMENT_FAILED_AFTER_SUBSCRIPTION":
|
||||
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
|
||||
elif result == "SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED":
|
||||
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
|
||||
|
||||
# Если успешно, генерируем URI
|
||||
if isinstance(result, dict) and result.get('status') == 'OK':
|
||||
uri_result = await database_manager.generate_uri(request_data.telegram_id)
|
||||
logger.info(f"Результат генерации URI: {uri_result}")
|
||||
return {"status": "success", "subscription_id": result.get('subscription_id'), "uri": uri_result[0]}
|
||||
return {
|
||||
"status": "success",
|
||||
"subscription_id": result.get('subscription_id'),
|
||||
"uri": uri_result[0] if uri_result and isinstance(uri_result, list) else uri_result
|
||||
}
|
||||
else:
|
||||
return {"status": "success", "message": "Subscription created"}
|
||||
|
||||
@@ -68,6 +80,7 @@ async def buy_subscription(
|
||||
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
|
||||
|
||||
|
||||
|
||||
# Эндпоинт для получения последней подписки
|
||||
@router.get("/subscription/{telegram_id}/last", response_model=SubscriptionResponse)
|
||||
async def last_subscription(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):
|
||||
|
||||
@@ -1,55 +1,79 @@
|
||||
import aiohttp
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from typing import Dict, Any, Optional
|
||||
from enum import Enum
|
||||
|
||||
class BillingService:
|
||||
def __init__(self, base_url: str, api_key: str):
|
||||
class BillingErrorCode(Enum):
|
||||
INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
|
||||
USER_NOT_FOUND = "USER_NOT_FOUND"
|
||||
PAYMENT_FAILED = "PAYMENT_FAILED"
|
||||
SERVICE_UNAVAILABLE = "SERVICE_UNAVAILABLE"
|
||||
|
||||
class BillingAdapter:
|
||||
def __init__(self, base_url: str):
|
||||
self.base_url = base_url
|
||||
self.api_key = api_key
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.session = None
|
||||
|
||||
async def get_session(self):
|
||||
if self.session is None:
|
||||
self.session = aiohttp.ClientSession(
|
||||
headers={'Authorization': f'Bearer {self.api_key}'}
|
||||
)
|
||||
self.session = aiohttp.ClientSession()
|
||||
return self.session
|
||||
|
||||
async def process_payment(self, telegram_id: int, amount: float, plan_name: str) -> Dict[str, Any]:
|
||||
"""Обработка платежа через биллинг-сервис"""
|
||||
async def withdraw_funds(self, user_id: int, amount: float, description: str = "") -> Dict[str, Any]:
|
||||
"""
|
||||
Списание средств через биллинг-сервис
|
||||
"""
|
||||
try:
|
||||
session = await self.get_session()
|
||||
|
||||
payload = {
|
||||
"user_id": telegram_id,
|
||||
"user_id": user_id,
|
||||
"amount": amount,
|
||||
"plan_name": plan_name,
|
||||
"currency": "USD"
|
||||
"description": description or f"Payment for subscription"
|
||||
}
|
||||
|
||||
async with session.post(f"{self.base_url}/payments/process", json=payload) as response:
|
||||
self.logger.info(f"Withdrawing {amount} from user {user_id}")
|
||||
|
||||
async with session.post(f"{self.base_url}/billing/payments/withdraw", json=payload) as response:
|
||||
if response.status == 200:
|
||||
return await response.json()
|
||||
result = await response.json()
|
||||
if result.get("success"):
|
||||
return {"status": "success"}
|
||||
else:
|
||||
error = result.get("error", "WITHDRAWAL_FAILED")
|
||||
self.logger.error(f"Withdrawal failed: {error}")
|
||||
return {"status": "error", "code": error}
|
||||
else:
|
||||
self.logger.error(f"Billing service error: {response.status}")
|
||||
return {"status": "error", "code": "BILLING_SERVICE_ERROR"}
|
||||
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
|
||||
|
||||
except Exception as e:
|
||||
except aiohttp.ClientError as e:
|
||||
self.logger.error(f"Billing service connection error: {str(e)}")
|
||||
return {"status": "error", "code": "CONNECTION_ERROR"}
|
||||
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error in withdraw_funds: {str(e)}")
|
||||
return {"status": "error", "code": "PAYMENT_FAILED"}
|
||||
|
||||
async def check_payment_status(self, payment_id: str) -> Dict[str, Any]:
|
||||
"""Проверка статуса платежа"""
|
||||
async def get_balance(self, user_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Получение баланса пользователя
|
||||
"""
|
||||
try:
|
||||
session = await self.get_session()
|
||||
async with session.get(f"{self.base_url}/payments/{payment_id}") as response:
|
||||
|
||||
async with session.get(f"{self.base_url}/billing/balance/{user_id}") as response:
|
||||
if response.status == 200:
|
||||
return await response.json()
|
||||
result = await response.json()
|
||||
return {"status": "success", "balance": result.get("balance", 0)}
|
||||
elif response.status == 404:
|
||||
return {"status": "error", "code": "USER_NOT_FOUND"}
|
||||
else:
|
||||
return {"status": "error"}
|
||||
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Billing service check error: {str(e)}")
|
||||
return {"status": "error"}
|
||||
self.logger.error(f"Error getting balance: {str(e)}")
|
||||
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
|
||||
|
||||
async def close(self):
|
||||
if self.session:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from decimal import Decimal
|
||||
import json
|
||||
from instance.model import User, Subscription, Transaction
|
||||
from app.services.billing_service import BillingAdapter
|
||||
from app.services.marzban import MarzbanService, MarzbanUser
|
||||
from .postgres_rep import PostgresRepository
|
||||
from instance.model import Transaction,TransactionType, Plan
|
||||
@@ -14,13 +15,14 @@ from uuid import UUID
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url):
|
||||
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url,billing_base_url):
|
||||
"""
|
||||
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
|
||||
"""
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.postgres_repo = PostgresRepository(session_generator, self.logger)
|
||||
self.marzban_service = MarzbanService(marzban_url,marzban_username,marzban_password)
|
||||
self.billing_adapter = BillingAdapter(billing_base_url)
|
||||
|
||||
async def create_user(self, telegram_id: int, invented_by: Optional[int]= None):
|
||||
"""
|
||||
@@ -59,24 +61,24 @@ class DatabaseManager:
|
||||
"""
|
||||
return await self.postgres_repo.get_last_transactions(telegram_id, limit)
|
||||
|
||||
async def update_balance(self, telegram_id: int, amount: float):
|
||||
"""
|
||||
Обновляет баланс пользователя и добавляет транзакцию.
|
||||
"""
|
||||
self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
|
||||
user = await self.get_user_by_telegram_id(telegram_id)
|
||||
if not user:
|
||||
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
|
||||
return "ERROR"
|
||||
# async def update_balance(self, telegram_id: int, amount: float):
|
||||
# """
|
||||
# Обновляет баланс пользователя и добавляет транзакцию.
|
||||
# """
|
||||
# self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
|
||||
# user = await self.get_user_by_telegram_id(telegram_id)
|
||||
# if not user:
|
||||
# self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
|
||||
# return "ERROR"
|
||||
|
||||
updated = await self.postgres_repo.update_balance(user, amount)
|
||||
if not updated:
|
||||
self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
|
||||
return "ERROR"
|
||||
# updated = await self.postgres_repo.update_balance(user, amount)
|
||||
# if not updated:
|
||||
# self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
|
||||
# return "ERROR"
|
||||
|
||||
self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
|
||||
await self.add_transaction(user.telegram_id, amount)
|
||||
return "OK"
|
||||
# self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
|
||||
# await self.add_transaction(user.telegram_id, amount)
|
||||
# return "OK"
|
||||
|
||||
|
||||
async def get_active_subscription(self, telegram_id: int):
|
||||
@@ -108,64 +110,56 @@ class DatabaseManager:
|
||||
|
||||
async def buy_sub(self, telegram_id: int, plan_name: str):
|
||||
"""
|
||||
Покупает подписку.
|
||||
Покупка подписки: сначала создаем подписку, потом списываем деньги
|
||||
"""
|
||||
try:
|
||||
self.logger.info(f"Начало покупки подписки для пользователя {telegram_id}, план: {plan_name}")
|
||||
active_subscription = await self.get_active_subscription(telegram_id)
|
||||
self.logger.info(f"Активная подписка: {active_subscription}")
|
||||
self.logger.info(f"Покупка подписки: user={telegram_id}, plan={plan_name}")
|
||||
|
||||
if active_subscription:
|
||||
self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.")
|
||||
# 1. Проверка активной подписки
|
||||
if await self.get_active_subscription(telegram_id):
|
||||
return "ACTIVE_SUBSCRIPTION_EXISTS"
|
||||
|
||||
result = await self._initialize_user_and_plan(telegram_id, plan_name)
|
||||
if isinstance(result, str):
|
||||
return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS"
|
||||
|
||||
user, plan = result
|
||||
self.logger.info(f"Пользователь и план найдены: user_id={user.telegram_id}, plan_price={plan.price}")
|
||||
|
||||
new_subscription = await self._create_subscription_and_add_client(user, plan)
|
||||
if not new_subscription:
|
||||
self.logger.error(f"Не удалось создать подписку для пользователя {telegram_id}")
|
||||
return "ERROR"
|
||||
|
||||
updated = await self.postgres_repo.update_balance(user,-plan.price)
|
||||
if updated == False:
|
||||
self.logger.error(f"Не удалось обновить баланс для пользователя {telegram_id}")
|
||||
return "ERROR"
|
||||
|
||||
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.")
|
||||
return {"status": "OK", "subscription_id": str(new_subscription.id)}
|
||||
except Exception as e:
|
||||
self.logger.error(f"Неожиданная ошибка в buy_sub: {str(e)}")
|
||||
return "ERROR"
|
||||
|
||||
async def _initialize_user_and_plan(self, telegram_id, plan_name):
|
||||
"""
|
||||
Инициализирует пользователя и план подписки.
|
||||
"""
|
||||
try:
|
||||
|
||||
user = await self.get_user_by_telegram_id(telegram_id)
|
||||
if not user:
|
||||
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
|
||||
return "ERROR"
|
||||
|
||||
# 2. Получаем план
|
||||
plan = await self.postgres_repo.get_subscription_plan(plan_name)
|
||||
if not plan:
|
||||
self.logger.error(f"Тарифный план {plan_name} не найден.")
|
||||
return "TARIFF_NOT_FOUND"
|
||||
|
||||
cost = plan.price
|
||||
if user.balance < cost:
|
||||
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_name}.")
|
||||
# 3. Проверяем пользователя
|
||||
user = await self.get_user_by_telegram_id(telegram_id)
|
||||
if not user:
|
||||
return "USER_NOT_FOUND"
|
||||
|
||||
# 4. Проверяем баланс (только для информации)
|
||||
balance_result = await self.billing_adapter.get_balance(telegram_id)
|
||||
if balance_result["status"] == "error":
|
||||
return "BILLING_SERVICE_ERROR"
|
||||
|
||||
if balance_result["balance"] < plan.price:
|
||||
return "INSUFFICIENT_FUNDS"
|
||||
|
||||
return user, plan
|
||||
# 5. СОЗДАЕМ ПОДПИСКУ (самое важное - сначала!)
|
||||
new_subscription = await self._create_subscription_and_add_client(user, plan)
|
||||
if not new_subscription:
|
||||
return "SUBSCRIPTION_CREATION_FAILED"
|
||||
|
||||
# 6. ТОЛЬКО ПОСЛЕ УСПЕШНОГО СОЗДАНИЯ ПОДПИСКИ - списываем деньги
|
||||
withdraw_result = await self.billing_adapter.withdraw_funds(
|
||||
telegram_id,
|
||||
float(plan.price),
|
||||
f"Оплата подписки {plan_name}"
|
||||
)
|
||||
|
||||
if withdraw_result["status"] == "error":
|
||||
await self.postgres_repo.delete_subscription(new_subscription.id)
|
||||
self.logger.error(f"Payment failed but subscription created: {new_subscription.id}")
|
||||
return "PAYMENT_FAILED_AFTER_SUBSCRIPTION"
|
||||
|
||||
# 7. ВСЕ УСПЕШНО
|
||||
self.logger.info(f"Подписка успешно создана и оплачена: {new_subscription.id}")
|
||||
return {"status": "OK", "subscription_id": str(new_subscription.id)}
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Неожиданная ошибка в _initialize_user_and_plan: {str(e)}")
|
||||
self.logger.error(f"Ошибка в buy_sub: {str(e)}")
|
||||
return "ERROR"
|
||||
|
||||
async def _create_subscription_and_add_client(self, user: User, plan: Plan):
|
||||
|
||||
@@ -68,30 +68,30 @@ class PostgresRepository:
|
||||
return False
|
||||
|
||||
|
||||
async def update_balance(self, user: User, amount: float):
|
||||
"""
|
||||
Обновляет баланс пользователя.
|
||||
# async def update_balance(self, user: User, amount: float):
|
||||
# """
|
||||
# Обновляет баланс пользователя.
|
||||
|
||||
:param user: Объект пользователя.
|
||||
:param amount: Сумма для добавления/вычитания.
|
||||
:return: True, если успешно, иначе False.
|
||||
"""
|
||||
self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
|
||||
async for session in self.session_generator():
|
||||
try:
|
||||
user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
|
||||
if not user:
|
||||
self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
|
||||
return False
|
||||
# Приведение amount к Decimal
|
||||
user.balance += Decimal(amount)
|
||||
await session.commit()
|
||||
self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
|
||||
return True
|
||||
except SQLAlchemyError as e:
|
||||
self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
|
||||
await session.rollback()
|
||||
return False
|
||||
# :param user: Объект пользователя.
|
||||
# :param amount: Сумма для добавления/вычитания.
|
||||
# :return: True, если успешно, иначе False.
|
||||
# """
|
||||
# self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
|
||||
# async for session in self.session_generator():
|
||||
# try:
|
||||
# user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
|
||||
# if not user:
|
||||
# self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
|
||||
# return False
|
||||
# # Приведение amount к Decimal
|
||||
# user.balance += Decimal(amount)
|
||||
# await session.commit()
|
||||
# self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
|
||||
# return True
|
||||
# except SQLAlchemyError as e:
|
||||
# self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
|
||||
# await session.rollback()
|
||||
# return False
|
||||
|
||||
async def get_last_transactions(self, user_telegram_id: int, limit: int = 10):
|
||||
"""
|
||||
@@ -139,6 +139,34 @@ class PostgresRepository:
|
||||
except SQLAlchemyError as e:
|
||||
self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}")
|
||||
return None
|
||||
async def delete_subscription(self, subscription_id: UUID) -> bool:
|
||||
"""
|
||||
Удаляет подписку по её ID.
|
||||
|
||||
:param subscription_id: UUID подписки для удаления
|
||||
:return: True если удалено успешно, False в случае ошибки
|
||||
"""
|
||||
async for session in self.session_generator():
|
||||
try:
|
||||
result = await session.execute(
|
||||
select(Subscription).where(Subscription.id == subscription_id)
|
||||
)
|
||||
subscription = result.scalars().first()
|
||||
|
||||
if not subscription:
|
||||
self.logger.warning(f"Подписка с ID {subscription_id} не найдена")
|
||||
return False
|
||||
|
||||
await session.delete(subscription)
|
||||
await session.commit()
|
||||
|
||||
self.logger.info(f"Подписка с ID {subscription_id} успешно удалена")
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
self.logger.error(f"Ошибка при удалении подписки {subscription_id}: {e}")
|
||||
await session.rollback()
|
||||
return False
|
||||
|
||||
async def add_record(self, record):
|
||||
"""
|
||||
|
||||
@@ -9,8 +9,10 @@ try:
|
||||
BASE_URL_MARZBAN = os.getenv("BASE_URL_MARZBAN")
|
||||
USERNAME_MARZBA = os.getenv('USERNAME_MARZBAN')
|
||||
PASSWORD_MARZBAN = os.getenv('PASSWORD_MARZBAN')
|
||||
BILLING_URL = os.getenv('BILLING_URL')
|
||||
|
||||
# Создание движка для PostgreSQL
|
||||
if POSTGRES_DSN is None or BASE_URL_MARZBAN is None or USERNAME_MARZBA is None or PASSWORD_MARZBAN is None:
|
||||
if POSTGRES_DSN is None or BASE_URL_MARZBAN is None or USERNAME_MARZBA is None or PASSWORD_MARZBAN is None or BILLING_URL is None:
|
||||
raise Exception
|
||||
postgres_engine = create_async_engine(POSTGRES_DSN, echo=False)
|
||||
except Exception as e:
|
||||
@@ -51,4 +53,4 @@ def get_database_manager() -> DatabaseManager:
|
||||
"""
|
||||
Функция-зависимость для получения экземпляра DatabaseManager.
|
||||
"""
|
||||
return DatabaseManager(get_postgres_session, USERNAME_MARZBA,PASSWORD_MARZBAN,BASE_URL_MARZBAN)
|
||||
return DatabaseManager(get_postgres_session, USERNAME_MARZBA,PASSWORD_MARZBAN,BASE_URL_MARZBAN,BILLING_URL)
|
||||
|
||||
Reference in New Issue
Block a user