Skip to content

Instantly share code, notes, and snippets.

@mattweber
Last active December 13, 2015 22:19
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save mattweber/4983404 to your computer and use it in GitHub Desktop.
Save mattweber/4983404 to your computer and use it in GitHub Desktop.
Using ElasticSearch To Find The Best Time To Ask Questions on StackOverflow
Use extractDocs.py to parse and index the StackOverflow posts.xml file into an existing index.
Usage: extractDocs.py [options] file
Options:
-h, --help show this help message and exit
-s SERVER, --server=SERVER
ElasticSearch Server
-i INDEX, --index=INDEX
Index name to use
-b BULKSIZE, --bulk-size=BULKSIZE
Number of docs to submit in each bulk request.
-m MAXDOCS, --max-docs=MAXDOCS
Max number of docs to index
-v, --verbose Enable verbose output
Use the setup script to execute the following steps:
- create index with parent mapping
- feed documents using bulk indexing via extractDocs.py
- optimize the index
- execute the query
Run with ./setup.sh /path/to/posts.xml
import re
import sys
import json
import httplib
from datetime import datetime
from HTMLParser import HTMLParser
from optparse import OptionParser
# pre-compiled regular expressions
rowRe = re.compile('^\s*<row') # detects a row
attrRe = re.compile('(\w+)="(.*?)"') # extracts all attribues and values
cleanupRe = re.compile('<[^<]+?>|[\r\n]+|\s+') # strips out html and extra whitespace
tagsRe = re.compile('&lt;(.*?)&gt;') # splits tags into a list
intRe = re.compile('^\d+$') # determines if field is an integer
def http(method, path, data):
""" Execute a Generic HTTP Request againt ElasticSearch """
conn = httplib.HTTPConnection(HOST)
conn.request(method, path, data)
return conn.getresponse()
def bulk(data):
""" Submit Bulk ElasticSearch Request """
resp = http('POST', '/%s/_bulk' % INDEX, data)
# load response and check for errors
status = json.loads(resp.read())
for stat in status['items']:
if not stat['index']['ok']:
print json.dumps(stat)
return resp
def main(fileName):
""" Parse StackExchange data into ElasticSearch Bulk Format """
# html parser used to unescape the body and title
html = HTMLParser()
with open(fileName) as f:
docs = []
for i, line in enumerate(f):
# skip line if not a row
if rowRe.match(line) is None:
continue
# build the document to be indexed
doc = {}
for field, val in attrRe.findall(line):
# strip whitespace and skip field if empty value
val = val.strip()
if not val:
continue
# cleanup title and body by stripping html and whitespace
if field in ['Body', 'Title']:
val = cleanupRe.sub(' ', html.unescape(unicode(val, 'utf-8', 'ignore')))
# make sure dates are in correct format
elif field in ['CreationDate', 'LastActivityDate', 'LastEditDate']:
# 2008-07-31T21:42:52.667
val = '%sZ' % val
# parse creation month, day, hour, and minute
if field == 'CreationDate':
dateObj = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ')
doc['CreationMonth'] = dateObj.strftime('%B')
doc['CreationDay'] = dateObj.strftime('%A')
doc['CreationHour'] = dateObj.strftime('%H')
doc['CreationMinute'] = dateObj.strftime('%M')
# split tags into an aray of tags
elif field == 'Tags':
val = tagsRe.findall(val)
# convert vals to integers if needed
elif intRe.match(val) is not None:
val = int(val)
doc[field] = val
# create the bulk action
action = {'_id': '%s' % doc['Id'], '_type': 'question'}
if doc['PostTypeId'] == 2:
action['_type'] = 'answer'
action['_parent'] = '%s' % doc['ParentId']
# queue bulk json request
docs.append(json.dumps({'index': action}))
docs.append(json.dumps(doc))
# submit bulk request
if len(docs) == BULK_SIZE * 2: # multiple by 2 to account for action
if VERBOSE:
print 'Submitting %s docs...' % BULK_SIZE
bulk('%s\n' % '\n'.join(docs)) # newline so last item is processed
docs = []
# only index a subset of the posts
# set max to -1 for all docs
if MAX_DOCS != -1 and i == MAX_DOCS + 1:
if VERBOSE:
print 'Hit max document count of %s' % MAX_DOCS
break;
# submit any hanging requests
if len(docs) > 0:
if VERBOSE:
print 'Submitting remaing %s docs in queue...' % (len(docs) / 2)
bulk('%s\n' % '\n'.join(docs))
return 0
if __name__ == '__main__':
usage = 'usage: %prog [options] file'
parser = OptionParser(usage)
parser.add_option('-s', '--server', dest='server', default='localhost:9200', help='ElasticSearch Server')
parser.add_option('-i', '--index', dest='index', default='stackoverflow', help='Index name to use')
parser.add_option('-b', '--bulk-size', dest='bulkSize', type='int', default=100000, help='Number of docs to submit in each bulk request.')
parser.add_option('-m', '--max-docs', dest='maxDocs', type='int', default=-1, help='Max number of docs to index')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true', default=False, help='Enable verbose output')
options, args = parser.parse_args()
if len(args) != 1:
parser.error('The StackOverflow posts.xml file location must be specified')
# globals
HOST = options.server
INDEX = options.index
BULK_SIZE = options.bulkSize
MAX_DOCS = options.maxDocs
VERBOSE = options.verbose
ret = main(args[0])
sys.exit(ret)
#!/bin/bash
HOST="localhost:9200"
INDEX="stackoverflow"
BATCH_SIZE=10000
MAX_DOCS=1500000
echo "Deleting Existing Index"
curl -XDELETE "http://${HOST}/${INDEX}"
echo -e "\n\nCreating index: ${INDEX}"
curl -XPUT "http://${HOST}/${INDEX}" -d '{
"settings": {
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings": {
"question": {
"_all": {"enabled": false}
},
"answer": {
"_all": {"enabled": false},
"_parent": {
"type": "question"
}
}
}
}'
echo -e "\n\nFeeding Documents"
python extractDocs.py -s ${HOST} -i ${INDEX} -b ${BATCH_SIZE} -m ${MAX_DOCS} -v $1
echo -e "\n\nEnabling index refresh interval"
curl -XPUT "http://${HOST}/${INDEX}/_settings" -d '{
"index": {
"refresh_interval": "1s"
}
}'
echo -e "\n\nOptimizing index"
curl -XPOST "http://${HOST}/${INDEX}/_optimize?max_num_segments=5"
echo -e "\n\nExecuting Search"
curl -XGET "http://${HOST}/${INDEX}/_search?pretty=true" -d '{
"size": 0,
"query": {
"has_parent": {
"parent_type": "question",
"_scope": "parents",
"query": {
"query_string": {
"query": "+java +javascript",
"default_field": "Tags"
}
}
}
},
"facets": {
"month": {
"terms": {
"field": "CreationMonth"
}
},
"day": {
"terms": {
"field": "CreationDay"
}
},
"hour": {
"terms": {
"field": "CreationHour",
"size": 10
}
},
"minute": {
"terms": {
"field": "CreationMinute",
"size": 10
}
},
"score": {
"statistical": {
"field": "Score"
},
"scope": "parents"
},
"score_by_tags": {
"terms_stats": {
"key_field": "Tags",
"value_field": "Score"
},
"scope": "parents"
}
}
}'
echo -e "\n\nDone."
exit 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment