import shlex import subprocess import time from typing import Union, Callable import mcipc.query import mcipc.query.client import file_manager import firebase_manager class MinecraftServerManager: allowed_properties: list[str] = ["difficulty", "gamemode", "force-gamemode", "hardcore", "generate-structures", "motd", "pvp", "online-mode", "max-players", "enable-command-block"] def __init__(self): self.servers: dict = {} self.servers_count: int = 0 self.cooldowns = {} self.offline_ports: list[int] = [] def start_server(self, server_directory: str, port: int, user_id: str, server_name: str, version: str, jar_file: str = 'server.jar', memory_size: str ='4G', modded: bool = False) -> Union[int, None]: if port in self.servers: return None reg_flags: str = ("-XX:+UseG1GC -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=200" " -XX:+UnlockExperimentalVMOptions -XX:+DisableExplicitGC -XX:+AlwaysPreTouch" " -XX:G1NewSizePercent=30 -XX:G1MaxNewSizePercent=40 -XX:G1HeapRegionSize=8M" " -XX:G1ReservePercent=20 -XX:G1HeapWastePercent=5 -XX:G1MixedGCCountTarget=4" " -XX:InitiatingHeapOccupancyPercent=15 -XX:G1MixedGCLiveThresholdPercent=90" " -XX:G1RSetUpdatingPauseTimePercent=5 -XX:SurvivorRatio=32 -XX:+PerfDisableSharedMem" " -XX:MaxTenuringThreshold=1 -Daikars.new.flags=true" " -Dusing.aikars.flags=https://mcutils.com") java: str = f"/usr/lib/jvm/java-{get_sdk_version(version)}-openjdk-amd64/bin/java" command = f"{java} -Xmx{memory_size} {reg_flags} -jar {jar_file} --nogui" if modded: command = "./start.sh" process = subprocess.Popen(shlex.split(command), cwd=server_directory, stdin=subprocess.PIPE) #TODO: Track process behavior and stderr, while excepting Advanced Terminal features not to be avail. self.servers_count = len(self.servers) + 1 self.servers[port] = { 'process': process, 'directory': server_directory, 'port': port, 'user_id': user_id, 'name': server_name, 'version': version, 'time': time.time(), } return port def execute_server_command(self, port, command) -> bool: if port not in self.servers: return False process = self.servers[port]['process'] process.stdin.write(command.encode('utf-8') + b'\n') process.stdin.flush() return True def stop_server(self, port: int) -> bool: if port not in self.servers: return False process = self.servers[port]['process'] process.communicate(input=b"stop\n") del self.servers[port] if port in self.offline_ports: self.offline_ports.remove(port) return True def stop_server_forcefully(self, port) -> bool: if port not in self.servers: return False process = self.servers[port]['process'] process.terminate() del self.servers[port] return True def get_servers(self): return self.servers.values() def get_server_id_by_port(self, port): for server_id, server_info in self.servers.items(): if server_info['port'] == port: return server_id return None def get_online_players(self, port) -> int: if not self.servers[port]: return 0 try: with mcipc.query.Client('127.0.0.1', port) as client: stats: int = client.stats(full=False).num_players return stats except Exception as e: file_manager.log_error(type(e).__name__, str(e)) return 0 def set_cooldown(self, user_id): expiry_timestamp = time.time() + 30 self.cooldowns[user_id] = expiry_timestamp def has_cooldown(self, user_id): expiry_timestamp = self.cooldowns.get(user_id) if expiry_timestamp is None: return False current_time = time.time() if current_time < expiry_timestamp: return True else: del self.cooldowns[user_id] return False def check_servers_idle(self) -> None: servers: dict = self.servers.copy() for port, server_info in servers.items(): online_players = self.get_online_players(port) if online_players == 0: if port not in self.offline_ports: self.offline_ports.append(port) else: user_id = server_info.get("user_id", None) name = server_info.get("name", None) server_stamp = server_info.get("time", None) firebase_manager.close_idle_server( user_id=user_id, name=name, server_stamp=server_stamp, port=port, ) else: if port in self.offline_ports: self.offline_ports.remove(port) return def version_range_checker(*ranges: tuple[str, str, str], default_sdk: str = "java") -> Callable[[str], str]: def compare_mc_versions(version1: str): def inner(version2: str) -> int: v1_parts: list[int] = list(map(int, version1.split('.'))) v2_parts: list[int] = list(map(int, version2.split('.'))) for v1, v2 in zip(v1_parts, v2_parts): if v1 > v2: return 1 elif v1 < v2: return -1 if len(v1_parts) > len(v2_parts): return 1 elif len(v1_parts) < len(v2_parts): return -1 return 0 return inner def check_range(lower_bound: str, upper_bound: str, version: str) -> bool: return compare_mc_versions(lower_bound)(version) <= 0 <= compare_mc_versions(upper_bound)(version) def check_version(version: str) -> str: for lower_bound, upper_bound, sdk in ranges: if check_range(lower_bound, upper_bound, version): return sdk return default_sdk return check_version get_sdk_version: Callable[[str], str] = version_range_checker(("1.0", "1.13.2", "8"), ("1.14.1", "1.16.5", "11"), ("1.17.1", "1.18.1", "17"), default_sdk="21") if __name__ == "__main__": pass