Skip to content

Instantly share code, notes, and snippets.

@soyo42
Last active November 3, 2015 08:29
Show Gist options
  • Save soyo42/1d1b7fb6cb3301e55f3d to your computer and use it in GitHub Desktop.
Save soyo42/1d1b7fb6cb3301e55f3d to your computer and use it in GitHub Desktop.
primitive and lightweight openflow switch emulator
#!/usr/bin/python
import struct
from io import BytesIO
import logging
class DummyExperimenterSupport:
"""
Support for basic dummy experimenter based on experimenterId = 123 and type = 123
"""
LOG = logging.getLogger('DummyExperimenterSupport')
_EXPERIMENTER_OF_HEADER = b'\x04\x04\x00\x00'
_EXPERIMENTER_ID = b'\x00\x00\x00\x7b'
_DUMMY_TYPE_01 = b'\x00\x00\x00\x7b'
def __init__(self):
self._actions = {'expDummy01-Symmetric': DummyExperimenterSupport.build_exp_dummy_01_symmetric}
@staticmethod
def build_exp_dummy_01_symmetric(xid):
bio = BytesIO()
# OFHeader
bio.write(DummyExperimenterSupport._EXPERIMENTER_OF_HEADER)
bio.write(xid)
# experimenter id (123)
bio.write(DummyExperimenterSupport._EXPERIMENTER_ID)
# experimenter type (123)
bio.write(DummyExperimenterSupport._DUMMY_TYPE_01)
# experimenter data
# bio.write(b'\x6c\x61\x6c\x61\x20\x68\x6f\x20\x70\x61\x70\x6c\x75\x68\x61\x20' +
# b'\x6f\x67\x72\x63\x61\x75\x20\x6d\x69\x20\x6b\x72\x70\x63\x65\x00')
bio.write(b'what do you get when you multiply 6 by 9?\x00')
# update size
real_size = bio.tell()
bio.seek(2)
bio.write(struct.pack('>H', real_size))
return bio.getvalue()
def get_actions(self):
return self._actions
# alias
SWITCH_MENU_ADDITION = {'dummy': DummyExperimenterSupport()}
= pySwitch dummy emulator =
===========================
This is very primitive openflow-1.3 device emulator. It is capable of
completing handshake and then it keeps sending echo message in 2 second
intevals.
== Additional testing feature ==
There is menu controlled by simple entering number of chosen action.
The menu will scroll up by message logs. Hitting enter will print
menu again. By entering action number a custom messages will be sent to
controller. (0 = EXIT)
== Quick start ==
# in order to connect to controller listening at localhost:6633 as dpid=1
./switchTcp.py localhost 6633 1
# in order to connect to controller listening at localhost:6633 as dpid=1
# and provide dummy experimenter symmetric message
./switchTcp.py localhost 6633 1 dummyExperimenter
== Next step ==
Feel free to add custom messages and to hack this script in order to
create simulate your testing scenario.
#!/usr/bin/python
import sys
import socket
import struct
import threading
import time
import Queue
from io import BytesIO
import logging
import traceback
import importlib
class OpenFlowSimu:
BUFFER_SIZE = 1024
# OF-1.3 hello message
HELLO_MESSAGE_1 = '\x04\x00\x00\x08\x01\x02\x03\x04' # 'Hello, World!'
LOG = logging.getLogger('OpenFlowSimu')
TCP_PROPERTY_FORMAT = "B" * 7 + "I" * 21
def __init__(self, dp_id=1, menu_addition=None):
self._active = False
self._xid = 1
self._lastXid = b'\x00\x00\x00\x2a'
self._actions = {'EXIT': None,
'echoReq': OpenFlowSimu.build_echo_req,
'echoRes': OpenFlowSimu.build_echo_res}
# self._menu = self._actions.keys()
self._menu = ['EXIT', 'echoReq', 'echoRes']
self.receptionQueue = Queue.Queue(12)
self._dpId = dp_id
self._s = None
if menu_addition:
mod = importlib.import_module(menu_addition)
switch_menu_addition = getattr(mod, 'SWITCH_MENU_ADDITION')
for name, ext_support in switch_menu_addition.iteritems():
ext_actions = ext_support.get_actions()
self._menu.extend(ext_actions.keys())
self._actions.update(ext_actions)
def connect(self, host, port):
# connecting
self.LOG.info('opening socket')
self._s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._s.connect((host, port))
self._s.settimeout(5)
self._active = True
self.LOG.info('\033[33;1mCONNECTED\033[0m')
def disconnect(self):
self.LOG.info('closing..')
self._active = False
self._s.shutdown(socket.SHUT_RDWR)
self._s.close()
self.LOG.info('\033[33;1mDISCONNECTED\033[0m')
def _build_features(self):
bio = BytesIO()
bio.write(b'\x04\x06\x00\x20')
bio.write(self._lastXid)
# bio.write(b'\x00\x01\x02\x03\x04\x05\x06\x07')
bio.write(struct.pack('>Q', self._dpId))
bio.write(b'\x00\x01\x02\x03\x10\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03')
return bio.getvalue()
@staticmethod
def build_echo_req(xid):
bio = BytesIO()
bio.write(b'\x04\x02\x00\x08')
bio.write(xid)
return bio.getvalue()
@staticmethod
def build_echo_res(xid):
bio = BytesIO()
bio.write(b'\x04\x03\x00\x08')
bio.write(xid)
return bio.getvalue()
@staticmethod
def build_barrier_res(xid):
bio = BytesIO()
bio.write(b'\x04\x15\x00\x08')
bio.write(xid)
return bio.getvalue()
@staticmethod
def build_mp_desc_res(xid):
bio = BytesIO()
bio.write(b'\x04\x13\x00\x08')
bio.write(xid)
# type desc
bio.write(b'\x00\x00')
# mp flags
bio.write(b'\x00\x00')
OpenFlowSimu._pad(bio, 4)
# manufacturer
OpenFlowSimu._append_fixed(bio, 'Dummy, Inc.', 256)
# hw desc
OpenFlowSimu._append_fixed(bio, 'simple-dummy-01', 256)
# sw desc
OpenFlowSimu._append_fixed(bio, 'simple-dummy-os_0.1', 256)
# S/N
OpenFlowSimu._append_fixed(bio, 'none', 32)
# datapath desc
OpenFlowSimu._append_fixed(bio, 'none', 256)
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def build_mp_meter_features_res(xid):
bio = BytesIO()
bio.write(b'\x04\x13\x00\x08')
bio.write(xid)
# type desc
bio.write(b'\x00\x0b')
# mp flags
bio.write(b'\x00\x00')
OpenFlowSimu._pad(bio, 4)
# empty
OpenFlowSimu._pad(bio, 20)
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def build_mp_group_features_res(xid):
bio = BytesIO()
bio.write(b'\x04\x13\x00\x08')
bio.write(xid)
# type desc
bio.write(b'\x00\x08')
# mp flags
bio.write(b'\x00\x00')
OpenFlowSimu._pad(bio, 4)
# empty
OpenFlowSimu._pad(bio, 44)
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def build_mp_port_desc_res(xid):
bio = BytesIO()
bio.write(b'\x04\x13\x00\x08')
bio.write(xid)
# type desc
bio.write(b'\x00\x0d')
# mp flags
bio.write(b'\x00\x00')
OpenFlowSimu._pad(bio, 4)
# port no
OpenFlowSimu._append_fixed(bio, b'\xff\xff\xff\xfe', 8)
# MAC addr
OpenFlowSimu._append_fixed(bio, b'\x01\x02\x03\x04\x05\x06', 8)
# name
OpenFlowSimu._append_fixed(bio, 's1', 16)
# config
bio.write(b'\x00\x00\x00\x01')
# state
bio.write(b'\x00\x00\x00\x01')
# current + advertized + supported + peer
OpenFlowSimu._pad(bio, 16)
# current speed + max speed
OpenFlowSimu._pad(bio, 8)
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def build_mp_error_res(xid, orig_mp):
bio = BytesIO()
bio.write(b'\x04\x01\x00\x08')
bio.write(xid)
# eror type
bio.write(b'\x00\x01')
# error code
bio.write(b'\x00\x02')
# data
bio.write(orig_mp[:16])
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def build_role_res(xid, role, generationId):
bio = BytesIO()
bio.write(b'\x04\x19\x00\x08')
bio.write(xid)
# role + padding(4)
bio.write(role)
OpenFlowSimu._pad(bio, 4)
# generationId
bio.write(generationId)
OpenFlowSimu._fix_length_info(bio)
return bio.getvalue()
@staticmethod
def _pad(buffer, pad_byte_amount):
buffer.write(b'\x00' * pad_byte_amount)
@staticmethod
def _append_fixed(buffer, content, fixed_size):
buffer.write(content)
OpenFlowSimu._pad(buffer, fixed_size - len(content))
def get_next_xid(self):
self._xid += 1
value = struct.pack('>I', self._xid)
return value
@staticmethod
def dump_data(data):
hex_bag = []
for i in data:
hex_bag.append('{0:02x}'.format(ord(i)))
return ' '.join(hex_bag)
@staticmethod
def _fix_length_info(buffer):
real_size = buffer.tell()
buffer.seek(2)
buffer.write(struct.pack('>H', real_size))
def _send(self, msg):
self.LOG.debug('\033[32msending msg..\033[0m [%.25s] l:%d', OpenFlowSimu.dump_data(msg), len(msg))
self._s.send(msg)
self.LOG.debug('\033[32;1mSENT\033[0m %sxid:[%s]', 17*' ', OpenFlowSimu.dump_data(msg[4:8]))
def _receive(self):
data = self._s.recv(OpenFlowSimu.BUFFER_SIZE)
self.LOG.debug('\033[36mRECEIVED\033[0m %s[%.25s]', 5*' ', OpenFlowSimu.dump_data(data))
messages = OpenFlowSimu._tokenize_data_to_msg(data)
self._lastXid = messages[-1][4:8]
OpenFlowSimu.LOG.debug(' last XID : {0}'.format(OpenFlowSimu.dump_data(self._lastXid)))
return messages
def _receive_messages(self):
received_messages = self._receive()
for msg in received_messages:
self.receptionQueue.put(msg, True)
def _pop_received_message(self, timeout=5):
popped_msg = None
try:
popped_msg = self.receptionQueue.get(True, timeout)
except Queue.Empty:
pass
return popped_msg
@staticmethod
def _extract_first_msg_size(data):
msg_size = struct.unpack_from('>H', data[2:4])[0]
return msg_size
@staticmethod
def _tokenize_data_to_msg(data):
chunks = []
while len(data) > 0:
first_msg_size = OpenFlowSimu._extract_first_msg_size(data)
chunks.append(data[0:first_msg_size])
OpenFlowSimu.LOG.debug(' msg size : {0}'.format(first_msg_size))
data = data[first_msg_size:]
return chunks
def keep_receiving(self):
while self._active:
try:
self._receive_messages()
except Exception as e:
OpenFlowSimu.LOG.info('socket unreadable: {0}'.format(e))
self._active = False
break
self.LOG.info('#T: receiving is over')
def keep_echoing(self):
while self._active:
try:
self._send(OpenFlowSimu.build_echo_req(self.get_next_xid()))
time.sleep(4)
except Exception as e:
OpenFlowSimu.LOG.debug('socket sending failed: {0}'.format(e))
self._active = False
self.LOG.info('#T: echoing is over')
def keep_processing(self):
while self._active:
try:
message = self.receptionQueue.get(True, 5)
self._process_msg(message)
except Queue.Empty: # no messages in queue
pass
self.LOG.info('#T: processing is over')
def _process_msg(self, message):
# prepare important header values
head2byte = message[0:2]
msg_xid = message[4:8]
if b'\x04\x02' == head2byte: # echo request received - need to respond
self._send(OpenFlowSimu.build_echo_res(msg_xid))
elif b'\x04\x03' == head2byte: # echo response received - NOOP
pass
elif b'\x04\x12' == head2byte: # mp request received - need to respond
mp_type = message[8:10]
if b'\x00\x00' == mp_type: # desc
self._send(OpenFlowSimu.build_mp_desc_res(msg_xid))
if b'\x00\x08' == mp_type: # group features
self._send(OpenFlowSimu.build_mp_group_features_res(msg_xid))
if b'\x00\x0b' == mp_type: # meter features
self._send(OpenFlowSimu.build_mp_meter_features_res(msg_xid))
if b'\x00\x0c' == mp_type: # table desc
self._send(OpenFlowSimu.build_mp_error_res(msg_xid, message))
if b'\x00\x0d' == mp_type: # port desc
self._send(OpenFlowSimu.build_mp_port_desc_res(msg_xid))
elif b'\x04\x14' == head2byte: # barrier request received - need to respond
self._send(OpenFlowSimu.build_barrier_res(msg_xid))
elif b'\x04\x18' == head2byte: # role request - need to respond
role = message[8:12]
if b'\x00\x00\x00\x00' == role: # current generationId query
self._send(OpenFlowSimu.build_role_res(msg_xid, b'\x00\x00\x00\x01', b'\x00\x00\x00\x00\x00\x00\x00\x18'))
else: # role change request
self._send(OpenFlowSimu.build_role_res(msg_xid, role, message[16:24]))
elif b'\x04\x0e' == head2byte: # flow mod - just count
# TODO count
self.LOG.info('flow-mod {0}'.format(OpenFlowSimu.dump_data(message[25:26])))
else:
if len(message) > 0:
self.LOG.warn('\033[43mNOT SUPPORTED\033[0m [%s]', OpenFlowSimu.dump_data(message[:8]))
def menu(self):
local_xid = bytes(b'\x00\x00\x01\x00')
while self._active:
print 'enter choice:'
idx = -1
for i in self._menu:
idx += 1
print '{0:2d}: {1}'.format(idx, i)
try:
user_input = sys.stdin.readline()
choice = self._menu[int(user_input)]
except Exception as e:
continue
print 'choice={}'.format(choice)
if self._actions[choice] is None:
break
try:
to_send = self._actions[choice](local_xid)
xid = struct.unpack_from('>L', local_xid)[0]
OpenFlowSimu.LOG.debug('local xid = {0}'.format(OpenFlowSimu.dump_data(struct.pack('>L', xid))))
local_xid = struct.pack('>L', xid+1)
OpenFlowSimu.LOG.debug('local xid +1 = {0}'.format(OpenFlowSimu.dump_data(local_xid)))
if to_send:
self._send(to_send)
except Exception as e:
OpenFlowSimu.LOG.error('menu problem: {0}'.format(e))
traceback.print_exc()
raise e
def start_simulation(self):
# receiving must be alive since beginning
t_recv = threading.Thread(target=self.keep_receiving, args=[])
t_recv.start()
# HANDSHAKE
self._send(OpenFlowSimu.HELLO_MESSAGE_1)
msg = self._pop_received_message()
if msg[0:2] == '\x04\x00':
OpenFlowSimu.LOG.info('hello received -> waiting for feature request')
else:
raise Exception('response not understood')
msg = self._pop_received_message()
if msg[0:2] == '\x04\x05':
print 'features request received -> answering'
features = self._build_features()
self._send(features)
else:
raise Exception('response not understood: {}'.format(OpenFlowSimu.dump_data(msg)))
t_proc = threading.Thread(target=self.keep_processing, args=[])
t_proc.start()
t_echo = threading.Thread(target=self.keep_echoing, args=[])
t_echo.start()
def main():
if len(sys.argv) < 4:
sys.stderr.write('usage:: {0} <host> <port> <dpid> [menu addition module]\n'.format(sys.argv[0]))
sys.exit(1)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)7s:%(name)s: %(message)s')
logging.info('starting')
simu = None
try:
host = sys.argv[1]
port = int(sys.argv[2])
dp_id = int(sys.argv[3])
menu_add = sys.argv[4] if len(sys.argv) > 4 else None
simu = OpenFlowSimu(dp_id, menu_add)
simu.connect(host, port)
simu.start_simulation()
simu.menu()
except Exception as e:
print('issue occured: {}'.format(e))
raise e
finally:
if simu:
simu.disconnect()
logging.info('finishing')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment