Skip to content

Instantly share code, notes, and snippets.

@RobFreiburger
Created October 25, 2011 17:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RobFreiburger/1313630 to your computer and use it in GitHub Desktop.
Save RobFreiburger/1313630 to your computer and use it in GitHub Desktop.
Battlefield 3 RCON "Kick Some Pubbies" Script
#!/usr/bin/python
# "Kick Some Pubbies" script for BF3 server admins
# Script is a modified version of EA DICE's CommandConsole.py from BFBC2
# Confirmed working on Battlefield 3 servers on launch day, 10/25/11
# Go to the configurable variables section to modify to your needs.
# Made to retrieve whitelist from a URL, can be easily changed to local file. See getWhitelist() function.
from struct import *
import binascii
import socket
import sys
import shlex
import string
import threading
import md5
import os
import urllib2
import random
import time
# Begin code written by EA DICE
def EncodeHeader(isFromServer, isResponse, sequence):
header = sequence & 0x3fffffff
if isFromServer:
header += 0x80000000
if isResponse:
header += 0x40000000
return pack('<I', header)
def DecodeHeader(data):
[header] = unpack('<I', data[0 : 4])
return [header & 0x80000000, header & 0x40000000, header & 0x3fffffff]
def EncodeInt32(size):
return pack('<I', size)
def DecodeInt32(data):
return unpack('<I', data[0 : 4])[0]
def EncodeWords(words):
size = 0
encodedWords = ''
for word in words:
strWord = str(word)
encodedWords += EncodeInt32(len(strWord))
encodedWords += strWord
encodedWords += '\x00'
size += len(strWord) + 5
return size, encodedWords
def DecodeWords(size, data):
numWords = DecodeInt32(data[0:])
words = []
offset = 0
while offset < size:
wordLen = DecodeInt32(data[offset : offset + 4])
word = data[offset + 4 : offset + 4 + wordLen]
words.append(word)
offset += wordLen + 5
return words
def EncodePacket(isFromServer, isResponse, sequence, words):
encodedHeader = EncodeHeader(isFromServer, isResponse, sequence)
encodedNumWords = EncodeInt32(len(words))
[wordsSize, encodedWords] = EncodeWords(words)
encodedSize = EncodeInt32(wordsSize + 12)
return encodedHeader + encodedSize + encodedNumWords + encodedWords
# Decode a request or response packet
# Return format is:
# [isFromServer, isResponse, sequence, words]
# where
# isFromServer = the command in this command/response packet pair originated on the server
# isResponse = True if this is a response, False otherwise
# sequence = sequence number
# words = list of words
def DecodePacket(data):
[isFromServer, isResponse, sequence] = DecodeHeader(data)
wordsSize = DecodeInt32(data[4:8]) - 12
words = DecodeWords(wordsSize, data[12:])
return [isFromServer, isResponse, sequence, words]
###############################################################################
# Encode a request packet
def EncodeClientRequest(words):
global clientSequenceNr
packet = EncodePacket(False, False, clientSequenceNr, words)
clientSequenceNr = (clientSequenceNr + 1) & 0x3fffffff
return packet
# Encode a response packet
def EncodeClientResponse(sequence, words):
return EncodePacket(True, True, sequence, words)
###################################################################################
def containsCompletePacket(data):
if len(data) < 8:
return False
if len(data) < DecodeInt32(data[4:8]):
return False
return True
# Wait until the local receive buffer contains a full packet (appending data from the network socket),
# then split receive buffer into first packet and remaining buffer data
def receivePacket(socket, receiveBuffer):
while not containsCompletePacket(receiveBuffer):
receiveBuffer += socket.recv(4096)
packetSize = DecodeInt32(receiveBuffer[4:8])
packet = receiveBuffer[0:packetSize]
receiveBuffer = receiveBuffer[packetSize:len(receiveBuffer)]
return [packet, receiveBuffer]
###################################################################################
# Display contents of packet in user-friendly format, useful for debugging purposes
def printPacket(packet):
if (packet[0]):
print "IsFromServer, ",
else:
print "IsFromClient, ",
if (packet[1]):
print "Response, ",
else:
print "Request, ",
print "Sequence: " + str(packet[2]),
if packet[3]:
print " Words:",
for word in packet[3]:
print "\"" + word + "\"",
print ""
###################################################################################
def generatePasswordHash(salt, password):
m = md5.new()
m.update(salt)
m.update(password)
return m.digest()
# End code written by EA DICE
def getWhitelist(whitelistURL):
theurl = whitelistURL
pagehandle = urllib2.urlopen(theurl)
unfilteredWhitelist = pagehandle.read().splitlines()
filteredWhitelist = []
for person in unfilteredWhitelist: filteredWhitelist.append(person.lower())
return filteredWhitelist
# Begin configurable variables
servers = [ {'name':'server1', 'ip':'server1.example.com', 'port':12345, 'password':'notAPassword', 'maxPlayers':64},
{'name':'server2', 'ip':'server2.example.com', 'port':12345, 'password':'badPassword', 'maxPlayers':32} ]
pubbiesToKick = 3
whitelist = getWhitelist('http://whitelist.example.com')
# End configurable variables
for server in servers:
clientSequenceNr = 0
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSocket.connect( (server['ip'], server['port']) )
serverSocket.setblocking(1)
print 'Connecting to ' + server['name']
# Retrieve this connection's 'salt' (magic value used when encoding password) from server
getPasswordSaltRequest = EncodeClientRequest( [ "login.hashed" ] )
serverSocket.send(getPasswordSaltRequest)
receiveBuffer = ""
[getPasswordSaltResponse, receiveBuffer] = receivePacket(serverSocket, receiveBuffer)
[isFromServer, isResponse, sequence, words] = DecodePacket(getPasswordSaltResponse)
# if the server doesn't understand "login.hashed" command, abort
if words[0] != "OK":
sys.exit(3) # error
# Given the salt and the password, combine them and compute hash value
salt = words[1].decode("hex")
passwordHash = generatePasswordHash(salt, server['password'])
passwordHashHexString = string.upper(passwordHash.encode("hex"))
loginRequest = EncodeClientRequest( [ "login.hashed", passwordHashHexString ] )
serverSocket.send(loginRequest)
[loginResponse, receiveBuffer] = receivePacket(serverSocket, receiveBuffer)
[isFromServer, isResponse, sequence, words] = DecodePacket(loginResponse)
# if the server didn't like our password, abort
if words[0] != "OK":
sys.exit(3) # error
###
# Get players currently on server
###
command = 'admin.listPlayers all'
words = shlex.split(command)
# Send request to server on command channel
request = EncodeClientRequest(words)
serverSocket.send(request)
# Wait for response from server
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer)
[isFromServer, isResponse, sequence, words] = DecodePacket(packet)
rconResponse = words
playersOnServer = [rconResponse[i].lower() for i in range(10, len(rconResponse), 7)]
# rcon's vars.maxPlayers doesn't work! YAY!
if len(playersOnServer) < (server['maxPlayers'] - pubbiesToKick):
print "Server is not full. Server has " + str(len(playersOnServer)) + " players on it."
continue
# Compare players on server to whitelist
# Make list of pubbies
whitelist = getWhitelist()
pubbies = set(playersOnServer).difference(set(whitelist))
if len(pubbies) == 0:
print "Server has no pubbies."
continue
# randomly select x pubbies to kick
if len(pubbies) > pubbiesToKick:
pubbies = random.sample(pubbies, pubbiesToKick)
command = 'admin.say "' + str(pubbiesToKick) + ' random pubbies will be kicked in 5 seconds. bye!" all'
words = shlex.split(command)
request = EncodeClientRequest(words)
serverSocket.send(request)
time.sleep(5)
for pubby in pubbies:
command = 'admin.kickPlayer "' + pubby + '" "you lose"'
words = shlex.split(command)
request = EncodeClientRequest(words)
serverSocket.send(request)
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer)
[isFromServer, isResponse, sequence, words] = DecodePacket(packet)
if words[0] != "OK":
print "Error kicking pubby: " + pubby
print words
command = 'admin.say "' + pubby + ' won the contest!" all'
words = shlex.split(command)
request = EncodeClientRequest(words)
serverSocket.send(request)
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer)
[isFromServer, isResponse, sequence, words] = DecodePacket(packet)
if words[0] != "OK":
print "Error announcing kick: "
print words
print "Kicked " + str(len(pubbies)) + " pubbies."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment