Created January 26, 2018 01:53
script to injest reddit comments data to elastic search for better searching. edited field is ignored when ingesting
import sys
import os
import re
import gc
import traceback
import tqdm
import mmap
import datetime as dt
import bz2
import json
import string
import itertools
import multiprocessing as mp
import time
from elasticsearch import Elasticsearch, TransportError
from pathlib import Path
import logging
import logging.config
# logging setup
logging_config = dict(
version = 1,
formatters = {
'f': {'format':
'%(asctime)s %(name)-12s %(processName)s %(levelname)-8s %(message)s'}
handlers = {
'h': {'class': 'logging.StreamHandler',
'formatter': 'f',
'level': logging.DEBUG}
root = {
'handlers': ['h'],
'level': logging.DEBUG,
logger = logging.getLogger('reddit_comments_to_es')
logger.propagate = False
fh = logging.FileHandler('reddit_comments_to_es.log')
# helper functions
def getNumberOfLines(file_path):
fp = open(file_path, "r+")
buf = mmap.mmap(fp.fileno(), 0)
lines = 0
while buf.readline():
lines += 1
return lines
def process_line(in_queue, es, index_name, doc_type):
json_data = None
retry = 0
retry_error = ""
json_fix_re = re.compile(r'\'failed\sto\sparse\s\[(\w+)\]\'')
while True:
if retry == 0 or retry > 3:
line_no, line = in_queue.get()
retry = 0
# logger.debug('getting new ones {}'.format(line_no))
# exit signal
if line == None:
logger.debug("completed ... ")
# put data into file
json_data = json.loads(line)
if 'edited' in json_data:
del json_data['edited']
json_fix_field =
if retry > 0 and json_fix_field:
field =
logger.debug('TransportError: issue with {}: updated {} to {}'.format(field,json_data[field],default_mappings[field]))
if field in json_data:
# logger.debug('Issue Json: {}'.format(json_data))
json_data[field] = default_mappings[field]
# logger.debug('Fixed Json: {}'.format(json_data))
es.index(index=index_name, doc_type=doc_type, body=json_data)
retry = 0
except TransportError as te:
retry_error = str(te)
logger.debug('TransportError: {}'.format(retry_error))
logger.debug('json_data: {}: {}'.format(line_no, line))
retry += 1
logger.debug('retrying ... {}'.format(str(retry)))
except Exception as e:
logger.debug('Exception in process: {}'.format(str(e)))
logger.debug('parent process:{} process id:{}'.format(os.getppid(), os.getpid()))
logger.debug('index_name: {} doc_type: {}'.format(index_name, doc_type))
# file names
reddit_raw_file_path = "/media/bicepjai/data/reddit/reddit_data/raw_data"
# =================================================
# elastic search instance needs to be running
# =================================================
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -v /home/bicepjai/Projects/docker_files/elastic-search/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/data:/var/lib/elasticsearch -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/logs:/var/log/elasticsearch
index_mappings = {
"settings": {
"index.mapping.ignore_malformed": True,
"dynamic": False
"mappings": {
"subreddit": { "null_value": "null", "type": "text" },
"body": { "null_value": "null", "type": "text" },
"stickied": { "null_value": False, "type": "boolean" },
"gilded": { "null_value": 0, "type": "integer" },
"score": { "null_value": 0, "type": "integer" },
"author": { "null_value": "null", "type": "text" },
"link_id": { "null_value": "null", "type": "text" },
"retrieved_on": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" },
"subreddit_id": { "null_value": "null", "type": "text" },
"controversiality": { "null_value": 0, "type": "integer" },
"author_flair_css_class": { "null_value": "null", "type": "text" },
"author_flair_text": { "null_value": "null", "type": "text" },
"created_utc": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" },
"distinguished": { "null_value": "null", "type": "text" },
"parent_id": { "null_value": "null", "type": "text" },
"id": { "null_value": "null", "type": "text" },
# "edited": { "null_value": False, "type": "boolean" }
default_mappings = {
"subreddit": "nul",
"body": "nul",
"stickied": False,
"gilded": 0,
"score": 0,
"author": "nul",
"link_id": "nul",
"retrieved_on": 0,
"subreddit_id": "nul",
"controversiality": 0,
"author_flair_css_class": "nul",
"author_flair_text": "nul",
"created_utc": 0,
"distinguished": "nul",
"parent_id": "nul",
"id": "nul",
# "edited": False
es = Elasticsearch()
logger.debug('Elasticsearch Info: %s', str(
# ====================================================================
# order all the files that needs to be read and update elastic search
# ====================================================================
all_files = []
for filename in Path(reddit_raw_file_path).glob('*/*.bz2'):
all_files += [str(filename)]
all_files = sorted(all_files, reverse=True)
logger.debug('total number of files: %s', len(all_files))
# ====================================================================
# iterate thru files
# ====================================================================
# testing
# all_files = ['/home/bicepjai/Projects/ml-compete/kaggle/05_toxic/dataset/external/reddit_comments/sample.json']
# go through all the non processes or partially processed files
for current_file in all_files:
logger.debug('writing from file .... ')
logger.debug('current_file: %s', current_file)
total_lines = getNumberOfLines(current_file)
logger.debug('total_lines in current_file: %s', total_lines)
year_str = current_file.split('/')[-2]
# testing
# year_str = "2017"
index_name = "reddit-comments-"+year_str
doc_type = "json-comments"
logger.debug('Elastic Search index_name: %s', index_name)
logger.debug('Elastic Search doc_type: %s', doc_type)
# testing
# es.indices.delete(index=index_name, ignore=[400, 404])
es.indices.create(index=index_name, ignore=400, body=index_mappings)
# lets create multiple processes and queues
# to make the loading faster
num_workers = 11
manager = mp.Manager()
work_q = manager.Queue(num_workers)
# start for worker processes
pools = []
for i in range(num_workers):
es1 = Elasticsearch()
p = mp.Process(target=process_line, args=(work_q, es1, index_name, doc_type))
logger.debug('created workers ...')
# produce data for all the processes
with bz2.BZ2File(current_file, 'r') as f:
# testing
# with open(current_file, 'r') as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in tqdm.tqdm(enumerate(iters), total=total_lines):
logger.debug('closing pool connections')
for p in pools:
except (KeyboardInterrupt, Exception) as e:
logger.debug('Exception/KeyboardInterrupt occured .... ')
logger.debug('current_file: %s', current_file)
logger.debug('Exception: %s', str(e))
logger.debug('traceback.format_exc(): \n%s', traceback.format_exc())
