Skip to content

Instantly share code, notes, and snippets.

@ggnanasekaran77
Last active September 25, 2021 02:00
Show Gist options
  • Save ggnanasekaran77/a0efdfdb464dd0f333130e1995d96104 to your computer and use it in GitHub Desktop.
Save ggnanasekaran77/a0efdfdb464dd0f333130e1995d96104 to your computer and use it in GitHub Desktop.
Multiprocessing CSV to Elastic
from multiprocessing import Pool
import time
import os
import pandas as pd
from elasticsearch import Elasticsearch, helpers
def csvToElastic(file):
client = Elasticsearch("localhost:9200", http_compress=True)
header_list = ["Time", "Url", "Uri", "RemoteIP"]
esResult = []
chunkCount = 0
for data in pd.read_csv(file, sep=';', header=None, chunksize=100000, low_memory=False, encoding='utf-8',
escapechar='\\', usecols=[1, 11, 12, 15], names=header_list):
data["RemoteIP"] = data["RemoteIP"].str.split(':', expand=True)[0]
data["FileName"] = data["Uri"].str.split('/').str[-1]
data["AppCode"] = data["Url"].str.replace('(.*)appcode=(.*);', '\\2', regex=True).str.split(';').str[0]
indexName = "csv-log-" + file.split('/')[-2] + "-" + file.split('/')[-1]
indexName = indexName.replace('.log', '')
resp = helpers.bulk(client, data.to_dict(orient='records'), index=indexName, pipeline='geoip')
print(resp)
chunkCount += 100000
print("Processed.... " + str(chunkCount))
esResult.append(resp)
return esResult
if __name__ == '__main__':
with Pool(processes=6) as pool:
path = os.getcwd() + "/csv"
files = [os.path.join(root, file) for root, dirs, files in os.walk(path) for file in files if
file.endswith(".log")]
start = time.time()
print(pool.map(csvToElastic, files))
print(time.time() - start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment