#!/bin/python3 | |
import json | |
import moment | |
from elasticsearch import helpers | |
from elasticsearch import Elasticsearch | |
esConnString = 'http://elastic:changeme@localhost:9200' | |
esTo = Elasticsearch([esConnString],request_timeout=100) | |
## utility function | |
def prettyPrint(doc): | |
print(json.dumps(doc, indent=4, sort_keys=True)) | |
def loadObjFromJSONFile(fn): | |
with open(fn,'r') as infile: | |
return json.load(infile) | |
### Untappd Checkin download, a list of json objects representing checkins | |
fn = 'checkin-report_07_27_17.json' | |
checkins = loadObjFromJSONFile(fn) | |
BEER_INDEX_NAME = 'beer' | |
BEER_TYPE_NAME = "checkins" | |
BEER_PERCOLATE_NAME = "checkinspercolate" | |
BREWERY_COUNTRY_FIELD = "brewery_country" | |
LOCATION_FIELD = "location" | |
PERCOLATE_FIELD = "query" | |
SECURITY_TAGS_FIELD = "securityTags" | |
CREATED_AT_FIELD = "created_at" | |
beerBody = { | |
"settings": { | |
"number_of_shards" : 1, | |
"number_of_replicas": 0 | |
}, | |
"mappings": { | |
BEER_TYPE_NAME: { | |
"properties": { | |
"@timestamp": { | |
"type": "date", | |
"format": "strict_date_optional_time||epoch_millis" | |
}, | |
BREWERY_COUNTRY_FIELD: { "type": "keyword"}, | |
LOCATION_FIELD: { "type": "geo_point"} | |
} | |
}, | |
BEER_PERCOLATE_NAME : { | |
"properties": { | |
PERCOLATE_FIELD: {"type": "percolator"}, | |
SECURITY_TAGS_FIELD: {"type": "keyword"} | |
} | |
} | |
} | |
} | |
esTo.indices.delete(index=BEER_INDEX_NAME) | |
esTo.indices.create(index=BEER_INDEX_NAME, body=beerBody) | |
allBeerMatcher = { | |
SECURITY_TAGS_FIELD: "Beer", | |
PERCOLATE_FIELD: { | |
"match_all": {} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=allBeerMatcher, refresh=True) | |
print("insert all beer matcher") | |
domesticMatcher = { | |
SECURITY_TAGS_FIELD: "DomesticBeer", | |
PERCOLATE_FIELD : { | |
"match": { | |
"brewery_country": "United States" | |
} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=domesticMatcher, refresh=True) | |
print("insert domestic matcher") | |
homeLocation = [<<LON>>, <<LAT>>] ## the thing I am trying to protect | |
homeMatcher = { | |
SECURITY_TAGS_FIELD: "HomeDrinking", | |
PERCOLATE_FIELD : { | |
"geo_distance": { | |
"distance": "100m", | |
"location": homeLocation | |
} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=homeMatcher, refresh=True) | |
print("insert home matcher") | |
esTo.indices.refresh( index=BEER_INDEX_NAME) | |
def genPercolateQuery(doc): | |
return { | |
"query": { | |
"percolate": { | |
"field": PERCOLATE_FIELD, | |
"document_type": BEER_TYPE_NAME, | |
"document": doc | |
} | |
} | |
} | |
## the percolate queries have a security tags field in the percolate result | |
## grab them as a list | |
def tagsFromPercolateHits(returnObj): | |
tags = [] | |
for hit in returnObj["hits"]["hits"]: | |
if "_source" in hit: | |
if SECURITY_TAGS_FIELD in hit["_source"]: | |
tags.append(hit["_source"][SECURITY_TAGS_FIELD]) | |
return tags | |
## convert an untappd string into a python date object ... or is this a moment object? | |
def dateFromUntappdString( dateString ): | |
if(dateString != ""): | |
return moment.date(dateString, "YYYY-MM-DD HH:mm:ss").date | |
else: | |
return None | |
## Transformations before checkin | |
def prepCheckin(checkin): | |
## Location | |
if "venue_lat" in checkin and checkin["venue_lat"] != None: | |
checkin[LOCATION_FIELD] = [float(checkin["venue_lng"]), float(checkin["venue_lat"])] | |
## Clean timestamp | |
if(CREATED_AT_FIELD in checkin): | |
checkin["@timestamp"] = dateFromUntappdString(checkin[CREATED_AT_FIELD]).isoformat() | |
if("beer_abv" in checkin and checkin["beer_abv"] != None ): | |
checkin["beer_abv"] = float(checkin["beer_abv"]) | |
if("beer_ibu" in checkin and checkin["beer_ibu"] != None ): | |
checkin["beer_ibu"] = float(checkin["beer_ibu"]) | |
return checkin | |
## the bulk buffer and bulk action size | |
bulkActions = [] | |
writeBatchSize = 250 | |
for doc in checkins: | |
## static transforms | |
doc = prepCheckin(doc) | |
## percolate tagging | |
returnObj = esTo.search(index=BEER_INDEX_NAME,body=genPercolateQuery(doc),size=100,_source_include=SECURITY_TAGS_FIELD) | |
tags = tagsFromPercolateHits(returnObj) | |
doc["securityTags"] = tags | |
doc["securityTag_Count"] = len(tags) | |
## insert this document (build up bulk actions) | |
action = { | |
"_index": BEER_INDEX_NAME, | |
"_type": BEER_TYPE_NAME, | |
"_source": doc | |
} | |
bulkActions.append( action ) | |
## if the bulk buffer is full, execute the bulk | |
actionCount = len(bulkActions) | |
if(actionCount >= writeBatchSize ): | |
helpers.bulk(esTo, bulkActions) | |
bulkActions = [] | |
print("executed "+str(actionCount) + " actions") | |
## execute any remaning items in bulk buffer | |
if(actionCount > 0 ): | |
helpers.bulk(esTo, bulkActions) | |
bulkActions = [] | |
print("executed "+str(actionCount) + " actions") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment