-
-
Save blankdots/bb004abdd6faaed2e59d5687f619eb1e to your computer and use it in GitHub Desktop.
M4 end to end
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
localega: | |
token: | |
# Username to identify the user in CentralEGA | |
user: dummy | |
# User secret private RSA key. | |
user_key: dummy.pk | |
# Public key file to encrypt file. | |
encrypt_key_public: key.1.pub | |
# Private key file to encrypt file. | |
encrypt_key_private: key.1.sec | |
# Private key password. | |
encrypt_key_pass: | |
# Inbox address, or service name | |
inbox_address: localega-inbox | |
# Inbox port | |
inbox_port: 2222 | |
# S3 address, or service name, should include port number | |
s3_address: minio:9000 | |
s3_access: | |
s3_secret: | |
s3_region: lega | |
s3_bucket: lega | |
# CentralEGA Message Broker address | |
cm_address: | |
cm_port: 5271 | |
# CentralEGA Message Broker Country specific details | |
cm_user: | |
cm_vhost: | |
cm_pass: | |
# RES address or service name with port | |
res_address: localega-res:9090 | |
# DataEdge address or service name with port | |
dataedge_address: localega-dataedge:9059 | |
# DB address, port will be assumed 5432 | |
db_address: localega-db | |
db_user: lega | |
db_name: lega | |
db_pass: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paramiko | |
import os | |
import pika | |
import secrets | |
from hashlib import md5 | |
import json | |
import string | |
import sys | |
import logging | |
from legacryptor.crypt4gh import encrypt, Header, get_header | |
import pgpy | |
import argparse | |
from minio import Minio | |
from time import sleep | |
import requests | |
import filecmp | |
import asyncio | |
import asyncpg | |
FORMAT = '[%(asctime)s][%(name)s][%(process)d %(processName)s][%(levelname)-8s] (L:%(lineno)s) %(funcName)s: %(message)s' | |
logging.basicConfig(format=FORMAT, datefmt='%Y-%m-%d %H:%M:%S') | |
LOG = logging.getLogger(__name__) | |
LOG.setLevel(logging.INFO) | |
async def get_last_id(db_user, db_name, db_pass, db_host): | |
"""Retrieve the last inserted file in the database, indifferent of status.""" | |
conn = await asyncpg.connect(user=db_user, password=db_pass, | |
database=db_name, host=db_host) | |
values = await conn.fetchrow('''SELECT created_at, id FROM local_ega.files ORDER BY created_at DESC LIMIT 1''') | |
LOG.info(f"Database ID: {values['id']}") | |
return values['id'] | |
await conn.close() | |
async def file2dataset_map(db_user, db_name, db_pass, db_host, file_id, dataset_id): | |
"""Assign file to dataset for dataset driven permissions.""" | |
conn = await asyncpg.connect(user=db_user, password=db_pass, | |
database=db_name, host=db_host) | |
last_index = await conn.fetchrow('''select id from local_ega.file2dataset ORDER BY id DESC LIMIT 1''') | |
await conn.execute(''' | |
INSERT INTO local_ega.file2dataset(id, file_id, dataset_id) VALUES($1, $2, $3) | |
''', last_index['id'] + 1, file_id, dataset_id) | |
LOG.info(f"Mapped ID: {file_id} to Dataset: {dataset_id}") | |
await conn.close() | |
def open_ssh_connection(hostname, user, key_path, key_pass='password', port=2222): | |
"""Open an ssh connection, test function.""" | |
try: | |
client = paramiko.SSHClient() | |
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass) | |
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
client.connect(hostname, allow_agent=False, look_for_keys=False, port=port, timeout=0.3, username=user, pkey=k) | |
LOG.info(f'ssh connected to {hostname}:{port} with {user}') | |
except paramiko.BadHostKeyException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('BadHostKeyException on ' + hostname) | |
except paramiko.AuthenticationException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('AuthenticationException on ' + hostname) | |
except paramiko.SSHException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('SSHException on ' + hostname) | |
return client | |
def sftp_upload(hostname, user, file_path, key_path, key_pass='password', port=2222): | |
"""SFTP Client file upload.""" | |
try: | |
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass) | |
transport = paramiko.Transport((hostname, port)) | |
transport.connect(username=user, pkey=k) | |
LOG.info(f'sftp connected to {hostname}:{port} with {user}') | |
sftp = paramiko.SFTPClient.from_transport(transport) | |
filename, _ = os.path.splitext(file_path) | |
sftp.put(file_path, f'{filename}.c4ga') | |
LOG.info(f'file uploaded {filename}.c4ga') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
finally: | |
LOG.debug('sftp done') | |
transport.close() | |
def submit_cega(address, user, vhost, message, routing_key, mq_password, correlation_id, port=5271, file_md5=None): | |
"""Submit message to CEGA along with.""" | |
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}' | |
try: | |
parameters = pika.URLParameters(mq_address) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
channel.basic_publish(exchange='localega.v1', routing_key=routing_key, | |
body=json.dumps(message), | |
properties=pika.BasicProperties(correlation_id=correlation_id, | |
content_type='application/json', | |
delivery_mode=2)) | |
connection.close() | |
LOG.info(f'Message published to CentralEGA: {message}') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
def get_corr(address, user, vhost, queue, filepath, mq_password, latest_message=True, port=5271): | |
"""Read all messages from a queue and fetches the correlation_id for the one with given path, if found.""" | |
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}' | |
parameters = pika.URLParameters(mq_address) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
correlation_ids = [] | |
messages = set() | |
while True: | |
method_frame, props, body = channel.basic_get(queue=queue) | |
if method_frame is None or props is None: | |
break | |
LOG.error('No message returned') | |
message_id = method_frame.delivery_tag | |
if message_id in messages: # we looped | |
break | |
messages.add(message_id) | |
try: | |
data = json.loads(body) | |
user = data.get('user') | |
filepath = data.get('filepath') | |
assert(user and filepath) | |
if user == user and filepath == filepath: | |
correlation_ids.append((props.correlation_id, message_id)) | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
pass | |
# Second loop, nack the messages | |
for message_id in messages: | |
channel.basic_nack(delivery_tag=message_id) | |
connection.close() | |
if not correlation_ids: | |
sys.exit(2) | |
correlation_id = correlation_ids[0][0] | |
if latest_message: | |
message_id = -1 # message ids are positive | |
for cid, mid in correlation_ids: | |
if mid > message_id: | |
correlation_id = cid | |
LOG.info(f'correlation_id: {correlation_id}') | |
return correlation_id | |
def encrypt_file(file_path, pubkey): | |
"""Encrypt file and extract its md5.""" | |
file_size = os.path.getsize(file_path) | |
filename, _ = os.path.splitext(file_path) | |
output_base = os.path.basename(filename) | |
c4ga_md5 = None | |
output_file = os.path.expanduser(f'{output_base}.c4ga') | |
infile = open(file_path, 'rb') | |
try: | |
encrypt(pubkey, infile, file_size, open(f'{output_base}.c4ga', 'wb')) | |
with open(output_file, 'rb') as read_file: | |
c4ga_md5 = md5(read_file.read()).hexdigest() | |
LOG.info(f'File {output_base}.c4ga is the encrypted file with md5: {c4ga_md5}.') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
return (output_file, c4ga_md5) | |
def list_s3_objects(minio_address, bucket_name, region_name, file_id, access, secret): | |
"""Check if there is a file inside s3.""" | |
minioClient = Minio(minio_address, access_key=access, secret_key=secret, | |
region=region_name, secure=False) | |
LOG.info(f'Connected to S3: {minio_address}.') | |
# List all object paths in bucket that begin with my-prefixname. | |
objects = minioClient.list_objects_v2(bucket_name, recursive=True) | |
object_list = [obj.object_name for obj in objects] | |
assert str(file_id) in object_list, f"Could not find the file just uploaded!" | |
LOG.info(f"Found the file uploaded to inbox as {file_id} in S3Storage.") | |
all_objects = minioClient.list_objects(bucket_name, recursive=True) | |
LOG.info("All the files in Lega bucket: ") | |
for obj in all_objects: | |
LOG.info(f'Found ingested file: {obj.object_name} of size: {obj.size}.') | |
def main(): | |
"""Do the sparkles and fireworks.""" | |
parser = argparse.ArgumentParser(description="M4 end to end test with command line option.") | |
# Should we do this in a configuration file ? | |
parser.add_argument('input', help='Input file to be encrypted.') | |
parser.add_argument('--u', help='Username to identify the user in CentralEGA.', default='dummy') | |
parser.add_argument('--uk', help='User secret private RSA key.', default='auto/config/user.key') | |
parser.add_argument('--pk', help='Public key file to encrypt file.', default='key.1.pub') | |
parser.add_argument('--sk', help='Private key file to encrypt file.', default='key.1.sec') | |
parser.add_argument('--sk-pass', help='Private key password.', default='') | |
parser.add_argument('--inbox', help='Inbox address, or service name', default='localega-inbox') | |
parser.add_argument('--inbox-port', help='Inbox address, or service name', default='2222') | |
parser.add_argument('--s3', help='Inbox address including port, or service name', default='minio:9000') | |
parser.add_argument('--s3-access', help='S3 access', default="") | |
parser.add_argument('--s3-secret', help='S3 secret', default="") | |
parser.add_argument('--s3-region', help='S3 region', default="lega") | |
parser.add_argument('--s3-bucket', help='S3 bucket', default="lega") | |
parser.add_argument('--cm', help='CEGA MQ broker IP/name address') | |
parser.add_argument('--cm-port', help='CEGA Broker port') | |
parser.add_argument('--cm-user', help='CEGA Broker user', default="") | |
parser.add_argument('--cm-vhost', help='CEGA Broker vhost', default="") | |
parser.add_argument('--cm-pass', help='CEGA MQ password', default="") | |
parser.add_argument('--res', help='RES - reencryption service with port', default='localega-res:9090') | |
parser.add_argument('--dataedge', help='DataEdge service with port', default='localega-dataedge:9059') | |
parser.add_argument('--db', help='DB service', default='localega-db') | |
parser.add_argument('--db-user', help='DB user', default='lega') | |
parser.add_argument('--db-name', help='Database name', default='lega') | |
parser.add_argument('--db-pass', help='DB password', default='') | |
args = parser.parse_args() | |
# Initialise what is needed | |
used_file = os.path.expanduser(args.input) | |
res_file = 'res.file' | |
dataedge_file = 'dataedge.file' | |
key_pk = os.path.expanduser(args.uk) | |
pub_key, _ = pgpy.PGPKey.from_file(os.path.expanduser(args.pk)) | |
sec_key, _ = pgpy.PGPKey.from_file(args.sk) | |
loop = asyncio.get_event_loop() | |
session_key = '' | |
iv = '' | |
fileID = '' | |
token = '' | |
test_user = args.u | |
# TEST Connection before anything | |
open_ssh_connection(args.inbox, test_user, key_pk) | |
# Encrypt File | |
test_file, c4ga_md5 = encrypt_file(used_file, pub_key) | |
# Retrieve session_key and IV to test RES | |
with sec_key.unlock(args.sk_pass) as privkey: | |
header = Header.decrypt(get_header(open(test_file, 'rb'))[1], privkey) | |
session_key = header.records[0].session_key.hex() | |
iv = header.records[0].iv.hex() | |
# Stable ID is mocked this should be generated by CentralEGA | |
stableID = ''.join(secrets.choice(string.digits) for i in range(16)) | |
if c4ga_md5: | |
sftp_upload(args.inbox, test_user, test_file, key_pk, port=int(args.inbox_port)) | |
correlation_id = get_corr(args.cm, args.cm_user, args.cm_vhost, 'v1.files.inbox', test_file, args.cm_pass) | |
# Publish the file to simulate a CentralEGA trigger | |
submit_cega(args.cm, args.cm_user, args.cm_vhost, | |
{'user': test_user, 'filepath': test_file}, 'files', | |
args.cm_pass, correlation_id) | |
# wait for submission to go through - might need to be longer depending on the file | |
# proper retry should be implemented | |
sleep(10) | |
# Once the file has been ingested it should be the last ID in the database | |
# We use this ID everywhere including donwload from DataEdge | |
# In future versions once we fix DB schema we will use StableID for download | |
fileID = loop.run_until_complete(get_last_id(args.db_user, args.db_name, args.db_pass, args.db)) | |
# Stable ID should be sent by CentralEGA | |
submit_cega(args.cm, args.cm_user, args.cm_vhost, | |
{'file_id': fileID, 'stable_id': f'EGAF{stableID}'}, 'stableIDs', | |
args.cm_pass, correlation_id) | |
# wait for the file in s3 | |
# proper retry should be implemented | |
sleep(10) | |
list_s3_objects(args.s3, args.s3_region, args.s3_bucket, fileID, args.s3_access, args.s3_secret) | |
LOG.info('Ingestion DONE') | |
LOG.info('-------------------------------------') | |
# Verify that the file can be downloaded from RES using the session_key and IV | |
payload = {'sourceKey': session_key, 'sourceIV': iv, 'filePath': fileID} | |
res_url = f'http://{args.res}/file' | |
download = requests.get(res_url, params=payload) | |
# We are using filecmp thus we will write content to file | |
assert download.status_code == 200, f'We got a different status from RES {download.status_code}' | |
LOG.info(f'write content to {res_file}') | |
open(res_file, 'wb').write(download.content) | |
LOG.info('Comparing RES downloaded file with original file ...') | |
# comparing content of the files | |
assert filecmp.cmp(res_file, used_file, shallow=False), 'files are not equal' | |
# The low level alternative would be: | |
# with open(res_file) as f1: | |
# with open(used_file) as f2: | |
# if f1.read() == f2.read(): | |
# pass | |
LOG.info('RES Downloaded file is equal to the original file.') | |
with open(res_file, "r") as out_file: | |
LOG.info(f'Contents: \n {out_file.read()} \n') | |
LOG.info('Mapping file to dataset for retrieving file via dataedge.') | |
# There is no component asigning permissions for files in datasets | |
# Thus we need this step | |
# for now this dataset ID is fixed to 'EGAD01' as we have it like this in the TOKEN | |
# Will need updating once we decide on the permissions handling | |
loop.run_until_complete(file2dataset_map(args.db_user, args.db_name, args.db_pass, args.db, fileID, 'EGAD01')) | |
# Verify that the file can be downloaded from DataEdge | |
# We are using a token that can be validated by DataEdge | |
edge_payload = {'destinationFormat': 'plain'} | |
edge_headers = {'Authorization': f'Bearer {token}'} # No token no permissions | |
dataedge_url = f'http://{args.dataedge}/files/{fileID}' | |
down_dataedge = requests.get(dataedge_url, params=edge_payload, headers=edge_headers) | |
LOG.info(down_dataedge.url) | |
# 206 would be a partial content that means we misshandle some bytes | |
assert down_dataedge.status_code == 200, f'We got a different status from DataEdge {down_dataedge.status_code}' | |
LOG.info(f'write content to {dataedge_file}') | |
open(dataedge_file, 'wb').write(down_dataedge.content) | |
LOG.info('Comparing Dataedge downloaded file with original file ...') | |
# comparing content of the files | |
assert filecmp.cmp(dataedge_file, used_file, shallow=False), 'files are not equal' | |
LOG.info('Dataedge Downloaded file is equal to the original file.') | |
with open(dataedge_file, "r") as out_file: | |
LOG.info(f'Contents: \n {out_file.read()} \n') | |
LOG.info('Outgestion DONE') | |
LOG.info('-------------------------------------') | |
LOG.info('Should be all!') | |
if __name__ == '__main__': | |
assert sys.version_info >= (3, 6), "M4 end to end test requires python3.6" | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paramiko | |
import os | |
import pika | |
import secrets | |
from hashlib import md5 | |
import json | |
import string | |
import sys | |
import logging | |
from legacryptor.crypt4gh import encrypt, Header, get_header | |
import pgpy | |
import argparse | |
from minio import Minio | |
from time import sleep | |
import requests | |
import filecmp | |
import asyncio | |
import asyncpg | |
import yaml | |
FORMAT = '[%(asctime)s][%(name)s][%(process)d %(processName)s][%(levelname)-8s] (L:%(lineno)s) %(funcName)s: %(message)s' | |
logging.basicConfig(format=FORMAT, datefmt='%Y-%m-%d %H:%M:%S') | |
LOG = logging.getLogger(__name__) | |
LOG.setLevel(logging.INFO) | |
async def get_last_id(db_user, db_name, db_pass, db_host): | |
"""Retrieve the last inserted file in the database, indifferent of status.""" | |
conn = await asyncpg.connect(user=db_user, password=db_pass, | |
database=db_name, host=db_host) | |
values = await conn.fetchrow('''SELECT created_at, id FROM local_ega.files ORDER BY created_at DESC LIMIT 1''') | |
LOG.info(f"Database ID: {values['id']}") | |
return values['id'] | |
await conn.close() | |
async def file2dataset_map(db_user, db_name, db_pass, db_host, file_id, dataset_id): | |
"""Assign file to dataset for dataset driven permissions.""" | |
conn = await asyncpg.connect(user=db_user, password=db_pass, | |
database=db_name, host=db_host) | |
last_index = await conn.fetchrow('''select id from local_ega.file2dataset ORDER BY id DESC LIMIT 1''') | |
await conn.execute(''' | |
INSERT INTO local_ega.file2dataset(id, file_id, dataset_id) VALUES($1, $2, $3) | |
''', last_index['id'] + 1, file_id, dataset_id) | |
LOG.info(f"Mapped ID: {file_id} to Dataset: {dataset_id}") | |
await conn.close() | |
def open_ssh_connection(hostname, user, key_path, key_pass='password', port=2222): | |
"""Open an ssh connection, test function.""" | |
try: | |
client = paramiko.SSHClient() | |
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass) | |
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
client.connect(hostname, allow_agent=False, look_for_keys=False, port=port, timeout=0.3, username=user, pkey=k) | |
LOG.info(f'ssh connected to {hostname}:{port} with {user}') | |
except paramiko.BadHostKeyException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('BadHostKeyException on ' + hostname) | |
except paramiko.AuthenticationException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('AuthenticationException on ' + hostname) | |
except paramiko.SSHException as e: | |
LOG.error(f'Something went wrong {e}') | |
raise Exception('SSHException on ' + hostname) | |
return client | |
def sftp_upload(hostname, user, file_path, key_path, key_pass='password', port=2222): | |
"""SFTP Client file upload.""" | |
try: | |
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass) | |
transport = paramiko.Transport((hostname, port)) | |
transport.connect(username=user, pkey=k) | |
LOG.info(f'sftp connected to {hostname}:{port} with {user}') | |
sftp = paramiko.SFTPClient.from_transport(transport) | |
filename, _ = os.path.splitext(file_path) | |
sftp.put(file_path, f'{filename}.c4ga') | |
LOG.info(f'file uploaded {filename}.c4ga') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
finally: | |
LOG.debug('sftp done') | |
transport.close() | |
def submit_cega(address, user, vhost, message, routing_key, mq_password, correlation_id, port=5271, file_md5=None): | |
"""Submit message to CEGA along with.""" | |
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}' | |
try: | |
parameters = pika.URLParameters(mq_address) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
channel.basic_publish(exchange='localega.v1', routing_key=routing_key, | |
body=json.dumps(message), | |
properties=pika.BasicProperties(correlation_id=correlation_id, | |
content_type='application/json', | |
delivery_mode=2)) | |
connection.close() | |
LOG.info(f'Message published to CentralEGA: {message}') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
def get_corr(address, user, vhost, queue, filepath, mq_password, latest_message=True, port=5271): | |
"""Read all messages from a queue and fetches the correlation_id for the one with given path, if found.""" | |
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}' | |
parameters = pika.URLParameters(mq_address) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
correlation_ids = [] | |
messages = set() | |
while True: | |
method_frame, props, body = channel.basic_get(queue=queue) | |
if method_frame is None or props is None: | |
break | |
LOG.error('No message returned') | |
message_id = method_frame.delivery_tag | |
if message_id in messages: # we looped | |
break | |
messages.add(message_id) | |
try: | |
data = json.loads(body) | |
user = data.get('user') | |
filepath = data.get('filepath') | |
assert(user and filepath) | |
if user == user and filepath == filepath: | |
correlation_ids.append((props.correlation_id, message_id)) | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
pass | |
# Second loop, nack the messages | |
for message_id in messages: | |
channel.basic_nack(delivery_tag=message_id) | |
connection.close() | |
if not correlation_ids: | |
sys.exit(2) | |
correlation_id = correlation_ids[0][0] | |
if latest_message: | |
message_id = -1 # message ids are positive | |
for cid, mid in correlation_ids: | |
if mid > message_id: | |
correlation_id = cid | |
LOG.info(f'correlation_id: {correlation_id}') | |
return correlation_id | |
def encrypt_file(file_path, pubkey): | |
"""Encrypt file and extract its md5.""" | |
file_size = os.path.getsize(file_path) | |
filename, _ = os.path.splitext(file_path) | |
output_base = os.path.basename(filename) | |
c4ga_md5 = None | |
output_file = os.path.expanduser(f'{output_base}.c4ga') | |
infile = open(file_path, 'rb') | |
try: | |
encrypt(pubkey, infile, file_size, open(f'{output_base}.c4ga', 'wb')) | |
with open(output_file, 'rb') as read_file: | |
c4ga_md5 = md5(read_file.read()).hexdigest() | |
LOG.info(f'File {output_base}.c4ga is the encrypted file with md5: {c4ga_md5}.') | |
except Exception as e: | |
LOG.error(f'Something went wrong {e}') | |
raise e | |
return (output_file, c4ga_md5) | |
def list_s3_objects(minio_address, bucket_name, region_name, file_id, access, secret): | |
"""Check if there is a file inside s3.""" | |
minioClient = Minio(minio_address, access_key=access, secret_key=secret, | |
region=region_name, secure=False) | |
LOG.info(f'Connected to S3: {minio_address}.') | |
# List all object paths in bucket that begin with my-prefixname. | |
objects = minioClient.list_objects_v2(bucket_name, recursive=True) | |
object_list = [obj.object_name for obj in objects] | |
assert str(file_id) in object_list, f"Could not find the file just uploaded!" | |
LOG.info(f"Found the file uploaded to inbox as {file_id} in S3Storage.") | |
all_objects = minioClient.list_objects(bucket_name, recursive=True) | |
LOG.info("All the files in Lega bucket: ") | |
for obj in all_objects: | |
LOG.info(f'Found ingested file: {obj.object_name} of size: {obj.size}.') | |
def main(): | |
"""Do the sparkles and fireworks.""" | |
parser = argparse.ArgumentParser(description="M4 end to end test with YAML configuration.") | |
# Should we do this in a configuration file ? | |
parser.add_argument('input', help='File to be uploaded.') | |
parser.add_argument('config', help='Configuration file.') | |
args = parser.parse_args() | |
used_file = os.path.expanduser(args.input) | |
config_file = os.path.expanduser(args.config) | |
with open(config_file, 'r') as stream: | |
try: | |
config = yaml.load(stream) | |
except yaml.YAMLError as exc: | |
LOG.error(exc) | |
# Initialise what is needed | |
res_file = 'res.file' | |
dataedge_file = 'dataedge.file' | |
key_pk = os.path.expanduser(config['localega']['user_key']) | |
pub_key, _ = pgpy.PGPKey.from_file(os.path.expanduser(config['localega']['encrypt_key_public'])) | |
sec_key, _ = pgpy.PGPKey.from_file(config['localega']['encrypt_key_private']) | |
loop = asyncio.get_event_loop() | |
session_key = '' | |
iv = '' | |
fileID = '' | |
token = config['localega']['token'] | |
test_user = config['localega']['user'] | |
# TEST Connection before anything | |
open_ssh_connection(config['localega']['inbox_address'], test_user, key_pk) | |
# Encrypt File | |
test_file, c4ga_md5 = encrypt_file(used_file, pub_key) | |
# Retrieve session_key and IV to test RES | |
with sec_key.unlock(config['localega']['encrypt_key_pass']) as privkey: | |
header = Header.decrypt(get_header(open(test_file, 'rb'))[1], privkey) | |
session_key = header.records[0].session_key.hex() | |
iv = header.records[0].iv.hex() | |
# Stable ID is mocked this should be generated by CentralEGA | |
stableID = ''.join(secrets.choice(string.digits) for i in range(16)) | |
if c4ga_md5: | |
sftp_upload(config['localega']['inbox_address'], test_user, test_file, key_pk, port=int(config['localega']['inbox_port'])) | |
correlation_id = get_corr(config['localega']['cm_address'], config['localega']['cm_user'], | |
config['localega']['cm_vhost'], 'v1.files.inbox', test_file, config['localega']['cm_pass']) | |
# Publish the file to simulate a CentralEGA trigger | |
submit_cega(config['localega']['cm_address'], config['localega']['cm_user'], config['localega']['cm_vhost'], | |
{'user': test_user, 'filepath': test_file}, 'files', | |
config['localega']['cm_pass'], correlation_id) | |
# wait for submission to go through - might need to be longer depending on the file | |
# proper retry should be implemented | |
sleep(10) | |
# Once the file has been ingested it should be the last ID in the database | |
# We use this ID everywhere including donwload from DataEdge | |
# In future versions once we fix DB schema we will use StableID for download | |
fileID = loop.run_until_complete(get_last_id(config['localega']['db_user'], config['localega']['db_name'], | |
config['localega']['db_pass'], config['localega']['db_address'])) | |
# Stable ID should be sent by CentralEGA | |
submit_cega(config['localega']['cm_address'], config['localega']['cm_user'], config['localega']['cm_vhost'], | |
{'file_id': fileID, 'stable_id': f'EGAF{stableID}'}, 'stableIDs', | |
config['localega']['cm_pass'], correlation_id) | |
# wait for the file in s3 | |
# proper retry should be implemented | |
sleep(10) | |
list_s3_objects(config['localega']['s3_address'], config['localega']['s3_region'], | |
config['localega']['s3_bucket'], fileID, | |
config['localega']['s3_access'], config['localega']['s3_secret']) | |
LOG.info('Ingestion DONE') | |
LOG.info('-------------------------------------') | |
# Verify that the file can be downloaded from RES using the session_key and IV | |
payload = {'sourceKey': session_key, 'sourceIV': iv, 'filePath': fileID} | |
res_url = f"http://{config['localega']['res_address']}/file" | |
download = requests.get(res_url, params=payload) | |
# We are using filecmp thus we will write content to file | |
assert download.status_code == 200, f'We got a different status from RES {download.status_code}' | |
LOG.info(f'write content to {res_file}') | |
open(res_file, 'wb').write(download.content) | |
LOG.info('Comparing RES downloaded file with original file ...') | |
# comparing content of the files | |
assert filecmp.cmp(res_file, used_file, shallow=False), 'files are not equal' | |
# The low level alternative would be: | |
# with open(res_file) as f1: | |
# with open(used_file) as f2: | |
# if f1.read() == f2.read(): | |
# pass | |
LOG.info('RES Downloaded file is equal to the original file.') | |
with open(res_file, "r") as out_file: | |
LOG.info(f'Contents: \n {out_file.read()} \n') | |
LOG.info('Mapping file to dataset for retrieving file via dataedge.') | |
# There is no component asigning permissions for files in datasets | |
# Thus we need this step | |
# for now this dataset ID is fixed to 'EGAD01' as we have it like this in the TOKEN | |
# Will need updating once we decide on the permissions handling | |
loop.run_until_complete(file2dataset_map(config['localega']['db_user'], config['localega']['db_name'], | |
config['localega']['db_pass'], config['localega']['db_address'], | |
fileID, 'EGAD01')) | |
# Verify that the file can be downloaded from DataEdge | |
# We are using a token that can be validated by DataEdge | |
edge_payload = {'destinationFormat': 'plain'} | |
edge_headers = {'Authorization': f'Bearer {token}'} # No token no permissions | |
dataedge_url = f"http://{config['localega']['dataedge_address']}/files/{fileID}" | |
down_dataedge = requests.get(dataedge_url, params=edge_payload, headers=edge_headers) | |
LOG.info(down_dataedge.url) | |
# 206 would be a partial content that means we misshandle some bytes | |
assert down_dataedge.status_code == 200, f'We got a different status from DataEdge {down_dataedge.status_code}' | |
LOG.info(f'write content to {dataedge_file}') | |
open(dataedge_file, 'wb').write(down_dataedge.content) | |
LOG.info('Comparing Dataedge downloaded file with original file ...') | |
# comparing content of the files | |
assert filecmp.cmp(dataedge_file, used_file, shallow=False), 'files are not equal' | |
LOG.info('Dataedge Downloaded file is equal to the original file.') | |
with open(dataedge_file, "r") as out_file: | |
LOG.info(f'Contents: \n {out_file.read()} \n') | |
LOG.info('Outgestion DONE') | |
LOG.info('-------------------------------------') | |
LOG.info('Should be all!') | |
if __name__ == '__main__': | |
assert sys.version_info >= (3, 6), "M4 end to end test requires python3.6" | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cryptography | |
PGPy==0.4.3 | |
pika | |
paramiko | |
minio | |
PyYAML | |
git+https://github.com/NBISweden/LocalEGA-cryptor.git | |
asyncpg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI0NTgxMSIsImF6cCI6ImJhM2UyNDA4LTllZGUtNDcwZC04MDM4LTRlNjU1ZDYwZjQ2MyIsImlzcyI6Imh0dHA6Ly9kYXRhLmVwb3V0YS5jc2MuZmkiLCJleHAiOjE2Mjc4MTEyMDAsImlhdCI6MTUxMjEzMDE2NywianRpIjoiOWM5ODM1MzAtZGQ3Ni00ZjY1LTgzODYtZTdlNWM5NmU5YTYzIiwiYXV0aG9yaXRpZXMiOlsiRUdBRDAxIl19.e_Yce8PtMD0bWeNxgAK_F4mA1xUmENlECuvOIffBXD2DNWoW03SQafGdm6ag64XosXxrb7JRxX6oOObTxVCRmRmK_63LmDCXNEm1CiYzLaZhPwpmfBKNYLkEvhUAU7OzskHJDLleqmmN5tbORlvTEOy-d35A_zVmcHfydC4h-VY |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-----BEGIN PUBLIC KEY----- | |
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd | |
UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs | |
HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D | |
o2kQ+X5xK9cipRgEKwIDAQAB | |
-----END PUBLIC KEY----- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment