Skip to content

Instantly share code, notes, and snippets.

@hoffmabc
Created June 29, 2014 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoffmabc/7a35bfd2c6e59b3c32a1 to your computer and use it in GitHub Desktop.
Save hoffmabc/7a35bfd2c6e59b3c32a1 to your computer and use it in GitHub Desktop.
import json
import logging
from collections import defaultdict
import traceback
from multiprocessing import Process, Queue
from threading import Thread
from random import randint
from zmq.eventloop import ioloop, zmqstream
import zmq
ioloop.install()
from protocol import goodbye
import network_util
from urlparse import urlparse
import sys
class PeerConnection(object):
def __init__(self, transport, address):
# timeout in seconds
self._timeout = 10
self._transport = transport
self._address = address
self._log = logging.getLogger('[%s] %s' % (self._transport._market_id, self.__class__.__name__))
self.create_socket()
def create_socket(self):
self._ctx = zmq.Context()
self._socket = self._ctx.socket(zmq.REQ)
self._socket.setsockopt(zmq.LINGER, 0)
#self._socket.setsockopt(zmq.SOCKS_PROXY, "127.0.0.1:9051");
self._socket.connect(self._address)
self._stream = zmqstream.ZMQStream(self._socket, io_loop=ioloop.IOLoop.current())
def cleanup_socket(self):
self._socket.close()
def send(self, data, callback):
msg = self.send_raw(json.dumps(data), callback)
return msg
def send_raw(self, serialized, callback=lambda msg: None):
msg = self._stream.send(serialized, callback=self.on_message)
print msg
def cb(msg):
print 'callback'
self.on_message(msg)
callback(msg)
self._stream.on_recv(cb)
def on_message(self, msg, callback=lambda msg: None):
self._log.info("Message received: %s" % msg)
callback()
# Transport layer manages a list of peers
class TransportLayer(object):
def __init__(self, market_id, my_ip, my_port, my_guid):
self._peers = {}
self._callbacks = defaultdict(list)
self._port = my_port
self._ip = my_ip
self._guid = my_guid
self._market_id = market_id
self._uri = 'tcp://%s:%s' % (self._ip, self._port)
self._log = logging.getLogger('[%s] %s' % (market_id, self.__class__.__name__))
# signal.signal(signal.SIGTERM, lambda x, y: self.broadcast_goodbye())
def add_callback(self, section, callback):
self._callbacks[section].append(callback)
def trigger_callbacks(self, section, *data):
for cb in self._callbacks[section]:
cb(*data)
if not section == 'all':
for cb in self._callbacks['all']:
cb(*data)
def get_profile(self):
return {'type': 'hello_request', 'uri': self._uri}
def listen(self, pubkey):
#t = Thread(target=self._listen, args=(pubkey,))
#t.setDaemon(True)
#t.start()
self._listen(pubkey)
def _listen(self, pubkey):
self._log.info("Listening at: %s:%s" % (self._ip, self._port))
ctx = zmq.Context()
socket = ctx.socket(zmq.REP)
if network_util.is_loopback_addr(self._ip):
# we are in local test mode so bind that socket on the
# specified IP
socket.bind(self._uri)
else:
socket.bind('tcp://*:%s' % self._port)
#while True:
stream = zmqstream.ZMQStream(socket, io_loop=ioloop.IOLoop.current())
print 'stream: %s' % stream
def handle_recv(message):
print 'got a msg'
for msg in message:
self.on_raw_message(message)
self._log.info('Sending back OK')
#self._socket.send(json.dumps({'type': 'ok', 'senderGUID': self._guid, 'pubkey': pubkey}), flags=zmq.NOBLOCK)
stream.send(json.dumps({'type': 'ok', 'senderGUID': self._guid, 'pubkey': pubkey}))
stream.on_recv(handle_recv)
# t = Thread(target=self.handle_raw_message, args=(message,))
# t.setDaemon(True)
# t.start()
def handle_raw_message(self, message):
self.on_raw_message(message)
def closed(self, *args):
self._log.info("client left")
def _init_peer(self, msg):
uri = msg['uri']
if uri not in self._peers:
self._peers[uri] = CryptoPeerConnection(self, uri)
def remove_peer(self, uri, guid):
self._log.info("Removing peer %s", uri)
ip = urlparse(uri).hostname
port = urlparse(uri).port
if (ip, port, guid) in self._shortlist:
self._shortlist.remove((ip, port, guid))
self._log.info('Removed')
# try:
# del self._peers[uri]
# msg = {
# 'type': 'peer_remove',
# 'uri': uri
# }
# self.trigger_callbacks(msg['type'], msg)
#
# except KeyError:
# self._log.info("Peer %s was already removed", uri)
def send(self, data, send_to=None):
self._log.info("Outgoing Data: %s" % data)
# Directed message
if send_to is not None:
peer = self._dht._routingTable.getContact(send_to)
new_peer = self._dht._transport.get_crypto_peer(peer._guid, peer._address, peer._pub)
new_peer.send(data)
# for peer in self._dht._activePeers:
#
# if peer._guid == send_to:
# self._log.info('Found a matching peer: %s' % peer._guid)
#
#
# peer.send(data)
#
# self._log.debug('Sent message: %s ' % data)
return
else:
# FindKey and then send
for peer in self._dht._activePeers:
try:
data['senderGUID'] = self._guid
if peer._pub:
peer.send(data)
else:
serialized = json.dumps(data)
peer.send_raw(serialized)
except:
self._log.info("Error sending over peer!")
traceback.print_exc()
def broadcast_goodbye(self):
self._log.info("Broadcast goodbye")
msg = goodbye({'uri': self._uri})
self.send(msg)
def on_message(self, msg):
# here goes the application callbacks
# we get a "clean" msg which is a dict holding whatever
self._log.info("[On Message] Data received: %s" % msg)
# if not self._routingTable.getContact(msg['senderGUID']):
# Add to contacts if doesn't exist yet
#self._addCryptoPeer(msg['uri'], msg['senderGUID'], msg['pubkey'])
if msg['type'] != 'ok':
self.trigger_callbacks(msg['type'], msg)
def on_raw_message(self, serialized):
self._log.info("connected " + str(len(serialized)))
try:
msg = json.loads(serialized[0])
except:
self._log.info("incorrect msg! " + serialized)
return
msg_type = msg.get('type')
if msg_type == 'hello_request' and msg.get('uri'):
self.init_peer(msg)
else:
self.on_message(msg)
def valid_peer_uri(self, uri):
try:
[self_protocol, self_addr, self_port] = \
network_util.uri_parts(self._uri)
[other_protocol, other_addr, other_port] = \
network_util.uri_parts(uri)
except RuntimeError:
return False
if not network_util.is_valid_protocol(other_protocol) \
or not network_util.is_valid_port(other_port):
return False
if network_util.is_private_ip_address(self_addr):
if not network_util.is_private_ip_address(other_addr):
self._log.warning(('Trying to connect to external '
'network with a private ip address.'))
else:
if network_util.is_private_ip_address(other_addr):
return False
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment