Интеграция биллинг при покупке подписки

This commit is contained in:
root
2025-11-29 22:19:13 +03:00
7 changed files with 213 additions and 127 deletions

View File

@@ -43,7 +43,7 @@ async def buy_subscription(
logger.info(f"Результат buy_sub: {result}")
if result == "ERROR" or result is None:
if result == "ERROR":
raise HTTPException(status_code=500, detail="Internal server error")
elif result == "INSUFFICIENT_FUNDS":
raise HTTPException(status_code=400, detail="INSUFFICIENT_FUNDS")
@@ -51,12 +51,24 @@ async def buy_subscription(
raise HTTPException(status_code=400, detail="TARIFF_NOT_FOUND")
elif result == "ACTIVE_SUBSCRIPTION_EXISTS":
raise HTTPException(status_code=400, detail="ACTIVE_SUBSCRIPTION_EXISTS")
elif result == "USER_NOT_FOUND":
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
elif result == "SUBSCRIPTION_CREATION_FAILED":
raise HTTPException(status_code=500, detail="Failed to create subscription")
elif result == "PAYMENT_FAILED_AFTER_SUBSCRIPTION":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
elif result == "SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
# Если успешно, генерируем URI
if isinstance(result, dict) and result.get('status') == 'OK':
uri_result = await database_manager.generate_uri(request_data.telegram_id)
logger.info(f"Результат генерации URI: {uri_result}")
return {"status": "success", "subscription_id": result.get('subscription_id'), "uri": uri_result[0]}
return {
"status": "success",
"subscription_id": result.get('subscription_id'),
"uri": uri_result[0] if uri_result and isinstance(uri_result, list) else uri_result
}
else:
return {"status": "success", "message": "Subscription created"}
@@ -68,6 +80,7 @@ async def buy_subscription(
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
# Эндпоинт для получения последней подписки
@router.get("/subscription/{telegram_id}/last", response_model=SubscriptionResponse)
async def last_subscription(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):

View File

@@ -98,37 +98,6 @@ async def get_user(
logger.exception(f"Неожиданная ошибка при получении пользователя с telegram_id {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/user/{telegram_id}/balance/{amount}", summary="Обновить баланс")
async def update_balance(
telegram_id: int,
amount: float,
db_manager: DatabaseManager = Depends(get_database_manager)
):
"""
Обновляет баланс пользователя.
"""
logger.info(f"Получен запрос на обновление баланса: telegram_id={telegram_id}, amount={amount}")
try:
result = await db_manager.update_balance(telegram_id, amount)
if result == "ERROR":
logger.error(f"Ошибка обновления баланса для пользователя {telegram_id}")
raise HTTPException(status_code=500, detail="Failed to update balance")
logger.info(f"Баланс пользователя {telegram_id} успешно обновлен на {amount}")
return {"message": "Balance updated successfully"}
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных при обновлении баланса пользователя {telegram_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при обновлении баланса пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}/transactions", summary="Последние транзакции пользователя")
async def last_transactions(
telegram_id: int,
@@ -139,7 +108,7 @@ async def last_transactions(
"""
logger.info(f"Получен запрос на транзакции для пользователя: {telegram_id}")
try:
logger.debug(f"Вызов метода get_transaction с user_id={telegram_id}")
logger.info(f"Вызов метода get_transaction с user_id={telegram_id}")
transactions = await db_manager.get_transaction(telegram_id)
if transactions == "ERROR":
@@ -149,7 +118,7 @@ async def last_transactions(
response = []
logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}")
return response
logger.debug(f"Транзакции для {telegram_id}: {transactions}")
logger.info(f"Транзакции для {telegram_id}: {transactions}")
response = [
{
"id": tx.id,

View File

@@ -0,0 +1,80 @@
import aiohttp
import logging
from typing import Dict, Any, Optional
from enum import Enum
class BillingErrorCode(Enum):
INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
USER_NOT_FOUND = "USER_NOT_FOUND"
PAYMENT_FAILED = "PAYMENT_FAILED"
SERVICE_UNAVAILABLE = "SERVICE_UNAVAILABLE"
class BillingAdapter:
def __init__(self, base_url: str):
self.base_url = base_url
self.logger = logging.getLogger(__name__)
self.session = None
async def get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
async def withdraw_funds(self, user_id: int, amount: float, description: str = "") -> Dict[str, Any]:
"""
Списание средств через биллинг-сервис
"""
try:
session = await self.get_session()
payload = {
"user_id": user_id,
"amount": amount,
"description": description or f"Payment for subscription"
}
self.logger.info(f"Withdrawing {amount} from user {user_id}")
async with session.post(f"{self.base_url}/billing/payments/withdraw", json=payload) as response:
if response.status == 200:
result = await response.json()
if result.get("success"):
return {"status": "success"}
else:
error = result.get("error", "WITHDRAWAL_FAILED")
self.logger.error(f"Withdrawal failed: {error}")
return {"status": "error", "code": error}
else:
self.logger.error(f"Billing service error: {response.status}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except aiohttp.ClientError as e:
self.logger.error(f"Billing service connection error: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Unexpected error in withdraw_funds: {str(e)}")
return {"status": "error", "code": "PAYMENT_FAILED"}
async def get_balance(self, user_id: int) -> Dict[str, Any]:
"""
Получение баланса пользователя
"""
try:
session = await self.get_session()
async with session.get(f"{self.base_url}/billing/balance/{user_id}") as response:
if response.status == 200:
result = await response.json()
return {"status": "success", "balance": result.get("balance", 0)}
elif response.status == 404:
return {"status": "error", "code": "USER_NOT_FOUND"}
else:
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Error getting balance: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
async def close(self):
if self.session:
await self.session.close()

View File

@@ -1,6 +1,7 @@
from decimal import Decimal
import json
from instance.model import User, Subscription, Transaction
from app.services.billing_service import BillingAdapter
from app.services.marzban import MarzbanService, MarzbanUser
from .postgres_rep import PostgresRepository
from instance.model import Transaction,TransactionType, Plan
@@ -14,13 +15,14 @@ from uuid import UUID
class DatabaseManager:
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url):
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url,billing_base_url):
"""
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
"""
self.logger = logging.getLogger(__name__)
self.postgres_repo = PostgresRepository(session_generator, self.logger)
self.marzban_service = MarzbanService(marzban_url,marzban_username,marzban_password)
self.billing_adapter = BillingAdapter(billing_base_url)
async def create_user(self, telegram_id: int, invented_by: Optional[int]= None):
"""
@@ -59,24 +61,24 @@ class DatabaseManager:
"""
return await self.postgres_repo.get_last_transactions(telegram_id, limit)
async def update_balance(self, telegram_id: int, amount: float):
"""
Обновляет баланс пользователя и добавляет транзакцию.
"""
self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
# async def update_balance(self, telegram_id: int, amount: float):
# """
# Обновляет баланс пользователя и добавляет транзакцию.
# """
# self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
# user = await self.get_user_by_telegram_id(telegram_id)
# if not user:
# self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
# return "ERROR"
updated = await self.postgres_repo.update_balance(user, amount)
if not updated:
self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
return "ERROR"
# updated = await self.postgres_repo.update_balance(user, amount)
# if not updated:
# self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
# return "ERROR"
self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
await self.add_transaction(user.telegram_id, amount)
return "OK"
# self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
# await self.add_transaction(user.telegram_id, amount)
# return "OK"
async def get_active_subscription(self, telegram_id: int):
@@ -105,67 +107,59 @@ class DatabaseManager:
Возвращает список последних подписок.
"""
return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id)
async def buy_sub(self, telegram_id: int, plan_name: str):
"""
Покупает подписку.
Покупка подписки: сначала создаем подписку, потом списываем деньги
"""
try:
self.logger.info(f"Начало покупки подписки для пользователя {telegram_id}, план: {plan_name}")
active_subscription = await self.get_active_subscription(telegram_id)
self.logger.info(f"Активная подписка: {active_subscription}")
if active_subscription:
self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.")
self.logger.info(f"Покупка подписки: user={telegram_id}, plan={plan_name}")
# 1. Проверка активной подписки
if await self.get_active_subscription(telegram_id):
return "ACTIVE_SUBSCRIPTION_EXISTS"
result = await self._initialize_user_and_plan(telegram_id, plan_name)
if isinstance(result, str):
return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS"
user, plan = result
self.logger.info(f"Пользователь и план найдены: user_id={user.telegram_id}, plan_price={plan.price}")
new_subscription = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
self.logger.error(f"Не удалось создать подписку для пользователя {telegram_id}")
return "ERROR"
updated = await self.postgres_repo.update_balance(user,-plan.price)
if updated == False:
self.logger.error(f"Не удалось обновить баланс для пользователя {telegram_id}")
return "ERROR"
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.")
return {"status": "OK", "subscription_id": str(new_subscription.id)}
except Exception as e:
self.logger.error(f"Неожиданная ошибка в buy_sub: {str(e)}")
return "ERROR"
async def _initialize_user_and_plan(self, telegram_id, plan_name):
"""
Инициализирует пользователя и план подписки.
"""
try:
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
# 2. Получаем план
plan = await self.postgres_repo.get_subscription_plan(plan_name)
if not plan:
self.logger.error(f"Тарифный план {plan_name} не найден.")
return "TARIFF_NOT_FOUND"
cost = plan.price
if user.balance < cost:
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_name}.")
# 3. Проверяем пользователя
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
return "USER_NOT_FOUND"
# 4. Проверяем баланс (только для информации)
balance_result = await self.billing_adapter.get_balance(telegram_id)
if balance_result["status"] == "error":
return "BILLING_SERVICE_ERROR"
if balance_result["balance"] < plan.price:
return "INSUFFICIENT_FUNDS"
return user, plan
# 5. СОЗДАЕМ ПОДПИСКУ (самое важное - сначала!)
new_subscription = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
return "SUBSCRIPTION_CREATION_FAILED"
# 6. ТОЛЬКО ПОСЛЕ УСПЕШНОГО СОЗДАНИЯ ПОДПИСКИ - списываем деньги
withdraw_result = await self.billing_adapter.withdraw_funds(
telegram_id,
float(plan.price),
f"Оплата подписки {plan_name}"
)
if withdraw_result["status"] == "error":
await self.postgres_repo.delete_subscription(new_subscription.id)
self.logger.error(f"Payment failed but subscription created: {new_subscription.id}")
return "PAYMENT_FAILED_AFTER_SUBSCRIPTION"
# 7. ВСЕ УСПЕШНО
self.logger.info(f"Подписка успешно создана и оплачена: {new_subscription.id}")
return {"status": "OK", "subscription_id": str(new_subscription.id)}
except Exception as e:
self.logger.error(f"Неожиданная ошибка в _initialize_user_and_plan: {str(e)}")
self.logger.error(f"Ошибка в buy_sub: {str(e)}")
return "ERROR"
async def _create_subscription_and_add_client(self, user: User, plan: Plan):

View File

@@ -68,30 +68,30 @@ class PostgresRepository:
return False
async def update_balance(self, user: User, amount: float):
"""
Обновляет баланс пользователя.
# async def update_balance(self, user: User, amount: float):
# """
# Обновляет баланс пользователя.
:param user: Объект пользователя.
:param amount: Сумма для добавления/вычитания.
:return: True, если успешно, иначе False.
"""
self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
async for session in self.session_generator():
try:
user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
if not user:
self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
return False
# Приведение amount к Decimal
user.balance += Decimal(amount)
await session.commit()
self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
return True
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
await session.rollback()
return False
# :param user: Объект пользователя.
# :param amount: Сумма для добавления/вычитания.
# :return: True, если успешно, иначе False.
# """
# self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
# async for session in self.session_generator():
# try:
# user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
# if not user:
# self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
# return False
# # Приведение amount к Decimal
# user.balance += Decimal(amount)
# await session.commit()
# self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
# return True
# except SQLAlchemyError as e:
# self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
# await session.rollback()
# return False
async def get_last_transactions(self, user_telegram_id: int, limit: int = 10):
"""
@@ -139,7 +139,35 @@ class PostgresRepository:
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}")
return None
async def delete_subscription(self, subscription_id: UUID) -> bool:
"""
Удаляет подписку по её ID.
:param subscription_id: UUID подписки для удаления
:return: True если удалено успешно, False в случае ошибки
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription).where(Subscription.id == subscription_id)
)
subscription = result.scalars().first()
if not subscription:
self.logger.warning(f"Подписка с ID {subscription_id} не найдена")
return False
await session.delete(subscription)
await session.commit()
self.logger.info(f"Подписка с ID {subscription_id} успешно удалена")
return True
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при удалении подписки {subscription_id}: {e}")
await session.rollback()
return False
async def add_record(self, record):
"""
Добавляет запись в базу данных.