Created
November 8, 2011 20:25
-
-
Save anonymous/1349063 to your computer and use it in GitHub Desktop.
pspos
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2 | |
# License: WTFPL <http://sam.zoy.org/wtfpl> | |
# >You just DO WHAT THE FUCK YOU WANT TO.< | |
import pcap | |
import struct | |
class Packet(object): | |
def __init__(self, data): | |
self.pos = (0, 0, 0) | |
self.rotation = 0 | |
self.sector = '' | |
self.is_valid = False | |
self.data = data | |
self.strip_header() | |
if self.data: self.unpack() | |
def __str__(self): | |
return str((self.pos, self.rotation, self.sector)) | |
def __repr__(self): | |
return str((self.pos, self.rotation, self.sector)) | |
def strip_header(self): | |
# Strip UDP header / extract data | |
if self.data and self.data[12:14] == '\x08\x00': | |
header_len = ord(self.data[14]) & 0x0f | |
self.data = self.data[22 + (4 * header_len):] | |
else: self.data = b'' | |
# Strip PS header: | |
# 4 bytes id / 4 bytes offset / 4 bytes size | |
# 2 bytes size / 1 byte priority / 2 bytes size | |
# 1 byte msg_type = 15 bytes | |
if len(self.data) > 15: # Acks are smaller | |
(msg_type, ) = struct.unpack('<B', self.data[15]) | |
# DeadReckoningPacket | |
if msg_type == 16: self.data = self.data[18:] | |
else: self.data = b'' | |
else: self.data = b'' | |
def struct_unpack(self, pattern): | |
size = struct.calcsize(pattern) | |
(data,) = struct.unpack(pattern, self.data[:size]) | |
self.data = self.data[size:] | |
return data | |
def unpack(self): | |
eid = self.struct_unpack('<L') | |
counter = self.struct_unpack('<B') | |
flags = self.struct_unpack('<B') | |
if flags & 1: mode = self.struct_unpack('<B') | |
if flags & 2: ang_vel = self.struct_unpack('<f') | |
if flags & 4: vel = (self.struct_unpack('<f'), 0, 0) | |
if flags & 8: vel = (0, self.struct_unpack('<f'), 0) | |
if flags & 16: vel = (0, 0, self.struct_unpack('<f')) | |
if flags & 32: world_vel = (self.struct_unpack('<f'), 0, 0) | |
if flags & 64: world_vel = (0, self.struct_unpack('<f'), 0) | |
if flags & 128: world_vel = (0, 0, self.struct_unpack('<f')) | |
self.pos = (self.struct_unpack('<f'), | |
self.struct_unpack('<f'), | |
self.struct_unpack('<f')) | |
rotation = self.struct_unpack('<B') | |
# quantized to 0 - 256: radian = (2pi * quan) / 256 | |
self.rotation = (2 * 3.14159 * rotation) / 256 | |
if self.struct_unpack('<L') == 4294967295: | |
self.sector = self.data[:-1].decode('utf-8') | |
else: self.sector = '' | |
self.is_valid = True | |
class Sniffer(pcap.pcapObject): | |
def __init__(self, dst, port=7777, device=None): | |
pcap.pcapObject.__init__(self) | |
if device is None: device = pcap.lookupdev() | |
self.open_live(device, 65535, 0, 100) | |
self.setfilter('dst {} and port {} and udp'.format(dst, port), 0, 0) | |
def get_callback(self, callback): | |
def process_packet(pktlen, data, timestamp): | |
packet = Packet(data) | |
if packet.is_valid: callback(timestamp, packet) | |
return process_packet | |
def loop(self, count, callback): | |
pcap.pcapObject.loop(self, count, self.get_callback(callback)) | |
def dispatch(self, count, callback): | |
pcap.pcapObject.dispatch(self, count, self.get_callback(callback)) | |
def next(self, *args): | |
ret = pcap.pcapObject.next(self) | |
if not ret in None: | |
(pktlen, data, timestamp) = ret | |
packet = Packet(data) | |
if packet.is_valid: return (timestamp, packet) | |
return None | |
def main(): | |
def print_it(*args): print args | |
sniffer = Sniffer('62.173.168.9') | |
try: sniffer.loop(0, print_it) | |
except KeyboardInterrupt: pass | |
if __name__ == '__main__': main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment