Skip to content

Instantly share code, notes, and snippets.

@sephii
Created December 13, 2018 11:00
Show Gist options
  • Save sephii/ed45f1049c1cf51458cd5aefdc3de50a to your computer and use it in GitHub Desktop.
Save sephii/ed45f1049c1cf51458cd5aefdc3de50a to your computer and use it in GitHub Desktop.
# cstrike plugin for python-rtmbot (https://github.com/slackapi/python-rtmbot)
import glob
import os
import re
import random
import socket
import struct
from valve.source.a2s import ServerQuerier
SERVER_INFO = ('<example.com>', 27015)
RCON_PASSWORD = '<Your rcon password>'
SAY_PLAYERS_INTERVAL = 60
TARGET_CHANNEL_ID = '<C048TFP2F>'
CSTRIKE_DIR = '/home/steam/cstrike'
sq = ServerQuerier(SERVER_INFO)
crontable = [
[SAY_PLAYERS_INTERVAL, 'say_players'],
]
outputs = []
def say_players():
players = sq.players()
if players['player_count'] == 0:
return
oldest_player = max([player['duration'] for player in players['players']])
if oldest_player < SAY_PLAYERS_INTERVAL:
msg = "There are %d players on the server. Go go go!" % players['player_count']
outputs.append([TARGET_CHANNEL_ID, msg])
def changelevel(data, cmdargs):
resp = rcon(SERVER_INFO, RCON_PASSWORD, 'changelevel %s' % cmdargs)
if 'changelevel failed' in resp:
msg = "Map %s not found on the server" % cmdargs
else:
msg = "Map changed to %s" % cmdargs
outputs.append([data['channel'], msg])
def maps(data, cmdargs):
maps_list = glob.glob(
os.path.join(CSTRIKE_DIR, 'cstrike/maps/*.bsp')
)
maps_list = [
os.path.splitext(os.path.basename(m))[0] for m in maps_list
]
outputs.append([data['channel'], '\n'.join(sorted(maps_list))])
def process_message(data):
commands_mapping = {
'players': show_players,
'map': changelevel,
'maps': maps,
'ranking': show_ranking,
'slap': slap,
}
if 'text' not in data:
return
if not data['text'].startswith('!'):
return
if ' ' in data['text']:
text = data['text'].split(' ', 1)
command, cmdargs = text
else:
command = data['text']
cmdargs = ''
command = command.lstrip('!')
if command in commands_mapping:
commands_mapping[command](data, cmdargs)
def show_players(data, cmdargs):
outputs.append([data['channel'], players()])
def players():
players = sq.players()
if players['player_count'] == 0:
return "The server is empty :("
else:
ranking = sorted(players['players'], key=lambda p: p['score'],
reverse=True)
buff = u"There are %s players on the server:\n\n" % players['player_count']
for pos, player in enumerate(ranking, 1):
buff += u"{pos} {player} ({score})\n".format(
pos=pos, player=player['name'], score=player['score']
)
return buff.encode('utf-8')
def rcon(server, password, cmd):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0.5)
sock.connect(server)
sock.send("\xFF\xFF\xFF\xFFchallenge rcon\n\0")
data = ""
data = sock.recv(1024)
obj = re.search("([0-9]+)", data, re.M)
chal = obj.group()
sock.send("\xFF\xFF\xFF\xFFrcon {challenge} {rcon} {cmd}".format(
challenge=chal, rcon=password, cmd=cmd
))
output = ""
while True:
try:
data = sock.recv(1024)
except socket.timeout:
break
if not data:
break
output += data[5:]
sock.close()
return output
def show_ranking(data, cmdargs):
rows = [['#', 'Name', 'Kills', 'Deaths', 'HS', 'Eff.', 'Acc.', 'HS ratio']]
ranking = get_ranking()
for i, player in enumerate(ranking, 1):
try:
kills_deaths = int(player['kills'] / float(player['kills'] + player['deaths']) * 100)
except ZeroDivisionError:
kills_deaths = 0
try:
hs_ratio = int(player['hs'] / float(player['kills']) * 100)
except ZeroDivisionError:
hs_ratio = 0
try:
accuracy = int(player['hits'] / float(player['shots']) * 100)
except ZeroDivisionError:
accuracy = 0
rows.append([
i, player['name'], player['kills'], player['deaths'], player['hs'],
'%d%%' % kills_deaths,
'%d%%' % accuracy,
'%d%%' % hs_ratio,
])
longest_content = [
max([len(str(row[col])) for row in rows])
for col, _ in enumerate(rows[0])
]
buffer = ''
for row in rows:
for i, col in enumerate(row):
size = max(longest_content[i] + 1, 5)
if i == 1:
buffer += str(col).ljust(size)
else:
buffer += str(col).rjust(size)
buffer += ' '
buffer += '\n'
buffer = buffer.replace('```', '[LOSER]')
buffer = '```\n' + buffer + '```'
outputs.append([data['channel'], buffer])
def slap(data, cmdargs):
players = sq.players()
players_names = [p['name'] for p in players['players']]
if players_names:
player_to_slap = random.choice(players_names)
rcon(SERVER_INFO, RCON_PASSWORD, 'amx_slap "%s"' % player_to_slap)
else:
outputs.append([data['channel'], u"No player to slap :("])
def get_ranking():
def as_sort_value(p):
if p['kills'] + p['deaths'] == 0:
return 0
return p['kills'] / float(p['kills'] + p['deaths'])
ranking = []
with open('/home/steam/cstrike/cstrike/addons/amxmodx/data/csstats.dat', 'rb') as f:
byte = f.read(2)
while byte != "":
player = {}
name_len = read_shortint(f)
if name_len == 0:
break
player['name'] = read_str(f, name_len)
unique_len = read_shortint(f)
player['steamid'] = read_str(f, unique_len)
for info in ['tks', 'damage', 'deaths', 'kills', 'shots', 'hits', 'hs',
'defusions', 'defused', 'plants', 'explosions']:
player[info] = read_int(f)
player['body_hits'] = []
for i in range(1, 10):
player['body_hits'].append(struct.unpack('i', f.read(4)))
ranking.append(player)
ranking = sorted(ranking, key=as_sort_value, reverse=True)
return ranking
def b_read(f, size, var_type):
return struct.unpack(var_type, f.read(size))
def read_int(f):
return b_read(f, 4, 'i')[0]
def read_shortint(f):
return b_read(f, 2, 'h')[0]
def read_str(f, length):
return ''.join(b_read(f, length, 's' * length)).rstrip('\x00')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment