Skip to content

Instantly share code, notes, and snippets.

@flub
Created June 21, 2012 09:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flub/2964760 to your computer and use it in GitHub Desktop.
Save flub/2964760 to your computer and use it in GitHub Desktop.
Demo script which makes greenlet raise SystemError
import random
import sys
import threading
import time
import dpkt
import eventlet
import eventlet.xthread
from eventlet.green import socket
# Size of data to read from socket, typical MTU of ethernet is 1500.
BUFSIZE = 4096
# Linux leaves raw packets in network byte order and leaves the packet
# lenght field alone. Everyone else uses host order for IP header
# fields and subtracts the IP header from the total lengh, turning it
# into the payload lenght.
LINUX = sys.platform.startswith('linux')
class Producer(threading.Thread):
"""This puts ping destination on the queue"""
def __init__(self, queue, delay=2, dests=None):
"""Constructor
:queue: Queue on which to place destinations
:delay: How long to wait before puting items on the queue again
:dests: List of destinations which will be placed on the queue
"""
self.queue = queue
self.delay = delay
if dests is None:
self.dests = ['127.0.0.1']
else:
self.dests = dests
super(Producer, self).__init__(name='producer')
self.daemon = True
def run(self):
print 'starting', self.name
while True:
for dest in self.dests:
self.queue.put(dest)
time.sleep(self.delay)
class Worker(threading.Thread):
"""This consumes ping destinations and puts the result on a queue
The pings are executed asynchronously using eventlet.
"""
def __init__(self, inputq, outputq, count=3, interval=1, timeout=3):
"""Constructor
:inputq: The queue from which destinations are recevied
:outputq: The queue on which results are placed
:count: Number of pings to send to one destination
:interval: Time to wait between each ping to a destination
:timeout: Max time to wait before giving up on a response
"""
self.inputq = inputq
self.outputq = outputq
self.count = count
self.interval = interval
self.timeout = timeout
self.pool = None
super(Worker, self).__init__(name='worker')
self.daemon = True
def run(self):
print 'starting', self.name
eventlet.sleep(0) # start hub, xthread needs this
self.pool = eventlet.GreenPool(64)
while True:
dest = self.inputq.get()
self.pool.spawn(self.ping, dest)
def ping(self, dest):
"""Ping a remote host, putting the result on self.outputq"""
sock = self.create_connection(dest)
send_times = {}
recv_times = {}
excess_data = ''
if sock.family == socket.AF_INET:
icmp = dpkt.icmp.ICMP(type=dpkt.icmp.ICMP_ECHO)
recv_data = self.recv_data
else:
icmp = dpkt.icmp6.ICMP6(type=dpkt.icmp6.ICMP6_ECHO_REQUEST)
recv_data = self.recv_data_v6
icmp.data = dpkt.icmp.ICMP.Echo(data='A'*56)
icmp.data.id = random.getrandbits(16)
for seq in xrange(self.count):
if seq != 0:
sleeptime = send_times[seq] + 1 - time.time()
eventlet.sleep(sleeptime)
seq += 1
icmp.data.seq = seq
icmp.sum = 0 # So dpkt will re-calculate it
data = str(icmp)
sent = sock.send(data)
while sent < len(data):
data = data[sent:]
sent = sock.send(data)
send_times[seq] = time.time()
with eventlet.Timeout(self.interval, False):
excess_data = recv_data(sock, icmp.data.id,
send_times, recv_times, excess_data)
with eventlet.Timeout(self.timeout, False):
recv_data(sock, icmp.data.id, send_times, recv_times, excess_data)
sock.close()
self.outputq.put((dest, send_times, recv_times))
def create_connection(self, dest):
"""Create a connected socket for a given destination"""
ai_list = socket.getaddrinfo(dest, None,
socket.AF_UNSPEC, socket.SOCK_RAW)
err = None
for af, socktype, proto, cannonname, sockaddr in ai_list:
if af == socket.AF_INET:
proto = socket.IPPROTO_ICMP
else:
proto = socket.IPPROTO_ICMPV6
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.connect(sockaddr)
return sock
except socket.error as e:
err = e
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")
def recv_data(self, sock, icmp_id, send_times, recv_times, recv_data=''):
"""Reads exaclty one echo-reply
Use recv_data to pass in any already recieved data. Any
received and unused data is returned. Any recieved echo
responses will be entered into the recv_times dict. The
socket must already be connected so no address filtering is
required.
Set a timeout on the socket to define how long to wait.
"""
# Note that we can't use .recvfrom() here as this is broken on
# RAW ICMP sockets for IPv4 and doesn't return the whole message.
while len(recv_times) < len(send_times):
recv_data += sock.recv(BUFSIZE)
recv_time = time.time()
if len(recv_data) < 20:
continue # Need to have read at least an IP header
ip = dpkt.ip.IP(recv_data)
if not LINUX:
# These platforms subtracted the header of the total lenght
hl = ip.hl * 4
ip.len += hl
ip.data = ip.icmp = dpkt.icmp.ICMP(recv_data[hl:])
if len(recv_data) < ip.len:
continue
recv_data = recv_data[ip.len:]
if ip.data.type != dpkt.icmp.ICMP_ECHOREPLY:
continue
if ip.data.data.id != icmp_id:
continue
recv_times[ip.data.data.seq] = recv_time
return recv_data
def recv_data_v6(self, sock, icmp_id,
send_times, recv_times, recv_data=''):
"""As .recv_data() but for IPv6/ICMPv6"""
# On IPv6/ICMPv6 we can and must use recvfrom() as that is the
# only way to know the payload length (since we don't have
# recvmsg()). This means the recv_data parameter and
# excess_data return values are just dummies. In theory we
# lose data if our buffer is too small for the requested data,
# but we don't use this data anyway so can safely throw it
# away.
while len(recv_times) < len(send_times):
recv_data, _ = sock.recvfrom(BUFSIZE)
recv_time = time.time()
if len(recv_data) < 4:
continue # broken packet
icmp = dpkt.icmp6.ICMP6(recv_data)
if icmp.type != dpkt.icmp6.ICMP6_ECHO_REPLY:
continue
if icmp.echo.id != icmp_id:
continue
recv_times[icmp.echo.seq] = recv_time
return ''
class Consumer(threading.Thread):
"""This consumes the ping resuls"""
def __init__(self, queue):
self.queue = queue
super(Consumer, self).__init__(name='consumer')
self.daemon = True
def run(self):
print 'starting', self.name
while True:
dest, send_times, recv_times = self.queue.get()
print '%s: %d/%d' % (dest, len(send_times), len(recv_times))
def main():
inputq = eventlet.xthread.Queue()
outputq = eventlet.xthread.Queue()
producer = Producer(inputq, delay=2,
dests=['localhost', '127.0.0.1',
'larissa', '192.168.2.106',
'earth', '192.168.2.103',
'oracle.com', '137.254.16.101'])
worker = Worker(inputq, outputq, count=3, interval=1, timeout=3)
consumer = Consumer(outputq)
consumer.start()
worker.start()
producer.start()
while True:
time.sleep(0.2)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment