Skip to content

Instantly share code, notes, and snippets.

@thomasdarimont
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomasdarimont/f09bf0812b4501b8eb13 to your computer and use it in GitHub Desktop.
Save thomasdarimont/f09bf0812b4501b8eb13 to your computer and use it in GitHub Desktop.
Example for a custom groovy based XD aggregator that is able to combine multiple messages for a logical group when a "group change" is detected.
import org.springframework.xd.tuple.TupleBuilder
enum Aggregator{
INSTANCE
long lastTimestampSeen = -1
def values = [:].withDefault { key -> 0.0 }
def emptyList = []
def update(msg){
//println(msg)
//timestamp: ts defines our "logical group"
def result = emptyList
if(lastTimestampSeen != -1 && msg.getLong("ts") > lastTimestampSeen){
//we are past our logical group -> thus we flush the aggregated values
//Build aggregated values by house
result = []
values.each{ k, v ->
def aggregatedTuple = TupleBuilder.tuple()
.put("ts",lastTimestampSeen)
.put("house_id",k)
.put("load",v )
.build()
result.add( aggregatedTuple )
}
//Cleanup now stale aggregate data
values.clear()
}
//Aggreate load per house
values[msg.getString("house_id")] += msg.getDouble("load")
lastTimestampSeen = msg.getLong("ts")
return result
}
}
Aggregator.INSTANCE.update(payload)

Stream definition

stream create --name simple --definition "http --outputType=application/x-xd-tuple | script --location=aggregate.groovy | filter --expression=!payload.isEmpty() | splitter | log" --deploy

Post http data

http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":1, \"house_id\":1, \"plug_id\":0, \"load\":11}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":1, \"house_id\":1, \"plug_id\":1, \"load\":21}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":1, \"house_id\":1, \"plug_id\":2, \"load\":31}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":1, \"house_id\":2, \"plug_id\":0, \"load\":40}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":1, \"house_id\":1, \"plug_id\":3, \"load\":41}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":2, \"house_id\":1, \"plug_id\":0, \"load\":50}"
http post --target http://localhost:9000 --contentType application/json --data "{ \"ts\":3, \"house_id\":2, \"plug_id\":0, \"load\":50}"

Output shows

22:44:38,588  INFO DeploymentsPathChildrenCache-0 server.ContainerRegistrar - Deploying module 'http' for stream 'simple'
22:44:38,692  INFO DeploymentsPathChildrenCache-0 server.ContainerRegistrar - Deploying module [ModuleDescriptor@16ea92ae moduleName = 'http', moduleLabel = 'http', group = 'simple', sourceChannelName = [null], sinkChannelName = [null], sinkChannelName = [null], index = 0, type = source, parameters = map['outputType' -> 'application/x-xd-tuple'], children = list[[empty]]]
22:44:39,027  INFO Deployer server.StreamDeploymentListener - Deployment status for stream 'simple': DeploymentStatus{state=deployed}
22:44:39,029  INFO Deployer server.StreamDeploymentListener - Stream Stream{name='simple'} deployment attempt complete
22:44:53,367  INFO pool-11-thread-14 sink.simple - {"id":"82f6f875-4a59-c35f-92f2-d4eee4451a84","timestamp":1409258693350,"ts":1,"house_id":"1","load":104.0}
22:44:53,368  INFO pool-11-thread-14 sink.simple - {"id":"bf65206e-46b1-201a-e3f4-34c781b33897","timestamp":1409258693357,"ts":1,"house_id":"2","load":40.0}
22:44:54,143  INFO pool-11-thread-16 sink.simple - {"id":"dfe0308e-b8d1-7603-2266-8620c5faa78e","timestamp":1409258694142,"ts":2,"house_id":"1","load":50.0}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment