Skip to content

Instantly share code, notes, and snippets.

@nathanmargaglio
Last active January 18, 2023 18:11
Show Gist options
  • Save nathanmargaglio/f40071455b6f40248d1ebab514681529 to your computer and use it in GitHub Desktop.
Save nathanmargaglio/f40071455b6f40248d1ebab514681529 to your computer and use it in GitHub Desktop.
LogQS Directory Upload
#!/usr/bin/env python3
"""Upload Directory
This script relies on the LogQS Client library:
pip install --upgrade lqs-client
The lqs.utils.upload method uses threading to speed up the upload. This means
that we need to consider both the number of threads (max_workers) and the part_size
when initiating an upload, since each of the parts are loaded into memory as they
are uploaded, which can cause a memory error if we load too much. Currently, it's set
to use 8 max_workers and a part_size of 1 GB (using 2 GB resulted in a memory error).
BUT keep in mind that S3 only allows up to 1,000 parts for a multipart upload,
so this may need to be adjusted if the file size is greater than 1 TB.
"""
import argparse
import os
import time
import base64
import shutil
from pprint import pprint
from concurrent.futures import ThreadPoolExecutor
from lqs_client import LogQS
# Note: creds are loaded from .env
def process_log(lqs, log_directory, log_path, group_id):
key = log_path
print(f"Listing logs with name like '{key}'...")
logs_response = lqs.list.log(name_like=key)
logs_count = logs_response["count"]
print(f"Found {logs_count} logs with name like '{key}'.")
if logs_count == 0:
print("Creating log...")
log = lqs.create.log(
group_id=group_id,
name=key,
note=None
)["data"]
print("Log:")
pprint(log)
log_id = log["id"]
else:
print(logs_response)
log = logs_response['data'][0]
log_id = log['id']
print(log_id)
log_path = os.path.join(log_directory, log_path)
log_format = 'ros'
print(f"Listing ingestions with name like '{key}'...")
ingestions_response = lqs.list.ingestion(name=key)
ingestions_count = ingestions_response["count"]
if ingestions_count == 0:
print("Creating ingestion...")
ingestion = lqs.create.ingestion(
name=key,
log_id=log["id"],
s3_bucket=None,
s3_key=None,
format=log_format,
queued=False,
note='Upload with malvarado processing script.'
)["data"]
print("Ingestion:")
pprint(ingestion)
ingestion_id = ingestion["id"]
else:
print(ingestions_response)
ingestion = ingestions_response['data'][0]
ingestion_id = ingestion['id']
print(ingestion_id)
if ingestion["completed"]:
print(f"Ingestion already completed; skipping...")
return
if ingestion["processing"]:
print(f"Ingestion already processing; skipping...")
return
if ingestion["queued"]:
print(f"Ingestion already queued; skipping...")
return
if ingestion["errored"]:
print(f"Ingestion errored; skipping...")
return
print("Uploading...")
lqs.utils.upload(
resource="ingestion",
resource_id=ingestion_id,
file_path=log_path,
key=key,
part_size=1024 * 1024 * 1024, # 1 GB
max_workers=8
)
print("Listing objects...")
r_headers, r_params, r_body = lqs.s3.list_objects_v2(
resource="ingestion",
resource_id=ingestion_id
)
s3_bucket = r_body["ListBucketResult"]["Name"]
s3_key = r_body['ListBucketResult']['Contents']['Key']
print(f"Found s3://{s3_bucket}/{s3_key}")
print("Queueing ingestion...")
lqs.update.ingestion(
ingestion_id=ingestion_id,
data=dict(
s3_bucket=s3_bucket,
s3_key=s3_key,
queued=True
)
)
print("Ingestion queued!")
def main(log_directory, group_name):
lqs = LogQS(verbose=True)
# The following is just a sanity check
if lqs._api_url is None:
raise Exception("Environment variables not loaded! Check that a .env file has been added to the runtime.")
assert lqs._api_url.endswith('/api'), "LQS_API_URL should end with '/api'"
assert lqs._api_url.startswith('http'), "LQS_API_URL should start with 'http'"
print(f"Using URL {lqs._api_url} with API Key ID {lqs._api_key_id}.")
group = lqs.list.group(name=group_name)["data"][0]
group_id = group["id"]
for log_path in os.listdir(log_directory):
if (not log_path.endswith('.bag')) and (not log_path.endswith('.log')):
continue
try:
process_log(lqs, log_directory, log_path, group_id)
except Exception as e:
with open("results.txt", "a+") as f:
f.write(f"{time.time()}: {log_directory}/{log_path} - {e}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser("upload a directory of logs")
parser.add_argument("-d", "--log-directory", help="The root directory of logs")
parser.add_argument("-g", "--group", help="The group name to upload logs under")
args = parser.parse_args()
main(
log_directory=args.log_directory,
group_name=args.group,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment