Skip to content

Instantly share code, notes, and snippets.

@qstokkink
Created October 3, 2020 07:31
Show Gist options
  • Save qstokkink/6f5bec769dae981a5eef67e57e6741a3 to your computer and use it in GitHub Desktop.
Save qstokkink/6f5bec769dae981a5eef67e57e6741a3 to your computer and use it in GitHub Desktop.
Attestation test scripts
import os
import shutil
from attestation_tutorial_common import finish, http_get, http_post, start, urlstr, wait_for_list
# Remove the output of previous experiments.
if os.path.exists('./state_1'):
shutil.rmtree('./state_1')
if os.path.exists('./state_2'):
shutil.rmtree('./state_2')
start()
print("Enrollment/Attestation flow")
print("0. SANITY CHECK")
http_get("http://localhost:14411/identity/pseudonym1/peers")
http_get("http://localhost:14412/identity/pseudonym2/peers")
peer1_neighborhood = wait_for_list("http://localhost:14411/identity/pseudonym1/peers", "peers")
peer2_neighborhood = wait_for_list("http://localhost:14412/identity/pseudonym2/peers", "peers")
peer1_id = urlstr(peer2_neighborhood[0])
peer2_id = urlstr(peer1_neighborhood[0])
print("Peer 1:", peer1_id)
print("Peer 2:", peer2_id)
print("Peer 1 attributes:", http_get("http://localhost:14411/identity/pseudonym1/credentials"))
print("Peer 2 attributes:", http_get("http://localhost:14412/identity/pseudonym2/credentials"))
print("1. ATTESTATION REQUEST")
print("Request attestation from peer 2:",
http_post(f"http://localhost:14411/identity/pseudonym1/request/{peer2_id}",
{"Content-Type": "application/json"},
b'{"name":"my_attribute","schema":"id_metadata","metadata":{}}'))
print("2. ATTESTATION")
peer2_outstanding_requests = wait_for_list("http://localhost:14412/identity/pseudonym2/outstanding/attestations",
"requests")
print("Peer 2 outstanding requests:", peer2_outstanding_requests)
print("Peer 2 attesting to outstanding request:",
http_post(f"http://localhost:14412/identity/pseudonym2/attest/{peer1_id}",
{"Content-Type": "application/json"},
b'{"name":"my_attribute","value":"dmFsdWU="}'))
print("3. CHECK")
print("Peer 1 attributes:", http_get("http://localhost:14411/identity/pseudonym1/credentials"))
print("Peer 2 attributes:", http_get("http://localhost:14412/identity/pseudonym2/credentials"))
print("X. DONE!")
finish()
import json
import os
import signal
import subprocess
import time
import urllib.parse
import urllib.request
PROCESS = None
def http_get(url):
"""
Perform an HTTP GET request to the given URL.
"""
return json.loads(urllib.request.urlopen(urllib.request.Request(url)).read().decode())
def http_post(url, headers=None, data=None):
"""
Perform an HTTP POST request to the given URL.
"""
return json.loads(urllib.request.urlopen(urllib.request.Request(url, method="PUT", headers=headers, data=data))
.read().decode())
def urlstr(s):
"""
Make the given string URL safe.
"""
return urllib.parse.quote(s, safe='')
def wait_for_list(url, element=None):
"""
Poll an endpoint until output (a list) is available.
"""
out = []
while not out:
out = http_get(url)
if element:
out = out[element]
time.sleep(0.5)
return out
def start():
"""
Run the main.py script and wait for it to finish initializing.
"""
global PROCESS
PROCESS = subprocess.Popen('python3 main.py', stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
output = b""
while output.decode().count("Starting peer") != 2:
time.sleep(1.0)
output = PROCESS.stdout.peek()
def finish():
"""
Kill our two IPv8 instances (running in the same process).
"""
global PROCESS
os.killpg(os.getpgid(PROCESS.pid), signal.SIGTERM)
PROCESS.communicate()
import time
from attestation_tutorial_common import finish, http_get, http_post, start, urlstr, wait_for_list
start()
print("Attribute verification flow")
print("0. SANITY CHECK")
http_get("http://localhost:14411/identity/pseudonym1/peers")
http_get("http://localhost:14412/identity/pseudonym2/peers")
peer1_neighborhood = wait_for_list("http://localhost:14411/identity/pseudonym1/peers", "peers")
peer2_neighborhood = wait_for_list("http://localhost:14412/identity/pseudonym2/peers", "peers")
peer1_id = urlstr(peer2_neighborhood[0])
peer2_id = urlstr(peer1_neighborhood[0])
print("Peer 1:", peer1_id)
print("Peer 2:", peer2_id)
peer1_attributes = http_get("http://localhost:14411/identity/pseudonym1/credentials")['names']
peer2_attributes = http_get("http://localhost:14412/identity/pseudonym2/credentials")['names']
print("Peer 1 attributes:", peer1_attributes)
print("Peer 2 attributes:", peer2_attributes)
attribute_hash = peer1_attributes[-1]["hash"].encode()
print("1. VERIFICATION REQUEST")
print("Request verification from peer 1:",
http_post(f"http://localhost:14412/identity/pseudonym2/verify/{peer1_id}",
{"Content-Type": "application/json"},
b'{"hash":"' + attribute_hash + b'","value":"dmFsdWU=","schema":"id_metadata"}'))
print("2. VERIFICATION ")
peer1_outstanding_requests = wait_for_list("http://localhost:14411/identity/pseudonym1/outstanding/verifications",
"requests")
print("Peer 1 outstanding verification requests:", peer1_outstanding_requests)
print("Peer 1 allow verification of outstanding request:",
http_post(f"http://localhost:14411/identity/pseudonym1/allow/{peer2_id}",
{"Content-Type": "application/json"},
b'{"name":"my_attribute"}'))
print("3. CHECK")
print("Peer 2 verification output:", wait_for_list("http://localhost:14412/identity/pseudonym2/verifications",
'outputs'))
print("X. DONE!")
finish()
from base64 import b64encode
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8.configuration import get_default_configuration
from pyipv8.ipv8.REST.rest_manager import RESTManager
from pyipv8.ipv8_service import IPv8
async def start_community():
for peer_id in [1, 2]:
configuration = get_default_configuration()
configuration['logger']['level'] = "ERROR"
configuration['keys'] = [{'alias': "anonymous id",
'generation': u"curve25519",
'file': f"keyfile_{peer_id}.pem"}]
configuration['working_directory'] = f"state_{peer_id}"
configuration['overlays'] = []
# Start the IPv8 service
ipv8 = IPv8(configuration)
await ipv8.start()
rest_manager = RESTManager(ipv8)
await rest_manager.start(14410 + peer_id)
# Print the peer for reference
print("Starting peer", b64encode(ipv8.keys["anonymous id"].mid))
ensure_future(start_community())
get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment