Skip to content

Instantly share code, notes, and snippets.

@derickson
Last active July 18, 2018 13:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save derickson/ff0aeb22630433008372d19487725f89 to your computer and use it in GitHub Desktop.
Save derickson/ff0aeb22630433008372d19487725f89 to your computer and use it in GitHub Desktop.
#!/bin/python3
import json
import moment
from elasticsearch import helpers
from elasticsearch import Elasticsearch
esConnString = 'http://elastic:changeme@localhost:9200'
esTo = Elasticsearch([esConnString],request_timeout=100)
## utility function
def prettyPrint(doc):
print(json.dumps(doc, indent=4, sort_keys=True))
def loadObjFromJSONFile(fn):
with open(fn,'r') as infile:
return json.load(infile)
### Untappd Checkin download, a list of json objects representing checkins
fn = 'checkin-report_07_27_17.json'
checkins = loadObjFromJSONFile(fn)
BEER_INDEX_NAME = 'beer'
BEER_TYPE_NAME = "checkins"
BEER_PERCOLATE_NAME = "checkinspercolate"
BREWERY_COUNTRY_FIELD = "brewery_country"
LOCATION_FIELD = "location"
PERCOLATE_FIELD = "query"
SECURITY_TAGS_FIELD = "securityTags"
CREATED_AT_FIELD = "created_at"
beerBody = {
"settings": {
"number_of_shards" : 1,
"number_of_replicas": 0
},
"mappings": {
BEER_TYPE_NAME: {
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
BREWERY_COUNTRY_FIELD: { "type": "keyword"},
LOCATION_FIELD: { "type": "geo_point"}
}
},
BEER_PERCOLATE_NAME : {
"properties": {
PERCOLATE_FIELD: {"type": "percolator"},
SECURITY_TAGS_FIELD: {"type": "keyword"}
}
}
}
}
esTo.indices.delete(index=BEER_INDEX_NAME)
esTo.indices.create(index=BEER_INDEX_NAME, body=beerBody)
allBeerMatcher = {
SECURITY_TAGS_FIELD: "Beer",
PERCOLATE_FIELD: {
"match_all": {}
}
}
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=allBeerMatcher, refresh=True)
print("insert all beer matcher")
domesticMatcher = {
SECURITY_TAGS_FIELD: "DomesticBeer",
PERCOLATE_FIELD : {
"match": {
"brewery_country": "United States"
}
}
}
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=domesticMatcher, refresh=True)
print("insert domestic matcher")
homeLocation = [<<LON>>, <<LAT>>] ## the thing I am trying to protect
homeMatcher = {
SECURITY_TAGS_FIELD: "HomeDrinking",
PERCOLATE_FIELD : {
"geo_distance": {
"distance": "100m",
"location": homeLocation
}
}
}
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=homeMatcher, refresh=True)
print("insert home matcher")
esTo.indices.refresh( index=BEER_INDEX_NAME)
def genPercolateQuery(doc):
return {
"query": {
"percolate": {
"field": PERCOLATE_FIELD,
"document_type": BEER_TYPE_NAME,
"document": doc
}
}
}
## the percolate queries have a security tags field in the percolate result
## grab them as a list
def tagsFromPercolateHits(returnObj):
tags = []
for hit in returnObj["hits"]["hits"]:
if "_source" in hit:
if SECURITY_TAGS_FIELD in hit["_source"]:
tags.append(hit["_source"][SECURITY_TAGS_FIELD])
return tags
## convert an untappd string into a python date object ... or is this a moment object?
def dateFromUntappdString( dateString ):
if(dateString != ""):
return moment.date(dateString, "YYYY-MM-DD HH:mm:ss").date
else:
return None
## Transformations before checkin
def prepCheckin(checkin):
## Location
if "venue_lat" in checkin and checkin["venue_lat"] != None:
checkin[LOCATION_FIELD] = [float(checkin["venue_lng"]), float(checkin["venue_lat"])]
## Clean timestamp
if(CREATED_AT_FIELD in checkin):
checkin["@timestamp"] = dateFromUntappdString(checkin[CREATED_AT_FIELD]).isoformat()
if("beer_abv" in checkin and checkin["beer_abv"] != None ):
checkin["beer_abv"] = float(checkin["beer_abv"])
if("beer_ibu" in checkin and checkin["beer_ibu"] != None ):
checkin["beer_ibu"] = float(checkin["beer_ibu"])
return checkin
## the bulk buffer and bulk action size
bulkActions = []
writeBatchSize = 250
for doc in checkins:
## static transforms
doc = prepCheckin(doc)
## percolate tagging
returnObj = esTo.search(index=BEER_INDEX_NAME,body=genPercolateQuery(doc),size=100,_source_include=SECURITY_TAGS_FIELD)
tags = tagsFromPercolateHits(returnObj)
doc["securityTags"] = tags
doc["securityTag_Count"] = len(tags)
## insert this document (build up bulk actions)
action = {
"_index": BEER_INDEX_NAME,
"_type": BEER_TYPE_NAME,
"_source": doc
}
bulkActions.append( action )
## if the bulk buffer is full, execute the bulk
actionCount = len(bulkActions)
if(actionCount >= writeBatchSize ):
helpers.bulk(esTo, bulkActions)
bulkActions = []
print("executed "+str(actionCount) + " actions")
## execute any remaning items in bulk buffer
if(actionCount > 0 ):
helpers.bulk(esTo, bulkActions)
bulkActions = []
print("executed "+str(actionCount) + " actions")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment