Skip to content

Instantly share code, notes, and snippets.

@quiver
Created December 15, 2015 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quiver/57e481d5c2a5e2eb25b8 to your computer and use it in GitHub Desktop.
Save quiver/57e481d5c2a5e2eb25b8 to your computer and use it in GitHub Desktop.
Store SORACOM Air traffic to Elasticsearch Raw
# API https://dev.soracom.io/jp/docs/api/#!/Stats/get_stats_air_subscriber
"""
AWS Lambda function to retrieve traffic data from SORACOM and store them to Elasticsearch
"""
import datetime
import json
import requests
SORACOM_EMAIL="dummy@example.com"
SORACOM_PASSWORD="PASSWORD"
IMSI="1234567890"
def get_truncated_time():
# truncate minutes, seconds
# e.g. 2015/05/12 11:22:33 => 2015/05/12 11:00:00
return datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
def handler(event, context):
# get SORACOM traffic stats
headers = {
"Accept": "application/json",
"Content-type": "application/json",
}
payload = {
"email": SORACOM_EMAIL,
"password": SORACOM_PASSWORD,
}
result = requests.post(
"https://api.soracom.io/v1/auth",
headers = headers,
data = json.dumps(payload)
)
print "result of auth: [%d]%s" % (result.status_code, result.reason)
auth = json.loads(result.text)
headers = {
'Accept': 'application/json',
'X-Soracom-Api-Key': auth['apiKey'],
'X-Soracom-Token': auth['token'],
}
payload = {
'from' : (get_truncated_time() - datetime.timedelta(hours = 1)).strftime("%s"),
'to' : get_truncated_time().strftime("%s"),
'period' : 'minutes',
}
result = requests.get(
"https://api.soracom.io/v1/stats/air/subscribers/%s"%IMSI,
headers = headers,
params = payload,
)
print "result of stats/air/subscribers: [%d]%s" % (result.status_code, result.reason)
stats = json.loads(result.text)
# POST traffic stats to Elasticsearch
for record in stats:
for plan, stats_map in record['dataTrafficStatsMap'].items():
payload = {
"imsi": IMSI,
"date": record['date'],
"unixtime": record['unixtime'],
"plan": plan,
}
payload.update(stats_map)
result = requests.post(
"http://DUMMY.ap-northeast-1.es.amazonaws.com/soracom/stats",
data = json.dumps(payload)
)
print "result of stats/air/subscribers: [%d]%s" % (result.status_code, result.reason)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment