-
-
Save feng-tao/0ef07fa64dc6bc278c31 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KeyCountTask implements StreamTask, InitableTask, WindowableTask { | |
private KeyValueStore<String, Integer> store; | |
private static final String delim = ",| |:"; | |
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { | |
String msg = (String) envelope.getMessage(); | |
String pageKey = ""; | |
Integer currentCount = null; | |
String[] arr = msg.split(delim); | |
pageKey = arr[2]; | |
if (!pageKey.equals("")) { | |
currentCount = store.get(pageKey); | |
if (currentCount == null) { | |
currentCount = 0; | |
} | |
store.put(pageKey, currentCount + 1); | |
} | |
} | |
@Override | |
public void init(Config config, TaskContext context) throws Exception { | |
store = (KeyValueStore<String, Integer>) context.getStore("page-key-tao"); | |
} | |
@Override | |
public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { | |
KeyValueIterator<String, Integer> iter = store.all(); | |
while (iter.hasNext()) { | |
iter.next(); | |
iter.remove(); | |
} | |
iter.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
config
job.factory.class=org.apache.samza.job.local.ProcessJobFactory
task.opts=-Xmx2g -XX:+UseG1GC -XX:G1HeapRegionSize=4M -XX:+PrintGCDetails -Xloggc:logs/gc.log
stores.page-key-tao.key.serde=string
stores.page-key-tao.factory=org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory
stores.page-key-tao.msg.serde=integer
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
systems.kafka.producer.bootstrap.servers=$kafka_bootstrap_server
task.inputs=$input_topic_name
task.window.ms=90000
task.class=com.linkedin.samza.example.KeyCountTask
systems.kafka.consumer.zookeeper.connect=$LinkedIn_zookeeper
metrics.reporters=jmx,amf
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
systems.kafka.samza.msg.serde=string
systems.kafka.producer.metadata.broker.list=$LinkedIn_broker
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
task.checkpoint.replication.factor=3
task.checkpoint.system=kafka
systems.kafka.samza.key.serde=string
task.command.class=org.apache.samza.job.ShellCommandBuilder