Skip to content

Instantly share code, notes, and snippets.

@semyont
Last active July 20, 2023 16:13
Show Gist options
  • Save semyont/c79ec9a9b6a19b8dcc0c07ce547a957f to your computer and use it in GitHub Desktop.
Save semyont/c79ec9a9b6a19b8dcc0c07ce547a957f to your computer and use it in GitHub Desktop.
large timeseries csv streaming upsert bulk elasticseach #index #pandas #bigdata #csv #upsert #elasticsearch #progressbar #example #bulk #stream #dataops #dataengineer #timeseries
import logging
import hashlib
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from tqdm import tqdm
class Storage:
es = None
def __init__(self, host, port, **kwargs):
if kwargs.get('user', None) is not None:
self.es = Elasticsearch(
[f'http://{kwargs.get("user")}:{kwargs.get("password")}@{host}:{port}'], **kwargs)
elif kwargs.get('ssl'):
self.es = Elasticsearch(
[f'https://{host}'], use_ssl=True, **kwargs)
else:
self.es = Elasticsearch([f'http://{host}:{port}'], **kwargs)
def generate_md5_key(self, item, keys):
try:
key = '_'.join([str(item[k]) for k in item.keys() if k in keys])
hash = hashlib.md5(key.encode('utf-8'))
return hash.hexdigest()
except:
key = '_'.join([item[k] for k in item.keys() if k in keys])
return key
def build_action(self, index, dt_field, item_type, keys, item):
assert item_type.lower() in [
"record"], "record type required default: record'"
item['@timestamp'] = item[dt_field]
ndx = index
# ndx = f'{index}_{item[dt_field]}'
action = {
"_op_type": 'index',
"_index": ndx,
"_type": item_type,
"_id": self.generate_md5_key(item, keys),
"_source": item
}
return action
def upsert_index(self, index, dt_field, item_type, keys, items, progress=True, batch_size=1000, timeout=15):
logging.info("streaming bulk data to elastic...")
actions = []
for item in items:
action = self.build_action(index, dt_field, item_type, keys, item)
actions.append(action)
if progress:
for x in tqdm(helpers.streaming_bulk(self.es, actions,
chunk_size=batch_size,
request_timeout=timeout), total=len(actions)): continue
else:
helpers.bulk(self.es, actions, chunk_size=batch_size,
request_timeout=timeout)
actions = []
logging.info(f"finished : [{len(actions)}] records")
if __name__ == "__main__":
# elastic required index and unique spec
INDEX_PREFIX = 'data'
TIMESTAMP_FIELD = 'obs_ts'
UNIQUE_ITEM_KEY = ['@timestamp']
es_con = Storage('localhost', '9200', ssl=False)
# timeseries data
# if dataset is above 1gb please think about the following
# * drop uneeded columns
# * define smaller pandas cols datatypes
# * define smaller CHUNK_SIZE to split large file into smaller chunks
import pandas as pd
CSV_CHUNK_SIZE=10000
data_itr = pd.read_csv(r'../one_miilion.csv', chunksize=CSV_CHUNK_SIZE)
# Each chunk is df transformed to filtered dict
for data_chunk in data_itr:
# data filtering null values from elastic
data_filter = [{k: v for k, v in m.items() if pd.notnull(v)}
for m in data_chunk.to_dict(orient='rows')]
# Once the data filtering is done, send data to elastic
es_con.upsert_index(INDEX_PREFIX, TIMESTAMP_FIELD,
'record', UNIQUE_ITEM_KEY, data_filter)
# clear
data_filter = []
logging.info('done')
@hajimurtaza
Copy link

Also what does line 4 do?

@semyont
Copy link
Author

semyont commented Apr 3, 2020

@hajimurtaza let me get back to you with some updated code using pandas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment