Skip to content

Instantly share code, notes, and snippets.

@joshuarobinson
Last active September 14, 2021 12:44
Show Gist options
  • Save joshuarobinson/0e69802200ea2b4072ffdf2ef2103508 to your computer and use it in GitHub Desktop.
Save joshuarobinson/0e69802200ea2b4072ffdf2ef2103508 to your computer and use it in GitHub Desktop.
import boto3
from datetime import datetime
import os
import sys
import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Source information: FlashBlade datavip, bucket name, and prefix.
FB_DATAVIP='10.62.64.200'
SOURCE_BUCKET = "beachboys"
SOURCE_PREFIX = ""
INDEX='s3index'
es = Elasticsearch([os.environ['ELASTICSEARCH_HOST']],
http_auth=(os.environ['ELASTICSEARCH_USER'], os.environ['ELASTICSEARCH_PASSWORD']),
scheme="https",
port=443,
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60,
verify_certs=False)
request_body = {
"settings" : {
"number_of_shards": 4,
"number_of_replicas": 1
}}
es.indices.create(index=INDEX, body=request_body)
while True:
print("Listing {}/{}".format(SOURCE_BUCKET, SOURCE_PREFIX))
epoch_ts = datetime.now().timestamp()
s3 = boto3.resource('s3', use_ssl=False, endpoint_url='http://' + FB_DATAVIP)
kwargs = {'Bucket' : SOURCE_BUCKET, 'Prefix' : SOURCE_PREFIX}
while True:
objlist = s3.meta.client.list_objects_v2(**kwargs)
doclist = []
for o in objlist.get('Contents', []):
key = o['Key']
doc = o
response = s3.meta.client.head_object(
Bucket=SOURCE_BUCKET,
Key=key,
)
doc['Metadata'] = response['Metadata']
doc['epoch_ts'] = epoch_ts
doclist.append(doc)
actions = [
{
"_index": INDEX,
"_source": d
} for d in doclist
]
bulk(es, actions)
try:
kwargs['ContinuationToken'] = objlist['NextContinuationToken']
except KeyError:
break
time.sleep(600)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment