Skip to content

Instantly share code, notes, and snippets.

@bicepjai
Created January 26, 2018 01:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bicepjai/571c47fc1cb661ca81672420e64d235c to your computer and use it in GitHub Desktop.
Save bicepjai/571c47fc1cb661ca81672420e64d235c to your computer and use it in GitHub Desktop.
script to injest reddit comments data to elastic search for better searching. edited field is ignored when ingesting
import sys
import os
import re
import gc
import traceback
import tqdm
import mmap
import datetime as dt
import bz2
import json
import string
import itertools
import multiprocessing as mp
import time
from elasticsearch import Elasticsearch, TransportError
from pathlib import Path
import logging
import logging.config
#=====================================================
# logging setup
#=====================================================
logging_config = dict(
version = 1,
formatters = {
'f': {'format':
'%(asctime)s %(name)-12s %(processName)s %(levelname)-8s %(message)s'}
},
handlers = {
'h': {'class': 'logging.StreamHandler',
'formatter': 'f',
'level': logging.DEBUG}
},
root = {
'handlers': ['h'],
'level': logging.DEBUG,
},
)
logging.config.dictConfig(logging_config)
logger = logging.getLogger('reddit_comments_to_es')
logger.propagate = False
fh = logging.FileHandler('reddit_comments_to_es.log')
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
#=====================================================
# helper functions
#=====================================================
def getNumberOfLines(file_path):
fp = open(file_path, "r+")
buf = mmap.mmap(fp.fileno(), 0)
lines = 0
while buf.readline():
lines += 1
gc.collect()
return lines
def process_line(in_queue, es, index_name, doc_type):
json_data = None
retry = 0
retry_error = ""
json_fix_re = re.compile(r'\'failed\sto\sparse\s\[(\w+)\]\'')
while True:
try:
if retry == 0 or retry > 3:
line_no, line = in_queue.get()
retry = 0
# logger.debug('getting new ones {}'.format(line_no))
# exit signal
if line == None:
logger.debug("completed ... ")
return
# put data into file
json_data = json.loads(line)
if 'edited' in json_data:
del json_data['edited']
json_fix_field = json_fix_re.search(retry_error)
if retry > 0 and json_fix_field:
field = json_fix_field.group(1)
logger.debug('TransportError: issue with {}: updated {} to {}'.format(field,json_data[field],default_mappings[field]))
if field in json_data:
# logger.debug('Issue Json: {}'.format(json_data))
json_data[field] = default_mappings[field]
# logger.debug('Fixed Json: {}'.format(json_data))
es.index(index=index_name, doc_type=doc_type, body=json_data)
retry = 0
except TransportError as te:
retry_error = str(te)
logger.debug('TransportError: {}'.format(retry_error))
logger.debug('json_data: {}: {}'.format(line_no, line))
retry += 1
logger.debug('retrying ... {}'.format(str(retry)))
except Exception as e:
logger.debug('Exception in process: {}'.format(str(e)))
logger.debug('parent process:{} process id:{}'.format(os.getppid(), os.getpid()))
logger.debug('index_name: {} doc_type: {}'.format(index_name, doc_type))
#=====================================================
# file names
#=====================================================
reddit_raw_file_path = "/media/bicepjai/data/reddit/reddit_data/raw_data"
# =================================================
# elastic search instance needs to be running
# =================================================
"""
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -v /home/bicepjai/Projects/docker_files/elastic-search/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/data:/var/lib/elasticsearch -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/logs:/var/log/elasticsearch docker.elastic.co/elasticsearch/elasticsearch-oss:6.1.2
"""
index_mappings = {
"settings": {
"index.mapping.ignore_malformed": True,
"dynamic": False
},
"mappings": {
"subreddit": { "null_value": "null", "type": "text" },
"body": { "null_value": "null", "type": "text" },
"stickied": { "null_value": False, "type": "boolean" },
"gilded": { "null_value": 0, "type": "integer" },
"score": { "null_value": 0, "type": "integer" },
"author": { "null_value": "null", "type": "text" },
"link_id": { "null_value": "null", "type": "text" },
"retrieved_on": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" },
"subreddit_id": { "null_value": "null", "type": "text" },
"controversiality": { "null_value": 0, "type": "integer" },
"author_flair_css_class": { "null_value": "null", "type": "text" },
"author_flair_text": { "null_value": "null", "type": "text" },
"created_utc": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" },
"distinguished": { "null_value": "null", "type": "text" },
"parent_id": { "null_value": "null", "type": "text" },
"id": { "null_value": "null", "type": "text" },
# "edited": { "null_value": False, "type": "boolean" }
}
}
default_mappings = {
"subreddit": "nul",
"body": "nul",
"stickied": False,
"gilded": 0,
"score": 0,
"author": "nul",
"link_id": "nul",
"retrieved_on": 0,
"subreddit_id": "nul",
"controversiality": 0,
"author_flair_css_class": "nul",
"author_flair_text": "nul",
"created_utc": 0,
"distinguished": "nul",
"parent_id": "nul",
"id": "nul",
# "edited": False
}
try:
es = Elasticsearch()
logger.debug('Elasticsearch Info: %s', str(es.info()))
# ====================================================================
# order all the files that needs to be read and update elastic search
# ====================================================================
all_files = []
for filename in Path(reddit_raw_file_path).glob('*/*.bz2'):
all_files += [str(filename)]
all_files = sorted(all_files, reverse=True)
logger.debug('total number of files: %s', len(all_files))
# ====================================================================
# iterate thru files
# ====================================================================
gc.collect()
# testing
# all_files = ['/home/bicepjai/Projects/ml-compete/kaggle/05_toxic/dataset/external/reddit_comments/sample.json']
# go through all the non processes or partially processed files
for current_file in all_files:
logger.debug('writing from file .... ')
logger.debug('current_file: %s', current_file)
total_lines = getNumberOfLines(current_file)
logger.debug('total_lines in current_file: %s', total_lines)
year_str = current_file.split('/')[-2]
# testing
# year_str = "2017"
index_name = "reddit-comments-"+year_str
doc_type = "json-comments"
logger.debug('Elastic Search index_name: %s', index_name)
logger.debug('Elastic Search doc_type: %s', doc_type)
# testing
# es.indices.delete(index=index_name, ignore=[400, 404])
es.indices.create(index=index_name, ignore=400, body=index_mappings)
# lets create multiple processes and queues
# to make the loading faster
num_workers = 11
manager = mp.Manager()
work_q = manager.Queue(num_workers)
# start for worker processes
pools = []
for i in range(num_workers):
es1 = Elasticsearch()
p = mp.Process(target=process_line, args=(work_q, es1, index_name, doc_type))
p.start()
pools.append(p)
logger.debug('created workers ...')
# produce data for all the processes
with bz2.BZ2File(current_file, 'r') as f:
# testing
# with open(current_file, 'r') as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in tqdm.tqdm(enumerate(iters), total=total_lines):
work_q.put(num_and_line)
logger.debug('closing pool connections')
for p in pools:
p.join()
gc.collect()
except (KeyboardInterrupt, Exception) as e:
logger.debug('Exception/KeyboardInterrupt occured .... ')
logger.debug('current_file: %s', current_file)
logger.debug('Exception: %s', str(e))
logger.debug('traceback.format_exc(): \n%s', traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment