Skip to content

Instantly share code, notes, and snippets.

@qstokkink
Last active October 23, 2018 08:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qstokkink/90fdcba7144a60ca697a41e0e7327dac to your computer and use it in GitHub Desktop.
Save qstokkink/90fdcba7144a60ca697a41e0e7327dac to your computer and use it in GitHub Desktop.
Final version of IDEMIA POC server files

Setup

Run the following command:

git clone https://github.com/Tribler/py-ipv8.git pyipv8

Follow the setup instructions of IPv8 and install the listed dependencies.

Running the server

Run the following command:

python main.py
from base64 import b64decode, b64encode
import json
from urllib import quote
import urllib2
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import deferLater
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from pyipv8.ipv8_service import IPv8
from pyipv8.ipv8.configuration import get_default_configuration
from pyipv8.ipv8.REST.rest_manager import RESTManager
__all__ = ['KNOWN_PSNS', 'main_loop']
# Set of all verified psns encountered on the TrustChain
KNOWN_PSNS = set()
# The agent used for REST requests
AGENT = Agent(reactor)
# The url to get the attestation response
URL_IDEMIA_GET_ATTESTATION = "https://demo-apac-morphocivis.i-sphere.fr:8082/getattestation"
# IDEMIA attribute name
IDEMIA_ATTRIBUTE_NAME = "qrcode"
def create_ipv8():
"""
Start the IPv8 instance.
We don't load any community other than the AttestationCommunity and IdentityCommunity.
:returns: the ipv8 instance and the REST API url for communication
:rtype: (IPv8, str)
"""
configuration = get_default_configuration()
configuration['logger'] = {'level': "DEBUG"}
configuration['overlays'] = \
[
{
'class': 'DiscoveryCommunity',
'key': "my peer",
'walkers': [
{
'strategy': "RandomWalk",
'peers': 4,
'init': {
'timeout': 60.0
}
},
{
'strategy': "RandomChurn",
'peers': -1,
'init': {
'sample_size': 8,
'ping_interval': 10.0,
'inactive_time': 27.5,
'drop_time': 57.5
}
}
],
'initialize': {},
'on_start': [
('resolve_dns_bootstrap_addresses', )
]
},
{
'class': 'AttestationCommunity',
'key': "anonymous id",
'walkers': [{
'strategy': "RandomWalk",
'peers': 20,
'init': {
'timeout': 60.0
}
}],
'initialize': {},
'on_start': []
},
{
'class': 'IdentityCommunity',
'key': "anonymous id",
'walkers': [{
'strategy': "RandomWalk",
'peers': 20,
'init': {
'timeout': 60.0
}
}],
'initialize': {},
'on_start': []
},
{
'class': 'DHTDiscoveryCommunity',
'key': "anonymous id",
'walkers': [{
'strategy': "RandomWalk",
'peers': 4,
'init': {
'timeout': 3.0
}
}],
'initialize': {},
'on_start': []
}
]
ipv8 = IPv8(configuration)
rest_manager = RESTManager(ipv8)
rest_manager.start()
return ipv8, "http://localhost:8085/attestation"
def make_request(url, type, arguments={}):
"""
Perform an HTTP request to a local REST API.
:param url: the REST API url
:type url: str
:param type: the type of request (GET or POST)
:type type: str
:param arguments: the key value store of arguments to pass
:type arguments: {str: str}
:returns: the response to the request
:rtype: Deferred
"""
global AGENT
request_url = url + '?' + '&'.join("%s=%s" % (k,v) for k,v in arguments.iteritems())
print "\t[HTTP-%s] %s" % (type, request_url)
d = AGENT.request(
type,
request_url,
Headers({'User-Agent': ['Twisted Web Client Example'],
'Content-Type': ['text/x-greeting']}),
None)
return d.addCallback(lambda response: readBody(response))
def contact_IDEMIA_cloud(metadata):
"""
Ask the IDEMIA cloud for attestation, given some metadata.
:param metadata: the metadata to pass
:type metadata: {str: str}
:returns: the response key value store
:rtype: {str: str}
"""
data = json.dumps(metadata)
req = urllib2.Request(URL_IDEMIA_GET_ATTESTATION)
req.get_method = lambda: 'POST'
req.add_header('content-type', 'application/json')
req.add_header("Content-Length", str(len(data)))
req.add_data(data)
try:
rep = urllib2.urlopen(req)
response = rep.read(int(rep.headers['content-length']))
return json.loads(response)
except urllib2.HTTPError, e:
print str(e) + ': ' + e.fp.read()
return {}
@inlineCallbacks
def try_attest(restapi):
"""
Check if we have any outstanding requests for attestation.
:param restapi: the REST API to query
:type restapi: str
:returns: None
"""
requests = yield make_request(restapi, 'GET', {
'type': 'outstanding'
})
value = json.loads(requests)
print "Pending attestation request for attester:", value
for (identifier, attribute, metadata_b64) in value:
if str(attribute) == "QR":
metadata = json.loads(b64decode(metadata_b64))
response = contact_IDEMIA_cloud(metadata)
if response.get(IDEMIA_ATTRIBUTE_NAME, None):
yield make_request(restapi, 'POST', {
'type': 'attest',
'mid': str(identifier).replace("+", "%2B"),
'attribute_name': str(attribute),
'attribute_value': quote(b64encode(response[IDEMIA_ATTRIBUTE_NAME])).replace("+", "%2B")
})
else:
yield make_request(restapi, 'POST', {
'type': 'attest',
'mid': str(identifier).replace("+", "%2B"),
'attribute_name': str(attribute),
'attribute_value': "no"
})
print "ERROR! IDEMIA response did not contain", IDEMIA_ATTRIBUTE_NAME, "entry:", response
@inlineCallbacks
def try_verify(restapi):
"""
Check the network for any attributes with a psn.
We update the KNOWN_PSNS if we find such an attribute.
:param restapi: the REST API to query
:type restapi: str
:returns: None
"""
peers = yield make_request(restapi, 'GET', {
'type': 'peers'
})
identifiers = json.loads(peers)
for identifier in identifiers:
value = yield make_request(restapi, 'GET', {
'type': 'attributes',
'mid': str(identifier).replace("+", "%2B")
})
attributes = json.loads(value)
print "Peer", identifier, "has attributes", attributes
if attributes:
for (_, __, metadata, attester) in attributes:
psn = metadata.get("psn", None)
if psn:
KNOWN_PSNS.add(psn)
@inlineCallbacks
def main_loop():
"""
The main loop to attest to attributes and verify their existence.
:returns: None
"""
ipv8, restapi = create_ipv8()
while True:
try:
# Aggressively advertise ourselves in the network
try:
for overlay in ipv8.overlays:
overlay.bootstrap()
except:
pass
yield try_attest(restapi)
yield try_verify(restapi)
yield deferLater(reactor, 1.0, lambda: None)
except:
import traceback
traceback.print_exc()
#!/usr/bin/python
import logging
import BaseHTTPServer
import urlparse
import json
from twisted.internet import reactor
from twisted.internet.task import deferLater
from bindings import KNOWN_PSNS, main_loop
try:
import ssl
with_ssl = True
except:
with_ssl = False
#______________________________________________________________________________
class HTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
server_version = "RequestSimulatorHTTP/0.1"
PATH = ''
def do_POST(self):
if self.path.startswith('/verifychainattribute'):
parsed_path = urlparse.urlparse(self.path)
print parsed_path.query
length = self.headers.getheader('content-length')
if not length is None:
try:
resp = {}
request = self.rfile.read(int(length))
j = json.loads(request)
resp["psn"] = j["psn"]
dict = {}
#"1"for success check of attribute,"0" for other case
if int(j["psn"]) in [int(psn) for psn in KNOWN_PSNS]:
dict["status"] = "1"
else:
dict["status"] = "0"
data = json.dumps(dict)
self.send_response(200)
self.send_header("Accept","application/json")
self.send_header("Content-Type","application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
except:
import traceback
traceback.print_exc()
self.send_error(400, "Bad input parameter")
else:
self.send_error(400, "content-length incorrect")
else:
self.send_error(500,"Unexpected request")
def log_message(self, format, *args):
# override to include in the standard log file and to apply filtering
logging.debug("%s - - [%s] %s\n" %
(self.client_address[0],
self.log_date_time_string(),
format%args))
def serve(options):
server_address = (options.ip, options.port)
httpd = BaseHTTPServer.HTTPServer(server_address, HTTPRequestHandler)
if with_ssl and options.ssl:
# See http://www.piware.de/2011/01/creating-an-https-server-in-python/
httpd.socket = ssl.wrap_socket(httpd.socket,server_side=True,certfile=options.ssl)
logging.info('Serving %s:%d',options.ip,options.port)
reactor.addSystemEventTrigger('before', 'shutdown', httpd.shutdown)
httpd.serve_forever()
#______________________________________________________________________________
#
# main and command line options
#
#______________________________________________________________________________
def main():
import optparse
parser = optparse.OptionParser(description='MorphoPerso simulator')
parser.add_option("-i", "--ip",default='0.0.0.0',dest='ip',help="Listen IP (default: 0.0.0.0)")
parser.add_option("-P", "--port",default=8081,dest='port',type='int',help="Port number (default: 8080)")
parser.add_option("-s", "--ssl",default=None,dest='ssl',help="Activate SSL by pointing to a pem file (default: no SSL)")
parser.add_option("-l", "--loglevel",default='DEBUG',dest='loglevel',help="Log level (default: INFO)")
parser.add_option("-f", "--logfile",default=None,dest='logfile',help="Log file (default: none)")
options,args = parser.parse_args()
logging.basicConfig(format='%(asctime)-15s %(levelname)s - %(message)s',level=logging.getLevelName(options.loglevel))
if options.logfile:
fh = logging.FileHandler(options.logfile)
fh.setLevel(logging.getLevelName(options.loglevel))
formatter = logging.Formatter('%(asctime)-15s %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logging.getLogger().addHandler(fh)
logging.info('Starting')
try:
logging.info('Starting...')
reactor.callInThread(serve, options)
deferLater(reactor, 0.5, main_loop)
reactor.run()
except:
logging.info('Exception')
stop_now = True
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment