Skip to content

Instantly share code, notes, and snippets.

@shanthoosh
Last active April 9, 2019 23:56
Show Gist options
  • Save shanthoosh/a7cadf7fd750be1f76c1eecf0714708e to your computer and use it in GitHub Desktop.
Save shanthoosh/a7cadf7fd750be1f76c1eecf0714708e to your computer and use it in GitHub Desktop.
Test logs for migrating the change-log manager and config to metadata store in samza

Samza-Yarn ApplicationMaster

Before the fix, there were four redundant metadata fetches of coordinator stream in samza-yarn ApplicationMaster:

[svenkata@svenkata-ld2 usercache]$ sudo grep "Fetching system stream metadata" ./appcache/application_1554757366064_0143/container_e43_1554757365064_0142_01_000001/logs/samza-job-coordinator.log
2019-04-09 14:21:16.275 [main] KafkaSystemAdmin [INFO] Fetching system stream metadata for [__samza_coordinator_samza-count-by-device_i001] from system samzametadatasystem
2019-04-09 14:21:16.275 [main] KafkaSystemAdmin [INFO] Fetching system stream metadata for [__samza_coordinator_samza-count-by-device_i001] from system samzametadatasystem
2019-04-09 14:21:16.275 [main] KafkaSystemAdmin [INFO] Fetching system stream metadata for [__samza_coordinator_samza-count-by-device_i001] from system samzametadatasystem
2019-04-09 14:21:16.275 [main] KafkaSystemAdmin [INFO] Fetching system stream metadata for [__samza_coordinator_samza-count-by-device_i001] from system samzametadatasystem

After the fix, there is only one metadata fetch of coordinator stream in samza-yarn ApplicationMaster:

[svenkata@svenkata-ld2 usercache]$ sudo grep "Fetching system stream metadata" ./appcache/application_1554757366064_0143/container_e43_1554757366064_0143_01_000001/logs/samza-job-coordinator.log
2019-04-09 17:21:16.275 [main] KafkaSystemAdmin [INFO] Fetching system stream metadata for [__samza_coordinator_samza-count-by-device_i001] from system samzametadatasystem

Before the fix, there were four coordinator stream store instantiation and registration in samza-yarn ApplicationMaster :

[svenkata@svenkata-ld2 usercache]$ sudo grep "CoordinatorStreamStore" ./appcache/application_1554757366064_0143/container_e43_1554757365064_0142_01_000001/logs/samza-job-coordinator.log | grep "Registering system stream partition"
2019-04-09 14:21:16.555 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 7646.
2019-04-09 14:21:16.555 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 7646.
2019-04-09 14:21:16.555 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 7646.
2019-04-09 14:21:16.555 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 7646.

After the fix, there was only one coordinator stream store instantiation and registration in samza-yarn ApplicationMaster:

[svenkata@svenkata-ld2 usercache]$ sudo grep "CoordinatorStreamStore" ./appcache/application_1554757366064_0143/container_e43_1554757366064_0143_01_000001/logs/samza-job-coordinator.log  
2019-04-09 17:21:16.267 [main] CoordinatorStreamStore [INFO] Starting the coordinator stream system consumer with config: { job.coordinator.monitor-partition-change.frequency.ms=300000, systems.samzametadatasystem.consumer.auto.offset.reset=smallest, job.coordinator.system=samzametadatasystem, systems.samzametadatasystem.producer.bootstrap.servers=localhost:16637, job.id=i001, systems.samzametadatasystem.producer.max.request.size=1048576, systems.samzametadatasystem.producer.compression.codec=gzip, systems.samzametadatasystem.producer.compression.type=none, job.name=samza-count-by-device, systems.samzametadatasystem.consumer.zookeeper.connect=localhost:12913/kafka-queuing, systems.samzametadatasystem.grestin.enabled=true, systems.samzametadatasystem.producer.request.required.acks=1}.
2019-04-09 17:21:16.555 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 7646.

Changelog partition mapping:

To verify that change log partition mapping was read correctly, it was logged to stdout. Here're the same change-log partition mapping read in consecutive run of a test hello-samza job:

First run:

Container: container_e43_1554757366064_0135_01_000001 on localhost:1158
LogAggregationType: LOCAL
===========================================================================================
LogType:stdout
LogLastModifiedTime:Tue Apr 09 17:05:00 +0000 2019
LogLength:2049
LogContents:
home_dir=/export/content/data/appcache/application_1554757366064_0135/container_e43_1554757366064_0135_01_000001
changelogPartitionMapping
{Partition 7=17, Partition 6=16, Partition 9=19, Partition 8=18, Partition 16=8, Partition 15=7, Partition 14=6, Partition 1=1, Partition 0=0, Partition 13=5, Partition 3=13, Partition 19=11, Partition 2=12, Partition 18=10, Partition 5=15, Partition 4=14, Partition 17=9, Partition 12=4, Partition 11=3, Partition 10=2}

Second run

Container: container_e43_1554757366064_0143_01_000001 on localhost:1158
LogAggregationType: LOCAL
===========================================================================================
LogType:stdout
LogLastModifiedTime:Tue Apr 09 17:21:17 +0000 2019
LogLength:2049
LogContents:
home_dir=/export/content/data/appcache/application_1554757366064_0143/container_e43_1554757366064_0143_01_000001
changelogPartitionMapping
{Partition 7=17, Partition 6=16, Partition 9=19, Partition 8=18, Partition 16=8, Partition 15=7, Partition 14=6, Partition 1=1, Partition 0=0, Partition 13=5, Partition 3=13, Partition 19=11, Partition 2=12, Partition 18=10, Partition 5=15, Partition 4=14, Partition 17=9, Partition 12=4, Partition 11=3, Partition 10=2}
End of LogType:stdout.This log file belongs to a running container (container_e43_1554757366064_0143_01_000001) and so may not be complete.
***********************************************************************

Checkpoint Tool

Result of running the checkpoint tool:

/export/apps/jdk/JDK-1_8_0_121/bin/java -javaagent:/export/apps/xtools/LNKD-intellij-2018.3/lib/idea_rt.jar=38316:/export/apps/xtools/LNKD-intellij-2018.3/bin -Dfile.encoding=UTF-8 -classpath /export/apps/jdk/JDK-1_8_0_121/jre/lib/charsets.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/deploy.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/cldrdata.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/dnsns.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/jaccess.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/jfxrt.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/localedata.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/nashorn.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/sunec.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/sunjce_provider.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/sunpkcs11.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/ext/zipfs.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/javaws.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/jce.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/jfr.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/jfxswt.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/jsse.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/management-agent.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/plugin.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/resources.jar:/export/apps/jdk/JDK-1_8_0_121/jre/lib/rt.jar:/home/svenkata/code/samza/samza-1/samza/out/production/samza-kafka_2.11:/home/svenkata/.m2/repository/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar:/home/svenkata/.m2/repository/org/scala-lang/scala-reflect/2.11.8/scala-reflect-2.11.8.jar:/home/svenkata/code/samza/samza-1/samza/out/production/samza-api:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.commons/commons-lang3/3.4/5fe28b9518e58819180a43a850fbc0dd24b7c050/commons-lang3-3.4.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.codehaus.jackson/jackson-mapper-asl/1.9.13/1ee2f2bed0e5dd29d1cb155a166e6f8d50bbddb7/jackson-mapper-asl-1.9.13.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/23.0/c947004bb13d18182be60077ade044099e4f26f1/guava-23.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.7/2b8019b6249bb05d81d3a3094e468753e2b21311/slf4j-api-1.7.7.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/io.dropwizard.metrics/metrics-core/3.1.2/224f03afd2521c6c94632f566beb1bb5ee32cf07/metrics-core-3.1.2.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.codehaus.jackson/jackson-core-asl/1.9.13/3c304d70f42f832e0a86d45bd437f692129299a4/jackson-core-asl-1.9.13.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.google.code.findbugs/jsr305/1.3.9/40719ea6961c0cb6afaeb6a921eaa1f6afd4cfdf/jsr305-1.3.9.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.google.errorprone/error_prone_annotations/2.0.18/5f65affce1684999e2f4024983835efc3504012e/error_prone_annotations-2.0.18.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.google.j2objc/j2objc-annotations/1.1/ed28ded51a8b1c6b112568def5f4b455e6809019/j2objc-annotations-1.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.codehaus.mojo/animal-sniffer-annotations/1.14/775b7e22fb10026eed3f86e8dc556dfafe35f2d5/animal-sniffer-annotations-1.14.jar:/home/svenkata/code/samza/samza-1/samza/out/production/samza-core_2.11:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.101tec/zkclient/0.8/c0f700a4a3b386279d7d8dd164edecbe836cbfdb/zkclient-0.8.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/net.sf.jopt-simple/jopt-simple/3.2/d625f12ba08083c8c16dcedd5396ec730e9e77ab/jopt-simple-3.2.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.commons/commons-collections4/4.0/da217367fd25e88df52ba79e47658d4cf928b0d1/commons-collections4-4.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-webapp/9.2.7.v20150116/c425e57423123c89ed72e76df8f5d0332f05ecee/jetty-webapp-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.scala-lang/scala-library/2.11.8/ddd5a8bced249bedd86fb4578a39b9fb71480573/scala-library-2.11.8.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/net.jodah/failsafe/1.1.0/b5c3d6166e974194eeacea21c3d07e707bad4ba4/failsafe-1.1.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.6.1/bd245d6746cdd4e6203e976e21d597a46f115802/slf4j-log4j12-1.6.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.zookeeper/zookeeper/3.4.6/1b2502e29da1ebaade2357cd1de35a855fa3755/zookeeper-3.4.6.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-xml/9.2.7.v20150116/81008da0080e82fd3c8e6d094a1241ce33ca7c6d/jetty-xml-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-servlet/9.2.7.v20150116/4c11b757c34627b21e493201a3173ed12ebe04a6/jetty-servlet-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/jline/jline/0.9.94/99a18e9a44834afdebc467294e1138364c207402/jline-0.9.94.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/io.netty/netty/3.7.0.Final/7a8c35599c68c0bf383df74469aa3e03d9aca87/netty-3.7.0.Final.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-util/9.2.7.v20150116/2fb9c6172e5704b1b6d8609c5e9dc15db3367d7/jetty-util-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-security/9.2.7.v20150116/f3f257d952bbe493a6a187d95395464fce9926fe/jetty-security-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/junit/junit/3.8.1/99129f16442844f6a4a11ae22fbbee40b14d774f/junit-3.8.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-server/9.2.7.v20150116/dc072b171e8b7ad5762c765c54bdf4e3b2198225/jetty-server-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/javax.servlet/javax.servlet-api/3.1.0/3cd63d075497751784b2fa84be59432f4905bf7c/javax.servlet-api-3.1.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-http/9.2.7.v20150116/ac6c19ab7daaffe86622f2d78e4cbeff1a674e85/jetty-http-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.eclipse.jetty/jetty-io/9.2.7.v20150116/11bd06fca00dd0c27a25aa0ba46a7c2cf19527d5/jetty-io-9.2.7.v20150116.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/log4j/log4j/1.2.16/7999a63bfccbc7c247a9aea10d83d4272bd492c6/log4j-1.2.16.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka_2.11/2.0.1/74edf9dcfe1b298422dcf133e8f02b64caa9c2ed/kafka_2.11-2.0.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.0.1/db87f87e64714faff9c90fdc97a06c7d8e79b672/kafka-clients-2.0.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-databind/2.9.7/e6faad47abd3179666e89068485a1b88a195ceb7/jackson-databind-2.9.7.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.yammer.metrics/metrics-core/2.2.0/f82c035cfa786d3cbec362c38c22a5f5b1bc8724/metrics-core-2.2.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.scala-lang/scala-reflect/2.11.12/2bb23c13c527566d9828107ca4108be2a2c06f01/scala-reflect-2.11.12.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.typesafe.scala-logging/scala-logging_2.11/3.9.0/e0dba06b4a763a0e2208182b264421baedbb0df/scala-logging_2.11-3.9.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.4.1/ad89b11ac280a2992d65e078af06f6709f1fe2fc/lz4-java-1.4.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.1/d5190b41f3de61e3b83d692322d58630252bc8c3/snappy-java-1.1.7.1.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-annotations/2.9.0/7c10d545325e3a6e72e06381afe469fd40eb701/jackson-annotations-2.9.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-core/2.9.7/4b7f0e0dc527fab032e9800ed231080fdc3ac015/jackson-core-2.9.7.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/net.sf.jopt-simple/jopt-simple/5.0.4/4fdac2fbe92dfad86aa6e9301736f6b4342a3f5c/jopt-simple-5.0.4.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.scala-lang/scala-library/2.11.12/bf5534e6fec3d665bd6419c952a929a8bdd4b591/scala-library-2.11.12.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/com.101tec/zkclient/0.10/c54d4b5a5e89af75a80b6d5857400165ce5188d0/zkclient-0.10.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.zookeeper/zookeeper/3.4.13/31e9937541cef95c4585b547eb2dbd34d3a76f1c/zookeeper-3.4.13.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.apache.yetus/audience-annotations/0.5.0/55762d3191a8d6610ef46d11e8cb70c7667342a3/audience-annotations-0.5.0.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/log4j/log4j/1.2.17/5af35056b4d257e4b64b9e8069c0746e8b08629f/log4j-1.2.17.jar:/home/svenkata/.gradle/caches/modules-2/files-2.1/io.netty/netty/3.10.6.Final/18ed04a0e502896552854926e908509db2987a00/netty-3.10.6.Final.jar org.apache.samza.checkpoint.CheckpointTool --config-path /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/deploy/samza/config/stream-table-join-example.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.6.1/bd245d6746cdd4e6203e976e21d597a46f115802/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/svenkata/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.25/110cefe2df103412849d72ef7a67e4e91e4266b4/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.samza.config.factories.PropertiesConfigFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Configuration read from the coordinator stream
{streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, job.default.system=kafka, job.id=1, serializers.registry.JsonSerdeV2-f362d7e6-b9fc-43b3-bdf6-8ae995f9795d.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-a6c40315-d580-4798-8d3b-667d76cb4ac6, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, streams.enriched-pageview-join-output.samza.system=kafka, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-34905f34-c2d8-450b-9b88-d8bd52cbb4af, app.run.id=1554848023767-93005864, job.name=pageview-profile-table-joiner, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-401d6ca8-f676-47b9-9a25-eaef00143021, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-34905f34-c2d8-450b-9b88-d8bd52cbb4af, systems.kafka.producer.bootstrap.servers=localhost:9092, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, systems.kafka.consumer.zookeeper.connect=localhost:2181, serializers.registry.StringSerde-a6c40315-d580-4798-8d3b-667d76cb4ac6.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.pageview-join-input.samza.system=kafka, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, serializers.registry.JsonSerdeV2-401d6ca8-f676-47b9-9a25-eaef00143021.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-3f736e53-060d-403d-b692-0564751904d9, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, task.window.ms=-1, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.0.2-SNAPSHOT-dist.tar.gz, serializers.registry.JsonSerdeV2-34905f34-c2d8-450b-9b88-d8bd52cbb4af.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, streams.profile-table-input.samza.system=kafka, serializers.registry.StringSerde-3f736e53-060d-403d-b692-0564751904d9.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, app.mode=STREAM, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-f362d7e6-b9fc-43b3-bdf6-8ae995f9795d, app.class=samza.examples.cookbook.StreamTableJoinExample, stores.profile-table.msg.serde=JsonSerdeV2-401d6ca8-f676-47b9-9a25-eaef00143021, job.container.count=2}
Current checkpoint for task: Partition 0 1200

Process finished with exit code 0

ProcessJobFactory

Local deployment of a samza job within LinkedIn with the fix works fine. From the following logs, coordinator stream is read only once.

grep 'CoordinatorStreamStore'  /export/content/apps/samza-count-by-device/i001/logs/samza-count-by-device-runner.out
2019-04-09 16:10:14.531 [main] CoordinatorStreamStore [INFO] Starting the coordinator stream system consumer with config: {Nuked since it contains linkedin specific secrets}
2019-04-09 16:10:15.401 [main] CoordinatorStreamStore [INFO] Registering system stream partition: SystemStreamPartition [samzametadatasystem, __samza_coordinator_samza-count-by-device_i001, 0] with offset: 31779.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment