Compare commits

8 Commits

21 changed files with 588 additions and 423 deletions

View File

@@ -43,7 +43,7 @@ async def buy_subscription(
logger.info(f"Результат buy_sub: {result}")
if result == "ERROR" or result is None:
if result == "ERROR":
raise HTTPException(status_code=500, detail="Internal server error")
elif result == "INSUFFICIENT_FUNDS":
raise HTTPException(status_code=400, detail="INSUFFICIENT_FUNDS")
@@ -51,12 +51,24 @@ async def buy_subscription(
raise HTTPException(status_code=400, detail="TARIFF_NOT_FOUND")
elif result == "ACTIVE_SUBSCRIPTION_EXISTS":
raise HTTPException(status_code=400, detail="ACTIVE_SUBSCRIPTION_EXISTS")
elif result == "USER_NOT_FOUND":
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
elif result == "SUBSCRIPTION_CREATION_FAILED":
raise HTTPException(status_code=500, detail="Failed to create subscription")
elif result == "PAYMENT_FAILED_AFTER_SUBSCRIPTION":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
elif result == "SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED":
raise HTTPException(status_code=402, detail="SUBSCRIPTION_CREATED_BUT_PAYMENT_FAILED")
# Если успешно, генерируем URI
if isinstance(result, dict) and result.get('status') == 'OK':
uri_result = await database_manager.generate_uri(request_data.telegram_id)
logger.info(f"Результат генерации URI: {uri_result}")
return {"status": "success", "subscription_id": result.get('subscription_id'), "uri": uri_result[0]}
return {
"status": "success",
"subscription_id": result.get('subscription_id'),
"uri": uri_result[0] if uri_result and isinstance(uri_result, list) else uri_result
}
else:
return {"status": "success", "message": "Subscription created"}
@@ -68,6 +80,7 @@ async def buy_subscription(
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
# Эндпоинт для получения последней подписки
@router.get("/subscription/{telegram_id}/last", response_model=SubscriptionResponse)
async def last_subscription(telegram_id: int, database_manager: DatabaseManager = Depends(get_database_manager)):
@@ -116,24 +129,39 @@ async def get_subscriptions(telegram_id: int, database_manager: DatabaseManager
logger.info(f"Получение подписок для пользователя: {telegram_id}")
try:
# Получаем подписки без ограничений или с указанным лимитом
subscriptions = await database_manager.get_last_subscriptions(telegram_id=telegram_id)
subscription = await database_manager.get_last_subscriptions(telegram_id=telegram_id)
if not subscriptions:
if not subscription:
logger.warning(f"Подписки для пользователя {telegram_id} не найдены")
raise HTTPException(status_code=404, detail="No subscriptions found")
plan = await database_manager.get_plan_by_id(subscription.plan_id)
if not plan:
logger.warning(f"Тариф для подписки {subscription.id} не найден")
plan_name = "Unknown"
else:
plan_name = plan.name
# Формируем список подписок для ответа
return [
{
"id": sub.id,
"plan": sub.plan,
"vpn_server_id": sub.vpn_server_id,
"expiry_date": sub.expiry_date.isoformat(),
"created_at": sub.created_at.isoformat(),
"updated_at": sub.updated_at.isoformat(),
}
for sub in subscriptions
]
# return [
# {
# "id": sub.id,
# "plan": sub.plan,
# "vpn_server_id": sub.vpn_server_id,
# "expiry_date": sub.expiry_date.isoformat(),
# "created_at": sub.created_at.isoformat(),
# "updated_at": sub.updated_at.isoformat(),
# }
# for sub in subscription
# ]
return [{
"id": str(subscription.id), # Конвертируем UUID в строку
"user_id": subscription.user_id,
"plan_name": plan_name,
"vpn_server_id": subscription.vpn_server_id,
"end_date": subscription.end_date.isoformat(),
"status": subscription.status.value, # Извлекаем значение enum
"start_date": subscription.start_date.isoformat(),
"created_at": subscription.created_at.isoformat()
}]
except SQLAlchemyError as e:
logger.error(f"Ошибка базы данных при получении подписок для пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail="Database error")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
import sys
from fastapi import APIRouter, Depends, HTTPException
from fastapi.exceptions import HTTPException
@@ -98,37 +99,6 @@ async def get_user(
logger.exception(f"Неожиданная ошибка при получении пользователя с telegram_id {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/user/{telegram_id}/balance/{amount}", summary="Обновить баланс")
async def update_balance(
telegram_id: int,
amount: float,
db_manager: DatabaseManager = Depends(get_database_manager)
):
"""
Обновляет баланс пользователя.
"""
logger.info(f"Получен запрос на обновление баланса: telegram_id={telegram_id}, amount={amount}")
try:
result = await db_manager.update_balance(telegram_id, amount)
if result == "ERROR":
logger.error(f"Ошибка обновления баланса для пользователя {telegram_id}")
raise HTTPException(status_code=500, detail="Failed to update balance")
logger.info(f"Баланс пользователя {telegram_id} успешно обновлен на {amount}")
return {"message": "Balance updated successfully"}
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных при обновлении баланса пользователя {telegram_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при обновлении баланса пользователя {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}/transactions", summary="Последние транзакции пользователя")
async def last_transactions(
telegram_id: int,
@@ -149,15 +119,30 @@ async def last_transactions(
response = []
logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}")
return response
logger.debug(f"Транзакции для {telegram_id}: {transactions}")
response = [
{
response = []
for tx in transactions:
# Проверяем, что транзакция существует и имеет created_at
if tx is None:
continue
if tx.status.value == "pending":
continue
# Обрабатываем created_at (может быть None)
created_at_str = None
if tx.created_at:
created_at_str = tx.created_at.isoformat()
else:
created_at_str = datetime.utcnow().isoformat() # или любое значение по умолчанию
response.append({
"id": tx.id,
"amount": tx.amount,
"created_at": tx.created_at.isoformat(),
"type": tx.type,
} for tx in transactions
]
"amount": float(tx.amount) if tx.amount else 0.0,
"created_at": created_at_str,
"type": tx.type.value if hasattr(tx.type, 'value') else str(tx.type),
})
logger.info(f"Формирование ответа для пользователя {telegram_id}: {response}")
return response
@@ -174,7 +159,6 @@ async def last_transactions(
raise HTTPException(status_code=500, detail=str(e))
@router.post("/user/{referrer_id}/add_referral", summary="Обновить баланс")
async def add_referal(
referrer_id: int,
@@ -201,4 +185,27 @@ async def add_referal(
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при добавлении рефералу {referrer_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/user/{telegram_id}/referrals", summary="Получить колво рефералов")
async def get_referral_count(
telegram_id: int,
db_manager: DatabaseManager = Depends(get_database_manager)
):
try:
result = await db_manager.get_referrals_count(telegram_id)
if result == "ERROR":
logger.error(f"Ошибка получения рефералов для пользователя: {telegram_id}")
raise HTTPException(status_code=500, detail="Failed to get referrals")
logger.info(f"Количество приглашённых {result}")
return {"invited_count": result}
except HTTPException as http_ex:
logger.warning(f"HTTP ошибка: {http_ex.detail}")
raise http_ex
except SQLAlchemyError as db_ex:
logger.error(f"Ошибка базы данных при получении рефералов пользователя {telegram_id}: {db_ex}")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.exception(f"Неожиданная ошибка при получении рефералов {telegram_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,80 @@
import aiohttp
import logging
from typing import Dict, Any, Optional
from enum import Enum
class BillingErrorCode(Enum):
INSUFFICIENT_FUNDS = "INSUFFICIENT_FUNDS"
USER_NOT_FOUND = "USER_NOT_FOUND"
PAYMENT_FAILED = "PAYMENT_FAILED"
SERVICE_UNAVAILABLE = "SERVICE_UNAVAILABLE"
class BillingAdapter:
def __init__(self, base_url: str):
self.base_url = base_url
self.logger = logging.getLogger(__name__)
self.session = None
async def get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
async def withdraw_funds(self, user_id: int, amount: float, description: str = "") -> Dict[str, Any]:
"""
Списание средств через биллинг-сервис
"""
try:
session = await self.get_session()
payload = {
"user_id": user_id,
"amount": amount,
"description": description or f"Payment for subscription"
}
self.logger.info(f"Withdrawing {amount} from user {user_id}")
async with session.post(f"{self.base_url}/billing/payments/withdraw", json=payload) as response:
if response.status == 200:
result = await response.json()
if result.get("success"):
return {"status": "success"}
else:
error = result.get("error", "WITHDRAWAL_FAILED")
self.logger.error(f"Withdrawal failed: {error}")
return {"status": "error", "code": error}
else:
self.logger.error(f"Billing service error: {response.status}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except aiohttp.ClientError as e:
self.logger.error(f"Billing service connection error: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Unexpected error in withdraw_funds: {str(e)}")
return {"status": "error", "code": "PAYMENT_FAILED"}
async def get_balance(self, user_id: int) -> Dict[str, Any]:
"""
Получение баланса пользователя
"""
try:
session = await self.get_session()
async with session.get(f"{self.base_url}/billing/balance/{user_id}") as response:
if response.status == 200:
result = await response.json()
return {"status": "success", "balance": result.get("balance", 0)}
elif response.status == 404:
return {"status": "error", "code": "USER_NOT_FOUND"}
else:
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
except Exception as e:
self.logger.error(f"Error getting balance: {str(e)}")
return {"status": "error", "code": "SERVICE_UNAVAILABLE"}
async def close(self):
if self.session:
await self.session.close()

View File

@@ -1,6 +1,7 @@
from decimal import Decimal
import json
from instance.model import User, Subscription, Transaction
from app.services.billing_service import BillingAdapter
from app.services.marzban import MarzbanService, MarzbanUser
from .postgres_rep import PostgresRepository
from instance.model import Transaction,TransactionType, Plan
@@ -14,13 +15,14 @@ from uuid import UUID
class DatabaseManager:
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url):
def __init__(self, session_generator,marzban_username,marzban_password,marzban_url,billing_base_url):
"""
Инициализация с асинхронным генератором сессий (например, get_postgres_session).
"""
self.logger = logging.getLogger(__name__)
self.postgres_repo = PostgresRepository(session_generator, self.logger)
self.marzban_service = MarzbanService(marzban_url,marzban_username,marzban_password)
self.billing_adapter = BillingAdapter(billing_base_url)
async def create_user(self, telegram_id: int, invented_by: Optional[int]= None):
"""
@@ -59,24 +61,36 @@ class DatabaseManager:
"""
return await self.postgres_repo.get_last_transactions(telegram_id, limit)
async def update_balance(self, telegram_id: int, amount: float):
async def get_referrals_count(self,telegram_id: int) -> int:
"""
Обновляет баланс пользователя и добавляет транзакцию.
Docstring for get_referrals_count
:param self: Description
:param telegram_id: Description
:type telegram_id: int
:return: Description
:rtype: int
"""
self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
return await self.postgres_repo.get_referrals_count(telegram_id)
# async def update_balance(self, telegram_id: int, amount: float):
# """
# Обновляет баланс пользователя и добавляет транзакцию.
# """
# self.logger.info(f"Попытка обновления баланса: telegram_id={telegram_id}, amount={amount}")
# user = await self.get_user_by_telegram_id(telegram_id)
# if not user:
# self.logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден.")
# return "ERROR"
updated = await self.postgres_repo.update_balance(user, amount)
if not updated:
self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
return "ERROR"
# updated = await self.postgres_repo.update_balance(user, amount)
# if not updated:
# self.logger.error(f"Не удалось обновить баланс пользователя {telegram_id}")
# return "ERROR"
self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
await self.add_transaction(user.telegram_id, amount)
return "OK"
# self.logger.info(f"Баланс пользователя {telegram_id} обновлен на {amount}, добавление транзакции")
# await self.add_transaction(user.telegram_id, amount)
# return "OK"
async def get_active_subscription(self, telegram_id: int):
@@ -105,67 +119,59 @@ class DatabaseManager:
Возвращает список последних подписок.
"""
return await self.postgres_repo.get_last_subscription_by_user_id(telegram_id)
async def buy_sub(self, telegram_id: int, plan_name: str):
"""
Покупает подписку.
Покупка подписки: сначала создаем подписку, потом списываем деньги
"""
try:
self.logger.info(f"Начало покупки подписки для пользователя {telegram_id}, план: {plan_name}")
active_subscription = await self.get_active_subscription(telegram_id)
self.logger.info(f"Активная подписка: {active_subscription}")
if active_subscription:
self.logger.error(f"Пользователь {telegram_id} уже имеет активную подписку.")
self.logger.info(f"Покупка подписки: user={telegram_id}, plan={plan_name}")
# 1. Проверка активной подписки
if await self.get_active_subscription(telegram_id):
return "ACTIVE_SUBSCRIPTION_EXISTS"
result = await self._initialize_user_and_plan(telegram_id, plan_name)
if isinstance(result, str):
return result # Возвращает "ERROR", "TARIFF_NOT_FOUND" или "INSUFFICIENT_FUNDS"
user, plan = result
self.logger.info(f"Пользователь и план найдены: user_id={user.telegram_id}, plan_price={plan.price}")
new_subscription = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
self.logger.error(f"Не удалось создать подписку для пользователя {telegram_id}")
return "ERROR"
updated = await self.postgres_repo.update_balance(user,-plan.price)
if updated == False:
self.logger.error(f"Не удалось обновить баланс для пользователя {telegram_id}")
return "ERROR"
self.logger.info(f"Подписка успешно оформлена для пользователя {telegram_id}.")
return {"status": "OK", "subscription_id": str(new_subscription.id)}
except Exception as e:
self.logger.error(f"Неожиданная ошибка в buy_sub: {str(e)}")
return "ERROR"
async def _initialize_user_and_plan(self, telegram_id, plan_name):
"""
Инициализирует пользователя и план подписки.
"""
try:
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
self.logger.error(f"Пользователь с Telegram ID {telegram_id} не найден.")
return "ERROR"
# 2. Получаем план
plan = await self.postgres_repo.get_subscription_plan(plan_name)
if not plan:
self.logger.error(f"Тарифный план {plan_name} не найден.")
return "TARIFF_NOT_FOUND"
cost = plan.price
if user.balance < cost:
self.logger.error(f"Недостаточно средств у пользователя {telegram_id} для покупки плана {plan_name}.")
# 3. Проверяем пользователя
user = await self.get_user_by_telegram_id(telegram_id)
if not user:
return "USER_NOT_FOUND"
# 4. Проверяем баланс (только для информации)
balance_result = await self.billing_adapter.get_balance(telegram_id)
if balance_result["status"] == "error":
return "BILLING_SERVICE_ERROR"
if balance_result["balance"] < plan.price:
return "INSUFFICIENT_FUNDS"
return user, plan
# 5. СОЗДАЕМ ПОДПИСКУ (самое важное - сначала!)
new_subscription = await self._create_subscription_and_add_client(user, plan)
if not new_subscription:
return "SUBSCRIPTION_CREATION_FAILED"
# 6. ТОЛЬКО ПОСЛЕ УСПЕШНОГО СОЗДАНИЯ ПОДПИСКИ - списываем деньги
withdraw_result = await self.billing_adapter.withdraw_funds(
telegram_id,
float(plan.price),
f"Оплата подписки {plan_name}"
)
if withdraw_result["status"] == "error":
await self.postgres_repo.delete_subscription(new_subscription.id)
self.logger.error(f"Payment failed but subscription created: {new_subscription.id}")
return "PAYMENT_FAILED_AFTER_SUBSCRIPTION"
# 7. ВСЕ УСПЕШНО
self.logger.info(f"Подписка успешно создана и оплачена: {new_subscription.id}")
return {"status": "OK", "subscription_id": str(new_subscription.id)}
except Exception as e:
self.logger.error(f"Неожиданная ошибка в _initialize_user_and_plan: {str(e)}")
self.logger.error(f"Ошибка в buy_sub: {str(e)}")
return "ERROR"
async def _create_subscription_and_add_client(self, user: User, plan: Plan):

View File

@@ -68,30 +68,30 @@ class PostgresRepository:
return False
async def update_balance(self, user: User, amount: float):
"""
Обновляет баланс пользователя.
# async def update_balance(self, user: User, amount: float):
# """
# Обновляет баланс пользователя.
:param user: Объект пользователя.
:param amount: Сумма для добавления/вычитания.
:return: True, если успешно, иначе False.
"""
self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
async for session in self.session_generator():
try:
user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
if not user:
self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
return False
# Приведение amount к Decimal
user.balance += Decimal(amount)
await session.commit()
self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
return True
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
await session.rollback()
return False
# :param user: Объект пользователя.
# :param amount: Сумма для добавления/вычитания.
# :return: True, если успешно, иначе False.
# """
# self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}")
# async for session in self.session_generator():
# try:
# user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии
# if not user:
# self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.")
# return False
# # Приведение amount к Decimal
# user.balance += Decimal(amount)
# await session.commit()
# self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}")
# return True
# except SQLAlchemyError as e:
# self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}")
# await session.rollback()
# return False
async def get_last_transactions(self, user_telegram_id: int, limit: int = 10):
"""
@@ -123,6 +123,7 @@ class PostgresRepository:
select(Subscription)
.where(Subscription.user_id == user_telegram_id)
.order_by(desc(Subscription.created_at))
.options(joinedload(Subscription.plan))
.limit(1)
)
subscription = result.scalars().first()
@@ -139,7 +140,35 @@ class PostgresRepository:
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}")
return None
async def delete_subscription(self, subscription_id: UUID) -> bool:
"""
Удаляет подписку по её ID.
:param subscription_id: UUID подписки для удаления
:return: True если удалено успешно, False в случае ошибки
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Subscription).where(Subscription.id == subscription_id)
)
subscription = result.scalars().first()
if not subscription:
self.logger.warning(f"Подписка с ID {subscription_id} не найдена")
return False
await session.delete(subscription)
await session.commit()
self.logger.info(f"Подписка с ID {subscription_id} успешно удалена")
return True
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при удалении подписки {subscription_id}: {e}")
await session.rollback()
return False
async def add_record(self, record):
"""
Добавляет запись в базу данных.
@@ -249,4 +278,24 @@ class PostgresRepository:
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при поиске плана: {plan_id}: {e}")
await session.rollback()
return None
return None
async def get_referrals_count(self, user_telegram_id: int) -> int:
"""
Получить количество рефералов пользователя.
:param user_telegram_id: Telegram ID пользователя-пригласителя
:return: Количество рефералов
"""
async for session in self.session_generator():
try:
result = await session.execute(
select(Referral)
.where(Referral.inviter_id == user_telegram_id)
)
referrals = result.scalars().all()
return len(referrals)
except SQLAlchemyError as e:
self.logger.error(f"Ошибка при получении количества рефералов для пользователя {user_telegram_id}: {e}")
return 0

View File

@@ -9,8 +9,10 @@ try:
BASE_URL_MARZBAN = os.getenv("BASE_URL_MARZBAN")
USERNAME_MARZBA = os.getenv('USERNAME_MARZBAN')
PASSWORD_MARZBAN = os.getenv('PASSWORD_MARZBAN')
BILLING_URL = os.getenv('BILLING_URL')
# Создание движка для PostgreSQL
if POSTGRES_DSN is None or BASE_URL_MARZBAN is None or USERNAME_MARZBA is None or PASSWORD_MARZBAN is None:
if POSTGRES_DSN is None or BASE_URL_MARZBAN is None or USERNAME_MARZBA is None or PASSWORD_MARZBAN is None or BILLING_URL is None:
raise Exception
postgres_engine = create_async_engine(POSTGRES_DSN, echo=False)
except Exception as e:
@@ -51,4 +53,4 @@ def get_database_manager() -> DatabaseManager:
"""
Функция-зависимость для получения экземпляра DatabaseManager.
"""
return DatabaseManager(get_postgres_session, USERNAME_MARZBA,PASSWORD_MARZBAN,BASE_URL_MARZBAN)
return DatabaseManager(get_postgres_session, USERNAME_MARZBA,PASSWORD_MARZBAN,BASE_URL_MARZBAN,BILLING_URL)

View File

@@ -29,7 +29,7 @@ class User(Base):
telegram_id = Column(BigInteger, primary_key=True)
username = Column(String(255))
balance = Column(Numeric(10, 2), default=0.0)
ref_code = Column(String(32), unique=True) # Реферальный код пользователя
ref_code = Column(String(7), unique=True) # Реферальный код пользователя
invited_by = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

View File

View File

@@ -1,128 +0,0 @@
import argparse
from datetime import datetime
import json
import base64
from pymongo import MongoClient
def connect_to_mongo(uri, db_name):
"""Подключение к MongoDB."""
client = MongoClient(uri)
db = client[db_name]
return db
def load_raw_json(json_path):
"""Загружает сырые JSON-данные из файла."""
with open(json_path, "r", encoding="utf-8") as f:
return json.loads(f.read())
def encode_file(file_path):
"""Читает файл и кодирует его в Base64."""
with open(file_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def transform_data(raw_data):
"""Преобразует исходные сырые данные в целевую структуру."""
try:
settings = json.loads(raw_data["obj"]["settings"])
stream_settings = json.loads(raw_data["obj"]["streamSettings"])
sniffing_settings = json.loads(raw_data["obj"]["sniffing"])
transformed = {
"server": {
"name": raw_data["obj"].get("remark", "Unknown"),
"ip": "45.82.255.110", # Замените на актуальные данные
"port": "2053",
"secretKey": "Hd8OsqN5Jh", # Замените на актуальные данные
"login": "nc1450nP", # Замените на актуальные данные
"password": "KmajQOuf" # Замените на актуальные данные
},
"clients": [
{
"email": client["email"],
"inboundId": raw_data["obj"].get("id"),
"id": client["id"],
"flow": client.get("flow", ""),
"limits": {
"ipLimit": client.get("limitIp", 0),
"reset": client.get("reset", 0),
"totalGB": client.get("totalGB", 0)
},
"subscriptions": {
"subId": client.get("subId", ""),
"tgId": client.get("tgId", "")
}
} for client in settings["clients"]
],
"connection": {
"destination": stream_settings["realitySettings"].get("dest", ""),
"serverNames": stream_settings["realitySettings"].get("serverNames", []),
"security": stream_settings.get("security", ""),
"publicKey": stream_settings["realitySettings"]["settings"].get("publicKey", ""),
"fingerprint": stream_settings["realitySettings"]["settings"].get("fingerprint", ""),
"shortIds": stream_settings["realitySettings"].get("shortIds", []),
"tcpSettings": {
"acceptProxyProtocol": stream_settings["tcpSettings"].get("acceptProxyProtocol", False),
"headerType": stream_settings["tcpSettings"]["header"].get("type", "none")
},
"sniffing": {
"enabled": sniffing_settings.get("enabled", False),
"destOverride": sniffing_settings.get("destOverride", [])
}
}
}
return transformed
except KeyError as e:
raise ValueError(f"Ошибка преобразования данных: отсутствует ключ {e}")
def insert_certificate(data, cert_path, cert_location):
"""Добавляет сертификат в указанное место внутри структуры JSON."""
# Читаем и кодируем сертификат
certificate_data = encode_file(cert_path)
# Разбиваем путь на вложенные ключи
keys = cert_location.split(".")
target = data
for key in keys[:-1]:
if key not in target:
target[key] = {} # Создаем вложенные ключи, если их нет
target = target[key]
target[keys[-1]] = {
"data": certificate_data,
"uploaded_at": datetime.utcnow()
}
def insert_data(db, collection_name, data):
"""Вставляет данные в указанную коллекцию MongoDB."""
collection = db[collection_name]
collection.insert_one(data)
print(f"Данные успешно вставлены в коллекцию '{collection_name}'.")
def main():
parser = argparse.ArgumentParser(description="Insert raw JSON data into MongoDB with certificate")
parser.add_argument("--mongo-uri", default="mongodb://root:itOj4CE2miKR@mongodb:27017", help="MongoDB URI")
parser.add_argument("--db-name", default="MongoDBSub&Ser", help="MongoDB database name")
parser.add_argument("--collection", default="servers", help="Collection name")
parser.add_argument("--json-path", required=True, help="Path to the JSON file with raw data")
parser.add_argument("--cert-path", help="Path to the certificate file (.crt)")
parser.add_argument("--cert-location", default='server.certificate', help="Path inside JSON structure to store certificate (e.g., 'server.certificate')")
args = parser.parse_args()
# Подключение к MongoDB
db = connect_to_mongo(args.mongo_uri, args.db_name)
# Загрузка сырых данных из JSON-файла
raw_data = load_raw_json(args.json_path)
# Преобразование данных в нужную структуру
transformed_data = transform_data(raw_data)
# Вставка сертификата в структуру данных (если путь к сертификату указан)
if args.cert_path:
insert_certificate(transformed_data, args.cert_path, args.cert_location)
# Вставка данных в MongoDB
insert_data(db, args.collection, transformed_data)
if __name__ == "__main__":
main()

View File

@@ -1,74 +0,0 @@
import argparse
from datetime import datetime
import json
import glob
from pymongo import MongoClient
def connect_to_mongo(uri, db_name):
"""Подключение к MongoDB."""
client = MongoClient(uri)
db = client[db_name]
return db
def load_all_json_from_folder(folder_path):
"""Загружает все JSON-файлы из указанной папки."""
all_data = []
for file_path in glob.glob(f"{folder_path}/*.json"):
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
all_data.append(data)
except Exception as e:
print(f"Ошибка при чтении файла {file_path}: {e}")
return all_data
def fetch_all_documents(mongo_uri, db_name, collection_name):
"""Выводит все элементы из указанной коллекции MongoDB."""
try:
client = MongoClient(mongo_uri)
db = client[db_name]
collection = db[collection_name]
documents = collection.find()
print(f"Содержимое коллекции '{collection_name}':")
for doc in documents:
print(doc)
except Exception as e:
print(f"Ошибка при получении данных: {e}")
finally:
client.close()
def insert_data(db, collection_name, data):
"""Вставляет данные в указанную коллекцию MongoDB."""
collection = db[collection_name]
for i in data:
collection.insert_one(i)
print(f"Данные '{i}'")
print(f"Данные успешно вставлены в коллекцию '{collection_name}'.")
def main():
parser = argparse.ArgumentParser(description="Insert JSON data into MongoDB with certificate")
parser.add_argument("--mongo-uri",default="mongodb://root:itOj4CE2miKR@mongodb:27017" ,required=True, help="MongoDB URI")
parser.add_argument("--db-name",default="MongoDBSub&Ser" ,required=True, help="MongoDB database name")
parser.add_argument("--collection",default="plans", required=True, help="Collection name")
parser.add_argument("--json-path", required=True, help="Path to the JSON file with data")
args = parser.parse_args()
db = connect_to_mongo(args.mongo_uri, args.db_name)
data = load_all_json_from_folder(args.json_path)
insert_data(db, args.collection, data)
fetch_all_documents(args.mongo_uri, args.db_name,args.collection)
if __name__ == "__main__":
main()

270
tests/add_plans.py Normal file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
"""
Автономный скрипт для инициализации тарифных планов в PostgreSQL.
Использует данные подключения из docker-compose.yml
"""
import asyncio
import argparse
import sys
from typing import List, Dict
from sqlalchemy import Column, Integer, String, Numeric, Text, delete, insert
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from decimal import Decimal
Base = declarative_base()
class Plan(Base):
"""Модель тарифного плана"""
__tablename__ = 'plans'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
price = Column(Numeric(10, 2), nullable=False)
duration_days = Column(Integer, nullable=False)
description = Column(Text, nullable=True)
# Данные из вашего docker-compose.yml
DEFAULT_CONFIG = {
'host': 'localhost',
'port': 5432,
'database': 'postgresql',
'user': 'AH3J9GSPBYOP',
'password': 'uPS9?y~mcu2',
'url': 'postgresql+asyncpg://AH3J9GSPBYOP:uPS9?y~mcu2@localhost:5432/postgresql'
}
PLANS_DATA = [
{'name': 'Lark_Standart_1', 'price': Decimal('200.00'), 'duration_days': 30},
{'name': 'Lark_Pro_1', 'price': Decimal('400.00'), 'duration_days': 30},
{'name': 'Lark_Family_1', 'price': Decimal('700.00'), 'duration_days': 30},
{'name': 'Lark_Standart_6', 'price': Decimal('1200.00'), 'duration_days': 180},
{'name': 'Lark_Standart_12', 'price': Decimal('2400.00'), 'duration_days': 360},
{'name': 'Lark_Pro_6', 'price': Decimal('2000.00'), 'duration_days': 180},
{'name': 'Lark_Pro_12', 'price': Decimal('4800.00'), 'duration_days': 360},
{'name': 'Lark_Family_6', 'price': Decimal('4200.00'), 'duration_days': 180},
{'name': 'Lark_Family_12', 'price': Decimal('8400.00'), 'duration_days': 360},
]
def print_banner():
"""Печатает баннер скрипта"""
print("=" * 60)
print("🚀 ИНИЦИАЛИЗАЦИЯ ТАРИФНЫХ ПЛАНОВ В БАЗЕ ДАННЫХ")
print("=" * 60)
print()
def create_db_url(config: dict) -> str:
"""Создает URL для подключения к базе данных"""
if config.get('url'):
return config['url']
return f"postgresql+asyncpg://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
async def check_connection(engine) -> bool:
"""Проверяет подключение к базе данных"""
try:
async with engine.connect() as conn:
result = await conn.execute("SELECT version()")
version = result.scalar()
print(f"✅ Подключено к PostgreSQL: {version.split(',')[0]}")
return True
except Exception as e:
print(f"❌ Ошибка подключения к базе данных: {e}")
return False
async def get_existing_plans(session) -> List:
"""Получает существующие тарифные планы"""
result = await session.execute(
"SELECT id, name, price, duration_days FROM plans ORDER BY price"
)
return result.fetchall()
async def clear_table(session, table_name: str = 'plans') -> bool:
"""Очищает указанную таблицу"""
try:
await session.execute(delete(Plan))
await session.commit()
print(f"✅ Таблица '{table_name}' очищена")
return True
except Exception as e:
print(f"❌ Ошибка при очистке таблицы: {e}")
await session.rollback()
return False
async def add_plans_to_db(session, plans_data: List[Dict]) -> int:
"""Добавляет тарифные планы в базу данных"""
try:
added_count = 0
for plan in plans_data:
await session.execute(
insert(Plan).values(**plan)
)
added_count += 1
await session.commit()
return added_count
except Exception as e:
await session.rollback()
raise e
async def print_plans_table(plans: List) -> None:
"""Выводит таблицу с тарифными планами"""
if not plans:
print("📭 Таблица 'plans' пуста")
return
print(f"\n📊 Текущие тарифные планы ({len(plans)} шт.):")
print("-" * 70)
print(f"{'ID':<5} {'Название':<25} {'Цена (руб.)':<15} {'Дней':<10}")
print("-" * 70)
for plan in plans:
print(f"{plan[0]:<5} {plan[1]:<25} {plan[2]:<15} {plan[3]:<10}")
print("-" * 70)
# Подсчет статистики
total_price = sum(float(p[2]) for p in plans)
avg_price = total_price / len(plans) if plans else 0
print(f"💰 Общая сумма всех тарифов: {total_price:.2f} руб.")
print(f"📈 Средняя цена тарифа: {avg_price:.2f} руб.")
print(f"📅 Всего предложений: {len(plans)}")
async def main(config: dict, clear_existing: bool = True, dry_run: bool = False):
"""Основная функция скрипта"""
print_banner()
# Создаем URL для подключения
db_url = create_db_url(config)
print(f"📡 Параметры подключения:")
print(f" Хост: {config['host']}:{config['port']}")
print(f" База данных: {config['database']}")
print(f" Пользователь: {config['user']}")
print(f" {'🚨 РЕЖИМ ТЕСТА (dry-run)' if dry_run else ''}")
print()
try:
# Подключаемся к базе данных
print("🔄 Подключение к базе данных...")
engine = create_async_engine(db_url, echo=False)
# Проверяем подключение
if not await check_connection(engine):
print("\nНе удалось подключиться к базе данных")
return False
# Создаем фабрику сессий
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with AsyncSessionLocal() as session:
# Получаем текущие тарифы
print("\n🔍 Проверяем существующие тарифы...")
existing_plans = await get_existing_plans(session)
if existing_plans:
await print_plans_table(existing_plans)
if clear_existing and not dry_run:
print("\n⚠️ ВНИМАНИЕ: Будут удалены все существующие тарифы!")
confirm = input("Продолжить? (y/N): ")
if confirm.lower() != 'y':
print("❌ Операция отменена пользователем")
return False
# Очищаем таблицу
await clear_table(session)
elif dry_run:
print("\n⚠️ DRY-RUN: Существующие тарифы НЕ будут удалены")
else:
print("📭 Таблица 'plans' пуста, создаем новые тарифы...")
# Добавляем новые тарифы
if not dry_run:
print(f"\n Добавляем {len(PLANS_DATA)} тарифных планов...")
added_count = await add_plans_to_db(session, PLANS_DATA)
print(f"✅ Успешно добавлено {added_count} тарифов")
else:
print(f"\n⚠️ DRY-RUN: Планируется добавить {len(PLANS_DATA)} тарифов:")
for i, plan in enumerate(PLANS_DATA, 1):
print(f" {i}. {plan['name']} - {plan['price']} руб. ({plan['duration_days']} дней)")
# Показываем финальный результат
print("\n🎯 ФИНАЛЬНЫЙ РЕЗУЛЬТАТ:")
final_plans = await get_existing_plans(session)
await print_plans_table(final_plans)
await engine.dispose()
print("\n✅ Скрипт успешно выполнен!")
return True
except Exception as e:
print(f"\n❌ Критическая ошибка: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Инициализация тарифных планов в базе данных',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Примеры использования:
%(prog)s # Использует настройки по умолчанию
%(prog)s --no-clear # Не очищать существующие тарифы
%(prog)s --dry-run # Только показать что будет сделано
%(prog)s --host 192.168.1.100 # Указать другой хост
%(prog)s --url "postgresql://..." # Указать полный URL
"""
)
parser.add_argument('--host', help='Хост базы данных', default=DEFAULT_CONFIG['host'])
parser.add_argument('--port', type=int, help='Порт базы данных', default=DEFAULT_CONFIG['port'])
parser.add_argument('--database', help='Имя базы данных', default=DEFAULT_CONFIG['database'])
parser.add_argument('--user', help='Имя пользователя', default=DEFAULT_CONFIG['user'])
parser.add_argument('--password', help='Пароль', default=DEFAULT_CONFIG['password'])
parser.add_argument('--url', help='Полный URL подключения (игнорирует остальные параметры)')
parser.add_argument('--no-clear', action='store_true', help='Не очищать существующие тарифы')
parser.add_argument('--dry-run', action='store_true', help='Только показать что будет сделано')
args = parser.parse_args()
# Формируем конфигурацию
config = DEFAULT_CONFIG.copy()
if args.url:
config['url'] = args.url
else:
config.update({
'host': args.host,
'port': args.port,
'database': args.database,
'user': args.user,
'password': args.password,
'url': None # Будет сгенерирован автоматически
})
# Запускаем скрипт
success = asyncio.run(main(
config=config,
clear_existing=not args.no_clear,
dry_run=args.dry_run
))
sys.exit(0 if success else 1)

View File

@@ -1,26 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIEYzCCA0ugAwIBAgIUEni/Go2t3/FXT7CGBMSnrlDzTKwwDQYJKoZIhvcNAQEL
BQAwgcAxCzAJBgNVBAYTAlVBMRswGQYDVQQIDBJSZXB1YmxpYyBvZiBDcmltZWEx
EzARBgNVBAcMClNpbWZlcm9wb2wxFzAVBgNVBAoMDkxhcmsgQ28gU3lzdGVtMSUw
IwYDVQQLDBxMYXJrIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MRgwFgYDVQQDDA9M
YXJrIFRydXN0ZWQgQ0ExJTAjBgkqhkiG9w0BCQEWFmxhcmtjb3N5c3RlbUBwcm90
b24ubWUwHhcNMjQxMjI3MTQ1NzQ2WhcNMzQxMjI1MTQ1NzQ2WjCBwDELMAkGA1UE
BhMCVUExGzAZBgNVBAgMElJlcHVibGljIG9mIENyaW1lYTETMBEGA1UEBwwKU2lt
ZmVyb3BvbDEXMBUGA1UECgwOTGFyayBDbyBTeXN0ZW0xJTAjBgNVBAsMHExhcmsg
Q2VydGlmaWNhdGlvbiBBdXRob3JpdHkxGDAWBgNVBAMMD0xhcmsgVHJ1c3RlZCBD
QTElMCMGCSqGSIb3DQEJARYWbGFya2Nvc3lzdGVtQHByb3Rvbi5tZTCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBAOzb2ibfe4Arrf5O3d15kObBJQkxcSGi
fzrtYj68/y0ZyNV3BTvp+gCdlmo+WqOrdgD4LCOod0585S2MLCxjvVIcuA+DIq6z
gxZvf6V1FRKjHO3s18HhUX6nl8LYe6bOveqHAiDf9TZ+8grJXYpGD2tybAofXkL5
8dmn5Jh10DTV2EBHwutET2hoBqSorop/Ro/zawYPOlMZuGXP4Txs/erUmNCzGm+b
AYw6qjBm+o9RG2AWzKVBI06/kFKA5vq7ATcEs2U5bdINy/U1u2vc1R08YuvTpPCh
2Q0uBn49T+WhiF9CpAYBoMj51Am22NqKWsc617ZFkl1OO3mWd4+mgocCAwEAAaNT
MFEwHQYDVR0OBBYEFAXCcmOWdaInuJLeY/5CRfdzb49+MB8GA1UdIwQYMBaAFAXC
cmOWdaInuJLeY/5CRfdzb49+MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL
BQADggEBADoyGv2Gdem/zrHCEy43WlXo9uFqKCX/Z/Rlr+V9OmRodZx83j2B0xIB
QdjuEP/EaxEtuGL98TIln7u8PX/FKApbZAk9zxrn0JNQ6bAVpsLBK1i+w3aw2XlN
p6qmFoc66Z8B1OUiGHrWczw0cV4rr8XoGwD5KS/jXYyuT+JTFBdsYXmUXqwqcwHY
N4qXRKh8FtqTgvjb/TpETMr7bnEHkn0vUwMwKwRe4TB1VwFIAaJeh7DPnrchy5xQ
EpS2DIQoO+ZoOaQYIkFT/8c7zpN79fy5uuVfW4XL8OS7sbZkzsl2YJDtO5zCEDNx
CJeEKQYXpCXRi+n3RvsIedshrnmqZcg=
-----END CERTIFICATE-----

View File

@@ -1 +0,0 @@
{"success":true,"msg":"","obj":{"id":1,"up":301130896430,"down":4057274949955,"total":0,"remark":"vlv","enable":true,"expiryTime":0,"clientStats":null,"listen":"","port":443,"protocol":"vless","settings":"{\n \"clients\": [\n {\n \"email\": \"j8oajwd3\",\n \"enable\": true,\n \"expiryTime\": 0,\n \"flow\": \"xtls-rprx-vision\",\n \"id\": \"a31d71a4-6afd-4f36-96f6-860691b52873\",\n \"limitIp\": 2,\n \"reset\": 0,\n \"subId\": \"ox2awiqwduryuqnz\",\n \"tgId\": 1342351277,\n \"totalGB\": 0\n },\n {\n \"email\": \"cvvbqpm2\",\n \"enable\": true,\n \"expiryTime\": 0,\n \"flow\": \"xtls-rprx-vision\",\n \"id\": \"b6882942-d69d-4d5e-be9a-168ac89b20b6\",\n \"limitIp\": 1,\n \"reset\": 0,\n \"subId\": \"jk289x00uf7vbr9x\",\n \"tgId\": 123144325,\n \"totalGB\": 0\n },\n {\n \"email\": \"k15vx82w\",\n \"enable\": true,\n \"expiryTime\": 0,\n \"flow\": \"\",\n \"id\": \"3c88e5bb-53ba-443d-9d68-a09f5037030c\",\n \"limitIp\": 0,\n \"reset\": 0,\n \"subId\": \"5ffz56ofveepep1t\",\n \"tgId\": 5364765066,\n \"totalGB\": 0\n },\n {\n \"email\": \"gm5x10tr\",\n \"enable\": true,\n \"expiryTime\": 0,\n \"flow\": \"xtls-rprx-vision\",\n \"id\": \"c0b9ff6c-4c48-4d75-8ca0-c13a2686fa5d\",\n \"limitIp\": 0,\n \"reset\": 0,\n \"subId\": \"132pioffqi6gwhw6\",\n \"tgId\": \"\",\n \"totalGB\": 0\n }\n ],\n \"decryption\": \"none\",\n \"fallbacks\": []\n}","streamSettings":"{\n \"network\": \"tcp\",\n \"security\": \"reality\",\n \"externalProxy\": [],\n \"realitySettings\": {\n \"show\": false,\n \"xver\": 0,\n \"dest\": \"google.com:443\",\n \"serverNames\": [\n \"google.com\",\n \"www.google.com\"\n ],\n \"privateKey\": \"gKsDFmRn0vyLMUdYdk0ZU_LVyrQh7zMl4r-9s0nNFCk\",\n \"minClient\": \"\",\n \"maxClient\": \"\",\n \"maxTimediff\": 0,\n \"shortIds\": [\n \"edfaf8ab\"\n ],\n \"settings\": {\n \"publicKey\": \"Bha0eW7nfRc69CdZyF9HlmGVvtAeOJKammhwf4WShTU\",\n \"fingerprint\": \"random\",\n \"serverName\": \"\",\n \"spiderX\": \"/\"\n }\n },\n \"tcpSettings\": {\n \"acceptProxyProtocol\": false,\n \"header\": {\n \"type\": \"none\"\n }\n }\n}","tag":"inbound-443","sniffing":"{\n \"enabled\": true,\n \"destOverride\": [\n \"http\",\n \"tls\",\n \"quic\",\n \"fakedns\"\n ],\n \"metadataOnly\": false,\n \"routeOnly\": false\n}"}}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Standart_1",
"normalName": "Lark Standart",
"type": "standart",
"duration_months": 1,
"ip_limit": 1,
"price": 200
}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Standart_6",
"normalName": "Lark Standart",
"type": "standart",
"duration_months": 6,
"ip_limit": 1,
"price": 1000
}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Standart_12",
"normalName": "Lark Standart",
"type": "standart",
"duration_months": 12,
"ip_limit": 1,
"price": 2000
}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Pro_1",
"normalName": "Lark Pro",
"type": "pro",
"duration_months": 1,
"ip_limit": 5,
"price": 600
}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Pro_6",
"normalName": "Lark Pro",
"type": "pro",
"duration_months": 6,
"ip_limit": 5,
"price": 3000
}

View File

@@ -1,8 +0,0 @@
{
"name": "Lark_Pro_12",
"normalName": "Lark Pro",
"type": "pro",
"duration_months": 12,
"ip_limit": 5,
"price": 5000
}

View File

View File