Created
May 16, 2018 09:03
-
-
Save qstokkink/09675f45d89e461b761a42fa08b9cbcb to your computer and use it in GitHub Desktop.
IDEMIA local test with verification
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base64 import b64encode | |
import json | |
from urllib import quote | |
from twisted.internet import reactor | |
from twisted.internet.defer import inlineCallbacks, returnValue | |
from twisted.internet.task import deferLater | |
from roles import initialize_peer, make_request, sleep, stop_peers | |
print "Initializing peers" | |
idowner, idowner_restapi = initialize_peer("idowner") | |
attester, attester_restapi = initialize_peer("attester") | |
verifier, verifier_restapi = initialize_peer("verifier") | |
@inlineCallbacks | |
def wait_for_peers(idowner_restapi): | |
# Wait a second, if we fail to find peers: repeat, otherwise: return the result | |
peers = yield make_request(idowner_restapi, 'GET', { | |
'type': 'peers' | |
}) | |
if peers != "[]": | |
returnValue(peers) | |
else: | |
print "No peers have connected yet, waiting for 4 seconds!" | |
peers = yield deferLater(reactor, 4.0, wait_for_peers, idowner_restapi) | |
returnValue(peers) | |
@inlineCallbacks | |
def wait_for_attestation_request(attester_restapi): | |
# Wait a second, if we fail to find pending requests: repeat, otherwise: return the result | |
requests = yield make_request(attester_restapi, 'GET', { | |
'type': 'outstanding' | |
}) | |
if requests != "[]": | |
returnValue(requests) | |
else: | |
print "No attestation requests have been received yet, waiting for 4 seconds!" | |
requests = yield deferLater(reactor, 4.0, wait_for_attestation_request, attester_restapi) | |
returnValue(requests) | |
@inlineCallbacks | |
def make_attribute(): | |
global attester, attester_restapi, idowner, idowner_restapi, verifier, verifier_restapi | |
try: | |
idowner_id = quote(b64encode(idowner.keys["anonymous id"].mid)) | |
attest_id = quote(b64encode(attester.keys["anonymous id"].mid)) | |
verifier_id = quote(b64encode(verifier.keys["anonymous id"].mid)) | |
value = yield wait_for_peers(idowner_restapi) | |
print "Known peers for id owner:", value | |
print "Requesting attestation for QR from", b64encode(attester.keys["anonymous id"].mid),\ | |
"(generates one-time EC key)" | |
yield make_request(idowner_restapi, 'POST', { | |
'type': 'request', | |
'mid': attest_id, | |
'attribute_name': 'QR' | |
}) | |
value = yield wait_for_attestation_request(attester_restapi) | |
print "Pending attestation request for attester:", value | |
print "Attesting QR for", b64encode(idowner.keys["anonymous id"].mid) | |
yield make_request(attester_restapi, 'POST', { | |
'type': 'attest', | |
'mid': idowner_id, | |
'attribute_name': 'QR', | |
'attribute_value': quote(b64encode('binarydata')) | |
}) | |
value = yield make_request(idowner_restapi, 'GET', { | |
'type': 'attributes', | |
'mid': attest_id | |
}) | |
print "ID Owner attributes:", value | |
yield make_request(verifier_restapi, 'POST', { | |
'type': 'verify', | |
'mid': idowner_id, | |
'attribute_hash': quote(json.loads(value)[0][1]), | |
'attribute_values': quote(b64encode('binarydata')) | |
}) | |
yield sleep(2) | |
i = 0 | |
while i < 20: | |
value = yield make_request(verifier_restapi, 'GET', { | |
'type': 'verification_output' | |
}) | |
print "Verification output:", value | |
yield sleep(0.2) | |
i += 1 | |
yield stop_peers(attester, idowner) | |
except: | |
import traceback | |
traceback.print_exc() | |
deferLater(reactor, 0.5, make_attribute) | |
reactor.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from twisted.internet.defer import inlineCallbacks | |
from twisted.internet.task import deferLater | |
from twisted.internet import reactor | |
from twisted.web import server | |
from twisted.web.client import Agent, readBody | |
from twisted.web.http_headers import Headers | |
from pyipv8.ipv8.REST.rest_manager import RESTManager, RESTRequest | |
from pyipv8.ipv8.REST.root_endpoint import RootEndpoint | |
from pyipv8.ipv8_service import IPv8 | |
from pyipv8.ipv8.configuration import get_default_configuration | |
agent = Agent(reactor) | |
class TestRESTAPI(RESTManager): | |
port = 8085 | |
def start(self): | |
self.root_endpoint = RootEndpoint(self.session) | |
site = server.Site(resource=self.root_endpoint) | |
site.requestFactory = RESTRequest | |
TestRESTAPI.port += 1 | |
self.site = reactor.listenTCP(TestRESTAPI.port, site, interface="127.0.0.1") | |
@inlineCallbacks | |
def sleep(time): | |
try: | |
yield deferLater(reactor, time, lambda: None) | |
except: | |
import traceback | |
print traceback.print_exc() | |
def create_working_dir(path): | |
if os.path.isdir(path): | |
import shutil | |
shutil.rmtree(path) | |
os.mkdir(path) | |
def initialize_peer(path): | |
configuration = get_default_configuration() | |
configuration['logger'] = {'level': "ERROR"} | |
configuration['walker_interval'] = 4 | |
overlays = ['AttestationCommunity', 'IdentityCommunity'] | |
configuration['overlays'] = [o for o in configuration['overlays'] if o['class'] in overlays] | |
for o in configuration['overlays']: | |
o['walkers'] = [{ | |
'strategy': "RandomWalk", | |
'peers': 4, | |
'init': { | |
'timeout': 60.0 | |
} | |
}] | |
create_working_dir(path) | |
os.chdir(path) | |
ipv8 = IPv8(configuration) | |
os.chdir(os.path.dirname(__file__)) | |
rest_manager = TestRESTAPI(ipv8) | |
rest_manager.start() | |
return ipv8, "http://localhost:%d/attestation" % rest_manager.port | |
@inlineCallbacks | |
def stop_peers(*peers): | |
for p in peers: | |
p.endpoint.close() | |
yield p.stop(stop_reactor=(p==peers[-1])) | |
def make_request(url, type, arguments={}): | |
global agent | |
request_url = url + '?' + '&'.join("%s=%s" % (k,v) for k,v in arguments.iteritems()) | |
print "\t[HTTP-%s] %s" % (type, request_url) | |
d = agent.request( | |
type, | |
request_url, | |
Headers({'User-Agent': ['Twisted Web Client Example'], | |
'Content-Type': ['text/x-greeting']}), | |
None) | |
return d.addCallback(lambda response: readBody(response)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment