Last active
July 20, 2023 16:13
-
-
Save semyont/c79ec9a9b6a19b8dcc0c07ce547a957f to your computer and use it in GitHub Desktop.
large timeseries csv streaming upsert bulk elasticseach #index #pandas #bigdata #csv #upsert #elasticsearch #progressbar #example #bulk #stream #dataops #dataengineer #timeseries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import hashlib | |
from elasticsearch import Elasticsearch | |
from elasticsearch import helpers | |
from tqdm import tqdm | |
class Storage: | |
es = None | |
def __init__(self, host, port, **kwargs): | |
if kwargs.get('user', None) is not None: | |
self.es = Elasticsearch( | |
[f'http://{kwargs.get("user")}:{kwargs.get("password")}@{host}:{port}'], **kwargs) | |
elif kwargs.get('ssl'): | |
self.es = Elasticsearch( | |
[f'https://{host}'], use_ssl=True, **kwargs) | |
else: | |
self.es = Elasticsearch([f'http://{host}:{port}'], **kwargs) | |
def generate_md5_key(self, item, keys): | |
try: | |
key = '_'.join([str(item[k]) for k in item.keys() if k in keys]) | |
hash = hashlib.md5(key.encode('utf-8')) | |
return hash.hexdigest() | |
except: | |
key = '_'.join([item[k] for k in item.keys() if k in keys]) | |
return key | |
def build_action(self, index, dt_field, item_type, keys, item): | |
assert item_type.lower() in [ | |
"record"], "record type required default: record'" | |
item['@timestamp'] = item[dt_field] | |
ndx = index | |
# ndx = f'{index}_{item[dt_field]}' | |
action = { | |
"_op_type": 'index', | |
"_index": ndx, | |
"_type": item_type, | |
"_id": self.generate_md5_key(item, keys), | |
"_source": item | |
} | |
return action | |
def upsert_index(self, index, dt_field, item_type, keys, items, progress=True, batch_size=1000, timeout=15): | |
logging.info("streaming bulk data to elastic...") | |
actions = [] | |
for item in items: | |
action = self.build_action(index, dt_field, item_type, keys, item) | |
actions.append(action) | |
if progress: | |
for x in tqdm(helpers.streaming_bulk(self.es, actions, | |
chunk_size=batch_size, | |
request_timeout=timeout), total=len(actions)): continue | |
else: | |
helpers.bulk(self.es, actions, chunk_size=batch_size, | |
request_timeout=timeout) | |
actions = [] | |
logging.info(f"finished : [{len(actions)}] records") | |
if __name__ == "__main__": | |
# elastic required index and unique spec | |
INDEX_PREFIX = 'data' | |
TIMESTAMP_FIELD = 'obs_ts' | |
UNIQUE_ITEM_KEY = ['@timestamp'] | |
es_con = Storage('localhost', '9200', ssl=False) | |
# timeseries data | |
# if dataset is above 1gb please think about the following | |
# * drop uneeded columns | |
# * define smaller pandas cols datatypes | |
# * define smaller CHUNK_SIZE to split large file into smaller chunks | |
import pandas as pd | |
CSV_CHUNK_SIZE=10000 | |
data_itr = pd.read_csv(r'../one_miilion.csv', chunksize=CSV_CHUNK_SIZE) | |
# Each chunk is df transformed to filtered dict | |
for data_chunk in data_itr: | |
# data filtering null values from elastic | |
data_filter = [{k: v for k, v in m.items() if pd.notnull(v)} | |
for m in data_chunk.to_dict(orient='rows')] | |
# Once the data filtering is done, send data to elastic | |
es_con.upsert_index(INDEX_PREFIX, TIMESTAMP_FIELD, | |
'record', UNIQUE_ITEM_KEY, data_filter) | |
# clear | |
data_filter = [] | |
logging.info('done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Also what does line 4 do?