Skip to content

Instantly share code, notes, and snippets.

@norinorin
Last active February 10, 2024 07:53
Show Gist options
  • Save norinorin/603d6fe866b480ca076a58ebd0f01b84 to your computer and use it in GitHub Desktop.
Save norinorin/603d6fe866b480ca076a58ebd0f01b84 to your computer and use it in GitHub Desktop.
Minecraft Bedrock Server Backup Utils
  1. i hate cgnat
  2. i hate cgnat
  3. frp cool
serverAddr = "xxx.xxx.xxx.xxx"
serverPort = 7000
[[proxies]]
name = "sshXX"
type = "tcp"
localIP = "127.0.0.1"
localPort = 22
remotePort = xxxx
[[proxies]]
name = "bedrockIPV4"
type = "udp"
localIP = "127.0.0.1"
localPort = 19132
remotePort = 19132
[[proxies]]
name = "bedrockIPV6"
type = "udp"
localIP = "127.0.0.1"
localPort = 19133
remotePort = 19133
[[proxies]]
name = "bedrockMonitor"
type = "tcp"
localPort = 50000
remotePort = 50000
import asyncio
import datetime
import logging
import os
import shutil
import signal
import sys
import typing
import uuid
from pathlib import Path
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 34567
queue: asyncio.Queue[str] = None # type: ignore
event: asyncio.Event = None # type: ignore
HANDLER_T = typing.Callable[["Bedrock"], typing.Any]
logging.basicConfig(level="DEBUG", stream=sys.stderr)
_LOGGER = logging.getLogger()
class LoggerAware:
log: logging.Logger
def __init_subclass__(cls) -> None:
cls.log = logging.getLogger(cls.__name__)
class Backup(LoggerAware):
@staticmethod
def get_dst_path():
root = Path("backups")
return root / str(uuid.uuid4()), root / datetime.datetime.now().strftime("%d-%m-%Y %H-%M-%S")
@staticmethod
def copy_and_truncate(path: str, length: int, dst_root: Path):
dst: Path = dst_root / path
src = Path("worlds") / path
os.makedirs(dst.parents[0], exist_ok=True)
shutil.copyfile(src, dst)
os.truncate(dst, int(length))
@staticmethod
def backup(entries: str | None):
temp_root, dst_root = Backup.get_dst_path()
if not entries:
Backup.log.info("Cloning ./worlds dir")
shutil.copytree("worlds", dst_root, dirs_exist_ok=True)
return
for entry in entries.split(", "):
Backup.copy_and_truncate(*entry.split(":"), dst_root=temp_root)
os.replace(temp_root, dst_root)
class Stdin:
def __init__(self):
self.task = None
@staticmethod
async def _internal_loop():
loop = asyncio.get_running_loop()
while 1:
await queue.put(await loop.run_in_executor(None, input))
async def start(self):
self.task = asyncio.create_task(Stdin._internal_loop())
async def close(self):
if self.task:
self.task.cancel()
self.task = None
class SocketServer(LoggerAware):
def __init__(self):
self.server = None
async def start(self):
self.server = await asyncio.start_server(self._handle_client, SERVER_HOST, SERVER_PORT)
async def close(self):
if self.server:
self.server.close()
await self.server.wait_closed()
@staticmethod
async def _handle_client(reader: asyncio.StreamReader, _):
SocketServer.log.debug("A client connected")
while 1:
if not (command := (await reader.readline()).decode().strip()):
SocketServer.log.debug("Client disconnected")
return
SocketServer.log.info("Received %s", command)
await queue.put(command)
class CommandHandler(LoggerAware):
def __init__(self, bedrock: "Bedrock"):
self.handlers: dict[str, ] = {}
self.log = logging.getLogger(self.__class__.__name__)
self.bedrock = bedrock
self.lock: asyncio.Lock = None # type: ignore
self.task = None
def command(self, command: str):
def wrapper(func):
self.handlers[command] = func
return func
return wrapper
async def _internal_loop(self):
self.lock = asyncio.Lock()
while 1:
input_cmd = (await queue.get()).strip()
if (handler := self.handlers.get(input_cmd)):
asyncio.create_task(self.execute_cmd(input_cmd, handler))
continue
CommandHandler.log.debug("No handler for %s is found, forwarding...", input_cmd)
await self.bedrock.stdin.put(input_cmd)
async def start(self):
self.task = asyncio.create_task(self._internal_loop())
async def close(self):
if self.task:
self.task.cancel()
self.task = None
if self.bedrock:
await self.bedrock.close()
self.bedrock = None
async def execute_cmd(self, input_cmd, handler: HANDLER_T):
await self.lock.acquire()
try:
await handler(self.bedrock)
except Exception as e:
CommandHandler.log.error("Error handling %s", input_cmd, exc_info=e)
finally:
self.lock.release()
class Bedrock(LoggerAware):
ARGS = "bedrock_server.exe" if os.name == "nt" else "LD_LIBRARY_PATH=. ./bedrock_server"
def __init__(self):
self.process: asyncio.subprocess.Process = None # type: ignore
self.stdin: asyncio.Queue[str] = None # type: ignore
self.stdout: asyncio.Queue[str] = None # type: ignore
self.ready: asyncio.Event = None # type: ignore
self._stdin_task = None
self._stdout_task = None
async def start(self):
self.stdin = asyncio.Queue()
self.stdout = asyncio.Queue()
self.ready = asyncio.Event()
self.process = await asyncio.create_subprocess_shell(Bedrock.ARGS, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
self._stdin_task = asyncio.create_task(self._handle_stdin())
self._stdout_task = asyncio.create_task(self._handle_stdout())
done, _ = await asyncio.wait([self._stdout_task, asyncio.Task(self.ready.wait())], return_when=asyncio.FIRST_COMPLETED)
done.pop().result()
async def close(self):
if self._stdin_task:
self._stdin_task.cancel()
self._stdin_task = None
if self._stdout_task:
self._stdout_task.cancel()
self._stdout_task = None
if self.process:
self.process.terminate()
await self.process.wait()
self.process = None
async def _handle_stdout(self):
line = b" "
stdout = self.process.stdout
while line:
line = (await stdout.readline()).decode().strip()
if not line:
return
if line.startswith("NO LOG FILE! - "):
line = line[15:]
self.log.info(line)
if self.ready.is_set():
await self.stdout.put(line)
continue
msg = line.split(maxsplit=3)[-1]
if msg.startswith("Server started."):
self.ready.set()
elif msg.startswith("Exiting program"):
raise RuntimeError("Exiting...")
async def _handle_stdin(self):
stdin = self.process.stdin
while not stdin.is_closing():
line = await self.stdin.get()
stdin.write((line + os.linesep).encode())
await stdin.drain()
handler = CommandHandler(Bedrock())
@handler.command("backup")
async def handle_backup(bedrock: Bedrock):
await bedrock.stdin.put("save hold")
async def _query():
while 1:
await bedrock.stdin.put("save query")
await asyncio.sleep(2)
query_task = asyncio.create_task(_query())
# this loop waits for the saving
# while the next loop grabs the files
while 1:
line = (await bedrock.stdout.get()).split(maxsplit=3)[-1]
if line.startswith("Data saved"):
query_task.cancel()
break
# the files should be next in line
# but just in case
while 1:
line = (await bedrock.stdout.get())
# the files line's with the previous line
# so it doesnt start with a bracket
if not line.startswith("["):
break
try:
Backup.backup(line)
except Exception as exc:
CommandHandler.log.error("Backup failed, resuming the saving", exc_info=exc)
else:
CommandHandler.log.info("Backup success")
finally:
await bedrock.stdin.put("save resume")
async def main():
global queue, event
queue = asyncio.Queue()
event = asyncio.Event()
_LOGGER.info("Starting server...")
bedrock = handler.bedrock
await bedrock.start()
_LOGGER.info("Starting command handler and socket server")
await (server := SocketServer()).start()
await (stdin := Stdin()).start()
await handler.start()
await event.wait()
await handler.close()
await server.close()
await stdin.close()
# seems to fix the Exception ignored in: <function BaseSubprocessTransport.__del__ bullshit
await asyncio.sleep(0.25)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, lambda *_: asyncio.get_running_loop().call_soon_threadsafe(event.set))
asyncio.run(main())
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from mcstatus import BedrockServer
MINECRAFT_HOST = "xxx.xxx.xxx.xx" # ip or hostname
MINECRAFT_PORT = 19132
BIND_TO = ("127.0.0.1", 50000)
class RequestHandler(BaseHTTPRequestHandler):
def _send_headers(self, status_code):
self.send_response(status_code)
self.send_header("Content-Type", "text/plain")
self.end_headers()
def _send_text(self, text):
self.wfile.write(text.encode())
def do_HEAD(self):
try:
status = BedrockServer(MINECRAFT_HOST, MINECRAFT_PORT).status()
except TimeoutError:
self._send_headers(503)
return None
except Exception as exc:
self._send_headers(500)
return exc
self._send_headers(200)
return status
def do_GET(self):
status = self.do_HEAD()
if not status:
self._send_text("server offline")
return
if isinstance(status, Exception):
self._send_text("Error: ", status)
return
lines = [
f"players: {status.players.online}/{status.players.max}",
f"version: {status.version.name} {status.version.brand}",
f"latency: {status.latency}",
f"map name: {status.map_name}",
f"gamemode: {status.gamemode}",
]
self._send_text("\n".join(lines))
try:
server = ThreadingHTTPServer(BIND_TO, RequestHandler)
server.serve_forever()
except KeyboardInterrupt:
server.socket.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment