Skip to content

Instantly share code, notes, and snippets.

@tsouza
Created June 26, 2016 03:46
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tsouza/6fe443dc017dc4b961862ee43b86fb91 to your computer and use it in GitHub Desktop.
Save tsouza/6fe443dc017dc4b961862ee43b86fb91 to your computer and use it in GitHub Desktop.
import json
import moment
from elasticsearch import helpers
from elasticsearch import Elasticsearch
import Geohash
es = Elasticsearch("demo:9200")
def prettyPrint(doc):
print json.dumps(doc, indent=4, sort_keys=True)
writeBatchSize = 5000
fromIndex = "bike-dc"
toIndex = "bike-dc_predict"
type="logs"
es.indices.delete(index=toIndex, ignore=[400, 404])
day = 24
predictionQuery = {
# "query": {
# "range": {
# "startDate": {
# "gte" : "2015-01-01",
# "lt" : "2015-02-01"
# }
# }
# },
"aggs": {
"predict": {
"date_histogram": {
"field": "startDate",
"interval": "hour"
},
"aggs": {
"count": {
"value_count": {
"field": "memberType"
}
},
"prediction1": {
"moving_avg": {
"buckets_path": "count",
"window": day * 7 * 4,
"model": "holt_winters",
"minimize": True,
"settings": {
"type": "mult",
"period": day * 7,
"pad": True
}
}
},
"prediction2": {
"moving_avg": {
"buckets_path": "count",
"window": day * 7 * 4,
"model": "holt_winters",
"minimize": True,
"settings": {
"type": "mult",
"period": day * 7,
"pad": True,
"alpha": 0.8,
"beta": 0.1,
"gamma": 0.8
}
}
},
"prediction3": {
"moving_avg": {
"buckets_path": "count",
"window": day * 7 * 4,
"model": "holt_winters",
"minimize": True,
"settings": {
"type": "mult",
"period": day * 7,
"pad": True,
"alpha": 0.8,
"beta": 0.8,
"gamma": 0.1
}
}
}
}
}
}
}
res = es.search(index=fromIndex, search_type="count", body=predictionQuery)
buckets = res["aggregations"]["predict"]["buckets"]
bulkActions = []
for bucket in buckets:
m = moment.unix(bucket["key"]).date
doc = {
"@timestamp": bucket["key_as_string"]
}
if ("count" in bucket):
doc["count"] = bucket["count"]["value"]
if ("prediction1" in bucket):
doc["prediction1"] = bucket["prediction1"]["value"]
doc['surprise1'] = max(0, 10.0 * (doc["count"] - doc["prediction1"]) / doc["prediction1"])
if ("prediction2" in bucket):
doc["prediction2"] = bucket["prediction2"]["value"]
doc['surprise2'] = max(0, 10.0 * (doc["count"] - doc["prediction2"]) / doc["prediction2"])
if ("prediction3" in bucket):
doc["prediction3"] = bucket["prediction3"]["value"]
doc['surprise3'] = max(0, 10.0 * (doc["count"] - doc["prediction3"]) / doc["prediction3"])
action = {
"_index": toIndex,
"_type": "logs",
"_source": doc
}
bulkActions.append(action)
if (len(bulkActions) >= writeBatchSize):
helpers.bulk(es, bulkActions)
bulkActions = []
if (len(bulkActions) > 0):
helpers.bulk(es, bulkActions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment