from datetime import datetime from typing import Optional from uuid import UUID from sqlalchemy.future import select from sqlalchemy.exc import SQLAlchemyError from decimal import Decimal from sqlalchemy import asc, desc, update from sqlalchemy.orm import joinedload from instance.model import Referral, User, Subscription, Transaction, Plan class PostgresRepository: def __init__(self, session_generator, logger): self.session_generator = session_generator self.logger = logger async def create_user(self, telegram_id: int, username: str, invited_by: Optional[int]= None): """ Создаёт нового пользователя в PostgreSQL. """ async for session in self.session_generator(): try: new_user = User(telegram_id=telegram_id, username=username, invited_by=invited_by) session.add(new_user) await session.commit() return new_user except SQLAlchemyError as e: self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}") await session.rollback() return None async def get_active_subscription(self, telegram_id: int): """ Проверяет наличие активной подписки у пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .join(User, Subscription.user_id == User.telegram_id) .where(User.telegram_id == telegram_id, Subscription.end_date > datetime.utcnow()) ) subscription = result.scalars().first() if subscription: # Отделяем объект от сессии session.expunge(subscription) self.logger.info(f"Пользователь с id {telegram_id}, проверен и имеет подписку ID: {subscription.id}") else: self.logger.info(f"Пользователь с id {telegram_id}, проверен и имеет None") return subscription except SQLAlchemyError as e: self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}") return None async def get_user_by_telegram_id(self, telegram_id: int): """ Возвращает пользователя по Telegram ID. """ async for session in self.session_generator(): try: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) if result: return result.scalars().first() return False except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}") return False # async def update_balance(self, user: User, amount: float): # """ # Обновляет баланс пользователя. # :param user: Объект пользователя. # :param amount: Сумма для добавления/вычитания. # :return: True, если успешно, иначе False. # """ # self.logger.info(f"Обновление баланса пользователя: id={user.telegram_id}, current_balance={user.balance}, amount={amount}") # async for session in self.session_generator(): # try: # user = await session.get(User, user.telegram_id) # Загружаем пользователя в той же сессии # if not user: # self.logger.warning(f"Пользователь с ID {user.telegram_id} не найден.") # return False # # Приведение amount к Decimal # user.balance += Decimal(amount) # await session.commit() # self.logger.info(f"Баланс пользователя id={user.telegram_id} успешно обновлен: new_balance={user.balance}") # return True # except SQLAlchemyError as e: # self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.telegram_id}: {e}") # await session.rollback() # return False async def get_last_transactions(self, user_telegram_id: int, limit: int = 10): """ Возвращает последние транзакции пользователя. """ async for session in self.session_generator(): try: result = await session.execute( select(Transaction) .where(Transaction.user_id == user_telegram_id) .order_by(desc(Transaction.created_at)) .limit(limit) ) return result.scalars().all() except SQLAlchemyError as e: self.logger.error(f"Ошибка получения транзакций пользователя {user_telegram_id}: {e}") return None async def get_last_subscription_by_user_id(self, user_telegram_id: int): """ Извлекает последнюю подписку пользователя на основании user_id. :param user_id: UUID пользователя. :return: Объект Subscription или None. """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription) .where(Subscription.user_id == user_telegram_id) .order_by(desc(Subscription.created_at)) .options(joinedload(Subscription.plan)) .limit(1) ) subscription = result.scalars().first() self.logger.info(f"Найдены такие подписки: {subscription}") if subscription: session.expunge(subscription) self.logger.info(f"Найдена подписка ID: {subscription.id} для пользователя {user_telegram_id}") return subscription else: return None except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении подписки для пользователя {user_telegram_id}: {e}") return None async def delete_subscription(self, subscription_id: UUID) -> bool: """ Удаляет подписку по её ID. :param subscription_id: UUID подписки для удаления :return: True если удалено успешно, False в случае ошибки """ async for session in self.session_generator(): try: result = await session.execute( select(Subscription).where(Subscription.id == subscription_id) ) subscription = result.scalars().first() if not subscription: self.logger.warning(f"Подписка с ID {subscription_id} не найдена") return False await session.delete(subscription) await session.commit() self.logger.info(f"Подписка с ID {subscription_id} успешно удалена") return True except SQLAlchemyError as e: self.logger.error(f"Ошибка при удалении подписки {subscription_id}: {e}") await session.rollback() return False async def add_record(self, record): """ Добавляет запись в базу данных. :param record: Объект записи. :return: Запись или None в случае ошибки. """ async for session in self.session_generator(): try: session.add(record) await session.commit() return record except SQLAlchemyError as e: self.logger.error(f"Ошибка при добавлении записи: {record}: {e}") await session.rollback() raise Exception async def add_referral(self, referrer_id: int, referral_id: int): """ Добавление реферальной связи между пользователями. """ async for session in self.session_generator(): try: # Проверить, существует ли уже такая реферальная связь existing_referral = await session.execute( select(Referral) .where( (Referral.inviter_id == referrer_id) & (Referral.invited_id == referral_id) ) ) existing_referral = existing_referral.scalars().first() if existing_referral: raise ValueError("Referral relationship already exists") # Проверить, что пользователи существуют referrer = await session.execute( select(User).where(User.telegram_id == referrer_id) ) referrer = referrer.scalars().first() if not referrer: raise ValueError("Referrer not found") referral_user = await session.execute( select(User).where(User.telegram_id == referral_id) ) referral_user = referral_user.scalars().first() if not referral_user: raise ValueError("Referral user not found") # Проверить, что пользователь не приглашает сам себя if referrer_id == referral_id: raise ValueError("User cannot refer themselves") # Создать новую реферальную связь new_referral = Referral( inviter_id=referrer_id, invited_id=referral_id ) session.add(new_referral) await session.commit() self.logger.info(f"Реферальная связь создана: {referrer_id} -> {referral_id}") except Exception as e: await session.rollback() self.logger.error(f"Ошибка при добавлении реферальной связи: {str(e)}") raise async def get_subscription_plan(self, plan_name:str) -> Plan | None: """ Поиск плана для подписки :param plan_name: Объект записи. :return: Запись или None в случае ошибки. """ async for session in self.session_generator(): try: result = await session.execute( select(Plan) .where(Plan.name == plan_name) ) return result.scalar_one_or_none() except SQLAlchemyError as e: self.logger.error(f"Ошибка при поиске плана: {plan_name}: {e}") await session.rollback() return None async def get_plan_by_id(self, plan_id: int) -> Plan | None: """ Поиск плана для подписки :param plan_name: Объект записи. :return: Запись или None в случае ошибки. """ async for session in self.session_generator(): try: result = await session.execute( select(Plan) .where(Plan.id == plan_id) ) return result.scalar_one_or_none() except SQLAlchemyError as e: self.logger.error(f"Ошибка при поиске плана: {plan_id}: {e}") await session.rollback() return None async def get_referrals_count(self, user_telegram_id: int) -> int: """ Получить количество рефералов пользователя. :param user_telegram_id: Telegram ID пользователя-пригласителя :return: Количество рефералов """ async for session in self.session_generator(): try: result = await session.execute( select(Referral) .where(Referral.inviter_id == user_telegram_id) ) referrals = result.scalars().all() return len(referrals) except SQLAlchemyError as e: self.logger.error(f"Ошибка при получении количества рефералов для пользователя {user_telegram_id}: {e}") return 0