Skip to content

Instantly share code, notes, and snippets.

@ewized ewized/
Last active Dec 12, 2019

What would you like to do?
A watchdog for Minecraft servers to make sure they are running all the time, even in the event of a crash.
# Copyright 2019 Year4000.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.
import argparse
import json
import logging
import os
import re
import signal
import socket
import struct
import subprocess
import sys
import threading
import time
class StatusPing:
""" Get the ping status for the Minecraft server """
def __init__(self, host='localhost', port=25565):
""" Init the hostname and the port """
self._host = host
self._port = port
def _unpack_varint(self, sock):
""" Unpack the varint """
data = 0
for i in range(5):
ordinal = sock.recv(1)
if len(ordinal) == 0:
byte = ord(ordinal)
data |= (byte & 0x7F) << 7*i
if not byte & 0x80:
return data
def _pack_varint(self, data):
""" Pack the var int """
ordinal = b''
while True:
byte = data & 0x7F
data >>= 7
ordinal += struct.pack('B', byte | (0x80 if data > 0 else 0))
if data == 0:
return ordinal
def _pack_data(self, data):
""" Page the data """
if type(data) is str:
data = data.encode('utf8')
return self._pack_varint(len(data)) + data
elif type(data) is int:
return struct.pack('H', data)
elif type(data) is float:
return struct.pack('L', int(data))
return data
def _send_data(self, connection, *args):
""" Send the data on the connection """
data = b''
for arg in args:
data += self._pack_data(arg)
connection.send(self._pack_varint(len(data)) + data)
def _read_fully(self, connection, extra_varint=False):
""" Read the connection and return the bytes """
packet_length = self._unpack_varint(connection)
packet_id = self._unpack_varint(connection)
byte = b''
if extra_varint:
extra_length = self._unpack_varint(connection)
while len(byte) < extra_length:
byte += connection.recv(extra_length)
byte = connection.recv(packet_length)
return byte
def get_status(self):
""" Get the status response """
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as connection:
connection.connect((self._host, self._port))
# Send handshake + status request
self._send_data(connection, b'\x00\x00', self._host, self._port, b'\x01')
self._send_data(connection, b'\x00')
# Read response, offset for string length
data = self._read_fully(connection, extra_varint=True)
# Send and read unix time
self._send_data(connection, b'\x01', time.time() * 1000)
unix = self._read_fully(connection)
# Load json and return
response = json.loads(data.decode('utf8'))
# Check if there is a response
if len(response) == 0:
return None
response['ping'] = int(time.time() * 1000) - struct.unpack('L', unix)[0]
return response
class Server:
""" The instance that stores the server info for this wrapper """
server = None
failed_heartbeats = 0
last_ping = None
class WatchdogError(BaseException):
""" The base error for this wrapper """
def __init__(self, message, *args):
self.message = str(message).format(*args)
def get_message(self):
""" Get the message set by this error """
return self.message
class SameStateError(WatchdogError):
""" SameStateError """
def address_port(address):
""" Utility function that converts an address into an address port tuple """
port = 25565
if ':' in address:
parts = address.split(':')
address = parts[0]
port = int(parts[1]) if len(parts[1]) > 0 else port
return (address, port)
def positive_int(value):
""" Utility function that makes sure that the value is positive and non zero """
value = int(value)
if value <= 0:
raise argparse.ArgumentTypeError(f'argument must be positive, was {value}')
return value
def watchdog():
""" Ping the provided minecraft server and when it fails its heartbeat kill it """
address = get_address()'Watchdog using this server address {address[0]}:{address[1]}')
ping = StatusPing(host=address[0], port=address[1])
while True:
# Sleep for the heartbeat watchdog events
status = ping.get_status()
status_online = status is not None and 'players' in status
online = status['players']['online'] if status_online else 0
capacity = status['players']['max'] if status_online else 0
motd = re.sub(r'[&§][a-f0-9k-or]', '', str(status['description']).replace('\n', ' ')) if status_online else 'Offline'
logger.debug(status)'Players: ({online}/{capacity}), MOTD: \'{motd}\'')
# When not disabled it will also check for the same pings
if namespace.equal_pings and equal_ping(status, Server.last_ping):
Server.last_ping = status
raise SameStateError('Grrr')
Server.last_ping = status
Server.failed_heartbeats = 0
except SameStateError:
Server.failed_heartbeats += 1
logger.error(f'Failed server state heartbeat (count #{Server.failed_heartbeats})')
except OSError:
Server.last_ping = None
Server.failed_heartbeats += 1
logger.error(f'Failed heartbeat (count #{Server.failed_heartbeats})')
# Kill the server soft then hard for failed heatbeats
if Server.failed_heartbeats > namespace.heartbeats_hard:'Hard killing the server (pid:{})')
elif Server.failed_heartbeats > namespace.heartbeats_soft:'Soft killing the server (pid:{})')
def equal_ping(left, right):
""" Make sure the two pings are equals """
# If both are none then they are equal
if left is None and right is None:
return True
# Check for nones
if (left is None and right is not None) or (right is None and left is not None):
return False
logger.debug(f'Left Ping: {left}')
logger.debug(f'Right Ping: {right}')
description = left['description'] == right['description']
online = left['players']['online'] == right['players']['online']
capacity = left['players']['max'] == right['players']['max']
# function that will populate the players uuids
def populate_pings(ping):
uuids = set()
if 'sample' in ping['players']:
for player in ping['players']['sample']:
return uuids
# create the two sets to check
left_uuids = populate_pings(left)
right_uuids = populate_pings(right)
# final return to check everything
return description and capacity and left_uuids == right_uuids
def start_command():
""" The command to start the server, this is now contructed from the arguments of the wrapper """
return [namespace.cmd] + namespace.args
def get_address():
""" Get the address the server is running on from the launch args """
return namespace.address
def setup_parser():
""" Set up the argument parser to handle the script which exports the global namespace of the args """
global namespace
parser = argparse.ArgumentParser(description='A watchdog for Minecraft servers to make sure they are running all the time, even in the event of a crash.', usage='%(prog)s [-a host:port] cmd ...')
parser.add_argument('--debug', help='enable debug output for the logger', action='store_true')
parser.add_argument('--no-restart', help='disable the automatic restart of the server', action='store_false', default=True)
parser.add_argument('--restart-delay', help='the time in seconds between automatic restarts', default=10, type=positive_int)
parser.add_argument('--equal-pings', help='should pings be checked for equality', action='store_true', default=False)
parser.add_argument('--heartbeat', help='the time in seconds that the watchdog with ping the server', default=240, type=positive_int)
parser.add_argument('--heartbeats-soft', help='the number of fail heartbeats to gracefully stop the server', default=3, type=positive_int)
parser.add_argument('--heartbeats-hard', help='the number of fail heartbeats to forcefully stop the server', default=6, type=positive_int)
parser.add_argument('-a', '--address', metavar='host:port', help='the server address used for the watchdog', default=('localhost', 25565), type=address_port)
parser.add_argument('cmd', help='the start command that will be wrapped')
parser.add_argument('args', nargs=argparse.REMAINDER, help='the args that are passed into')
namespace = parser.parse_args()
def check_debug():
if namespace.debug:
logger.debug('Debug mode is enabled')
logger.debug(f'Namespace: {namespace}')
def setup_logger():
""" Creates the global logger instance """
global logger
logger = logging.getLogger('net.year4000.wrapper')
formatter = logging.Formatter(f'[%(asctime)s] [Server Watchdog/%(levelname)s]: %(message)s', '%H:%M:%S')
stream_handler = logging.StreamHandler(stream=sys.stdout)
def make_watchdog():
""" Create the watchdog daemon thread """'Creating the watchdog thread')
thread = threading.Thread(target=watchdog)
def main():
""" Start the watchdog in dameon thread and run the minecraft server in main thread """
loop_server = True
while loop_server:
server = subprocess.Popen(start_command())
Server.server = server'Starting server... (pid: {})')
Server.failed_heartbeats = 0
except Exception as error:
# Will the server automaticly restart default true
loop_server = namespace.no_restart
# Let the thread sleep
if loop_server:'Restarting in {namespace.restart_delay} seconds')
if __name__ == '__main__':
except KeyboardInterrupt:
except Exception as error:
if Server.server is not None:'Stoping the minecraft server')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.