-
-
Save Fingercomp/0889f51f0349ecf338b120a9952246d0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import ssl as ssllib | |
import functools | |
import irc.bot | |
from irc.connection import Factory | |
import dns.resolver | |
from mcstatus import MinecraftServer | |
from mcstatus.pinger import ServerPinger | |
from mcstatus.protocol.connection import TCPSocketConnection, \ | |
UDPSocketConnection | |
from mcstatus.querier import ServerQuerier | |
class MinecraftServerWithTimeout(MinecraftServer): | |
def __init__(self, host, port=25565, timeout=3): | |
super().__init__(host, port) | |
self.host = host | |
self.port = port | |
self._timeout = timeout | |
@staticmethod | |
def lookup(address): | |
host = address | |
port = None | |
if ":" in address: | |
parts = address.split(":") | |
if len(parts) > 2: | |
raise ValueError("Invalid address '%s'" % address) | |
host = parts[0] | |
port = int(parts[1]) | |
if port is None: | |
port = 25565 | |
try: | |
answers = dns.resolver.query("_minecraft._tcp." + host, "SRV") | |
if len(answers): | |
answer = answers[0] | |
host = str(answer.target).rstrip(".") | |
port = int(answer.port) | |
except Exception: | |
pass | |
return MinecraftServerWithTimeout(host, port) | |
@property | |
def timeout(self): | |
return self._timeout | |
@timeout.setter | |
def timeout(self, timeout): | |
self._timeout = timeout | |
def ping(self, retries=3, **kwargs): | |
connection = TCPSocketConnection((self.host, self.port), self.timeout) | |
exception = None | |
for attempt in range(retries): | |
try: | |
pinger = ServerPinger(connection, host=self.host, | |
port=self.port, **kwargs) | |
pinger.handshake() | |
return pinger.test_ping() | |
except Exception as e: | |
exception = e | |
else: | |
raise exception | |
def status(self, retries=3, **kwargs): | |
connection = TCPSocketConnection((self.host, self.port), self.timeout) | |
exception = None | |
for attempt in range(retries): | |
try: | |
pinger = ServerPinger(connection, host=self.host, | |
port=self.port, **kwargs) | |
pinger.handshake() | |
result = pinger.read_status() | |
result.latency = pinger.test_ping() | |
return result | |
except Exception as e: | |
exception = e | |
else: | |
raise exception | |
def query(self, retries=3): | |
exception = None | |
host = self.host | |
try: | |
answers = dns.resolver.query(host, "A") | |
if len(answers): | |
answer = answers[0] | |
host = str(answer).rstrip(".") | |
except: | |
pass | |
for attempt in range(retries): | |
try: | |
connection = UDPSocketConnection((host, self.port), | |
self.timeout) | |
querier = ServerQuerier(connection) | |
querier.handshake() | |
return querier.read_query() | |
except Exception as e: | |
exception = e | |
else: | |
raise exception | |
class ServerData: | |
def __init__(self, server: MinecraftServerWithTimeout): | |
self.server = server | |
self._status = None | |
self._query = None | |
@property | |
def status(self): | |
if self._status is None: | |
try: | |
self._status = self.server.status(retries=1) | |
except (socket.gaierror, socket.timeout, IOError): | |
self._status = False | |
return self._status | |
@property | |
def query(self): | |
if self._query is None: | |
try: | |
self._query = self.server.query(retries=1) | |
except (socket.gaierror, socket.timeout, IOError): | |
self._query = False | |
return self._query | |
@property | |
def on(self): | |
if self.status or self.query: | |
return True | |
else: | |
return False | |
@property | |
def latency(self): | |
if self.status: | |
return self.status.latency | |
else: | |
return None | |
@property | |
def online(self): | |
if self.status: | |
return self.status.players.online | |
elif self.query: | |
return self.query.players.online | |
else: | |
return None | |
@property | |
def max(self): | |
if self.status: | |
return self.status.players.max | |
elif self.query: | |
return self.query.players.max | |
else: | |
return None | |
@property | |
def players(self): | |
if self.online is not None: | |
if self.online == 0: | |
return set() | |
else: | |
if self.status and self.status.players.sample: | |
return set(player.name for player in | |
self.status.players.sample) | |
elif self.query: | |
return set(player for player in self.query.players.names) | |
else: | |
return None | |
else: | |
return None | |
@property | |
def software(self): | |
if self.status: | |
return self.status.version.name | |
elif self.query: | |
return self.query.software.version | |
else: | |
return None | |
@property | |
def mods(self): | |
if self.status and 'modinfo' in self.status.raw: | |
return self.status.raw['modinfo']['modList'] | |
elif self.query and self.query.software.plugins: | |
return self.query.software.plugins | |
else: | |
return None | |
def get_server_data(address): | |
server = MinecraftServerWithTimeout.lookup(address) | |
server.timeout = 0.5 | |
server_data = ServerData(server) | |
return server_data | |
class IRCBot(irc.bot.SingleServerIRCBot): | |
strategy = irc.bot.ExponentialBackoff(max_interval=180) | |
def __init__(self, server, nickname, realname, chans, passwd, nickserv_nick=None, ssl=False): | |
if ssl: | |
irc.bot.SingleServerIRCBot.__init__( | |
self, | |
[server], | |
nickname, | |
realname, | |
recon=self.strategy, | |
connect_factory=Factory(wrapper=ssllib.wrap_socket)) | |
else: | |
irc.bot.SingleServerIRCBot.__init__( | |
self, | |
[server], | |
nickname, | |
realname, | |
recon=self.strategy) | |
self.chans = [{"address": x["address"], | |
"name": x["name"], | |
"last_topic": "", | |
"last_online": set()} for x in chans] | |
self.nickserv_passwd = passwd | |
self.nickserv_nick = nickserv_nick or nickname | |
self.connection.set_rate_limit(100) | |
def on_welcome(self, connection, event): | |
print("Connected to server") | |
connection.privmsg("NickServ", "IDENTIFY " + self.nickserv_nick + " " + | |
self.nickserv_passwd) | |
print("Identified") | |
for chan in self.chans: | |
connection.join(chan["name"]) | |
print("Joined channels") | |
self.reactor.scheduler.execute_every(10, functools.partial(self.check_online, connection)) | |
def check_online(self, connection): | |
for chan in self.chans: | |
data = get_server_data(chan["address"]) | |
topic = "\x02[" + chan["address"] + "] " | |
if data.on: | |
topic += ("\x0303ONLINE\x0f " + str(data.online) + | |
"\x0315/\x0f" + str(data.max) + ": ") | |
if data.players is not None: | |
topic += "\x0315,\x03 ".join(sorted(data.players)) | |
else: | |
topic += "\x0308could not get player list" | |
else: | |
topic += "\x0304OFFLINE\x03" | |
if chan["last_topic"] != topic: | |
connection.topic(chan["name"], topic) | |
chan["last_topic"] = topic | |
if (chan["last_online"] != data.players and | |
chan["last_online"] is not None and | |
data.players is not None): | |
joins = data.players - chan["last_online"] | |
parts = chan["last_online"] - data.players | |
for nick in joins: | |
connection.notice( | |
chan["name"], | |
nick + "\x0309 joined\x0315 the server.") | |
for nick in parts: | |
connection.notice( | |
chan["name"], | |
nick + "\x0305 left\x0315 the server.") | |
chan["last_online"] = data.players | |
if __name__ == "__main__": | |
bot = IRCBot(('irc.esper.net', 6697,), 'mocivally', 'mocivally', | |
[{'name': '#cc.ru-server1', | |
'address': 'server1.computercraft.ru:25565'}, | |
{'name': '#cc.ru-server4', | |
'address': 'server4.computercraft.ru:25565'}, | |
{'name': '#unrealtourres', | |
'address': 'server.opencomputers.info:25565'}], | |
'servcheckservcheck', 'mocivally', ssl=True) | |
bot.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment