Created
July 19, 2017 11:51
-
-
Save kojiisd/a8f5eb276ba16c6b31c7701cad4484a1 to your computer and use it in GitHub Desktop.
AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加) ref: http://qiita.com/kojiisd/items/228acc17c41194a2f2b6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"label_id": "id", | |
"label_range": "timestamp", | |
"id": [ | |
"sensor1", | |
"sensor2" | |
], | |
"aggregator": "latest", | |
"time_from": "2017-04-30T22:00:00.000", | |
"time_to": "2017-04-30T22:06:00.000", | |
"params": { | |
"range": "timestamp" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"[{\"timestamp\": \"2017-04-30T22:05:00.000\", \"score\": 0.0, \"id\": \"sensor1\"}, {\"timestamp\": \"2017-04-30T22:06:00.000\", \"score\": 1.0, \"id\": \"sensor2\"}]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import boto3 | |
import json | |
import decimal | |
import os | |
from boto3.dynamodb.conditions import Key | |
from aggregator.lambda_aggregator import LambdaAggregator | |
from aggregator.latest_aggregator import LatestAggregator | |
from aggregator.max_aggregator import MaxAggregator | |
from aggregator.min_aggregator import MinAggregator | |
from aggregator.sum_aggregator import SumAggregator | |
from aggregator.avg_aggregator import AvgAggregator | |
from aggregator.count_aggregator import CountAggregator | |
dynamodb = boto3.resource('dynamodb') | |
table = dynamodb.Table(os.environ['TABLE']) | |
aggregator_map = {} | |
aggregator_map['latest'] = LatestAggregator() | |
aggregator_map['max'] = MaxAggregator() | |
aggregator_map['min'] = MinAggregator() | |
aggregator_map['sum'] = SumAggregator() | |
aggregator_map['avg'] = AvgAggregator() | |
aggregator_map['count'] = CountAggregator() | |
def run(event, context): | |
check_params(event) | |
result = [] | |
for id in event['id']: | |
res = table.query( | |
KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']), | |
ScanIndexForward=False | |
) | |
return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params']) | |
result.append(return_response) | |
return json.dumps(result, default=decimal_default) | |
def decimal_default(obj): | |
if isinstance(obj, decimal.Decimal): | |
return float(obj) | |
raise TypeError | |
def check_params(params): | |
if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params: | |
sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.") | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment