Skip to content

Instantly share code, notes, and snippets.

@juangesino
Created October 17, 2021 12:07
Show Gist options
  • Save juangesino/2caa471fe676fe69c76532d25305f258 to your computer and use it in GitHub Desktop.
Save juangesino/2caa471fe676fe69c76532d25305f258 to your computer and use it in GitHub Desktop.
Data lake in S3 from MongoDB
# Data lake in S3 from MongoDB
collection_name: catawiki_auctions
incremental: true
date_field: createdAt
start_date: 2020-03-01
end_date: 2020-08-10
hours_interval: 24
# Store file as compressed JSON
with gzip.GzipFile(file_name, "w") as file_out:
file_out.write(json_bytes)
for i in range(1, 10):
print(i, end=", ")
#=> 1, 2, 3, 4, 5, 6, 7, 8, 9,
for day in date_range("2021-01-01", "2021-01-10"):
print(day, end=", ")
#=> 2021-01-01 00:00:00, 2021-01-02 00:00:00, 2021-01-03 00:00:00, 2021-01-04 00:00:00, 2021-01-05 00:00:00, 2021-01-06 00:00:00, 2021-01-07 00:00:00, 2021-01-08 00:00:00, 2021-01-09 00:00:00,
file_name = f"{COLLECTION_NAME}_{date_str_file}_{z_lower_limit}-{z_upper_limit}_{today_str}_batch{batch_number}.json.gz"
# => catawiki_auctions_20200310_0000-2400_20211010082324540093_batch0.json.gz
import json
json_str = json.dumps(records)
# => TypeError: Object of type ObjectId is not JSON serializable
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::BUCKET-NAME-HERE/*"
}
]
}
from bson.json_util import dumps as mongo_dumps
# Dump and encode records
json_str = mongo_dumps(records)
json_bytes = json_str.encode("utf-8")
# Start an S3 client
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
# Upload file to S3 bucket
s3_client.upload_file(
file_path,
bucket_name,
bucket_path,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment