Created
June 21, 2012 09:10
-
-
Save flub/2964760 to your computer and use it in GitHub Desktop.
Demo script which makes greenlet raise SystemError
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import sys | |
import threading | |
import time | |
import dpkt | |
import eventlet | |
import eventlet.xthread | |
from eventlet.green import socket | |
# Size of data to read from socket, typical MTU of ethernet is 1500. | |
BUFSIZE = 4096 | |
# Linux leaves raw packets in network byte order and leaves the packet | |
# lenght field alone. Everyone else uses host order for IP header | |
# fields and subtracts the IP header from the total lengh, turning it | |
# into the payload lenght. | |
LINUX = sys.platform.startswith('linux') | |
class Producer(threading.Thread): | |
"""This puts ping destination on the queue""" | |
def __init__(self, queue, delay=2, dests=None): | |
"""Constructor | |
:queue: Queue on which to place destinations | |
:delay: How long to wait before puting items on the queue again | |
:dests: List of destinations which will be placed on the queue | |
""" | |
self.queue = queue | |
self.delay = delay | |
if dests is None: | |
self.dests = ['127.0.0.1'] | |
else: | |
self.dests = dests | |
super(Producer, self).__init__(name='producer') | |
self.daemon = True | |
def run(self): | |
print 'starting', self.name | |
while True: | |
for dest in self.dests: | |
self.queue.put(dest) | |
time.sleep(self.delay) | |
class Worker(threading.Thread): | |
"""This consumes ping destinations and puts the result on a queue | |
The pings are executed asynchronously using eventlet. | |
""" | |
def __init__(self, inputq, outputq, count=3, interval=1, timeout=3): | |
"""Constructor | |
:inputq: The queue from which destinations are recevied | |
:outputq: The queue on which results are placed | |
:count: Number of pings to send to one destination | |
:interval: Time to wait between each ping to a destination | |
:timeout: Max time to wait before giving up on a response | |
""" | |
self.inputq = inputq | |
self.outputq = outputq | |
self.count = count | |
self.interval = interval | |
self.timeout = timeout | |
self.pool = None | |
super(Worker, self).__init__(name='worker') | |
self.daemon = True | |
def run(self): | |
print 'starting', self.name | |
eventlet.sleep(0) # start hub, xthread needs this | |
self.pool = eventlet.GreenPool(64) | |
while True: | |
dest = self.inputq.get() | |
self.pool.spawn(self.ping, dest) | |
def ping(self, dest): | |
"""Ping a remote host, putting the result on self.outputq""" | |
sock = self.create_connection(dest) | |
send_times = {} | |
recv_times = {} | |
excess_data = '' | |
if sock.family == socket.AF_INET: | |
icmp = dpkt.icmp.ICMP(type=dpkt.icmp.ICMP_ECHO) | |
recv_data = self.recv_data | |
else: | |
icmp = dpkt.icmp6.ICMP6(type=dpkt.icmp6.ICMP6_ECHO_REQUEST) | |
recv_data = self.recv_data_v6 | |
icmp.data = dpkt.icmp.ICMP.Echo(data='A'*56) | |
icmp.data.id = random.getrandbits(16) | |
for seq in xrange(self.count): | |
if seq != 0: | |
sleeptime = send_times[seq] + 1 - time.time() | |
eventlet.sleep(sleeptime) | |
seq += 1 | |
icmp.data.seq = seq | |
icmp.sum = 0 # So dpkt will re-calculate it | |
data = str(icmp) | |
sent = sock.send(data) | |
while sent < len(data): | |
data = data[sent:] | |
sent = sock.send(data) | |
send_times[seq] = time.time() | |
with eventlet.Timeout(self.interval, False): | |
excess_data = recv_data(sock, icmp.data.id, | |
send_times, recv_times, excess_data) | |
with eventlet.Timeout(self.timeout, False): | |
recv_data(sock, icmp.data.id, send_times, recv_times, excess_data) | |
sock.close() | |
self.outputq.put((dest, send_times, recv_times)) | |
def create_connection(self, dest): | |
"""Create a connected socket for a given destination""" | |
ai_list = socket.getaddrinfo(dest, None, | |
socket.AF_UNSPEC, socket.SOCK_RAW) | |
err = None | |
for af, socktype, proto, cannonname, sockaddr in ai_list: | |
if af == socket.AF_INET: | |
proto = socket.IPPROTO_ICMP | |
else: | |
proto = socket.IPPROTO_ICMPV6 | |
sock = None | |
try: | |
sock = socket.socket(af, socktype, proto) | |
sock.connect(sockaddr) | |
return sock | |
except socket.error as e: | |
err = e | |
if sock is not None: | |
sock.close() | |
if err is not None: | |
raise err | |
else: | |
raise socket.error("getaddrinfo returns an empty list") | |
def recv_data(self, sock, icmp_id, send_times, recv_times, recv_data=''): | |
"""Reads exaclty one echo-reply | |
Use recv_data to pass in any already recieved data. Any | |
received and unused data is returned. Any recieved echo | |
responses will be entered into the recv_times dict. The | |
socket must already be connected so no address filtering is | |
required. | |
Set a timeout on the socket to define how long to wait. | |
""" | |
# Note that we can't use .recvfrom() here as this is broken on | |
# RAW ICMP sockets for IPv4 and doesn't return the whole message. | |
while len(recv_times) < len(send_times): | |
recv_data += sock.recv(BUFSIZE) | |
recv_time = time.time() | |
if len(recv_data) < 20: | |
continue # Need to have read at least an IP header | |
ip = dpkt.ip.IP(recv_data) | |
if not LINUX: | |
# These platforms subtracted the header of the total lenght | |
hl = ip.hl * 4 | |
ip.len += hl | |
ip.data = ip.icmp = dpkt.icmp.ICMP(recv_data[hl:]) | |
if len(recv_data) < ip.len: | |
continue | |
recv_data = recv_data[ip.len:] | |
if ip.data.type != dpkt.icmp.ICMP_ECHOREPLY: | |
continue | |
if ip.data.data.id != icmp_id: | |
continue | |
recv_times[ip.data.data.seq] = recv_time | |
return recv_data | |
def recv_data_v6(self, sock, icmp_id, | |
send_times, recv_times, recv_data=''): | |
"""As .recv_data() but for IPv6/ICMPv6""" | |
# On IPv6/ICMPv6 we can and must use recvfrom() as that is the | |
# only way to know the payload length (since we don't have | |
# recvmsg()). This means the recv_data parameter and | |
# excess_data return values are just dummies. In theory we | |
# lose data if our buffer is too small for the requested data, | |
# but we don't use this data anyway so can safely throw it | |
# away. | |
while len(recv_times) < len(send_times): | |
recv_data, _ = sock.recvfrom(BUFSIZE) | |
recv_time = time.time() | |
if len(recv_data) < 4: | |
continue # broken packet | |
icmp = dpkt.icmp6.ICMP6(recv_data) | |
if icmp.type != dpkt.icmp6.ICMP6_ECHO_REPLY: | |
continue | |
if icmp.echo.id != icmp_id: | |
continue | |
recv_times[icmp.echo.seq] = recv_time | |
return '' | |
class Consumer(threading.Thread): | |
"""This consumes the ping resuls""" | |
def __init__(self, queue): | |
self.queue = queue | |
super(Consumer, self).__init__(name='consumer') | |
self.daemon = True | |
def run(self): | |
print 'starting', self.name | |
while True: | |
dest, send_times, recv_times = self.queue.get() | |
print '%s: %d/%d' % (dest, len(send_times), len(recv_times)) | |
def main(): | |
inputq = eventlet.xthread.Queue() | |
outputq = eventlet.xthread.Queue() | |
producer = Producer(inputq, delay=2, | |
dests=['localhost', '127.0.0.1', | |
'larissa', '192.168.2.106', | |
'earth', '192.168.2.103', | |
'oracle.com', '137.254.16.101']) | |
worker = Worker(inputq, outputq, count=3, interval=1, timeout=3) | |
consumer = Consumer(outputq) | |
consumer.start() | |
worker.start() | |
producer.start() | |
while True: | |
time.sleep(0.2) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment