Last active
January 18, 2023 18:11
-
-
Save nathanmargaglio/f40071455b6f40248d1ebab514681529 to your computer and use it in GitHub Desktop.
LogQS Directory Upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Upload Directory | |
This script relies on the LogQS Client library: | |
pip install --upgrade lqs-client | |
The lqs.utils.upload method uses threading to speed up the upload. This means | |
that we need to consider both the number of threads (max_workers) and the part_size | |
when initiating an upload, since each of the parts are loaded into memory as they | |
are uploaded, which can cause a memory error if we load too much. Currently, it's set | |
to use 8 max_workers and a part_size of 1 GB (using 2 GB resulted in a memory error). | |
BUT keep in mind that S3 only allows up to 1,000 parts for a multipart upload, | |
so this may need to be adjusted if the file size is greater than 1 TB. | |
""" | |
import argparse | |
import os | |
import time | |
import base64 | |
import shutil | |
from pprint import pprint | |
from concurrent.futures import ThreadPoolExecutor | |
from lqs_client import LogQS | |
# Note: creds are loaded from .env | |
def process_log(lqs, log_directory, log_path, group_id): | |
key = log_path | |
print(f"Listing logs with name like '{key}'...") | |
logs_response = lqs.list.log(name_like=key) | |
logs_count = logs_response["count"] | |
print(f"Found {logs_count} logs with name like '{key}'.") | |
if logs_count == 0: | |
print("Creating log...") | |
log = lqs.create.log( | |
group_id=group_id, | |
name=key, | |
note=None | |
)["data"] | |
print("Log:") | |
pprint(log) | |
log_id = log["id"] | |
else: | |
print(logs_response) | |
log = logs_response['data'][0] | |
log_id = log['id'] | |
print(log_id) | |
log_path = os.path.join(log_directory, log_path) | |
log_format = 'ros' | |
print(f"Listing ingestions with name like '{key}'...") | |
ingestions_response = lqs.list.ingestion(name=key) | |
ingestions_count = ingestions_response["count"] | |
if ingestions_count == 0: | |
print("Creating ingestion...") | |
ingestion = lqs.create.ingestion( | |
name=key, | |
log_id=log["id"], | |
s3_bucket=None, | |
s3_key=None, | |
format=log_format, | |
queued=False, | |
note='Upload with malvarado processing script.' | |
)["data"] | |
print("Ingestion:") | |
pprint(ingestion) | |
ingestion_id = ingestion["id"] | |
else: | |
print(ingestions_response) | |
ingestion = ingestions_response['data'][0] | |
ingestion_id = ingestion['id'] | |
print(ingestion_id) | |
if ingestion["completed"]: | |
print(f"Ingestion already completed; skipping...") | |
return | |
if ingestion["processing"]: | |
print(f"Ingestion already processing; skipping...") | |
return | |
if ingestion["queued"]: | |
print(f"Ingestion already queued; skipping...") | |
return | |
if ingestion["errored"]: | |
print(f"Ingestion errored; skipping...") | |
return | |
print("Uploading...") | |
lqs.utils.upload( | |
resource="ingestion", | |
resource_id=ingestion_id, | |
file_path=log_path, | |
key=key, | |
part_size=1024 * 1024 * 1024, # 1 GB | |
max_workers=8 | |
) | |
print("Listing objects...") | |
r_headers, r_params, r_body = lqs.s3.list_objects_v2( | |
resource="ingestion", | |
resource_id=ingestion_id | |
) | |
s3_bucket = r_body["ListBucketResult"]["Name"] | |
s3_key = r_body['ListBucketResult']['Contents']['Key'] | |
print(f"Found s3://{s3_bucket}/{s3_key}") | |
print("Queueing ingestion...") | |
lqs.update.ingestion( | |
ingestion_id=ingestion_id, | |
data=dict( | |
s3_bucket=s3_bucket, | |
s3_key=s3_key, | |
queued=True | |
) | |
) | |
print("Ingestion queued!") | |
def main(log_directory, group_name): | |
lqs = LogQS(verbose=True) | |
# The following is just a sanity check | |
if lqs._api_url is None: | |
raise Exception("Environment variables not loaded! Check that a .env file has been added to the runtime.") | |
assert lqs._api_url.endswith('/api'), "LQS_API_URL should end with '/api'" | |
assert lqs._api_url.startswith('http'), "LQS_API_URL should start with 'http'" | |
print(f"Using URL {lqs._api_url} with API Key ID {lqs._api_key_id}.") | |
group = lqs.list.group(name=group_name)["data"][0] | |
group_id = group["id"] | |
for log_path in os.listdir(log_directory): | |
if (not log_path.endswith('.bag')) and (not log_path.endswith('.log')): | |
continue | |
try: | |
process_log(lqs, log_directory, log_path, group_id) | |
except Exception as e: | |
with open("results.txt", "a+") as f: | |
f.write(f"{time.time()}: {log_directory}/{log_path} - {e}\n") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser("upload a directory of logs") | |
parser.add_argument("-d", "--log-directory", help="The root directory of logs") | |
parser.add_argument("-g", "--group", help="The group name to upload logs under") | |
args = parser.parse_args() | |
main( | |
log_directory=args.log_directory, | |
group_name=args.group, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment