servii-backend/firebase_manager.py
Charles Le Maux 9ad1bdaff9 [+] Added super-secure-token authentication
Now uses firebase complete tokens; it verifies the signature and integrity of the token, the origin of the project the token was issued for, the secret key, and finally the sub before verifying the account.
2024-09-20 15:54:41 +02:00

230 lines
8.1 KiB
Python

import time
from datetime import datetime
from typing import Union
import firebase_admin
import jwt
from firebase_admin import auth, credentials, firestore
from google.api_core.exceptions import Aborted, DataLoss, NotFound, OutOfRange, PermissionDenied, ResourceExhausted
from google.cloud.firestore_v1 import FieldFilter, DocumentReference
import file_manager
from generic_executor import mc_manager
cred = credentials.Certificate('secrets/servii.json')
app = firebase_admin.initialize_app(cred)
firestore_database = firestore.client()
def get_user_from_id(user_id):
return auth.get_user(user_id)
def verify_jwt_token(token):
try:
decoded_token = auth.verify_id_token(token, app=app, check_revoked=True)
user_id = decoded_token.get('sub')
return True, user_id
except jwt.ExpiredSignatureError:
return False, None
except jwt.InvalidTokenError:
return False, None
except Exception as e:
log_exception_to_firestore(e, None, {"user": None, "error-step": "auth"})
return False, None
def fetch_port() -> Union[int, None]:
servers_ref = firestore_database.collection("users")
query = servers_ref.order_by("port", direction="DESCENDING").limit(1)
highest_port_doc = next(query.stream(), None)
if highest_port_doc:
return highest_port_doc.get('port')
return None
def user_field_exists(user_id: str) -> bool:
try:
doc = firestore_database.collection('users').document(user_id).get()
if doc.exists:
return True
return False
except NotFound:
return False
def server_name_taken(user_id: str, server_name: str) -> bool:
servers = firestore_database.collection('users').document(user_id).collection('servers')
query = servers.where(filter=FieldFilter(field_path='name', op_string='==', value=server_name))
for _ in query.stream():
return True
return False
def get_user_field(user_id, field_name):
user_doc_ref = firestore_database.collection('users').document(user_id)
user_doc = user_doc_ref.get()
if user_doc.exists:
field_value = user_doc.to_dict().get(field_name)
return field_value
else:
return None
def get_server_port(user_id: str) -> Union[int, None]:
try:
servers_ref = firestore_database.collection('users').document(user_id)
server_doc = servers_ref.get()
port = server_doc.get('port')
return port if port else None
except Exception as e:
file_manager.log_error(type(e).__name__, str(e))
return None
def create_firestore(user_id: str, data: dict) -> bool:
doc_ref = firestore_database.collection('users').document(user_id)
try:
doc_ref.create(data)
return True
except (NotFound, PermissionDenied, Aborted, ResourceExhausted,
OutOfRange, DataLoss, TypeError, Exception, ValueError) as e:
log_exception_to_firestore(e, user_id, data)
return False
def update_firestore(user_id: str, data: dict) -> bool:
doc_ref = firestore_database.collection('users').document(user_id)
try:
doc_ref.update(data)
return True
except (NotFound, PermissionDenied, Aborted, ResourceExhausted,
OutOfRange, DataLoss, TypeError, Exception, ValueError) as e:
log_exception_to_firestore(e, user_id, data)
return False
def set_firestore(user_id: str, data: dict) -> bool:
doc_ref = firestore_database.collection('users').document(user_id)
try:
doc_ref.set(data)
return True
except (NotFound, PermissionDenied, Aborted, ResourceExhausted,
OutOfRange, DataLoss, TypeError, Exception, ValueError) as e:
log_exception_to_firestore(e, user_id, data)
return False
def create_server(user_id: str, server_name: str, version: str, port: int, framework: str = "paper"):
servers_ref = firestore_database.collection('users').document(user_id).collection('servers')
server_doc_ref = servers_ref.document(server_name)
server_doc_ref.set(
{'name': server_name,
'port': port,
'running': False,
'version': version,
'framework': framework,
"difficulty": "easy",
"gamemode": "survival",
"forceGamemode": "false",
"hardcore": "false",
"generateStructures": "true",
"motd": "A Minecraft Server",
"pvp": "true",
"onlineMode": "true",
"maxPlayers": "20",
"enableCommandBlock": "true"})
def get_server_field(user_id: str, name: str, field_name: str) -> Union[str, bool, None]:
try:
server_doc: DocumentReference = firestore_database.document(f'users/{user_id}/servers/{name}')
doc = server_doc.get()
if doc.exists:
field = doc.to_dict().get(field_name)
return field
else:
return None
except (NotFound, PermissionDenied, Aborted, ResourceExhausted,
OutOfRange, DataLoss, TypeError, Exception, ValueError) as e:
log_exception_to_firestore(e, user_id, {"function": "get_server_field", "name": name, "field": field_name})
return None
def delete_server(user_id: str, server_name: str):
user_ref = firestore_database.collection('users').document(user_id)
servers_ref = user_ref.collection('servers')
server_doc_ref = servers_ref.document(server_name)
server_doc_ref.delete()
def delete_user(user_id: str):
user_ref = firestore_database.collection('users').document(user_id)
user_ref.delete()
def update_server_running_state(user_id: str, server_name: str, state: bool):
server_ref = firestore_database.collection('users').document(user_id).collection('servers').document(server_name)
if server_ref.get().get('running') != state:
server_ref.update({'running': state})
def update_server_property(user_id: str, server_name: str, prop: str, value: str):
server_ref = firestore_database.collection('users').document(user_id).collection('servers').document(server_name)
prop = file_manager.kebab_to_camel_case(prop)
server_ref.update({prop: value})
def set_servers_not_running():
users_ref = firestore_database.collection(u'users')
docs = users_ref.stream()
for doc in docs:
user_id = doc.id
servers_ref = firestore_database.collection(u'users').document(user_id).collection(u'servers')
server_docs = servers_ref.stream()
for server_doc in server_docs:
server_id = server_doc.id
(firestore_database.collection(u'users')
.document(user_id).collection(u'servers').document(server_id).update({u'running': False}))
print("All servers have been set to not running.")
def log_exception_to_firestore(exception: Exception = None, user_id: str = None, data: dict = None):
new_id: str = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z%z')
log_entry = {
'exception_name': str(type(exception).__name__),
'exception': str(exception) if exception else 'No exception',
'user_id': str(user_id) if user_id else 'No user_id',
'data': str(data) if data else 'No data provided',
}
try:
firestore_database.collection('firebase.logs').document(new_id).create(log_entry)
print("Log entry added successfully.")
except Exception as e:
print(f"Failed to add log entry: {e}")
def close_idle_server(user_id: Union[str, None], name: Union[str, None], port: int,
server_stamp: Union[float, None]) -> None:
if any(var is None for var in (user_id, name, server_stamp)):
return
try:
mc_manager.stop_server(port)
update_server_running_state(user_id, name, False)
now: float = time.time()
elapsed_seconds = now - server_stamp
hours = int(elapsed_seconds // 3600)
minutes = int((elapsed_seconds % 3600) // 60)
seconds = round(elapsed_seconds % 60, 2)
file_manager.log_action(user_id, name, "ServerStop (idle)",
f"Suspended inactive server activities. Uptime : {hours}h {minutes}m {seconds}s")
except Exception as e:
print(e, user_id, name, server_stamp)
if __name__ == "__main__":
pass