#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
""" | |
iterate a bucket in s3 containing JSON log files | |
get each file, massage json a little and uploade to elasticsearc | |
""" | |
import json | |
import os | |
import click | |
from datetime import datetime, date, timedelta | |
from elasticsearch import Elasticsearch | |
from smart_open import smart_open, s3_iter_bucket | |
import boto | |
import logging | |
logging.getLogger("smart_open").setLevel(logging.WARNING) | |
logging.getLogger("elasticsearch").setLevel(logging.WARNING) | |
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='03/07/2019 01:28:39 pm', level=logging.DEBUG, filename='logrun_dsl_data_loader.log',) | |
DEFAULT_HOST = "http://127.0.0.1:9200/" | |
DEFAULT_INDEX = "dsl_logs_test" | |
def pretty_json(dict): | |
return json.dumps(dict, indent=4, sort_keys=True) | |
class Document(object): | |
"""Abstraction of an ES document""" | |
def __init__(self, pk=None, doc_type=None, index_name=DEFAULT_INDEX, host_name=DEFAULT_HOST): | |
super(Document, self).__init__() | |
self.raw_data = None | |
self.doc_type = doc_type or "doc" | |
self.id = pk | |
self.index_name = index_name | |
self.host_name = host_name | |
self.ES = Elasticsearch(hosts=self.host_name) | |
if self.id: | |
self.add() | |
def get(self, pk=None): | |
"Retrieve a doc from the index based on its ID" | |
if pk: | |
self.id = pk | |
elif not self.id: | |
print("Specify an ID") | |
return | |
self.raw_data = self.ES.get(index=self.index_name, id=self.id) | |
def get_val(self, prop_name): | |
"Get values for a loaded document, based on a specific property. Lazy and friendly." | |
if self.raw_data: | |
try: | |
return rdf_data[guess_prop_name] | |
except: | |
pass | |
print("Property not found") | |
else: | |
print("No document is loaded") | |
return None | |
def add(self, json_doc, doc_type=None, _id=""): | |
"Add a JSON document to the ES Index" | |
if not doc_type: | |
doc_type = self.doc_type | |
res = self.ES.index(index=self.index_name, doc_type=doc_type, id=_id, body=json_doc) | |
if False: # doing this on many inserts will blow up the terminal | |
print(pretty_json(res)) | |
def delete(self, id): | |
res = self.ES.delete(index=index_name, id=id) | |
print(pretty_json(res)) | |
def delete_index(self, index_name=None): | |
if not index_name: | |
index_name = self.index_name | |
res = self.ES.indices.delete(index=index_name) | |
print(pretty_json(res)) | |
def print_raw_data(self): | |
"Returns full JSON document as stored in ES" | |
print(pretty_json(self.raw_data)) | |
def parse_and_index_data(file_contents, index_name, host_name=None, source_path=None, test_mode=False): | |
""" | |
take the bytes contents of an S3 file, split it into lines, then decode it so that the JSON can be parsed and | |
data sent to Elastisearch | |
Each file is a sequence of log data, organized into 2 lines: the access token/timestamp, and the proper logs data. | |
Example source_path for s3: | |
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json | |
""" | |
if not host_name: | |
host_name = DEFAULT_HOST | |
doc = Document(index_name=index_name, host_name=host_name) | |
counter = 0 | |
for line in file_contents.splitlines(): | |
if type(line) == bytes: | |
j = json.loads(line.decode()) | |
else: | |
j = json.loads(line) # this is only when reading form local file for testing.. | |
if "token" in j: | |
# timestamp line | |
ml_timestamp = j['received'] | |
t = datetime.fromtimestamp(ml_timestamp//1000.0) | |
else: | |
# data line => get all of it | |
counter += 1 | |
j['timestamp'] = t | |
# print(j) | |
if source_path: | |
_id = source_path.split("/")[-1].rstrip(".json") + "-%d" % counter | |
else: | |
_id = "" | |
if not test_mode: | |
# click.secho("... adding data from file " + str(source_path), fg="green") | |
logging.info("\n... scanning row[%d] of file " % counter + str(source_path)) | |
doc.add(j, _id=_id) | |
def iterate_s3_logs(index_name, host_name="", start_date="", end_date=""): | |
""" | |
Go through all log files in S3, start from a selected location | |
Location is given by the year/month/day folder structure, in ascending date order. | |
Example source_path for s3: | |
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json | |
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01 | |
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02 | |
NOTE the end_date folder is not processed, that's where the iteration stops. | |
""" | |
logging.info("\n========\nScript <iterate_s3_logs> Started Running \n=========\n") | |
# default = yesterday, today | |
d1, delta = _date_range_from_strings(start_date, end_date) | |
# this has to be the top level bucket | |
bucket = boto.connect_s3().get_bucket('com-uberresearch-sematext-logs') | |
logging.info("\n connected to s3 \n") | |
for i in range(delta.days): # this goes up to d2-1, so excludes the end_date ! | |
step = d1 + timedelta(i) | |
bucket_prefix = step.strftime("sematext_24b2ef8d/%Y/%m/%d/") # must end with slash | |
click.secho("..trying to connect to s3 folder: %s" % bucket_prefix, fg="blue") | |
logging.info("\n..trying to connect to s3 folder: %s" % bucket_prefix) | |
# iterate only through one dir at a time | |
for key, content in s3_iter_bucket(bucket, prefix=bucket_prefix, workers=16): | |
click.secho(">>>>> File: " + key + str(len(content)), fg="green") | |
parse_and_index_data(content, index_name, host_name, key) | |
def _date_range_from_strings(start_date, end_date): | |
""" | |
From two string-dates formatted as YYYY-MM-DD | |
Return a delta date_object which can be iterated on | |
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01 | |
=> default: YESTERDAY | |
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02 | |
=> default: TODAY | |
""" | |
# the following is redundant, added just as an example to simulate receiving a date-string | |
if not start_date: | |
yesterday = date.today() - timedelta(1) | |
start_date = yesterday.strftime("%Y-%m-%d") | |
if not end_date: | |
end_date = date.today().strftime("%Y-%m-%d") | |
click.secho("START_DATE = %s //// END_DATE = %s " % (start_date, end_date), fg="red") | |
logging.info("\n======= START_DATE = %s //// END_DATE = %s " % (start_date, end_date)) | |
d1 = datetime.strptime(start_date, '%Y-%m-%d').date() | |
d2 = datetime.strptime(end_date, '%Y-%m-%d').date() | |
return (d1, d2 - d1) | |
@click.command() | |
@click.option('--start_date', help='Start date eg 2018-11-15 (default=yesterday) ') | |
@click.option('--delete', is_flag=True, help='Delete selected index (view source)') | |
def search_test(start_date, test_local, delete): | |
if delete: | |
# | |
# DELETE INDEX | |
# | |
HOST, INDEX = DEFAULT_HOST, "logs_from_s3" # manually update for deleting! | |
d = Document(index_name=INDEX) | |
if click.confirm('Do you want to delete <%s/%s>?' % (HOST, INDEX)): | |
d.delete_index() | |
click.echo('Goodbye') | |
else: | |
# | |
# LOAD FROM S3 | |
# | |
HOST, INDEX = DEFAULT_HOST, "logs_from_s3" | |
iterate_s3_logs(INDEX, HOST, start_date) | |
if __name__ == '__main__': | |
search_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment