Удалил мусор и сертификаты вроде добавил

This commit is contained in:
2024-12-19 23:01:48 +03:00
parent 197e291bdc
commit 33b502fd86
15 changed files with 447 additions and 407 deletions

View File

@@ -1,107 +1,187 @@
import requests
import aiohttp
import uuid
import string
import secrets
import json
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import json
import base64
from datetime import datetime
from dateutil.relativedelta import relativedelta
def generate_uuid():
return str(uuid.uuid4())
class PanelInteraction:
def __init__(self, base_url, login_data, logger_):
def __init__(self, base_url, login_data, logger, certificate=None, is_encoded=True):
"""
Initialize the PanelInteraction class.
:param base_url: Base URL for the panel.
:param login_data: Login data (username/password or token).
:param logger: Logger for debugging.
:param certificate: Certificate content (Base64-encoded or raw string).
:param is_encoded: Indicates whether the certificate is Base64-encoded.
"""
self.base_url = base_url
self.login_data = login_data
self.logger = logger_
self.session_id = self.login()
if self.session_id:
self.headers = {
'Accept': 'application/json',
'Cookie': f'3x-ui={self.session_id}',
'Content-Type': 'application/json'
}
else:
raise ValueError("Login failed, session_id is None")
self.logger = logger
self.cert_content = self._decode_certificate(certificate, is_encoded)
self.session_id = None # Session ID will be initialized lazily
self.headers = None
def login(self):
login_url = self.base_url + "/login"
def _decode_certificate(self, certificate, is_encoded):
"""
Decode the provided certificate content.
:param certificate: Certificate content (Base64-encoded or raw string).
:param is_encoded: Indicates whether the certificate is Base64-encoded.
:return: Decoded certificate content as bytes.
"""
if not certificate:
self.logger.error("No certificate provided.")
raise ValueError("Certificate is required.")
return base64.b64decode(certificate) if is_encoded else certificate.encode()
async def _ensure_logged_in(self):
"""
Ensure the session ID is available for authenticated requests.
"""
if not self.session_id:
self.session_id = await self.login()
if self.session_id:
self.headers = {
'Accept': 'application/json',
'Cookie': f'3x-ui={self.session_id}',
'Content-Type': 'application/json'
}
else:
raise ValueError("Unable to log in and retrieve session ID.")
async def login(self):
"""
Perform login to the panel.
:return: Session ID or None.
"""
login_url = f"{self.base_url}/login"
self.logger.info(f"Attempting to login at: {login_url}")
try:
response = requests.post(login_url, data=self.login_data, verify=False, timeout=10)
response.raise_for_status()
session_id = response.cookies.get("3x-ui")
if session_id:
return session_id
else:
self.logger.error(f"Login failed: {response.status_code}")
self.logger.debug(f"Response content: {response.text}")
return None
except requests.RequestException as e:
self.logger.error(f"Login request failed: {e}")
return None
def getInboundInfo(self, inboundId):
url = f"{self.base_url}/panel/api/inbounds/get/{inboundId}"
try:
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
response.raise_for_status()
if response:
return response.json()
else:
self.logger.error(f"Failed to get inbound info: {response.status_code}")
self.logger.debug("Response:", response.text)
async with aiohttp.ClientSession() as session:
try:
async with session.post(
login_url, data=self.login_data, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
session_id = response.cookies.get("3x-ui")
if session_id:
return session_id.value
else:
self.logger.error("Login failed: No session ID received.")
return None
else:
self.logger.error(f"Login failed: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Login request failed: {e}")
return None
except requests.RequestException as e:
self.logger.error(f"Get inbound request failed: {e}")
def get_client_traffic(self, email):
async def get_inbound_info(self, inbound_id):
"""
Fetch inbound information by ID.
:param inbound_id: ID of the inbound.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/get/{inbound_id}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=self.headers, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
return await response.json()
else:
self.logger.error(f"Failed to get inbound info: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Get inbound info request failed: {e}")
return None
async def get_client_traffic(self, email):
"""
Fetch traffic information for a specific client.
:param email: Client's email.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/getClientTraffics/{email}"
try:
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
response.raise_for_status()
if response:
return response.json()
else:
self.logger.error(f"Failed to get client traffic: {response.status_code}")
self.logger.debug("Response:", response.text)
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=self.headers, ssl=self.cert_content, timeout=10
) as response:
if response.status == 200:
return await response.json()
else:
self.logger.error(f"Failed to get client traffic: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Get client traffic request failed: {e}")
return None
except requests.RequestException as e:
self.loggin.error(f"Get client request failed: {e}")
def update_client_expiry(self, client_uuid, new_expiry_time, client_email):
async def update_client_expiry(self, client_uuid, new_expiry_time, client_email):
"""
Update the expiry date of a specific client.
:param client_uuid: UUID of the client.
:param new_expiry_time: New expiry date in ISO format.
:param client_email: Client's email.
:return: None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/updateClient"
update_data = {
"id": 1,
"settings": json.dumps({
"id": 1,
"settings": {
"clients": [
{
"id": client_uuid,
"alterId": 0,
"email": client_email,
"email": client_email,
"limitIp": 2,
"totalGB": 0,
"totalGB": 0,
"expiryTime": new_expiry_time,
"enable": True,
"tgId": "",
"subId": ""
}
]
})
}
}
try:
response = requests.post(url, headers=self.headers, json=update_data, verify=False)
response.raise_for_status()
if response:
self.logger.debug("Client expiry time updated successfully.")
else:
self.logger.error(f"Failed to update client: {response.status_code}, {response.text}")
except requests.RequestException as e:
self.logger.error(f"Update client request failed: {e}")
def add_client(self, inbound_id, expiry_date,email):
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, headers=self.headers, json=update_data, ssl=self.cert_content
) as response:
if response.status == 200:
self.logger.info("Client expiry updated successfully.")
else:
self.logger.error(f"Failed to update client expiry: {response.status}")
except aiohttp.ClientError as e:
self.logger.error(f"Update client expiry request failed: {e}")
async def add_client(self, inbound_id, expiry_date, email):
"""
Add a new client to an inbound.
:param inbound_id: ID of the inbound.
:param expiry_date: Expiry date in ISO format.
:param email: Client's email.
:return: JSON response or None.
"""
await self._ensure_logged_in()
url = f"{self.base_url}/panel/api/inbounds/addClient"
client_info = {
"clients": [
@@ -111,7 +191,7 @@ class PanelInteraction:
"email": email,
"limitIp": 2,
"totalGB": 0,
"flow":"xtls-rprx-vision",
"flow": "xtls-rprx-vision",
"expiryTime": expiry_date,
"enable": True,
"tgId": "",
@@ -121,15 +201,19 @@ class PanelInteraction:
}
payload = {
"id": inbound_id,
"settings": json.dumps(client_info)
"settings": client_info
}
try:
response = requests.post(url, headers=self.headers, json=payload, verify=False)
if response.status_code == 200:
return response.json()
else:
self.logger.error(f"Failed to add client: {response.status_code}")
self.logger.debug("Response:", response.text)
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, headers=self.headers, json=payload, ssl=self.cert_content
) as response:
if response.status == 200:
return await response.status
else:
self.logger.error(f"Failed to add client: {response.status}")
return None
except aiohttp.ClientError as e:
self.logger.error(f"Add client request failed: {e}")
return None
finally:
self.logger.info("Finished attempting to add client.")