Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Find Minecraft servers with the lowest latency!
from urllib2 import Request, urlopen
class ServerList():
listName = None
def __init__(self, data):
self.headers = { 'User-Agent': 'Mozilla/5.0' }
self.servers = data['servers']
if self.listName is None:
raise NotImplementedError("ServerList must override listName")
self.state = data['lists'][self.listName]
def log(self, msg):
print '% 20s: %s' % (self.listName, msg)
def addServer(self, hostname, meta=None):
server = {
'ping': None,
'black': False,
}
if meta is not None:
if meta.has_key('name'):
server['name'] = meta['name']
del meta['name']
if meta.has_key('mc-version'):
server['mc-version'] = meta['mc-version']
del meta['mc-version']
server['meta-'+self.listName] = meta
self.servers[hostname] = server
return server
def work(self):
raise NotImplementedError("ServerLists must override work() method")
def urlGet(self, url):
request = Request(url, None, self.headers)
return urlopen(request).read()
class RetoolingAgent():
agentName = None
def __init__(self, data):
self.servers = data['servers']
if self.agentName is None:
raise NotImplementedError("RetoolingAgents must override agentName")
if not data.has_key('agents'):
data['agents'] = {}
if not data['agents'].has_key(self.agentName):
data['agents'][self.agentName] = {}
self.state = data['agents'][self.agentName]
def log(self, msg):
print '% 20s: %s' % (self.agentName, msg)
def work(self):
raise NotImplementedError("RetoolingAgents must override work() method")
def urlGet(self, url):
request = Request(url, None, self.headers)
return urlopen(request).read()
import re
from sgmllib import SGMLParser
from common import ServerList
urlfmt = 'http://minecraft-server-list.com/page/%d/'
serverHrefPattern = re.compile('^/server/[0-9]+/$')
class MinecraftServerListDotComSGMLParser(SGMLParser):
def __init__(self, serverList):
self.serverList = serverList
SGMLParser.__init__(self)
self.server = None
self.dataMode = None
def start_tr(self, attributes):
featured = False
for k, v in attributes:
if k == 'class' and v == 'featured':
featured = True
break
self.newServer(featured)
def end_tr(self):
self.server['mc-categories'] = tuple(self.server['mc-categories'])
self.serverList.addServer(self.server['hostname'], self.server)
self.server = None
def do_input(self, attributes):
if self.server is None: return
isHostname = False
hostname = None
for k, v in attributes:
if k == 'value':
hostname = v
elif k == 'name' and v == 'serverip':
isHostname = True
if isHostname:
self.server['hostname'] = hostname
def do_a(self, attributes):
if self.server is None: return
serverName = None
foundServerName = False
for k, v in attributes:
if k == 'title':
serverName = v
elif k == 'href' and serverHrefPattern.match(v):
foundServerName = True
if foundServerName:
self.server['name'] = serverName
def start_span(self, attributes):
if self.server is None: return
for k, v in attributes:
if k == 'class':
if v == 'buttonsmall green size10':
self.dataMode = 'version parse'
return
if v == 'buttonsmall black size10':
self.dataMode = 'category parse'
return
def handle_data(self, data):
if self.server is None: return
if self.dataMode == 'version parse':
self.server['mc-version'] += data
elif self.dataMode == 'category parse':
self.server['mc-categories'].append(data)
def end_span(self):
if self.server is None: return
# XXX nested span data handling!
self.dataMode = None
def newServer(self, featured):
self.server = {
'featured': featured,
'name': None,
'hostname': None,
'mc-version': '',
'mc-categories': []
}
class MinecraftServerListDotCom(ServerList):
listName = 'http://minecraft-server-list.com/'
def __init__(self, data):
if not data['lists'].has_key(self.listName):
self.log('starting fresh')
data['lists'][self.listName] = {
'page': 1,
'inpage': 0
}
ServerList.__init__(self, data)
self.parser = MinecraftServerListDotComSGMLParser(self)
def work(self):
self.log('scanning page %d' % self.state['page'])
pageData = self.urlGet(urlfmt % self.state['page'])
self.parser.reset()
self.parser.feed(pageData)
self.state['page'] += 1
from common import RetoolingAgent
from popen2 import popen2
from time import gmtime, strftime
cmdfmt = 'ping -n -w 2.5 -c 10 -i .2 %s'
def gethostname(host):
n = host.find(':')
if n >= 0:
return host[:n]
return host
class PingAgent(RetoolingAgent):
agentName = 'ping'
def __init__(self, data):
RetoolingAgent.__init__(self, data)
servers = self.servers
self.hitlist = filter(lambda hn:servers[hn]['ping'] is None, servers.keys())
self.log("pinging %d hosts" % len(self.hitlist))
def work(self):
if len(self.hitlist) == 0:
raise IndexError("hitlist exhausted")
hit = self.hitlist.pop()
timeString = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
try:
cmd = cmdfmt % gethostname(hit)
self.log("executing command: %s" % cmd)
stdout, stdin = popen2(cmd)
stdin.close()
data = stdout.readlines()
stdout.close()
packets = filter(lambda s:s.find('packets transmitted')>=0, data)[0].split(' ')
tx = float(packets[0])
rx = float(packets[3])
if rx > 0.0:
times = tuple(data[-1].split(' ')[3].split('/'))
timeAvg = times[1]
else:
times = None
timeAvg = None
ping = {
'avg': timeAvg,
'times': times,
'drop': (tx-rx)/tx,
'when': timeString
}
self.servers[hit]['ping'] = ping
except BaseException as e:
self.log('')
self.log('caught exception: %s "%s"' % (repr(e), str(e)))
self.servers[hit]['ping'] = 'error'
self.log('')
import urllib2, pickle
from ping import PingAgent
dataFile = open('data.pickle')
data = pickle.load(dataFile)
dataFile.close()
l0 = PingAgent(data)
try:
while 1:
l0.work()
except BaseException as e:
print
print 'caught exception: %s "%s"' % (repr(e), str(e))
print 'remaining hosts: %d' % len(l0.hitlist)
print 'saving and quitting'
print
dataFile = open('data.pickle','w')
pickle.dump(data, dataFile)
print 'saved. bye.'
import pickle, pprint, sys
simpleOutput = True
infoOnHosts = False
infoHosts = []
for arg in sys.argv[1:]:
if arg[0] == '-':
if arg[1] != '-':
for c in arg[1:]:
if c == 'v':
simpleOutput = False
elif c == 'H':
infoOnHosts = True
else:
if infoOnHosts:
infoHosts.append(arg)
data = pickle.load(open('data.pickle'))
if infoOnHosts:
for host in infoHosts:
print data['servers'][host]
elif simpleOutput:
for k in data['servers']:
ping = data['servers'][k]['ping']
if ping == 'error': continue
if ping is None: continue
if ping['times'] is None: continue
print '%07s %s' % (ping['avg'], k)
else:
pprint.pprint(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment