Last active
March 27, 2019 17:31
-
-
Save arvindkalra/fcfed36c7328cd5b21621d45acc417cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import Statements | |
# Core python imports | |
import datetime | |
import json | |
import os | |
import shutil | |
import base64 | |
import msgpack | |
import maya | |
from twisted.logger import globalLogPublisher | |
# Nucypher API imports | |
from nucypher.characters.lawful import Bob, Ursula, Enrico, Alice | |
from nucypher.config.characters import AliceConfiguration | |
from nucypher.crypto.kits import UmbralMessageKit | |
from nucypher.crypto.powers import DecryptingPower, SigningPower | |
from nucypher.network.middleware import RestMiddleware | |
from nucypher.utilities.logging import SimpleObserver | |
from nucypher.keystore.keypairs import DecryptingKeypair, SigningKeypair | |
from umbral.keys import UmbralPrivateKey, UmbralPublicKey | |
# Umbral imports | |
from umbral.keys import UmbralPrivateKey, UmbralPublicKey | |
# Flask imports | |
from flask import Flask,jsonify,request, Response | |
from flask_cors import CORS | |
app = Flask(__name__) | |
CORS(app) | |
globalLogPublisher.addObserver(SimpleObserver()) | |
# We expect the url of the seednode as the first argument. | |
SEEDNODE_URL = '127.0.0.1:10151' | |
# Setting up the ursula | |
ursula = Ursula.from_seed_and_stake_info(seed_uri=SEEDNODE_URL, | |
federated_only=True, | |
minimum_stake=0) | |
@app.route("/generateKeys", methods=['POST']) | |
def generateKeys(): | |
# Fetch JSON from request | |
print('####') | |
print(request.data) | |
json_data = json.loads(request.data.decode('utf-8')) | |
# Fetch username and password | |
username = json_data['username'] | |
password = json_data['password'] | |
# directory to store alice keys | |
ALICE_DIR = os.path.join(os.getcwd() , 'alice/' + username) | |
print(ALICE_DIR) | |
alice_config = AliceConfiguration( | |
config_root=os.path.join(ALICE_DIR), | |
is_me=True, | |
known_nodes={ursula}, | |
start_learning_now=False, | |
federated_only=True, | |
learn_on_same_thread=True | |
) | |
alice_config.initialize(password=password) | |
alice_config.keyring.unlock(password=password) | |
file = alice_config.to_configuration_file() | |
data = {} | |
data['alice'] = file | |
data['bob'] = generate_doctor_keys(username=username); | |
print(data) | |
return jsonify(data) | |
@app.route('/encryptData',methods=['POST']) | |
def encryptData(): | |
print('data') | |
print(request.data.decode('utf-8')) | |
print('data\n\n') | |
json_data = json.loads(request.data.decode('utf-8')) | |
hash = json_data['hash'] | |
aliceFile = json_data['alice'] | |
# username = json_data['username'] | |
password = json_data['password'] | |
label = json_data['label'] | |
label = label.encode() | |
# print ('93\t' + hash) | |
alice_config = AliceConfiguration.from_configuration_file( | |
filepath=aliceFile, | |
known_nodes={ursula}, | |
start_learning_now=False, | |
) | |
alice_config.keyring.unlock(password=password) | |
alicia = alice_config() | |
alicia.start_learning_loop(now=True) | |
policy_pubkey = alicia.get_policy_pubkey_from_label(label) | |
# Initialise Enrico | |
enrico = Enrico(policy_encrypting_key=policy_pubkey) | |
print ("Done upto 111") | |
hash = msgpack.dumps(hash, use_bin_type=True) | |
message_kit, _signature = enrico.encrypt_message(hash) | |
message_kit_bytes = message_kit.to_bytes() | |
data = {} | |
data['message'] = message_kit_bytes.hex() | |
data['label'] = label.decode('utf-8') | |
data['policy_public_key'] = policy_pubkey.to_bytes().hex() | |
data['alice_sig_pubkey"'] = bytes(alicia.stamp).hex() | |
print ('result\n') | |
print (data) | |
print ('#####\n') | |
return json.dumps(data) | |
@app.route('/createPolicy', methods=['POST']) | |
def createPolicy(): | |
print('data') | |
print(request.data.decode('utf-8')) | |
print('data\n\n') | |
json_data = json.loads(request.data.decode('utf-8')) | |
bobName = json_data['bob'] | |
label = json_data['label'] | |
aliceFile = json_data['alice'] | |
password = json_data['password'] | |
label = label.encode() | |
# Generating Alice | |
alice_config = AliceConfiguration.from_configuration_file( | |
filepath=aliceFile, | |
known_nodes={ursula}, | |
start_learning_now=False, | |
) | |
alice_config.keyring.unlock(password=password) | |
alicia = alice_config() | |
alicia.start_learning_loop(now=True) | |
# Generating Bob | |
bobFilePath = os.path.join(os.getcwd(), 'bob/' + bobName + '.json') | |
doctor_pubkeys = _get_keys(bobFilePath, UmbralPublicKey) | |
powers_and_material = { | |
DecryptingPower: doctor_pubkeys['enc'], | |
SigningPower: doctor_pubkeys['sig'] | |
} | |
print (powers_and_material) | |
doctor_strange = Bob.from_public_keys(powers_and_material=powers_and_material, | |
federated_only=True) | |
policy_end_datetime = maya.now() + datetime.timedelta(days=10) | |
# Generate Policy | |
policy = alicia.grant(bob=doctor_strange, | |
label=label, | |
m=1, | |
n=1, | |
expiration=policy_end_datetime) | |
data = { | |
'done' : True | |
} | |
return jsonify(data) | |
@app.route('/decryptDelegated', methods=['POST']) | |
def decryptDelegated(): | |
# Fetch Request Data | |
json_data = json.loads(request.data.decode('utf-8')) | |
bob_private_keys = json.loads(json_data['bobKeys']) | |
policy_public_key = json_data['policy_public_key'] | |
alice_signing_key = json_data['alice_sig_pubkey'] | |
label = json_data['label'] | |
message = json_data['message'] | |
print (bob_private_keys['enc']) | |
enc = UmbralPrivateKey.from_bytes(bytes.fromhex(bob_private_keys["enc"])) | |
sig = UmbralPrivateKey.from_bytes(bytes.fromhex(bob_private_keys["sig"])) | |
bob_enc_keypair = DecryptingKeypair(private_key=enc) | |
bob_sig_keypair = SigningKeypair(private_key=sig) | |
enc_power = DecryptingPower(keypair=bob_enc_keypair) | |
sig_power = SigningPower(keypair=bob_sig_keypair) | |
power_ups = [enc_power, sig_power] | |
doctor = Bob( | |
is_me=True, | |
federated_only=True, | |
crypto_power_ups=power_ups, | |
start_learning_now=True, | |
abort_on_learning_error=True, | |
known_nodes={ursula}, | |
save_metadata=False, | |
network_middleware=RestMiddleware() | |
) | |
policy_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(policy_public_key)) | |
alices_sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(alice_signing_key)) | |
label = label.encode() | |
doctor.join_policy(label, alices_sig_pubkey) | |
message_kit = UmbralMessageKit.from_bytes(bytes.fromhex(message)) | |
enrico = Enrico(policy_encrypting_key=policy_pubkey) | |
retrieved_plaintexts = doctor.retrieve( | |
label=label, | |
message_kit=message_kit, | |
data_source=enrico, | |
alice_verifying_key=alices_sig_pubkey | |
) | |
plaintext = msgpack.loads(retrieved_plaintexts[0], raw=False) | |
data = { | |
"plainText" : plaintext | |
} | |
return jsonify(data) | |
def _get_keys(file, key_class): | |
if not os.path.isfile(file): | |
generate_doctor_keys() | |
with open(file) as f: | |
stored_keys = json.load(f) | |
keys = dict() | |
for key_type, key_str in stored_keys.items(): | |
keys[key_type] = key_class.from_bytes(bytes.fromhex(key_str)) | |
return keys | |
def generate_doctor_keys(username): | |
enc_privkey = UmbralPrivateKey.gen_key() | |
sig_privkey = UmbralPrivateKey.gen_key() | |
keys = {}; | |
keys['enc'] = enc_privkey.to_bytes().hex() | |
keys['sig'] = enc_privkey.to_bytes().hex() | |
enc_pubkey = enc_privkey.get_pubkey() | |
sig_pubkey = sig_privkey.get_pubkey() | |
doctor_pubkeys = { | |
'enc': enc_pubkey.to_bytes().hex(), | |
'sig': sig_pubkey.to_bytes().hex() | |
} | |
BOB_PUBLIC_KEYS = os.path.join(os.getcwd(), 'bob/' + username + '.json') | |
with open(BOB_PUBLIC_KEYS, 'w') as f: | |
json.dump(doctor_pubkeys, f) | |
return json.dumps(keys) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment