222 lines
10 KiB
Python
222 lines
10 KiB
Python
from datetime import datetime
|
||
from uuid import UUID
|
||
from sqlalchemy.future import select
|
||
from sqlalchemy.exc import SQLAlchemyError
|
||
from decimal import Decimal
|
||
from sqlalchemy import asc, desc, update
|
||
from instance.model import TicketMessage, User, Subscription, Transaction,SupportTicket
|
||
|
||
|
||
class PostgresRepository:
|
||
def __init__(self, session_generator, logger):
|
||
self.session_generator = session_generator
|
||
self.logger = logger
|
||
|
||
async def create_user(self, telegram_id: str, username: str, referrer_id: str):
|
||
"""
|
||
Создаёт нового пользователя в PostgreSQL.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
new_user = User(telegram_id=telegram_id, username=username, referrer_id=referrer_id)
|
||
session.add(new_user)
|
||
await session.commit()
|
||
return new_user
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при создании пользователя {telegram_id}: {e}")
|
||
await session.rollback()
|
||
return None
|
||
|
||
async def get_active_subscription(self, telegram_id: str):
|
||
"""
|
||
Проверяет наличие активной подписки у пользователя.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
result = await session.execute(
|
||
select(Subscription)
|
||
.join(User, Subscription.user_id == User.id)
|
||
.where(User.telegram_id == telegram_id, Subscription.expiry_date > datetime.utcnow())
|
||
)
|
||
result= result.scalars().first()
|
||
self.logger.info(f"Пользователь с id {telegram_id}, проверен и имеет {result}")
|
||
return result
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка проверки активной подписки для пользователя {telegram_id}: {e}")
|
||
return None
|
||
|
||
async def get_user_by_telegram_id(self, telegram_id: str):
|
||
"""
|
||
Возвращает пользователя по Telegram ID.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
|
||
if result:
|
||
return result.scalars().first()
|
||
return False
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при получении пользователя {telegram_id}: {e}")
|
||
return False
|
||
|
||
|
||
async def update_balance(self, user: User, amount: float):
|
||
"""
|
||
Обновляет баланс пользователя.
|
||
|
||
:param user: Объект пользователя.
|
||
:param amount: Сумма для добавления/вычитания.
|
||
:return: True, если успешно, иначе False.
|
||
"""
|
||
self.logger.info(f"Обновление баланса пользователя: id={user.id}, current_balance={user.balance}, amount={amount}")
|
||
async for session in self.session_generator():
|
||
try:
|
||
user = await session.get(User, user.id) # Загружаем пользователя в той же сессии
|
||
if not user:
|
||
self.logger.warning(f"Пользователь с ID {user.id} не найден.")
|
||
return False
|
||
# Приведение amount к Decimal
|
||
user.balance += Decimal(amount)
|
||
await session.commit()
|
||
self.logger.info(f"Баланс пользователя id={user.id} успешно обновлен: new_balance={user.balance}")
|
||
return True
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при обновлении баланса пользователя id={user.id}: {e}")
|
||
await session.rollback()
|
||
return False
|
||
|
||
async def get_last_transactions(self, user_id: UUID, limit: int = 10):
|
||
"""
|
||
Возвращает последние транзакции пользователя.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
result = await session.execute(
|
||
select(Transaction)
|
||
.where(Transaction.user_id == user_id)
|
||
.order_by(desc(Transaction.created_at))
|
||
.limit(limit)
|
||
)
|
||
return result.scalars().all()
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка получения транзакций пользователя {user_id}: {e}")
|
||
return None
|
||
|
||
async def get_last_subscription_by_user_id(self, user_id: UUID, limit: int = 1):
|
||
"""
|
||
Извлекает последнюю подписку пользователя на основании user_id.
|
||
|
||
:param user_id: UUID пользователя.
|
||
:return: Объект Subscription или None.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
result = await session.execute(
|
||
select(Subscription)
|
||
.where(Subscription.user_id == user_id)
|
||
.order_by(desc(Subscription.created_at))
|
||
.limit(limit)
|
||
)
|
||
subscriptions = list(result.scalars())
|
||
self.logger.info(f"Найдены такие подписки: {subscriptions}")
|
||
return subscriptions
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при получении подписки для пользователя {user_id}: {e}")
|
||
return None
|
||
|
||
async def add_record(self, record):
|
||
"""
|
||
Добавляет запись в базу данных.
|
||
|
||
:param record: Объект записи.
|
||
:return: Запись или None в случае ошибки.
|
||
"""
|
||
async for session in self.session_generator():
|
||
try:
|
||
session.add(record)
|
||
await session.commit()
|
||
return record
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при добавлении записи: {record}: {e}")
|
||
await session.rollback()
|
||
return None
|
||
|
||
async def list_active_tickets(self, user_id: UUID):
|
||
async for session in self.session_generator():
|
||
try:
|
||
tickets = await session.execute(
|
||
select(SupportTicket)
|
||
.where(
|
||
SupportTicket.user_id == user_id,
|
||
SupportTicket.status.in_([status.upper() for status in ["pending", "open"]])
|
||
)
|
||
)
|
||
result = list(tickets.scalars().all())
|
||
self.logger.info(f"Получены активные тикеты: {result}")
|
||
return result
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Произошла ошибка при поиске активных тикетов: {e}")
|
||
return None
|
||
|
||
async def get_ticket(self, ticket_id):
|
||
async for session in self.session_generator():
|
||
try:
|
||
ticket = await session.execute(
|
||
select(SupportTicket)
|
||
.where(SupportTicket.id == ticket_id)
|
||
)
|
||
result = ticket.scalars().first()
|
||
self.logger.info(f"Получен тикет {ticket_id}.")
|
||
if result:
|
||
serialized_result = {
|
||
"id": result.id,
|
||
"user_id": result.user_id,
|
||
"subject": result.subject,
|
||
"message": result.message,
|
||
"status": result.status,
|
||
"created_at": result.created_at.isoformat(),
|
||
"updated_at": result.updated_at.isoformat(),
|
||
}
|
||
return serialized_result
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Произошла ошибка при поиске тикета {ticket_id}.")
|
||
return None
|
||
|
||
async def get_ticket_messages(self, ticket_id: int):
|
||
async for session in self.session_generator():
|
||
try:
|
||
# Выполняем запрос для получения сообщений, сортированных по дате
|
||
result = await session.execute(
|
||
select(TicketMessage)
|
||
.where(TicketMessage.ticket_id == ticket_id)
|
||
.order_by(asc(TicketMessage.created_at))
|
||
)
|
||
messages = result.scalars().all()
|
||
self.logger.info(f"Получены сообщения для тикета {ticket_id}, {messages}")
|
||
self.logger.info(messages)
|
||
return messages
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка при получении сообщений для тикета {ticket_id}: {e}")
|
||
return []
|
||
|
||
async def set_new_status(self,ticket_id: int, new_status: str):
|
||
async for session in self.session_generator():
|
||
try:
|
||
# Выполняем обновление тикета
|
||
result = await session.execute(
|
||
update(SupportTicket)
|
||
.where(SupportTicket.id == ticket_id)
|
||
.values(status=new_status)
|
||
.execution_options(synchronize_session="fetch")
|
||
)
|
||
if result.rowcount == 0:
|
||
raise ValueError(f"Тикет с ID {ticket_id} не найден.")
|
||
|
||
await session.commit()
|
||
self.logger.info(f"Статус тикета {ticket_id} обновлён на '{new_status}'.")
|
||
return "OK"
|
||
except SQLAlchemyError as e:
|
||
self.logger.error(f"Ошибка обновления статуса тикета {ticket_id}: {e}")
|
||
await session.rollback()
|
||
return "ERROR"
|