Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shanthoosh/88ef289cb98d6d3be9db9fb76bc04472 to your computer and use it in GitHub Desktop.
Save shanthoosh/88ef289cb98d6d3be9db9fb76bc04472 to your computer and use it in GitHub Desktop.
Logs to demonstrate that ApplicatoinMaster ignores configurations with serialized null values from the coordinator stream.
Published the samza reserved configurations with serialized null values into coordinator stream:
```java
╰─λ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __samza_coordinator_pageview-profile-table-joiner_1 --property print.key=true --property key.separator="-" --from-beginning | grep "set-config"
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205737}
[1,"set-config","tables.profile-table.provider.factory"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"org.apache.samza.storage.kv.LocalTableProviderFactory"},"username":"svenkata","timestamp":1556154205791}
[1,"set-config","systems.kafka.samza.factory"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"org.apache.samza.system.kafka.KafkaSystemFactory"},"username":"svenkata","timestamp":1556154205792}
[1,"set-config","job.default.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205792}
[1,"set-config","job.id"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1"},"username":"svenkata","timestamp":1556154205792}
[1,"set-config","serializers.registry.StringSerde-32e63a08-f16c-43ac-8261-d24f4b4badef.samza.serialized.instance"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg="},"username":"svenkata","timestamp":1556154205793}
[1,"set-config","systems.kafka.default.stream.replication.factor"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1"},"username":"svenkata","timestamp":1556154205793}
[1,"set-config","stores.profile-table.key.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"StringSerde-32e63a08-f16c-43ac-8261-d24f4b4badef"},"username":"svenkata","timestamp":1556154205794}
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"2147483647"},"username":"svenkata","timestamp":1556154205794}
[1,"set-config","serializers.registry.JsonSerdeV2-e4f7373f-da63-4176-a9e8-c4c404e8f925.samza.serialized.instance"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA=="},"username":"svenkata","timestamp":1556154205795}
[1,"set-config","streams.enriched-pageview-join-output.samza.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205795}
[1,"set-config","streams.pageview-join-input.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-d13051cf-8262-4e94-962c-96c3b36fcbfb"},"username":"svenkata","timestamp":1556154205796}
[1,"set-config","app.run.id"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1556154204508-063c4569"},"username":"svenkata","timestamp":1556154205796}
[1,"set-config","job.name"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"pageview-profile-table-joiner"},"username":"svenkata","timestamp":1556154205797}
[1,"set-config","streams.profile-table-input.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-e4f7373f-da63-4176-a9e8-c4c404e8f925"},"username":"svenkata","timestamp":1556154205797}
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-d13051cf-8262-4e94-962c-96c3b36fcbfb"},"username":"svenkata","timestamp":1556154205798}
[1,"set-config","systems.kafka.producer.bootstrap.servers"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"localhost:9092"},"username":"svenkata","timestamp":1556154205798}
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"true"},"username":"svenkata","timestamp":1556154205799}
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"oldest"},"username":"svenkata","timestamp":1556154205799}
[1,"set-config","systems.kafka.consumer.zookeeper.connect"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"localhost:2181"},"username":"svenkata","timestamp":1556154205799}
// Samza specific configuration keys with value serialized to null and stored in coordinator stream.
[1,"set-config","serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance"]-null
[1,"set-config","serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance"]-null
[1,"set-config","samza.autoscaling.server.url"]-null
[1,"set-config","serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance"]-null
[1,"set-config","serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance"]-null
[1,"set-config","serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance"]-null
[1,"set-config","serializers.registry.last-message-interaction-v2-specific-record-to-json.class"]-null
[1,"set-config","serializers.registry.recipient-offline-score-specific-record-to-json.class"]-null
[1,"set-config","stores.last-message-interaction-v2.changelog"]-null
[1,"set-config","stores.last-message-interaction-v2.factory"]-null
[1,"set-config","stores.last-message-interaction-v2.key.serde"]-null
[1,"set-config","stores.last-message-interaction-v2.rocksdb.ttl.ms"]-null
[1,"set-config","stores.last-message-interaction-v2.msg.serde"]-null
[1,"set-config","stores.recipient-offline-score.msg.serde"]-null
[1,"set-config","stores.recipient-offline-score.factory"]-null
[1,"set-config","stores.recipient-offline-score.key.class"]-null
[1,"set-config","stores.recipient-offline-score.changelog.replication.factor"]-null
[1,"set-config","stores.recipient-offline-score.changelog.replication.factor"]-null
[1,"set-config","stores.recipient-offline-score.key.serde"]-null
[1,"set-config","stores.recipient-offline-score.changelog"]-null
```
Without the patch, if we try to run the job, the SamzaContainer fails with the exception:
```java
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1)
╰─λ tail -n 150 ./deploy/yarn/logs/userlogs/application_1556153599011_0007/container_1556153599011_0007_01_000002/samza-container-0.log | head -n 100 0 < 19:01:59
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2019-04-25 19:00:30.081 [main] ConsumerConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config.
2019-04-25 19:00:30.081 [main] AppInfoParser [INFO] Kafka version : 2.0.1
2019-04-25 19:00:30.081 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5
2019-04-25 19:00:30.081 [main] AppInfoParser [WARN] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka_consumer-pageview_profile_table_joiner-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
at org.apache.samza.system.kafka.KafkaSystemConsumer.createKafkaConsumerImpl(KafkaSystemConsumer.java:146)
at org.apache.samza.system.kafka.KafkaSystemFactory.getConsumer(KafkaSystemFactory.scala:57)
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:223)
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:219)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:219)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75)
2019-04-25 19:00:30.082 [main] KafkaSystemFactory [INFO] Created kafka consumer for system kafka, clientId kafka_consumer-pageview_profile_table_joiner-1: org.apache.kafka.clients.consumer.KafkaConsumer@2a8d39c4
2019-04-25 19:00:30.082 [main] KafkaConsumerProxy [INFO] Creating KafkaConsumerProxy with systeName=kafka, clientId=kafka_consumer-pageview_profile_table_joiner-1, metricsName=kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.082 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Created proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.083 [main] KafkaSystemFactory [INFO] Created samza system consumer for system kafka, config {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, stores.recipient-offline-score.key.class=null, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, stores.recipient-offline-score.msg.serde=null, random-key=null, stores.recipient-offline-score.factory=null, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-3bcc5e3b-5346-4aea-be61-b40848f2bbd3, serializers.registry.StringSerde-a158545f-60db-44e5-b8f4-60dfe4a95f2b.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, stores.recipient-offline-score.key.serde=null, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f, stores.recipient-offline-score.changelog=null, serializers.registry.JsonSerdeV2-4ed15e46-b2c2-4fdf-be27-944aee04360e.samza.serialized.instance=null, app.run.id=1556244016574-255e6fcb, serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance=null, job.name=pageview-profile-table-joiner, serializers.registry.JsonSerdeV2-55b1464f-2840-4104-be32-e378cfdae26d.samza.serialized.instance=null, systems.kafka.producer.bootstrap.servers=localhost:9092, serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, stores.last-message-interaction-v2.key.serde=null, serializers.registry.JsonSerdeV2-2f3f4c41-7d3c-4240-b1fb-52cddba57693.samza.serialized.instance=null, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, serializers.registry.StringSerde-ae153054-890f-422a-b98c-02c18761dc08.samza.serialized.instance=null, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, stores.last-message-interaction-v2.changelog=null, serializers.registry.JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, stores.last-message-interaction-v2.factory=null, app.mode=STREAM, stores.recipient-offline-score.changelog.replication.factor=null, app.class=samza.examples.cookbook.StreamTableJoinExample, serializers.registry.JsonSerdeV2-72395b28-b1a4-40f8-9203-2fe24128b971.samza.serialized.instance=null, serializers.registry.JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, job.container.count=1, serializers.registry.JsonSerdeV2-ab33ac1d-459a-4fcb-a9c4-cf2009db8d3c.samza.serialized.instance=null, job.default.system=kafka, job.id=1, serializers.registry.JsonSerdeV2-aa46a488-ac4b-4a64-ba94-8391491aacfc.samza.serialized.instance=null, serializers.registry.StringSerde-3bcc5e3b-5346-4aea-be61-b40848f2bbd3.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance=null, samza.autoscaling.server.url=null, serializers.registry.StringSerde-d221cb99-c56e-4924-9699-6f12ebddb40e.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.enriched-pageview-join-output.samza.system=kafka, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f, stores.last-message-interaction-v2.rocksdb.ttl.ms=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, systems.kafka.consumer.zookeeper.connect=localhost:2181, stores.last-message-interaction-v2.msg.serde=null, streams.pageview-join-input.samza.system=kafka, serializers.registry.StringSerde-60bbf2c3-0787-4399-8aed-4e2632d32656.samza.serialized.instance=null, serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance=null, serializers.registry.last-message-interaction-v2-specific-record-to-json.class=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-d221cb99-c56e-4924-9699-6f12ebddb40e, task.window.ms=-1, serializers.registry.recipient-offline-score-specific-record-to-json.class=null, streams.profile-table-input.samza.system=kafka, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-6d178166-b28d-4513-b620-e466f9f12a59, serializers.registry.StringSerde-409f4a88-7605-4d43-830e-c27e9ad3baed.samza.serialized.instance=null, stores.profile-table.msg.serde=JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c, serializers.registry.JsonSerdeV2-6d178166-b28d-4513-b620-e466f9f12a59.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==}: kafka:kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.083 [main] SamzaContainer$ [INFO] Got system consumers: Set(kafka)
2019-04-25 19:00:30.084 [main] JobPlanner [WARN] job.id is a deprecated configuration, use app.id instead.
2019-04-25 19:00:30.084 [main] JobPlanner [WARN] job.name is a deprecated configuration, use use app.name instead.
2019-04-25 19:00:30.085 [main] KafkaSystemFactory [INFO] Creating kafka producer for system kafka, producerClientId kafka_producer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.086 [main] SamzaContainer$ [INFO] Got system producers: Set(kafka)
2019-04-25 19:00:30.089 [main] Util$ [ERROR] Unable to create an instance for class null.
java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.samza.util.Util$.getObj(Util.scala:56)
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:255)
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:250)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Set$Set2.foreach(Set.scala:128)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:250)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75)
2019-04-25 19:00:30.089 [main] CoordinatorStreamStore [INFO] Stopping the coordinator stream system consumer.
2019-04-25 19:00:30.094 [main] KafkaSystemProducer [INFO] Stopping producer for system: kafka
2019-04-25 19:00:30.094 [main] KafkaProducer [INFO] [Producer clientId=kafka_producer-pageview_profile_table_joiner-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-04-25 19:00:30.095 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Stopping Samza kafkaConsumer
2019-04-25 19:00:30.095 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Stopping proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.096 [main] KafkaConsumerProxy [INFO] Shutting down KafkaConsumerProxy poll thread Samza KafkaConsumerProxy Poll Thread-1 - kafka for consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:00:30.376 [Samza KafkaConsumerProxy Poll Thread-1 - kafka] KafkaConsumerProxy [INFO] KafkaConsumerProxy for system kafka has stopped.
2019-04-25 19:00:30.376 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Closing kafkaSystemConsumer org.apache.kafka.clients.consumer.KafkaConsumer@26df6e3a
2019-04-25 19:00:30.379 [main] SamzaUncaughtExceptionHandler [ERROR] Uncaught exception in thread main.
java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.samza.util.Util$.getObj(Util.scala:56)
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:255)
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:250)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Set$Set2.foreach(Set.scala:128)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:250)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75)
2019-04-25 19:00:30.391 [main] ThreadUtil [INFO] Thread dump from uncaught exception handler.
"kafka-admin-client-thread | adminclient-2" Id=23 RUNNABLE (in native)
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
```
After deploying the change, from the logs we can see that the configurations with null values are ignored in the ApplicationMaster.
```
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-1/samza (master↑1 ✚1)
╰─λ ./gradlew publishToMavenLocal
To honour the JVM settings for this build a new JVM will be forked. Please consider using the daemon: https://docs.gradle.org/2.8/userguide/gradle_daemon.html.
:samza-api:generatePomFileForMavenJavaPublication
:samza-api:compileJava UP-TO-DATE
:samza-api:processResources UP-TO-DATE
:samza-api:classes UP-TO-DATE
:samza-api:jar UP-TO-DATE
:samza-api:publishMavenJavaPublicationToMavenLocal
:samza-api:publishToMavenLocal
:samza-autoscaling_2.11:generatePomFileForMavenJavaPublication
:samza-core_2.11:compileJava UP-TO-DATE
:samza-core_2.11:compileScala
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:298: warning: non-variable type argument Any in type org.apache.samza.serializers.NoOpSerde[Any] is unchecked since it is eliminated by erasure
[ant:scalac] .filter(!_.isInstanceOf[NoOpSerde[Any]])
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:317: warning: non-variable type argument Any in type org.apache.samza.serializers.NoOpSerde[Any] is unchecked since it is eliminated by erasure
[ant:scalac] .filter(!_.isInstanceOf[NoOpSerde[Any]])
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:71: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information.
[ant:scalac] val isClosableTask = task.isInstanceOf[ClosableTask]
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:258: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information.
[ant:scalac] if (task.isInstanceOf[ClosableTask]) {
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:261: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information.
[ant:scalac] task.asInstanceOf[ClosableTask].close
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:352: warning: method onCheckpoint in trait CheckpointListener is deprecated: see corresponding Javadoc for more information.
[ant:scalac] checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
[ant:scalac] ^
[ant:scalac] 6 warnings found
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-core_2.11:processResources UP-TO-DATE
:samza-core_2.11:classes
:samza-core_2.11:jar
:samza-autoscaling_2.11:compileJava UP-TO-DATE
:samza-autoscaling_2.11:compileScala
:samza-autoscaling_2.11:processResources UP-TO-DATE
:samza-autoscaling_2.11:classes
:samza-autoscaling_2.11:jar UP-TO-DATE
:samza-autoscaling_2.11:publishMavenJavaPublicationToMavenLocal
:samza-autoscaling_2.11:publishToMavenLocal
:samza-aws_2.11:generatePomFileForMavenJavaPublication
:samza-aws_2.11:compileJava
:samza-aws_2.11:processResources UP-TO-DATE
:samza-aws_2.11:classes
:samza-aws_2.11:jar UP-TO-DATE
:samza-aws_2.11:publishMavenJavaPublicationToMavenLocal
:samza-aws_2.11:publishToMavenLocal
:samza-azure_2.11:generatePomFileForMavenJavaPublication
:samza-azure_2.11:compileJava
Note: /home/svenkata/code/samza/samza-1/samza/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-azure_2.11:processResources UP-TO-DATE
:samza-azure_2.11:classes
:samza-azure_2.11:jar UP-TO-DATE
:samza-azure_2.11:publishMavenJavaPublicationToMavenLocal
:samza-azure_2.11:publishToMavenLocal
:samza-core_2.11:generatePomFileForMavenJavaPublication
:samza-core_2.11:publishMavenJavaPublicationToMavenLocal
:samza-core_2.11:publishToMavenLocal
:samza-elasticsearch_2.11:generatePomFileForMavenJavaPublication
:samza-elasticsearch_2.11:compileJava
:samza-elasticsearch_2.11:processResources UP-TO-DATE
:samza-elasticsearch_2.11:classes
:samza-elasticsearch_2.11:jar UP-TO-DATE
:samza-elasticsearch_2.11:publishMavenJavaPublicationToMavenLocal
:samza-elasticsearch_2.11:publishToMavenLocal
:samza-hdfs_2.11:generatePomFileForMavenJavaPublication
:samza-yarn_2.11:compileJava UP-TO-DATE
:samza-yarn_2.11:compileScala
Note: /home/svenkata/code/samza/samza-1/samza/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-yarn_2.11:processResources UP-TO-DATE
:samza-yarn_2.11:classes
:samza-yarn_2.11:lesscss UP-TO-DATE
:samza-yarn_2.11:jar UP-TO-DATE
:samza-hdfs_2.11:compileJava UP-TO-DATE
:samza-hdfs_2.11:compileScala
:samza-hdfs_2.11:processResources UP-TO-DATE
:samza-hdfs_2.11:classes
:samza-hdfs_2.11:jar UP-TO-DATE
:samza-hdfs_2.11:publishMavenJavaPublicationToMavenLocal
:samza-hdfs_2.11:publishToMavenLocal
:samza-kafka_2.11:generatePomFileForMavenJavaPublication
:samza-kafka_2.11:compileJava UP-TO-DATE
:samza-kafka_2.11:compileScala
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala:81: warning: class ZkUtils in package utils is deprecated: This is an internal class that is no longer used by Kafka and will be removed in a future release. Please use org.apache.kafka.clients.admin.AdminClient instead.
[ant:scalac] val connectZk: () => ZkUtils) extends Logging {
[ant:scalac] ^
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala:110: warning: object ZkUtils in package utils is deprecated: This is an internal class that is no longer used by Kafka and will be removed in a future release. Please use org.apache.kafka.clients.admin.AdminClient instead.
[ant:scalac] ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics()
[ant:scalac] ^
[ant:scalac] two warnings found
Note: /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-kafka_2.11:processResources UP-TO-DATE
:samza-kafka_2.11:classes
:samza-kafka_2.11:jar UP-TO-DATE
:samza-kafka_2.11:publishMavenJavaPublicationToMavenLocal
:samza-kafka_2.11:publishToMavenLocal
:samza-kv-inmemory_2.11:generatePomFileForMavenJavaPublication
:samza-kv_2.11:compileJava UP-TO-DATE
:samza-kv_2.11:compileScala
Note: /home/svenkata/code/samza/samza-1/samza/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-kv_2.11:processResources UP-TO-DATE
:samza-kv_2.11:classes
:samza-kv_2.11:jar UP-TO-DATE
:samza-kv-inmemory_2.11:compileJava UP-TO-DATE
:samza-kv-inmemory_2.11:compileScala
:samza-kv-inmemory_2.11:processResources UP-TO-DATE
:samza-kv-inmemory_2.11:classes
:samza-kv-inmemory_2.11:jar UP-TO-DATE
:samza-kv-inmemory_2.11:publishMavenJavaPublicationToMavenLocal
:samza-kv-inmemory_2.11:publishToMavenLocal
:samza-kv-rocksdb_2.11:generatePomFileForMavenJavaPublication
:samza-kv-rocksdb_2.11:compileJava UP-TO-DATE
:samza-kv-rocksdb_2.11:compileScala
Note: /home/svenkata/code/samza/samza-1/samza/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-kv-rocksdb_2.11:processResources UP-TO-DATE
:samza-kv-rocksdb_2.11:classes
:samza-kv-rocksdb_2.11:jar UP-TO-DATE
:samza-kv-rocksdb_2.11:publishMavenJavaPublicationToMavenLocal
:samza-kv-rocksdb_2.11:publishToMavenLocal
:samza-kv_2.11:generatePomFileForMavenJavaPublication
:samza-kv_2.11:publishMavenJavaPublicationToMavenLocal
:samza-kv_2.11:publishToMavenLocal
:samza-log4j2_2.11:generatePomFileForMavenJavaPublication
:samza-log4j2_2.11:compileJava
Processing annotations
Annotations processed
Processing annotations
No elements to process
Note: /home/svenkata/code/samza/samza-1/samza/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-log4j2_2.11:processResources UP-TO-DATE
:samza-log4j2_2.11:classes
:samza-log4j2_2.11:jar UP-TO-DATE
:samza-log4j2_2.11:publishMavenJavaPublicationToMavenLocal
:samza-log4j2_2.11:publishToMavenLocal
:samza-log4j_2.11:generatePomFileForMavenJavaPublication
:samza-log4j_2.11:compileJava
Note: /home/svenkata/code/samza/samza-1/samza/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-log4j_2.11:processResources UP-TO-DATE
:samza-log4j_2.11:classes
:samza-log4j_2.11:jar UP-TO-DATE
:samza-log4j_2.11:publishMavenJavaPublicationToMavenLocal
:samza-log4j_2.11:publishToMavenLocal
:samza-rest_2.11:generatePomFileForMavenJavaPublication
:samza-shell:compileJava UP-TO-DATE
:samza-shell:processResources UP-TO-DATE
:samza-shell:classes UP-TO-DATE
:samza-shell:jar UP-TO-DATE
:samza-rest_2.11:compileJava
Note: /home/svenkata/code/samza/samza-1/samza/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
:samza-rest_2.11:processResources UP-TO-DATE
:samza-rest_2.11:classes
:samza-rest_2.11:jar UP-TO-DATE
:samza-rest_2.11:publishMavenJavaPublicationToMavenLocal
:samza-rest_2.11:publishToMavenLocal
:samza-shell:generatePomFileForMavenJavaPublication
:samza-shell:shellTarGz UP-TO-DATE
:samza-shell:publishMavenJavaPublicationToMavenLocal
:samza-shell:publishToMavenLocal
:samza-sql-shell_2.11:generatePomFileForMavenJavaPublication
:samza-sql_2.11:compileJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-sql_2.11:processResources UP-TO-DATE
:samza-sql_2.11:classes
:samza-sql_2.11:jar UP-TO-DATE
:samza-tools_2.11:compileJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
:samza-tools_2.11:processResources UP-TO-DATE
:samza-tools_2.11:classes
:samza-tools_2.11:jar UP-TO-DATE
:samza-sql-shell_2.11:compileJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
:samza-sql-shell_2.11:processResources UP-TO-DATE
:samza-sql-shell_2.11:classes
:samza-sql-shell_2.11:jar UP-TO-DATE
:samza-sql-shell_2.11:publishMavenJavaPublicationToMavenLocal
:samza-sql-shell_2.11:publishToMavenLocal
:samza-sql_2.11:generatePomFileForMavenJavaPublication
:samza-sql_2.11:publishMavenJavaPublicationToMavenLocal
:samza-sql_2.11:publishToMavenLocal
:samza-test_2.11:generatePomFileForMavenJavaPublication
:samza-test_2.11:compileJava UP-TO-DATE
:samza-test_2.11:compileScala
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:samza-test_2.11:processResources UP-TO-DATE
:samza-test_2.11:classes
:samza-test_2.11:jar UP-TO-DATE
:samza-test_2.11:publishMavenJavaPublicationToMavenLocal
:samza-test_2.11:publishToMavenLocal
:samza-tools_2.11:generatePomFileForMavenJavaPublication
:samza-tools_2.11:publishMavenJavaPublicationToMavenLocal
:samza-tools_2.11:publishToMavenLocal
:samza-yarn_2.11:generatePomFileForMavenJavaPublication
:samza-yarn_2.11:publishMavenJavaPublicationToMavenLocal
:samza-yarn_2.11:publishToMavenLocal
BUILD SUCCESSFUL
Total time: 46.454 secs
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1)
╰─λ grep 'Ignoring it.' ./deploy/yarn/logs/userlogs/application_1556153599011_0008/container_1556153599011_0008_01_000001/samza-job-coordinator.log 0 < 19:12:25
╰─λ grep 'Ignoring it' deploy/yarn/logs/userlogs/application_1556153599011_0009/container_1556153599011_0009_01_000001/samza-job-coordinator.log 0 < 09:52:53
2019-04-26 09:52:48.646 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.key.class is empty or null. Ignoring it.
2019-04-26 09:52:48.646 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.msg.serde is empty or null. Ignoring it.
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: random-key is empty or null. Ignoring it.
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.factory is empty or null. Ignoring it.
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-a158545f-60db-44e5-b8f4-60dfe4a95f2b.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.648 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.key.serde is empty or null. Ignoring it.
2019-04-26 09:52:48.648 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.changelog is empty or null. Ignoring it.
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-4ed15e46-b2c2-4fdf-be27-944aee04360e.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-55b1464f-2840-4104-be32-e378cfdae26d.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.650 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.650 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.651 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.key.serde is empty or null. Ignoring it.
2019-04-26 09:52:48.651 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-2f3f4c41-7d3c-4240-b1fb-52cddba57693.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-ae153054-890f-422a-b98c-02c18761dc08.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.changelog is empty or null. Ignoring it.
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.factory is empty or null. Ignoring it.
2019-04-26 09:52:48.653 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.changelog.replication.factor is empty or null. Ignoring it.
2019-04-26 09:52:48.653 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-72395b28-b1a4-40f8-9203-2fe24128b971.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-ab33ac1d-459a-4fcb-a9c4-cf2009db8d3c.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-aa46a488-ac4b-4a64-ba94-8391491aacfc.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.655 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.rocksdb.ttl.ms is empty or null. Ignoring it.
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.msg.serde is empty or null. Ignoring it.
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-60bbf2c3-0787-4399-8aed-4e2632d32656.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance is empty or null. Ignoring it.
2019-04-26 09:52:48.657 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.last-message-interaction-v2-specific-record-to-json.class is empty or null. Ignoring it.
2019-04-26 09:52:48.657 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.recipient-offline-score-specific-record-to-json.class is empty or null. Ignoring it.
2019-04-26 09:52:48.658 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-409f4a88-7605-4d43-830e-c27e9ad3baed.samza.serialized.instance is empty or null. Ignoring it.
```
From logs, we can conclude that the SamzaContainer had started up successfully and the configuration-map read in the SamzaContainer did not have any string key with null value:
```java
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1)
╰─λ tail -n 250 ./deploy/yarn/logs/userlogs/application_1556153599011_0008/container_1556153599011_0008_01_000002/samza-container-0.log 0 < 19:14:12
2019-04-25 19:11:07.220 [main] SamzaContainer$ [INFO] Using configuration: {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, job.default.system=kafka, serializers.registry.StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, job.id=1, serializers.registry.JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, streams.enriched-pageview-join-output.samza.system=kafka, serializers.registry.JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, app.run.id=1556244654191-a2723d2d, job.name=pageview-profile-table-joiner, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, systems.kafka.producer.bootstrap.servers=localhost:9092, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, serializers.registry.JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==, serializers.registry.StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, systems.kafka.consumer.zookeeper.connect=localhost:2181, streams.pageview-join-input.samza.system=kafka, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, task.window.ms=-1, streams.profile-table-input.samza.system=kafka, app.mode=STREAM, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, app.class=samza.examples.cookbook.StreamTableJoinExample, stores.profile-table.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, job.container.count=1}
2019-04-25 19:11:07.333 [main] ConsumerConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config.
2019-04-25 19:11:07.333 [main] AppInfoParser [INFO] Kafka version : 2.0.1
2019-04-25 19:11:07.333 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5
2019-04-25 19:11:07.333 [main] AppInfoParser [WARN] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka_consumer-pageview_profile_table_joiner-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
at org.apache.samza.system.kafka.KafkaSystemConsumer.createKafkaConsumerImpl(KafkaSystemConsumer.java:146)
at org.apache.samza.system.kafka.KafkaSystemFactory.getConsumer(KafkaSystemFactory.scala:57)
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:223)
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:219)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:219)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75)
2019-04-25 19:11:07.334 [main] KafkaSystemFactory [INFO] Created kafka consumer for system kafka, clientId kafka_consumer-pageview_profile_table_joiner-1: org.apache.kafka.clients.consumer.KafkaConsumer@4c398c80
2019-04-25 19:11:07.334 [main] KafkaConsumerProxy [INFO] Creating KafkaConsumerProxy with systeName=kafka, clientId=kafka_consumer-pageview_profile_table_joiner-1, metricsName=kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.334 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Created proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.334 [main] KafkaSystemFactory [INFO] Created samza system consumer for system kafka, config {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, job.default.system=kafka, serializers.registry.StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, job.id=1, serializers.registry.JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, streams.enriched-pageview-join-output.samza.system=kafka, serializers.registry.JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, app.run.id=1556244654191-a2723d2d, job.name=pageview-profile-table-joiner, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, systems.kafka.producer.bootstrap.servers=localhost:9092, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, serializers.registry.JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==, serializers.registry.StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, systems.kafka.consumer.zookeeper.connect=localhost:2181, streams.pageview-join-input.samza.system=kafka, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, task.window.ms=-1, streams.profile-table-input.samza.system=kafka, app.mode=STREAM, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, app.class=samza.examples.cookbook.StreamTableJoinExample, stores.profile-table.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, job.container.count=1}: kafka:kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.335 [main] SamzaContainer$ [INFO] Got system consumers: Set(kafka)
2019-04-25 19:11:07.336 [main] JobPlanner [WARN] job.id is a deprecated configuration, use app.id instead.
2019-04-25 19:11:07.336 [main] JobPlanner [WARN] job.name is a deprecated configuration, use use app.name instead.
2019-04-25 19:11:07.336 [main] KafkaSystemFactory [INFO] Creating kafka producer for system kafka, producerClientId kafka_producer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.337 [main] SamzaContainer$ [INFO] Got system producers: Set(kafka)
2019-04-25 19:11:07.340 [main] SamzaContainer$ [INFO] Got serdes from factories: Set()
2019-04-25 19:11:07.357 [main] SamzaContainer$ [INFO] Got serdes from serialized instances: Set(StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053)
2019-04-25 19:11:07.393 [main] SamzaContainer$ [INFO] Got change log system streams: Map()
2019-04-25 19:11:07.396 [main] SamzaContainer$ [INFO] Got intermediate streams: List(pageview-profile-table-joiner-1-partition_by-join)
2019-04-25 19:11:07.400 [main] SamzaContainer$ [INFO] Setting up JVM metrics.
2019-04-25 19:11:07.405 [main] SamzaContainer$ [INFO] Setting up message chooser.
2019-04-25 19:11:07.428 [main] DefaultChooser [INFO] Building default chooser with: useBatching=false, useBootstrapping=false, usePriority=true
2019-04-25 19:11:07.433 [main] SamzaContainer$ [INFO] Setting up metrics reporters.
2019-04-25 19:11:07.435 [main] SamzaContainer$ [INFO] Got metrics reporters: Set()
2019-04-25 19:11:07.436 [main] SamzaContainer$ [INFO] Got security manager: null
2019-04-25 19:11:07.437 [main] SamzaContainer$ [INFO] Got checkpoint manager: null
2019-04-25 19:11:07.438 [main] SamzaContainer$ [INFO] Got checkpointListeners : Map()
2019-04-25 19:11:07.443 [main] SamzaContainer$ [INFO] Got offset manager: org.apache.samza.checkpoint.OffsetManager@2c3dec30
2019-04-25 19:11:07.451 [main] SamzaContainer$ [INFO] Got storage engines: Set(profile-table)
2019-04-25 19:11:07.451 [main] SamzaContainer$ [INFO] Got single thread mode: false
2019-04-25 19:11:07.452 [main] SamzaContainer$ [INFO] Got thread pool size: 0
2019-04-25 19:11:07.452 [main] TaskFactoryUtil [INFO] Got an AsyncStreamTask implementation.
2019-04-25 19:11:07.455 [main] SamzaContainer$ [INFO] Got default storage engine base directory: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state
2019-04-25 19:11:07.456 [main] SamzaContainer$ [INFO] Got base directory for non logged data stores: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state
2019-04-25 19:11:07.456 [main] SamzaContainer$ [WARN] No override was provided for logged store base directory. This disables local state re-use on application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment variable in all machines running the Samza container or configure job.logged.store.base.dir for your application
2019-04-25 19:11:07.457 [main] SamzaContainer$ [INFO] Got base directory for logged data stores: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state
2019-04-25 19:11:07.469 [main] ContainerStorageManager [INFO] Starting with changelogSystemStreams = {} sideInputSystemStreams = {}
2019-04-25 19:11:07.551 [main] RocksDbOptionsHelper [INFO] Using prepareForBulkLoad for restore to /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0
2019-04-25 19:11:07.564 [main] ContainerStorageManager [INFO] Created store profile-table for task Partition 0 in mode BulkLoad
2019-04-25 19:11:07.606 [main] SamzaContainer$ [INFO] Got task SSPs: Set(SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, profile-table-input, 0])
2019-04-25 19:11:07.607 [main] SamzaContainer$ [INFO] Got task side input SSPs: Set()
2019-04-25 19:11:07.613 [main] TableManager [INFO] Added 1 tables
2019-04-25 19:11:07.613 [main] SamzaContainer$ [INFO] Got table manager
2019-04-25 19:11:07.620 [main] RunLoopFactory [INFO] Got window milliseconds: -1.
2019-04-25 19:11:07.620 [main] RunLoopFactory [INFO] Got commit milliseconds: 60000.
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got taskMaxConcurrency: 1.
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got asyncCommitEnabled: false.
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got callbackTimeout: -1.
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got maxIdleMs: 10.
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Run loop in asynchronous mode.
2019-04-25 19:11:07.630 [main] NoThrottlingDiskQuotaPolicy [INFO] Using a no throttling disk quota policy
2019-04-25 19:11:07.631 [main] SamzaContainer$ [INFO] Disk quotas disabled because polling interval is not set (container.disk.poll.interval.ms)
2019-04-25 19:11:07.632 [main] SamzaContainer$ [INFO] Samza container setup complete.
2019-04-25 19:11:07.633 [main] ContainerLaunchUtil [INFO] Got execution environment container id: container_1556153599011_0008_01_000002
2019-04-25 19:11:07.635 [main] ContainerHeartbeatMonitor [INFO] Starting ContainerHeartbeatMonitor
2019-04-25 19:11:07.637 [main] SamzaContainer [INFO] Starting container.
2019-04-25 19:11:07.637 [main] ContainerLaunchUtil [INFO] Before starting the container.
2019-04-25 19:11:07.643 [main] JmxServer [INFO] According to Util.getLocalHost.getHostName we are 172.21.224.112
2019-04-25 19:11:07.690 [main] JmxServer [INFO] Started JmxServer registry port=35836 server port=37870 url=service:jmx:rmi://localhost:37870/jndi/rmi://localhost:35836/jmxrmi
2019-04-25 19:11:07.691 [main] JmxServer [INFO] If you are tunneling, you might want to try JmxServer registry port=35836 server port=37870 url=service:jmx:rmi://172.21.224.112:37870/jndi/rmi://172.21.224.112:35836/jmxrmi
2019-04-25 19:11:07.692 [main] SamzaContainer [INFO] Registering task instance metrics.
2019-04-25 19:11:07.694 [main] SamzaContainer [INFO] Starting JVM metrics.
2019-04-25 19:11:07.694 [main] SamzaContainer [INFO] Starting metrics reporters.
2019-04-25 19:11:07.695 [main] SamzaContainer [INFO] Starting admin multiplexer.
2019-04-25 19:11:07.696 [main] SamzaContainer [INFO] Registering task instances with offsets.
2019-04-25 19:11:07.698 [main] SamzaContainer [INFO] Starting offset manager.
2019-04-25 19:11:07.707 [main] OffsetManager [INFO] Successfully loaded last processed offsets: {}
2019-04-25 19:11:07.708 [main] OffsetManager [INFO] Successfully loaded starting offsets: Map(Partition 0 -> Map(SystemStreamPartition [kafka, profile-table-input, 0] -> 0, SystemStreamPartition [kafka, pageview-join-input, 0] -> 0, SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] -> 0))
2019-04-25 19:11:07.709 [main] SamzaContainer [INFO] Starting container storage manager.
2019-04-25 19:11:07.709 [main] ContainerStorageManager [INFO] Store Restore started
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Got non logged storage partition directory as /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Got logged storage partition directory as /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Deleting logged storage partition directory /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0
2019-04-25 19:11:07.713 [main] ContainerStorageManager [INFO] Using non logged storage partition directory: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 for store: profile-table
2019-04-25 19:11:07.713 [main] ContainerStorageManager [INFO] Validating change log streams: {}
2019-04-25 19:11:07.714 [main] ContainerStorageManager [INFO] Got change log stream metadata: {}
2019-04-25 19:11:07.715 [main] ContainerStorageManager [INFO] Assigning oldest change log offsets for taskName Partition 0 : {}
2019-04-25 19:11:07.716 [Samza Restore Thread-0] ContainerStorageManager [INFO] Starting stores in task instance Partition 0
2019-04-25 19:11:07.754 [Samza Restore Thread-0] ContainerStorageManager [INFO] Stopped persistent stores {profile-table=org.apache.samza.storage.kv.KeyValueStorageEngine@37a0659a}
2019-04-25 19:11:07.756 [main] ContainerStorageManager [INFO] Re-created store profile-table in read-write mode for task Partition 0 because it a persistent store
2019-04-25 19:11:07.756 [main] ContainerStorageManager [INFO] Store Restore complete
2019-04-25 19:11:07.757 [main] SamzaContainer [INFO] Starting table manager in task instance Partition 0
2019-04-25 19:11:07.758 [main] LocalTableProvider [INFO] Initializing table provider for table profile-table
2019-04-25 19:11:07.758 [main] LocalTableProvider [INFO] Initialized backing store for table profile-table
2019-04-25 19:11:07.758 [main] SamzaContainer [INFO] Starting host statistics monitor
2019-04-25 19:11:07.760 [main] SamzaContainer [INFO] Registering task instances with producers.
2019-04-25 19:11:07.762 [main] SamzaContainer [INFO] Starting producer multiplexer.
2019-04-25 19:11:07.763 [main] ProducerConfig [INFO] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = kafka_producer-pageview_profile_table_joiner-1
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 10
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2019-04-25 19:11:07.766 [main] AppInfoParser [INFO] Kafka version : 2.0.1
2019-04-25 19:11:07.766 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5
2019-04-25 19:11:07.766 [main] AppInfoParser [WARN] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka_producer-pageview_profile_table_joiner-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272)
at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$1.apply(KafkaSystemFactory.scala:75)
at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$1.apply(KafkaSystemFactory.scala:74)
at org.apache.samza.system.kafka.KafkaSystemProducer.start(KafkaSystemProducer.scala:53)
at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41)
at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.system.SystemProducers.start(SystemProducers.scala:41)
at org.apache.samza.container.SamzaContainer.startProducers(SamzaContainer.scala:974)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:750)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:139)
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75)
2019-04-25 19:11:07.767 [main] SamzaContainer [INFO] Initializing stream tasks.
2019-04-25 19:11:07.785 [main] OperatorImplGraph [INFO] SystemStream [system=kafka, stream=pageview-profile-table-joiner-1-partition_by-join] has 1 producer tasks.
2019-04-25 19:11:07.797 [main] SamzaContainer [INFO] Registering task instances with consumers.
2019-04-25 19:11:07.801 [main] SamzaContainer [INFO] Starting consumer multiplexer.
2019-04-25 19:11:07.804 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Consumer subscribes to [pageview-profile-table-joiner-1-partition_by-join-0, profile-table-input-0, pageview-join-input-0]
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: fetchThresholdBytes = -1; fetchThreshold=10000; numPartitions=3, perPartitionFetchThreshold=3333, perPartitionFetchThresholdBytes(0 if disabled)=0
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: pageview-profile-table-joiner-1-partition_by-join-0 to 0.
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: profile-table-input-0 to 0.
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, profile-table-input, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: pageview-join-input-0 to 0.
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, pageview-join-input, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Starting proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Starting KafkaConsumerProxy polling thread for consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1
2019-04-25 19:11:07.805 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Starting consumer poll thread Samza KafkaConsumerProxy Poll Thread-2 - kafka for system kafka
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Consumer started
2019-04-25 19:11:07.806 [main] TieredPriorityChooser [INFO] Starting priority chooser with priorities: Map(SystemStream [system=kafka, stream=pageview-join-input] -> 0, SystemStream [system=kafka, stream=pageview-profile-table-joiner-1-partition_by-join] -> 2147483647, SystemStream [system=kafka, stream=profile-table-input] -> 0)
2019-04-25 19:11:07.807 [main] TieredPriorityChooser [INFO] Priority chooser has a default chooser: org.apache.samza.system.chooser.RoundRobinChooser@59221b97
2019-04-25 19:11:07.808 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] Metadata [INFO] Cluster ID: 4lG0x-HeQPiUY-OWu7MHFg
2019-04-25 19:11:07.810 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] is 0 (end=0, startOffset=0)
2019-04-25 19:11:07.810 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, profile-table-input, 0] is 0 (end=0, startOffset=0)
2019-04-25 19:11:07.811 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, pageview-join-input, 0] is 0 (end=0, startOffset=0)
2019-04-25 19:11:07.820 [main] SamzaContainer [INFO] Entering run loop.
2019-04-25 19:11:07.821 [main] ContainerLaunchUtil [INFO] Container Started
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment