Skip to content

Instantly share code, notes, and snippets.

@danielrichman
Created April 2, 2013 00:13
Show Gist options
  • Save danielrichman/5288896 to your computer and use it in GitHub Desktop.
Save danielrichman/5288896 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# Copyright 2011 (C) Daniel Richman
#
# This programme is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This programme is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this programme. If not, see <http://www.gnu.org/licenses/>.
import struct
import socket
import select
import sys
class RCON:
AUTH = 3
EXEC = 2
RESP_VALUE = 0
RESP_AUTHR = 2
def __init__(self, host, port):
self.host = host
self.port = port
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.reqid = 0
def connect(self):
self.conn.connect((self.host, self.port))
def authenticate(self, password, stdout):
reqid = self.send_packet(self.AUTH, password)
for attempt in xrange(2):
(resp_reqid, response, string1, string2) = self.recv_packet()
if resp_reqid == -1 and response == self.RESP_AUTHR:
raise ValueError("Bad Password")
assert resp_reqid == reqid
elif response != self.RESP_AUTHR:
assert attempt == 0
else:
assert resp_reqid == reqid
stdout.write("Authentication succesfull; connected to %s:%s\n" %
(self.host, self.port))
def shell_mode(self, stdin, stdout):
poll = select.poll()
poll.register(self.conn, select.POLLIN)
poll.register(stdin, select.POLLIN)
while True:
ready = poll.poll()
files = [f for (f, e) in ready]
events = [e for (f, e) in ready]
events = set(events)
assert len(events) <= 1
if len(events) == 1:
assert set(events) == set([select.POLLIN])
if self.conn.fileno() in files:
(resp_reqid, response, string1, string2) = self.recv_packet()
assert resp_reqid >= 1
assert resp_reqid <= self.reqid
assert string2 == ''
stdout.write(string1)
if stdin.fileno() in files:
line = stdin.readline().strip()
if len(line) == 0:
return
self.send_packet(self.EXEC, line)
def single_command(self, command, responses=1):
reqid = self.send_packet(self.EXEC, command)
nreqid = self.send_packet(self.EXEC, "echo Finished")
data = ''
while True:
(resp_reqid, response, string1, string2) = self.recv_packet()
if resp_reqid == nreqid:
break
else:
assert resp_reqid == reqid
assert response == self.RESP_VALUE
assert string2 == ''
data += string1
return data
def send_packet(self, command, string1, string2=''):
string1 = str(string1)
string2 = str(string2)
length = 8 + len(string1) + len(string2) + 2
header = struct.pack('<iii', length, self.reqid, command)
data = header + string1 + '\x00' + string2 + '\x00'
sent_reqid = self.reqid
self.reqid += 1
self.conn.send(data)
return sent_reqid
def recv_packet(self):
length = self.conn.recv(4)
(length, ) = struct.unpack('<i', length)
data = self.conn.recv(length)
(reqid, response) = struct.unpack('<ii', data[:8])
strings = (data[8:]).split('\x00')
assert len(strings) == 3
return (reqid, response, strings[0], strings[1])
def main():
if len(sys.argv) < 3 or len(sys.argv) > 5:
print "Usage: %s host port [password [command]]"
return
rcon = RCON(sys.argv[1], int(sys.argv[2]))
rcon.connect()
if len(sys.argv) > 3:
rcon.authenticate(sys.argv[3], sys.stdout)
if len(sys.argv) == 5:
data = rcon.single_command(sys.argv[4])
sys.stdout.write(data)
else:
rcon.shell_mode(sys.stdin, sys.stdout)
else:
sys.stdout.write("Password: ")
sys.stdout.flush()
password = sys.stdin.readline().strip()
rcon.authenticate(password, sys.stdout)
rcon.shell_mode(sys.stdin, sys.stdout)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment