Skip to content

Instantly share code, notes, and snippets.

@yiwang
Forked from codyaray/BinaryToString.java
Created April 8, 2014 20:09
Show Gist options
  • Save yiwang/10183500 to your computer and use it in GitHub Desktop.
Save yiwang/10183500 to your computer and use it in GitHub Desktop.
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Converts the first tuple from a byte array into a string.
*/
public class BinaryToString extends BaseFunction {
private static final long serialVersionUID = -8686873770270590062L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String field = new String(tuple.getBinary(0));
collector.emit(new Values(field));
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import java.math.RoundingMode;
import backtype.storm.tuple.Values;
import com.google.common.math.DoubleMath;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Maps a timestamp into a bucket of size {@code interval}.
*
* Assumes the first tuple value is a long timestamp.
* Outputs the {@code bucketStart} and {@code bucketEnd}.
*/
public class Bucket extends BaseFunction {
private static final long serialVersionUID = 1042081321412192768L;
private final long interval;
public Bucket(long interval) {
this.interval = interval;
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
long timestamp = tuple.getLong(0);
long bucketStart = DoubleMath.roundToLong(
Math.floor(timestamp / interval), RoundingMode.UNNECESSARY) * interval;
long bucketEnd = bucketStart + interval;
collector.emit(new Values(String.valueOf(bucketStart), String.valueOf(bucketEnd)));
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.json;
import backtype.storm.tuple.Values;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Parses the first tuple as a JSON string into a metric tuple
* with the form (timestamp, name, value).
*
* The input JSON must have the form
* <pre>{@code
* {
* "name": [string],
* "timestamp": [long],
* "value": [long]
* }
* }</pre>
*/
public class MetricJsonParser extends BaseFunction {
private static final long serialVersionUID = 7592816813615529588L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try {
JSONArray array = new JSONArray(tuple.getString(0));
for (int i = 0; i < array.length(); i++) {
JSONObject object = array.getJSONObject(i);
String name = object.getString("name");
long timestamp = object.getLong("timestamp");
long value = object.getLong("value");
collector.emit(new Values(timestamp, name, value));
}
} catch (JSONException e) {
System.out.println("Problem with incoming JSON: " + e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment