Last active
May 17, 2018 14:39
-
-
Save qstokkink/b7996c3c249931c9dc5d0370296d1239 to your computer and use it in GitHub Desktop.
IDEMIA server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone https://github.com/Tribler/py-ipv8.git pyipv8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base64 import b64decode, b64encode | |
from json import loads | |
from urllib import quote | |
from twisted.internet import reactor | |
from twisted.internet.defer import inlineCallbacks, returnValue | |
from twisted.internet.task import deferLater | |
from roles import initialize_peer, make_request, sleep, stop_peers | |
print "Initializing peers" | |
attester, attester_restapi = initialize_peer("attester") | |
@inlineCallbacks | |
def wait_for_peers(idowner_restapi): | |
# Wait a second, if we fail to find peers: repeat, otherwise: return the result | |
peers = yield make_request(idowner_restapi, 'GET', { | |
'type': 'peers' | |
}) | |
if peers != "[]": | |
returnValue(peers) | |
else: | |
print "No peers have connected yet, waiting for 4 seconds!" | |
peers = yield deferLater(reactor, 4.0, wait_for_peers, idowner_restapi) | |
returnValue(peers) | |
@inlineCallbacks | |
def wait_for_attestation_request(attester_restapi): | |
# Wait a second, if we fail to find pending requests: repeat, otherwise: return the result | |
requests = yield make_request(attester_restapi, 'GET', { | |
'type': 'outstanding' | |
}) | |
if requests != "[]": | |
returnValue(requests) | |
else: | |
print "No attestation requests have been received yet, waiting for 4 seconds!" | |
requests = yield deferLater(reactor, 4.0, wait_for_attestation_request, attester_restapi) | |
returnValue(requests) | |
@inlineCallbacks | |
def make_attribute(): | |
global attester, attester_restapi | |
try: | |
value = yield wait_for_peers(attester_restapi) | |
print "Known peers:", value | |
done = False | |
while not done: | |
value = yield wait_for_attestation_request(attester_restapi) | |
value = loads(value) | |
print "Pending attestation request for attester:", value | |
raw_input('PRESS ANY KEY TO CONTINUE') | |
for (identifier, attribute, metadata) in value: | |
print "Request metadata:", loads(b64decode(metadata)) | |
yield make_request(attester_restapi, 'POST', { | |
'type': 'attest', | |
'mid': str(identifier).replace("+", "%2B"), | |
'attribute_name': str(attribute), | |
'attribute_value': quote(b64encode('binarydata')).replace("+", "%2B") | |
}) | |
done = True | |
except: | |
import traceback | |
traceback.print_exc() | |
deferLater(reactor, 0.5, make_attribute) | |
reactor.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from twisted.internet.defer import inlineCallbacks | |
from twisted.internet.task import deferLater | |
from twisted.internet import reactor | |
from twisted.web import server | |
from twisted.web.client import Agent, readBody | |
from twisted.web.http_headers import Headers | |
from pyipv8.ipv8.REST.rest_manager import RESTManager, RESTRequest | |
from pyipv8.ipv8.REST.root_endpoint import RootEndpoint | |
from pyipv8.ipv8_service import IPv8 | |
from pyipv8.ipv8.configuration import get_default_configuration | |
agent = Agent(reactor) | |
class TestRESTAPI(RESTManager): | |
port = 8085 | |
def start(self): | |
self.root_endpoint = RootEndpoint(self.session) | |
site = server.Site(resource=self.root_endpoint) | |
site.requestFactory = RESTRequest | |
TestRESTAPI.port += 1 | |
self.site = reactor.listenTCP(TestRESTAPI.port, site, interface="127.0.0.1") | |
@inlineCallbacks | |
def sleep(time): | |
try: | |
yield deferLater(reactor, time, lambda: None) | |
except: | |
import traceback | |
print traceback.print_exc() | |
def create_working_dir(path): | |
if os.path.isdir(path): | |
import shutil | |
shutil.rmtree(path) | |
os.mkdir(path) | |
def initialize_peer(path): | |
configuration = get_default_configuration() | |
configuration['logger'] = {'level': "ERROR"} | |
overlays = ['AttestationCommunity', 'IdentityCommunity'] | |
configuration['overlays'] = [o for o in configuration['overlays'] if o['class'] in overlays] | |
for o in configuration['overlays']: | |
o['walkers'] = [{ | |
'strategy': "RandomWalk", | |
'peers': 20, | |
'init': { | |
'timeout': 60.0 | |
} | |
}] | |
create_working_dir(path) | |
os.chdir(path) | |
ipv8 = IPv8(configuration) | |
os.chdir(os.path.dirname(__file__)) | |
rest_manager = TestRESTAPI(ipv8) | |
rest_manager.start() | |
return ipv8, "http://localhost:%d/attestation" % rest_manager.port | |
@inlineCallbacks | |
def stop_peers(*peers): | |
for p in peers: | |
p.endpoint.close() | |
yield p.stop(stop_reactor=(p==peers[-1])) | |
def make_request(url, type, arguments={}): | |
global agent | |
request_url = url + '?' + '&'.join("%s=%s" % (k,v) for k,v in arguments.iteritems()) | |
print "\t[HTTP-%s] %s" % (type, request_url) | |
d = agent.request( | |
type, | |
request_url, | |
Headers({'User-Agent': ['Twisted Web Client Example'], | |
'Content-Type': ['text/x-greeting']}), | |
None) | |
return d.addCallback(lambda response: readBody(response)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment