Skip to content

Instantly share code, notes, and snippets.

@esromneb
Last active December 1, 2015 03:17
Show Gist options
  • Save esromneb/1137f0632127636a9c8c to your computer and use it in GitHub Desktop.
Save esromneb/1137f0632127636a9c8c to your computer and use it in GitHub Desktop.
lossy socket for python allows you to tune how many packets are dropped
# This class wraps socket, and will maybe send your packet
# tune tx_drop_rate and rx_drop_rate, 0 means transmit everything, 1 means drop everything
# This class works best for UDP, as it doesn't allow you to drop TCP control messages
import socket
import random
class LossySocket(object):
def __init__(self, *p):
self._sock = socket.socket(*p)
self.tx_drop_rate = 0.0 # percentage of outgoing packets to drop in [0,1]
self.rx_drop_rate = 0.0 # percentage of incoming packets to drop in [0,1]
self.verbose = True
if self.verbose: print "LossySocket init"
def __getattr__(self, name):
if self.verbose: print "Getattr", name
return getattr(self._sock, name)
def sendall(self, *p):
if self.verbose: print "LossySocket sendall"
return self._sock.sendall(*p)
def send(self, *p):
if self.verbose: print "LossySocket send"
return self._sock.send(*p)
def recv(self, *p):
if self.verbose: print "LossySocket recv"
ret = self._sock.recv(*p)
if random.random() < self.rx_drop_rate:
if self.verbose: print "LossySocket dropping RX"
raise socket.error(11, 'Intentionally dropped packet')
return ret
def sendto(self, *p):
if self.verbose: print "LossySocket sendto"
if random.random() < self.tx_drop_rate:
if self.verbose: print "LossySocket dropping TX"
ret = len(p[0]) # drop packet
else:
ret = self._sock.sendto(*p) # send packet
return ret
def connect(self, *p):
if self.verbose: print "LossySocket connecting..."
self._sock.connect(*p)
if self.verbose: print "LossySocket connected"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment