Skip to content

Instantly share code, notes, and snippets.

@ewized
Last active February 14, 2022 12:10
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ewized/8599d0d846830af079d7 to your computer and use it in GitHub Desktop.
Save ewized/8599d0d846830af079d7 to your computer and use it in GitHub Desktop.
A watchdog for Minecraft servers to make sure they are running all the time, even in the event of a crash.
#!/usr/local/bin/python3
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# For more information, please refer to <http://unlicense.org/>
import argparse
import json
import logging
import os
import re
import signal
import socket
import struct
import subprocess
import sys
import threading
import time
class StatusPing:
""" Get the ping status for the Minecraft server """
def __init__(self, host='localhost', port=25565):
""" Init the hostname and the port """
self._host = host
self._port = port
def _unpack_varint(self, sock):
""" Unpack the varint """
data = 0
for i in range(5):
ordinal = sock.recv(1)
if len(ordinal) == 0:
break
byte = ord(ordinal)
data |= (byte & 0x7F) << 7*i
if not byte & 0x80:
break
return data
def _pack_varint(self, data):
""" Pack the var int """
ordinal = b''
while True:
byte = data & 0x7F
data >>= 7
ordinal += struct.pack('B', byte | (0x80 if data > 0 else 0))
if data == 0:
break
return ordinal
def _pack_data(self, data):
""" Page the data """
if type(data) is str:
data = data.encode('utf8')
return self._pack_varint(len(data)) + data
elif type(data) is int:
return struct.pack('H', data)
elif type(data) is float:
return struct.pack('L', int(data))
else:
return data
def _send_data(self, connection, *args):
""" Send the data on the connection """
data = b''
for arg in args:
data += self._pack_data(arg)
connection.send(self._pack_varint(len(data)) + data)
def _read_fully(self, connection, extra_varint=False):
""" Read the connection and return the bytes """
packet_length = self._unpack_varint(connection)
packet_id = self._unpack_varint(connection)
byte = b''
if extra_varint:
extra_length = self._unpack_varint(connection)
while len(byte) < extra_length:
byte += connection.recv(extra_length)
else:
byte = connection.recv(packet_length)
return byte
def get_status(self):
""" Get the status response """
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as connection:
connection.connect((self._host, self._port))
# Send handshake + status request
self._send_data(connection, b'\x00\x00', self._host, self._port, b'\x01')
self._send_data(connection, b'\x00')
# Read response, offset for string length
data = self._read_fully(connection, extra_varint=True)
# Send and read unix time
self._send_data(connection, b'\x01', time.time() * 1000)
unix = self._read_fully(connection)
# Load json and return
response = json.loads(data.decode('utf8'))
# Check if there is a response
if len(response) == 0:
return None
response['ping'] = int(time.time() * 1000) - struct.unpack('L', unix)[0]
return response
class Server:
""" The instance that stores the server info for this wrapper """
server = None
failed_heartbeats = 0
last_ping = None
MIN_HEARTBEATS = 3
MAX_HEARTBEATS = 6
class WatchdogError(BaseException):
""" The base error for this wrapper """
def __init__(self, message, *args):
self.message = str(message).format(*args)
def get_message(self):
""" Get the message set by this error """
return self.message
class SameStateError(WatchdogError):
""" SameStateError """
pass
def address_port(address):
""" Utility function that converts an address into an address port tuple """
port = 25565
if ':' in address:
parts = address.split(':')
address = parts[0]
port = int(parts[1]) if len(parts[1]) > 0 else port
return (address, port)
def positive_int(value):
""" Utility function that makes sure that the value is positive and non zero """
value = int(value)
if value <= 0:
raise argparse.ArgumentTypeError(f'argument must be positive, was {value}')
return value
def watchdog():
""" Ping the provided minecraft server and when it fails its heartbeat kill it """
address = get_address()
logger.info(f'Watchdog using this server address {address[0]}:{address[1]}')
ping = StatusPing(host=address[0], port=address[1])
while True:
# Sleep for the heartbeat watchdog events
time.sleep(namespace.heartbeat)
try:
status = ping.get_status()
status_online = status is not None and 'players' in status
online = status['players']['online'] if status_online else 0
capacity = status['players']['max'] if status_online else 0
motd = re.sub(r'[&§][a-f0-9k-or]', '', str(status['description']).replace('\n', ' ')) if status_online else 'Offline'
logger.debug(status)
logger.info(f'Players: ({online}/{capacity}), MOTD: \'{motd}\'')
# When not disabled it will also check for the same pings
if namespace.equal_pings and equal_ping(status, Server.last_ping):
Server.last_ping = status
raise SameStateError('Grrr')
Server.last_ping = status
Server.failed_heartbeats = 0
except SameStateError:
Server.failed_heartbeats += 1
logger.error(f'Failed server state heartbeat (count #{Server.failed_heartbeats})')
except OSError:
Server.last_ping = None
Server.failed_heartbeats += 1
logger.error(f'Failed heartbeat (count #{Server.failed_heartbeats})')
finally:
# Kill the server soft then hard for failed heatbeats
if Server.failed_heartbeats > namespace.heartbeats_hard:
logger.info(f'Hard killing the server (pid:{Server.server.pid})')
Server.server.kill()
elif Server.failed_heartbeats > namespace.heartbeats_soft:
logger.info(f'Soft killing the server (pid:{Server.server.pid})')
Server.server.terminate()
def equal_ping(left, right):
""" Make sure the two pings are equals """
# If both are none then they are equal
if left is None and right is None:
return True
# Check for nones
if (left is None and right is not None) or (right is None and left is not None):
return False
logger.debug(f'Left Ping: {left}')
logger.debug(f'Right Ping: {right}')
description = left['description'] == right['description']
online = left['players']['online'] == right['players']['online']
capacity = left['players']['max'] == right['players']['max']
# function that will populate the players uuids
def populate_pings(ping):
uuids = set()
if 'sample' in ping['players']:
for player in ping['players']['sample']:
uuids.add(player['id'])
return uuids
# create the two sets to check
left_uuids = populate_pings(left)
right_uuids = populate_pings(right)
# final return to check everything
return description and capacity and left_uuids == right_uuids
def start_command():
""" The command to start the server, this is now contructed from the arguments of the wrapper """
return [namespace.cmd] + namespace.args
def get_address():
""" Get the address the server is running on from the launch args """
return namespace.address
def setup_parser():
""" Set up the argument parser to handle the script which exports the global namespace of the args """
global namespace
parser = argparse.ArgumentParser(description='A watchdog for Minecraft servers to make sure they are running all the time, even in the event of a crash.', usage='%(prog)s [-a host:port] cmd ...')
parser.add_argument('--debug', help='enable debug output for the logger', action='store_true')
parser.add_argument('--no-restart', help='disable the automatic restart of the server', action='store_false', default=True)
parser.add_argument('--restart-delay', help='the time in seconds between automatic restarts', default=10, type=positive_int)
parser.add_argument('--equal-pings', help='should pings be checked for equality', action='store_true', default=False)
parser.add_argument('--heartbeat', help='the time in seconds that the watchdog with ping the server', default=240, type=positive_int)
parser.add_argument('--heartbeats-soft', help='the number of fail heartbeats to gracefully stop the server', default=3, type=positive_int)
parser.add_argument('--heartbeats-hard', help='the number of fail heartbeats to forcefully stop the server', default=6, type=positive_int)
parser.add_argument('-a', '--address', metavar='host:port', help='the server address used for the watchdog', default=('localhost', 25565), type=address_port)
parser.add_argument('cmd', help='the start command that will be wrapped')
parser.add_argument('args', nargs=argparse.REMAINDER, help='the args that are passed into')
namespace = parser.parse_args()
def check_debug():
if namespace.debug:
logger.setLevel(logging.DEBUG)
logger.debug('Debug mode is enabled')
logger.debug(f'Namespace: {namespace}')
def setup_logger():
""" Creates the global logger instance """
global logger
logger = logging.getLogger('net.year4000.wrapper')
formatter = logging.Formatter(f'[%(asctime)s] [Server Watchdog/%(levelname)s]: %(message)s', '%H:%M:%S')
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.setLevel(logging.INFO)
def make_watchdog():
""" Create the watchdog daemon thread """
logger.info('Creating the watchdog thread')
thread = threading.Thread(target=watchdog)
thread.setDaemon(True)
thread.start()
def main():
""" Start the watchdog in dameon thread and run the minecraft server in main thread """
loop_server = True
while loop_server:
try:
server = subprocess.Popen(start_command())
Server.server = server
logger.info(f'Starting server... (pid: {server.pid})')
Server.failed_heartbeats = 0
server.wait()
except Exception as error:
logger.error(error)
finally:
# Will the server automaticly restart default true
loop_server = namespace.no_restart
# Let the thread sleep
if loop_server:
logger.info(f'Restarting in {namespace.restart_delay} seconds')
time.sleep(namespace.restart_delay)
if __name__ == '__main__':
try:
setup_logger()
setup_parser()
check_debug()
make_watchdog()
main()
except KeyboardInterrupt:
pass
except Exception as error:
logger.error(error)
finally:
if Server.server is not None:
logger.info('Stoping the minecraft server')
Server.server.kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment