Skip to content

Instantly share code, notes, and snippets.

@blankdots
Last active February 21, 2019 14:19
Show Gist options
  • Save blankdots/bb004abdd6faaed2e59d5687f619eb1e to your computer and use it in GitHub Desktop.
Save blankdots/bb004abdd6faaed2e59d5687f619eb1e to your computer and use it in GitHub Desktop.
M4 end to end
localega:
token:
# Username to identify the user in CentralEGA
user: dummy
# User secret private RSA key.
user_key: dummy.pk
# Public key file to encrypt file.
encrypt_key_public: key.1.pub
# Private key file to encrypt file.
encrypt_key_private: key.1.sec
# Private key password.
encrypt_key_pass:
# Inbox address, or service name
inbox_address: localega-inbox
# Inbox port
inbox_port: 2222
# S3 address, or service name, should include port number
s3_address: minio:9000
s3_access:
s3_secret:
s3_region: lega
s3_bucket: lega
# CentralEGA Message Broker address
cm_address:
cm_port: 5271
# CentralEGA Message Broker Country specific details
cm_user:
cm_vhost:
cm_pass:
# RES address or service name with port
res_address: localega-res:9090
# DataEdge address or service name with port
dataedge_address: localega-dataedge:9059
# DB address, port will be assumed 5432
db_address: localega-db
db_user: lega
db_name: lega
db_pass:
import paramiko
import os
import pika
import secrets
from hashlib import md5
import json
import string
import sys
import logging
from legacryptor.crypt4gh import encrypt, Header, get_header
import pgpy
import argparse
from minio import Minio
from time import sleep
import requests
import filecmp
import asyncio
import asyncpg
FORMAT = '[%(asctime)s][%(name)s][%(process)d %(processName)s][%(levelname)-8s] (L:%(lineno)s) %(funcName)s: %(message)s'
logging.basicConfig(format=FORMAT, datefmt='%Y-%m-%d %H:%M:%S')
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
async def get_last_id(db_user, db_name, db_pass, db_host):
"""Retrieve the last inserted file in the database, indifferent of status."""
conn = await asyncpg.connect(user=db_user, password=db_pass,
database=db_name, host=db_host)
values = await conn.fetchrow('''SELECT created_at, id FROM local_ega.files ORDER BY created_at DESC LIMIT 1''')
LOG.info(f"Database ID: {values['id']}")
return values['id']
await conn.close()
async def file2dataset_map(db_user, db_name, db_pass, db_host, file_id, dataset_id):
"""Assign file to dataset for dataset driven permissions."""
conn = await asyncpg.connect(user=db_user, password=db_pass,
database=db_name, host=db_host)
last_index = await conn.fetchrow('''select id from local_ega.file2dataset ORDER BY id DESC LIMIT 1''')
await conn.execute('''
INSERT INTO local_ega.file2dataset(id, file_id, dataset_id) VALUES($1, $2, $3)
''', last_index['id'] + 1, file_id, dataset_id)
LOG.info(f"Mapped ID: {file_id} to Dataset: {dataset_id}")
await conn.close()
def open_ssh_connection(hostname, user, key_path, key_pass='password', port=2222):
"""Open an ssh connection, test function."""
try:
client = paramiko.SSHClient()
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, allow_agent=False, look_for_keys=False, port=port, timeout=0.3, username=user, pkey=k)
LOG.info(f'ssh connected to {hostname}:{port} with {user}')
except paramiko.BadHostKeyException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('BadHostKeyException on ' + hostname)
except paramiko.AuthenticationException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('AuthenticationException on ' + hostname)
except paramiko.SSHException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('SSHException on ' + hostname)
return client
def sftp_upload(hostname, user, file_path, key_path, key_pass='password', port=2222):
"""SFTP Client file upload."""
try:
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass)
transport = paramiko.Transport((hostname, port))
transport.connect(username=user, pkey=k)
LOG.info(f'sftp connected to {hostname}:{port} with {user}')
sftp = paramiko.SFTPClient.from_transport(transport)
filename, _ = os.path.splitext(file_path)
sftp.put(file_path, f'{filename}.c4ga')
LOG.info(f'file uploaded {filename}.c4ga')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
finally:
LOG.debug('sftp done')
transport.close()
def submit_cega(address, user, vhost, message, routing_key, mq_password, correlation_id, port=5271, file_md5=None):
"""Submit message to CEGA along with."""
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}'
try:
parameters = pika.URLParameters(mq_address)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish(exchange='localega.v1', routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(correlation_id=correlation_id,
content_type='application/json',
delivery_mode=2))
connection.close()
LOG.info(f'Message published to CentralEGA: {message}')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
def get_corr(address, user, vhost, queue, filepath, mq_password, latest_message=True, port=5271):
"""Read all messages from a queue and fetches the correlation_id for the one with given path, if found."""
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}'
parameters = pika.URLParameters(mq_address)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
correlation_ids = []
messages = set()
while True:
method_frame, props, body = channel.basic_get(queue=queue)
if method_frame is None or props is None:
break
LOG.error('No message returned')
message_id = method_frame.delivery_tag
if message_id in messages: # we looped
break
messages.add(message_id)
try:
data = json.loads(body)
user = data.get('user')
filepath = data.get('filepath')
assert(user and filepath)
if user == user and filepath == filepath:
correlation_ids.append((props.correlation_id, message_id))
except Exception as e:
LOG.error(f'Something went wrong {e}')
pass
# Second loop, nack the messages
for message_id in messages:
channel.basic_nack(delivery_tag=message_id)
connection.close()
if not correlation_ids:
sys.exit(2)
correlation_id = correlation_ids[0][0]
if latest_message:
message_id = -1 # message ids are positive
for cid, mid in correlation_ids:
if mid > message_id:
correlation_id = cid
LOG.info(f'correlation_id: {correlation_id}')
return correlation_id
def encrypt_file(file_path, pubkey):
"""Encrypt file and extract its md5."""
file_size = os.path.getsize(file_path)
filename, _ = os.path.splitext(file_path)
output_base = os.path.basename(filename)
c4ga_md5 = None
output_file = os.path.expanduser(f'{output_base}.c4ga')
infile = open(file_path, 'rb')
try:
encrypt(pubkey, infile, file_size, open(f'{output_base}.c4ga', 'wb'))
with open(output_file, 'rb') as read_file:
c4ga_md5 = md5(read_file.read()).hexdigest()
LOG.info(f'File {output_base}.c4ga is the encrypted file with md5: {c4ga_md5}.')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
return (output_file, c4ga_md5)
def list_s3_objects(minio_address, bucket_name, region_name, file_id, access, secret):
"""Check if there is a file inside s3."""
minioClient = Minio(minio_address, access_key=access, secret_key=secret,
region=region_name, secure=False)
LOG.info(f'Connected to S3: {minio_address}.')
# List all object paths in bucket that begin with my-prefixname.
objects = minioClient.list_objects_v2(bucket_name, recursive=True)
object_list = [obj.object_name for obj in objects]
assert str(file_id) in object_list, f"Could not find the file just uploaded!"
LOG.info(f"Found the file uploaded to inbox as {file_id} in S3Storage.")
all_objects = minioClient.list_objects(bucket_name, recursive=True)
LOG.info("All the files in Lega bucket: ")
for obj in all_objects:
LOG.info(f'Found ingested file: {obj.object_name} of size: {obj.size}.')
def main():
"""Do the sparkles and fireworks."""
parser = argparse.ArgumentParser(description="M4 end to end test with command line option.")
# Should we do this in a configuration file ?
parser.add_argument('input', help='Input file to be encrypted.')
parser.add_argument('--u', help='Username to identify the user in CentralEGA.', default='dummy')
parser.add_argument('--uk', help='User secret private RSA key.', default='auto/config/user.key')
parser.add_argument('--pk', help='Public key file to encrypt file.', default='key.1.pub')
parser.add_argument('--sk', help='Private key file to encrypt file.', default='key.1.sec')
parser.add_argument('--sk-pass', help='Private key password.', default='')
parser.add_argument('--inbox', help='Inbox address, or service name', default='localega-inbox')
parser.add_argument('--inbox-port', help='Inbox address, or service name', default='2222')
parser.add_argument('--s3', help='Inbox address including port, or service name', default='minio:9000')
parser.add_argument('--s3-access', help='S3 access', default="")
parser.add_argument('--s3-secret', help='S3 secret', default="")
parser.add_argument('--s3-region', help='S3 region', default="lega")
parser.add_argument('--s3-bucket', help='S3 bucket', default="lega")
parser.add_argument('--cm', help='CEGA MQ broker IP/name address')
parser.add_argument('--cm-port', help='CEGA Broker port')
parser.add_argument('--cm-user', help='CEGA Broker user', default="")
parser.add_argument('--cm-vhost', help='CEGA Broker vhost', default="")
parser.add_argument('--cm-pass', help='CEGA MQ password', default="")
parser.add_argument('--res', help='RES - reencryption service with port', default='localega-res:9090')
parser.add_argument('--dataedge', help='DataEdge service with port', default='localega-dataedge:9059')
parser.add_argument('--db', help='DB service', default='localega-db')
parser.add_argument('--db-user', help='DB user', default='lega')
parser.add_argument('--db-name', help='Database name', default='lega')
parser.add_argument('--db-pass', help='DB password', default='')
args = parser.parse_args()
# Initialise what is needed
used_file = os.path.expanduser(args.input)
res_file = 'res.file'
dataedge_file = 'dataedge.file'
key_pk = os.path.expanduser(args.uk)
pub_key, _ = pgpy.PGPKey.from_file(os.path.expanduser(args.pk))
sec_key, _ = pgpy.PGPKey.from_file(args.sk)
loop = asyncio.get_event_loop()
session_key = ''
iv = ''
fileID = ''
token = ''
test_user = args.u
# TEST Connection before anything
open_ssh_connection(args.inbox, test_user, key_pk)
# Encrypt File
test_file, c4ga_md5 = encrypt_file(used_file, pub_key)
# Retrieve session_key and IV to test RES
with sec_key.unlock(args.sk_pass) as privkey:
header = Header.decrypt(get_header(open(test_file, 'rb'))[1], privkey)
session_key = header.records[0].session_key.hex()
iv = header.records[0].iv.hex()
# Stable ID is mocked this should be generated by CentralEGA
stableID = ''.join(secrets.choice(string.digits) for i in range(16))
if c4ga_md5:
sftp_upload(args.inbox, test_user, test_file, key_pk, port=int(args.inbox_port))
correlation_id = get_corr(args.cm, args.cm_user, args.cm_vhost, 'v1.files.inbox', test_file, args.cm_pass)
# Publish the file to simulate a CentralEGA trigger
submit_cega(args.cm, args.cm_user, args.cm_vhost,
{'user': test_user, 'filepath': test_file}, 'files',
args.cm_pass, correlation_id)
# wait for submission to go through - might need to be longer depending on the file
# proper retry should be implemented
sleep(10)
# Once the file has been ingested it should be the last ID in the database
# We use this ID everywhere including donwload from DataEdge
# In future versions once we fix DB schema we will use StableID for download
fileID = loop.run_until_complete(get_last_id(args.db_user, args.db_name, args.db_pass, args.db))
# Stable ID should be sent by CentralEGA
submit_cega(args.cm, args.cm_user, args.cm_vhost,
{'file_id': fileID, 'stable_id': f'EGAF{stableID}'}, 'stableIDs',
args.cm_pass, correlation_id)
# wait for the file in s3
# proper retry should be implemented
sleep(10)
list_s3_objects(args.s3, args.s3_region, args.s3_bucket, fileID, args.s3_access, args.s3_secret)
LOG.info('Ingestion DONE')
LOG.info('-------------------------------------')
# Verify that the file can be downloaded from RES using the session_key and IV
payload = {'sourceKey': session_key, 'sourceIV': iv, 'filePath': fileID}
res_url = f'http://{args.res}/file'
download = requests.get(res_url, params=payload)
# We are using filecmp thus we will write content to file
assert download.status_code == 200, f'We got a different status from RES {download.status_code}'
LOG.info(f'write content to {res_file}')
open(res_file, 'wb').write(download.content)
LOG.info('Comparing RES downloaded file with original file ...')
# comparing content of the files
assert filecmp.cmp(res_file, used_file, shallow=False), 'files are not equal'
# The low level alternative would be:
# with open(res_file) as f1:
# with open(used_file) as f2:
# if f1.read() == f2.read():
# pass
LOG.info('RES Downloaded file is equal to the original file.')
with open(res_file, "r") as out_file:
LOG.info(f'Contents: \n {out_file.read()} \n')
LOG.info('Mapping file to dataset for retrieving file via dataedge.')
# There is no component asigning permissions for files in datasets
# Thus we need this step
# for now this dataset ID is fixed to 'EGAD01' as we have it like this in the TOKEN
# Will need updating once we decide on the permissions handling
loop.run_until_complete(file2dataset_map(args.db_user, args.db_name, args.db_pass, args.db, fileID, 'EGAD01'))
# Verify that the file can be downloaded from DataEdge
# We are using a token that can be validated by DataEdge
edge_payload = {'destinationFormat': 'plain'}
edge_headers = {'Authorization': f'Bearer {token}'} # No token no permissions
dataedge_url = f'http://{args.dataedge}/files/{fileID}'
down_dataedge = requests.get(dataedge_url, params=edge_payload, headers=edge_headers)
LOG.info(down_dataedge.url)
# 206 would be a partial content that means we misshandle some bytes
assert down_dataedge.status_code == 200, f'We got a different status from DataEdge {down_dataedge.status_code}'
LOG.info(f'write content to {dataedge_file}')
open(dataedge_file, 'wb').write(down_dataedge.content)
LOG.info('Comparing Dataedge downloaded file with original file ...')
# comparing content of the files
assert filecmp.cmp(dataedge_file, used_file, shallow=False), 'files are not equal'
LOG.info('Dataedge Downloaded file is equal to the original file.')
with open(dataedge_file, "r") as out_file:
LOG.info(f'Contents: \n {out_file.read()} \n')
LOG.info('Outgestion DONE')
LOG.info('-------------------------------------')
LOG.info('Should be all!')
if __name__ == '__main__':
assert sys.version_info >= (3, 6), "M4 end to end test requires python3.6"
main()
import paramiko
import os
import pika
import secrets
from hashlib import md5
import json
import string
import sys
import logging
from legacryptor.crypt4gh import encrypt, Header, get_header
import pgpy
import argparse
from minio import Minio
from time import sleep
import requests
import filecmp
import asyncio
import asyncpg
import yaml
FORMAT = '[%(asctime)s][%(name)s][%(process)d %(processName)s][%(levelname)-8s] (L:%(lineno)s) %(funcName)s: %(message)s'
logging.basicConfig(format=FORMAT, datefmt='%Y-%m-%d %H:%M:%S')
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
async def get_last_id(db_user, db_name, db_pass, db_host):
"""Retrieve the last inserted file in the database, indifferent of status."""
conn = await asyncpg.connect(user=db_user, password=db_pass,
database=db_name, host=db_host)
values = await conn.fetchrow('''SELECT created_at, id FROM local_ega.files ORDER BY created_at DESC LIMIT 1''')
LOG.info(f"Database ID: {values['id']}")
return values['id']
await conn.close()
async def file2dataset_map(db_user, db_name, db_pass, db_host, file_id, dataset_id):
"""Assign file to dataset for dataset driven permissions."""
conn = await asyncpg.connect(user=db_user, password=db_pass,
database=db_name, host=db_host)
last_index = await conn.fetchrow('''select id from local_ega.file2dataset ORDER BY id DESC LIMIT 1''')
await conn.execute('''
INSERT INTO local_ega.file2dataset(id, file_id, dataset_id) VALUES($1, $2, $3)
''', last_index['id'] + 1, file_id, dataset_id)
LOG.info(f"Mapped ID: {file_id} to Dataset: {dataset_id}")
await conn.close()
def open_ssh_connection(hostname, user, key_path, key_pass='password', port=2222):
"""Open an ssh connection, test function."""
try:
client = paramiko.SSHClient()
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, allow_agent=False, look_for_keys=False, port=port, timeout=0.3, username=user, pkey=k)
LOG.info(f'ssh connected to {hostname}:{port} with {user}')
except paramiko.BadHostKeyException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('BadHostKeyException on ' + hostname)
except paramiko.AuthenticationException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('AuthenticationException on ' + hostname)
except paramiko.SSHException as e:
LOG.error(f'Something went wrong {e}')
raise Exception('SSHException on ' + hostname)
return client
def sftp_upload(hostname, user, file_path, key_path, key_pass='password', port=2222):
"""SFTP Client file upload."""
try:
k = paramiko.RSAKey.from_private_key_file(key_path, password=key_pass)
transport = paramiko.Transport((hostname, port))
transport.connect(username=user, pkey=k)
LOG.info(f'sftp connected to {hostname}:{port} with {user}')
sftp = paramiko.SFTPClient.from_transport(transport)
filename, _ = os.path.splitext(file_path)
sftp.put(file_path, f'{filename}.c4ga')
LOG.info(f'file uploaded {filename}.c4ga')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
finally:
LOG.debug('sftp done')
transport.close()
def submit_cega(address, user, vhost, message, routing_key, mq_password, correlation_id, port=5271, file_md5=None):
"""Submit message to CEGA along with."""
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}'
try:
parameters = pika.URLParameters(mq_address)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish(exchange='localega.v1', routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(correlation_id=correlation_id,
content_type='application/json',
delivery_mode=2))
connection.close()
LOG.info(f'Message published to CentralEGA: {message}')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
def get_corr(address, user, vhost, queue, filepath, mq_password, latest_message=True, port=5271):
"""Read all messages from a queue and fetches the correlation_id for the one with given path, if found."""
mq_address = f'amqps://{user}:{mq_password}@{address}:{port}/{vhost}'
parameters = pika.URLParameters(mq_address)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
correlation_ids = []
messages = set()
while True:
method_frame, props, body = channel.basic_get(queue=queue)
if method_frame is None or props is None:
break
LOG.error('No message returned')
message_id = method_frame.delivery_tag
if message_id in messages: # we looped
break
messages.add(message_id)
try:
data = json.loads(body)
user = data.get('user')
filepath = data.get('filepath')
assert(user and filepath)
if user == user and filepath == filepath:
correlation_ids.append((props.correlation_id, message_id))
except Exception as e:
LOG.error(f'Something went wrong {e}')
pass
# Second loop, nack the messages
for message_id in messages:
channel.basic_nack(delivery_tag=message_id)
connection.close()
if not correlation_ids:
sys.exit(2)
correlation_id = correlation_ids[0][0]
if latest_message:
message_id = -1 # message ids are positive
for cid, mid in correlation_ids:
if mid > message_id:
correlation_id = cid
LOG.info(f'correlation_id: {correlation_id}')
return correlation_id
def encrypt_file(file_path, pubkey):
"""Encrypt file and extract its md5."""
file_size = os.path.getsize(file_path)
filename, _ = os.path.splitext(file_path)
output_base = os.path.basename(filename)
c4ga_md5 = None
output_file = os.path.expanduser(f'{output_base}.c4ga')
infile = open(file_path, 'rb')
try:
encrypt(pubkey, infile, file_size, open(f'{output_base}.c4ga', 'wb'))
with open(output_file, 'rb') as read_file:
c4ga_md5 = md5(read_file.read()).hexdigest()
LOG.info(f'File {output_base}.c4ga is the encrypted file with md5: {c4ga_md5}.')
except Exception as e:
LOG.error(f'Something went wrong {e}')
raise e
return (output_file, c4ga_md5)
def list_s3_objects(minio_address, bucket_name, region_name, file_id, access, secret):
"""Check if there is a file inside s3."""
minioClient = Minio(minio_address, access_key=access, secret_key=secret,
region=region_name, secure=False)
LOG.info(f'Connected to S3: {minio_address}.')
# List all object paths in bucket that begin with my-prefixname.
objects = minioClient.list_objects_v2(bucket_name, recursive=True)
object_list = [obj.object_name for obj in objects]
assert str(file_id) in object_list, f"Could not find the file just uploaded!"
LOG.info(f"Found the file uploaded to inbox as {file_id} in S3Storage.")
all_objects = minioClient.list_objects(bucket_name, recursive=True)
LOG.info("All the files in Lega bucket: ")
for obj in all_objects:
LOG.info(f'Found ingested file: {obj.object_name} of size: {obj.size}.')
def main():
"""Do the sparkles and fireworks."""
parser = argparse.ArgumentParser(description="M4 end to end test with YAML configuration.")
# Should we do this in a configuration file ?
parser.add_argument('input', help='File to be uploaded.')
parser.add_argument('config', help='Configuration file.')
args = parser.parse_args()
used_file = os.path.expanduser(args.input)
config_file = os.path.expanduser(args.config)
with open(config_file, 'r') as stream:
try:
config = yaml.load(stream)
except yaml.YAMLError as exc:
LOG.error(exc)
# Initialise what is needed
res_file = 'res.file'
dataedge_file = 'dataedge.file'
key_pk = os.path.expanduser(config['localega']['user_key'])
pub_key, _ = pgpy.PGPKey.from_file(os.path.expanduser(config['localega']['encrypt_key_public']))
sec_key, _ = pgpy.PGPKey.from_file(config['localega']['encrypt_key_private'])
loop = asyncio.get_event_loop()
session_key = ''
iv = ''
fileID = ''
token = config['localega']['token']
test_user = config['localega']['user']
# TEST Connection before anything
open_ssh_connection(config['localega']['inbox_address'], test_user, key_pk)
# Encrypt File
test_file, c4ga_md5 = encrypt_file(used_file, pub_key)
# Retrieve session_key and IV to test RES
with sec_key.unlock(config['localega']['encrypt_key_pass']) as privkey:
header = Header.decrypt(get_header(open(test_file, 'rb'))[1], privkey)
session_key = header.records[0].session_key.hex()
iv = header.records[0].iv.hex()
# Stable ID is mocked this should be generated by CentralEGA
stableID = ''.join(secrets.choice(string.digits) for i in range(16))
if c4ga_md5:
sftp_upload(config['localega']['inbox_address'], test_user, test_file, key_pk, port=int(config['localega']['inbox_port']))
correlation_id = get_corr(config['localega']['cm_address'], config['localega']['cm_user'],
config['localega']['cm_vhost'], 'v1.files.inbox', test_file, config['localega']['cm_pass'])
# Publish the file to simulate a CentralEGA trigger
submit_cega(config['localega']['cm_address'], config['localega']['cm_user'], config['localega']['cm_vhost'],
{'user': test_user, 'filepath': test_file}, 'files',
config['localega']['cm_pass'], correlation_id)
# wait for submission to go through - might need to be longer depending on the file
# proper retry should be implemented
sleep(10)
# Once the file has been ingested it should be the last ID in the database
# We use this ID everywhere including donwload from DataEdge
# In future versions once we fix DB schema we will use StableID for download
fileID = loop.run_until_complete(get_last_id(config['localega']['db_user'], config['localega']['db_name'],
config['localega']['db_pass'], config['localega']['db_address']))
# Stable ID should be sent by CentralEGA
submit_cega(config['localega']['cm_address'], config['localega']['cm_user'], config['localega']['cm_vhost'],
{'file_id': fileID, 'stable_id': f'EGAF{stableID}'}, 'stableIDs',
config['localega']['cm_pass'], correlation_id)
# wait for the file in s3
# proper retry should be implemented
sleep(10)
list_s3_objects(config['localega']['s3_address'], config['localega']['s3_region'],
config['localega']['s3_bucket'], fileID,
config['localega']['s3_access'], config['localega']['s3_secret'])
LOG.info('Ingestion DONE')
LOG.info('-------------------------------------')
# Verify that the file can be downloaded from RES using the session_key and IV
payload = {'sourceKey': session_key, 'sourceIV': iv, 'filePath': fileID}
res_url = f"http://{config['localega']['res_address']}/file"
download = requests.get(res_url, params=payload)
# We are using filecmp thus we will write content to file
assert download.status_code == 200, f'We got a different status from RES {download.status_code}'
LOG.info(f'write content to {res_file}')
open(res_file, 'wb').write(download.content)
LOG.info('Comparing RES downloaded file with original file ...')
# comparing content of the files
assert filecmp.cmp(res_file, used_file, shallow=False), 'files are not equal'
# The low level alternative would be:
# with open(res_file) as f1:
# with open(used_file) as f2:
# if f1.read() == f2.read():
# pass
LOG.info('RES Downloaded file is equal to the original file.')
with open(res_file, "r") as out_file:
LOG.info(f'Contents: \n {out_file.read()} \n')
LOG.info('Mapping file to dataset for retrieving file via dataedge.')
# There is no component asigning permissions for files in datasets
# Thus we need this step
# for now this dataset ID is fixed to 'EGAD01' as we have it like this in the TOKEN
# Will need updating once we decide on the permissions handling
loop.run_until_complete(file2dataset_map(config['localega']['db_user'], config['localega']['db_name'],
config['localega']['db_pass'], config['localega']['db_address'],
fileID, 'EGAD01'))
# Verify that the file can be downloaded from DataEdge
# We are using a token that can be validated by DataEdge
edge_payload = {'destinationFormat': 'plain'}
edge_headers = {'Authorization': f'Bearer {token}'} # No token no permissions
dataedge_url = f"http://{config['localega']['dataedge_address']}/files/{fileID}"
down_dataedge = requests.get(dataedge_url, params=edge_payload, headers=edge_headers)
LOG.info(down_dataedge.url)
# 206 would be a partial content that means we misshandle some bytes
assert down_dataedge.status_code == 200, f'We got a different status from DataEdge {down_dataedge.status_code}'
LOG.info(f'write content to {dataedge_file}')
open(dataedge_file, 'wb').write(down_dataedge.content)
LOG.info('Comparing Dataedge downloaded file with original file ...')
# comparing content of the files
assert filecmp.cmp(dataedge_file, used_file, shallow=False), 'files are not equal'
LOG.info('Dataedge Downloaded file is equal to the original file.')
with open(dataedge_file, "r") as out_file:
LOG.info(f'Contents: \n {out_file.read()} \n')
LOG.info('Outgestion DONE')
LOG.info('-------------------------------------')
LOG.info('Should be all!')
if __name__ == '__main__':
assert sys.version_info >= (3, 6), "M4 end to end test requires python3.6"
main()
cryptography
PGPy==0.4.3
pika
paramiko
minio
PyYAML
git+https://github.com/NBISweden/LocalEGA-cryptor.git
asyncpg
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI0NTgxMSIsImF6cCI6ImJhM2UyNDA4LTllZGUtNDcwZC04MDM4LTRlNjU1ZDYwZjQ2MyIsImlzcyI6Imh0dHA6Ly9kYXRhLmVwb3V0YS5jc2MuZmkiLCJleHAiOjE2Mjc4MTEyMDAsImlhdCI6MTUxMjEzMDE2NywianRpIjoiOWM5ODM1MzAtZGQ3Ni00ZjY1LTgzODYtZTdlNWM5NmU5YTYzIiwiYXV0aG9yaXRpZXMiOlsiRUdBRDAxIl19.e_Yce8PtMD0bWeNxgAK_F4mA1xUmENlECuvOIffBXD2DNWoW03SQafGdm6ag64XosXxrb7JRxX6oOObTxVCRmRmK_63LmDCXNEm1CiYzLaZhPwpmfBKNYLkEvhUAU7OzskHJDLleqmmN5tbORlvTEOy-d35A_zVmcHfydC4h-VY
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd
UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs
HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D
o2kQ+X5xK9cipRgEKwIDAQAB
-----END PUBLIC KEY-----
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment