Skip to content

Instantly share code, notes, and snippets.

@feng-tao
Last active November 7, 2015 00:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feng-tao/0ef07fa64dc6bc278c31 to your computer and use it in GitHub Desktop.
Save feng-tao/0ef07fa64dc6bc278c31 to your computer and use it in GitHub Desktop.
public class KeyCountTask implements StreamTask, InitableTask, WindowableTask {
private KeyValueStore<String, Integer> store;
private static final String delim = ",| |:";
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
String msg = (String) envelope.getMessage();
String pageKey = "";
Integer currentCount = null;
String[] arr = msg.split(delim);
pageKey = arr[2];
if (!pageKey.equals("")) {
currentCount = store.get(pageKey);
if (currentCount == null) {
currentCount = 0;
}
store.put(pageKey, currentCount + 1);
}
}
@Override
public void init(Config config, TaskContext context) throws Exception {
store = (KeyValueStore<String, Integer>) context.getStore("page-key-tao");
}
@Override
public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
KeyValueIterator<String, Integer> iter = store.all();
while (iter.hasNext()) {
iter.next();
iter.remove();
}
iter.close();
}
}
@feng-tao
Copy link
Author

config

job.factory.class=org.apache.samza.job.local.ProcessJobFactory
task.opts=-Xmx2g -XX:+UseG1GC -XX:G1HeapRegionSize=4M -XX:+PrintGCDetails -Xloggc:logs/gc.log
stores.page-key-tao.key.serde=string
stores.page-key-tao.factory=org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory
stores.page-key-tao.msg.serde=integer
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
systems.kafka.producer.bootstrap.servers=$kafka_bootstrap_server
task.inputs=$input_topic_name
task.window.ms=90000
task.class=com.linkedin.samza.example.KeyCountTask
systems.kafka.consumer.zookeeper.connect=$LinkedIn_zookeeper
metrics.reporters=jmx,amf
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
systems.kafka.samza.msg.serde=string
systems.kafka.producer.metadata.broker.list=$LinkedIn_broker
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
task.checkpoint.replication.factor=3
task.checkpoint.system=kafka
systems.kafka.samza.key.serde=string
task.command.class=org.apache.samza.job.ShellCommandBuilder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment