Last active
April 26, 2019 16:54
-
-
Save shanthoosh/88ef289cb98d6d3be9db9fb76bc04472 to your computer and use it in GitHub Desktop.
Logs to demonstrate that ApplicatoinMaster ignores configurations with serialized null values from the coordinator stream.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Published the samza reserved configurations with serialized null values into coordinator stream: | |
```java | |
╰─λ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __samza_coordinator_pageview-profile-table-joiner_1 --property print.key=true --property key.separator="-" --from-beginning | grep "set-config" | |
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205737} | |
[1,"set-config","tables.profile-table.provider.factory"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"org.apache.samza.storage.kv.LocalTableProviderFactory"},"username":"svenkata","timestamp":1556154205791} | |
[1,"set-config","systems.kafka.samza.factory"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"org.apache.samza.system.kafka.KafkaSystemFactory"},"username":"svenkata","timestamp":1556154205792} | |
[1,"set-config","job.default.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205792} | |
[1,"set-config","job.id"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1"},"username":"svenkata","timestamp":1556154205792} | |
[1,"set-config","serializers.registry.StringSerde-32e63a08-f16c-43ac-8261-d24f4b4badef.samza.serialized.instance"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg="},"username":"svenkata","timestamp":1556154205793} | |
[1,"set-config","systems.kafka.default.stream.replication.factor"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1"},"username":"svenkata","timestamp":1556154205793} | |
[1,"set-config","stores.profile-table.key.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"StringSerde-32e63a08-f16c-43ac-8261-d24f4b4badef"},"username":"svenkata","timestamp":1556154205794} | |
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"2147483647"},"username":"svenkata","timestamp":1556154205794} | |
[1,"set-config","serializers.registry.JsonSerdeV2-e4f7373f-da63-4176-a9e8-c4c404e8f925.samza.serialized.instance"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA=="},"username":"svenkata","timestamp":1556154205795} | |
[1,"set-config","streams.enriched-pageview-join-output.samza.system"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"kafka"},"username":"svenkata","timestamp":1556154205795} | |
[1,"set-config","streams.pageview-join-input.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-d13051cf-8262-4e94-962c-96c3b36fcbfb"},"username":"svenkata","timestamp":1556154205796} | |
[1,"set-config","app.run.id"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"1556154204508-063c4569"},"username":"svenkata","timestamp":1556154205796} | |
[1,"set-config","job.name"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"pageview-profile-table-joiner"},"username":"svenkata","timestamp":1556154205797} | |
[1,"set-config","streams.profile-table-input.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-e4f7373f-da63-4176-a9e8-c4c404e8f925"},"username":"svenkata","timestamp":1556154205797} | |
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"JsonSerdeV2-d13051cf-8262-4e94-962c-96c3b36fcbfb"},"username":"svenkata","timestamp":1556154205798} | |
[1,"set-config","systems.kafka.producer.bootstrap.servers"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"localhost:9092"},"username":"svenkata","timestamp":1556154205798} | |
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"true"},"username":"svenkata","timestamp":1556154205799} | |
[1,"set-config","streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"oldest"},"username":"svenkata","timestamp":1556154205799} | |
[1,"set-config","systems.kafka.consumer.zookeeper.connect"]-{"host":"172.21.224.112","source":"job-runner","values":{"value":"localhost:2181"},"username":"svenkata","timestamp":1556154205799} | |
// Samza specific configuration keys with value serialized to null and stored in coordinator stream. | |
[1,"set-config","serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance"]-null | |
[1,"set-config","serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance"]-null | |
[1,"set-config","samza.autoscaling.server.url"]-null | |
[1,"set-config","serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance"]-null | |
[1,"set-config","serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance"]-null | |
[1,"set-config","serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance"]-null | |
[1,"set-config","serializers.registry.last-message-interaction-v2-specific-record-to-json.class"]-null | |
[1,"set-config","serializers.registry.recipient-offline-score-specific-record-to-json.class"]-null | |
[1,"set-config","stores.last-message-interaction-v2.changelog"]-null | |
[1,"set-config","stores.last-message-interaction-v2.factory"]-null | |
[1,"set-config","stores.last-message-interaction-v2.key.serde"]-null | |
[1,"set-config","stores.last-message-interaction-v2.rocksdb.ttl.ms"]-null | |
[1,"set-config","stores.last-message-interaction-v2.msg.serde"]-null | |
[1,"set-config","stores.recipient-offline-score.msg.serde"]-null | |
[1,"set-config","stores.recipient-offline-score.factory"]-null | |
[1,"set-config","stores.recipient-offline-score.key.class"]-null | |
[1,"set-config","stores.recipient-offline-score.changelog.replication.factor"]-null | |
[1,"set-config","stores.recipient-offline-score.changelog.replication.factor"]-null | |
[1,"set-config","stores.recipient-offline-score.key.serde"]-null | |
[1,"set-config","stores.recipient-offline-score.changelog"]-null | |
``` | |
Without the patch, if we try to run the job, the SamzaContainer fails with the exception: | |
```java | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1) | |
╰─λ tail -n 150 ./deploy/yarn/logs/userlogs/application_1556153599011_0007/container_1556153599011_0007_01_000002/samza-container-0.log | head -n 100 0 < 19:01:59 | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
2019-04-25 19:00:30.081 [main] ConsumerConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config. | |
2019-04-25 19:00:30.081 [main] AppInfoParser [INFO] Kafka version : 2.0.1 | |
2019-04-25 19:00:30.081 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5 | |
2019-04-25 19:00:30.081 [main] AppInfoParser [WARN] Error registering AppInfo mbean | |
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka_consumer-pageview_profile_table_joiner-1 | |
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) | |
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) | |
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596) | |
at org.apache.samza.system.kafka.KafkaSystemConsumer.createKafkaConsumerImpl(KafkaSystemConsumer.java:146) | |
at org.apache.samza.system.kafka.KafkaSystemFactory.getConsumer(KafkaSystemFactory.scala:57) | |
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:223) | |
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:219) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) | |
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) | |
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) | |
at scala.collection.SetLike$class.map(SetLike.scala:92) | |
at scala.collection.AbstractSet.map(Set.scala:47) | |
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:219) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71) | |
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75) | |
2019-04-25 19:00:30.082 [main] KafkaSystemFactory [INFO] Created kafka consumer for system kafka, clientId kafka_consumer-pageview_profile_table_joiner-1: org.apache.kafka.clients.consumer.KafkaConsumer@2a8d39c4 | |
2019-04-25 19:00:30.082 [main] KafkaConsumerProxy [INFO] Creating KafkaConsumerProxy with systeName=kafka, clientId=kafka_consumer-pageview_profile_table_joiner-1, metricsName=kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.082 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Created proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.083 [main] KafkaSystemFactory [INFO] Created samza system consumer for system kafka, config {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, stores.recipient-offline-score.key.class=null, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, stores.recipient-offline-score.msg.serde=null, random-key=null, stores.recipient-offline-score.factory=null, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-3bcc5e3b-5346-4aea-be61-b40848f2bbd3, serializers.registry.StringSerde-a158545f-60db-44e5-b8f4-60dfe4a95f2b.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, stores.recipient-offline-score.key.serde=null, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f, stores.recipient-offline-score.changelog=null, serializers.registry.JsonSerdeV2-4ed15e46-b2c2-4fdf-be27-944aee04360e.samza.serialized.instance=null, app.run.id=1556244016574-255e6fcb, serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance=null, job.name=pageview-profile-table-joiner, serializers.registry.JsonSerdeV2-55b1464f-2840-4104-be32-e378cfdae26d.samza.serialized.instance=null, systems.kafka.producer.bootstrap.servers=localhost:9092, serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, stores.last-message-interaction-v2.key.serde=null, serializers.registry.JsonSerdeV2-2f3f4c41-7d3c-4240-b1fb-52cddba57693.samza.serialized.instance=null, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, serializers.registry.StringSerde-ae153054-890f-422a-b98c-02c18761dc08.samza.serialized.instance=null, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, stores.last-message-interaction-v2.changelog=null, serializers.registry.JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, stores.last-message-interaction-v2.factory=null, app.mode=STREAM, stores.recipient-offline-score.changelog.replication.factor=null, app.class=samza.examples.cookbook.StreamTableJoinExample, serializers.registry.JsonSerdeV2-72395b28-b1a4-40f8-9203-2fe24128b971.samza.serialized.instance=null, serializers.registry.JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, job.container.count=1, serializers.registry.JsonSerdeV2-ab33ac1d-459a-4fcb-a9c4-cf2009db8d3c.samza.serialized.instance=null, job.default.system=kafka, job.id=1, serializers.registry.JsonSerdeV2-aa46a488-ac4b-4a64-ba94-8391491aacfc.samza.serialized.instance=null, serializers.registry.StringSerde-3bcc5e3b-5346-4aea-be61-b40848f2bbd3.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance=null, samza.autoscaling.server.url=null, serializers.registry.StringSerde-d221cb99-c56e-4924-9699-6f12ebddb40e.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.enriched-pageview-join-output.samza.system=kafka, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-ac719c4d-352b-4b2f-af94-b012aa43e42f, stores.last-message-interaction-v2.rocksdb.ttl.ms=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, systems.kafka.consumer.zookeeper.connect=localhost:2181, stores.last-message-interaction-v2.msg.serde=null, streams.pageview-join-input.samza.system=kafka, serializers.registry.StringSerde-60bbf2c3-0787-4399-8aed-4e2632d32656.samza.serialized.instance=null, serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance=null, serializers.registry.last-message-interaction-v2-specific-record-to-json.class=null, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-d221cb99-c56e-4924-9699-6f12ebddb40e, task.window.ms=-1, serializers.registry.recipient-offline-score-specific-record-to-json.class=null, streams.profile-table-input.samza.system=kafka, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-6d178166-b28d-4513-b620-e466f9f12a59, serializers.registry.StringSerde-409f4a88-7605-4d43-830e-c27e9ad3baed.samza.serialized.instance=null, stores.profile-table.msg.serde=JsonSerdeV2-5cb36170-834d-4a93-8085-993d7b24714c, serializers.registry.JsonSerdeV2-6d178166-b28d-4513-b620-e466f9f12a59.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==}: kafka:kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.083 [main] SamzaContainer$ [INFO] Got system consumers: Set(kafka) | |
2019-04-25 19:00:30.084 [main] JobPlanner [WARN] job.id is a deprecated configuration, use app.id instead. | |
2019-04-25 19:00:30.084 [main] JobPlanner [WARN] job.name is a deprecated configuration, use use app.name instead. | |
2019-04-25 19:00:30.085 [main] KafkaSystemFactory [INFO] Creating kafka producer for system kafka, producerClientId kafka_producer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.086 [main] SamzaContainer$ [INFO] Got system producers: Set(kafka) | |
2019-04-25 19:00:30.089 [main] Util$ [ERROR] Unable to create an instance for class null. | |
java.lang.NullPointerException | |
at java.lang.Class.forName0(Native Method) | |
at java.lang.Class.forName(Class.java:264) | |
at org.apache.samza.util.Util$.getObj(Util.scala:56) | |
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:255) | |
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:250) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.immutable.Set$Set2.foreach(Set.scala:128) | |
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) | |
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) | |
at scala.collection.SetLike$class.map(SetLike.scala:92) | |
at scala.collection.AbstractSet.map(Set.scala:47) | |
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:250) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71) | |
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75) | |
2019-04-25 19:00:30.089 [main] CoordinatorStreamStore [INFO] Stopping the coordinator stream system consumer. | |
2019-04-25 19:00:30.094 [main] KafkaSystemProducer [INFO] Stopping producer for system: kafka | |
2019-04-25 19:00:30.094 [main] KafkaProducer [INFO] [Producer clientId=kafka_producer-pageview_profile_table_joiner-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. | |
2019-04-25 19:00:30.095 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Stopping Samza kafkaConsumer | |
2019-04-25 19:00:30.095 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Stopping proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.096 [main] KafkaConsumerProxy [INFO] Shutting down KafkaConsumerProxy poll thread Samza KafkaConsumerProxy Poll Thread-1 - kafka for consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:00:30.376 [Samza KafkaConsumerProxy Poll Thread-1 - kafka] KafkaConsumerProxy [INFO] KafkaConsumerProxy for system kafka has stopped. | |
2019-04-25 19:00:30.376 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Closing kafkaSystemConsumer org.apache.kafka.clients.consumer.KafkaConsumer@26df6e3a | |
2019-04-25 19:00:30.379 [main] SamzaUncaughtExceptionHandler [ERROR] Uncaught exception in thread main. | |
java.lang.NullPointerException | |
at java.lang.Class.forName0(Native Method) | |
at java.lang.Class.forName(Class.java:264) | |
at org.apache.samza.util.Util$.getObj(Util.scala:56) | |
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:255) | |
at org.apache.samza.container.SamzaContainer$$anonfun$18.apply(SamzaContainer.scala:250) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.immutable.Set$Set2.foreach(Set.scala:128) | |
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) | |
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) | |
at scala.collection.SetLike$class.map(SetLike.scala:92) | |
at scala.collection.AbstractSet.map(Set.scala:47) | |
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:250) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71) | |
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75) | |
2019-04-25 19:00:30.391 [main] ThreadUtil [INFO] Thread dump from uncaught exception handler. | |
"kafka-admin-client-thread | adminclient-2" Id=23 RUNNABLE (in native) | |
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) | |
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) | |
``` | |
After deploying the change, from the logs we can see that the configurations with null values are ignored in the ApplicationMaster. | |
``` | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-1/samza (master↑1 ✚1) | |
╰─λ ./gradlew publishToMavenLocal | |
To honour the JVM settings for this build a new JVM will be forked. Please consider using the daemon: https://docs.gradle.org/2.8/userguide/gradle_daemon.html. | |
:samza-api:generatePomFileForMavenJavaPublication | |
:samza-api:compileJava UP-TO-DATE | |
:samza-api:processResources UP-TO-DATE | |
:samza-api:classes UP-TO-DATE | |
:samza-api:jar UP-TO-DATE | |
:samza-api:publishMavenJavaPublicationToMavenLocal | |
:samza-api:publishToMavenLocal | |
:samza-autoscaling_2.11:generatePomFileForMavenJavaPublication | |
:samza-core_2.11:compileJava UP-TO-DATE | |
:samza-core_2.11:compileScala | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:298: warning: non-variable type argument Any in type org.apache.samza.serializers.NoOpSerde[Any] is unchecked since it is eliminated by erasure | |
[ant:scalac] .filter(!_.isInstanceOf[NoOpSerde[Any]]) | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:317: warning: non-variable type argument Any in type org.apache.samza.serializers.NoOpSerde[Any] is unchecked since it is eliminated by erasure | |
[ant:scalac] .filter(!_.isInstanceOf[NoOpSerde[Any]]) | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:71: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information. | |
[ant:scalac] val isClosableTask = task.isInstanceOf[ClosableTask] | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:258: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information. | |
[ant:scalac] if (task.isInstanceOf[ClosableTask]) { | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:261: warning: trait ClosableTask in package task is deprecated: see corresponding Javadoc for more information. | |
[ant:scalac] task.asInstanceOf[ClosableTask].close | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:352: warning: method onCheckpoint in trait CheckpointListener is deprecated: see corresponding Javadoc for more information. | |
[ant:scalac] checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava)) | |
[ant:scalac] ^ | |
[ant:scalac] 6 warnings found | |
Note: Some input files use or override a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
Note: Some input files use unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-core_2.11:processResources UP-TO-DATE | |
:samza-core_2.11:classes | |
:samza-core_2.11:jar | |
:samza-autoscaling_2.11:compileJava UP-TO-DATE | |
:samza-autoscaling_2.11:compileScala | |
:samza-autoscaling_2.11:processResources UP-TO-DATE | |
:samza-autoscaling_2.11:classes | |
:samza-autoscaling_2.11:jar UP-TO-DATE | |
:samza-autoscaling_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-autoscaling_2.11:publishToMavenLocal | |
:samza-aws_2.11:generatePomFileForMavenJavaPublication | |
:samza-aws_2.11:compileJava | |
:samza-aws_2.11:processResources UP-TO-DATE | |
:samza-aws_2.11:classes | |
:samza-aws_2.11:jar UP-TO-DATE | |
:samza-aws_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-aws_2.11:publishToMavenLocal | |
:samza-azure_2.11:generatePomFileForMavenJavaPublication | |
:samza-azure_2.11:compileJava | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java uses or overrides a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
Note: Some input files use unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-azure_2.11:processResources UP-TO-DATE | |
:samza-azure_2.11:classes | |
:samza-azure_2.11:jar UP-TO-DATE | |
:samza-azure_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-azure_2.11:publishToMavenLocal | |
:samza-core_2.11:generatePomFileForMavenJavaPublication | |
:samza-core_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-core_2.11:publishToMavenLocal | |
:samza-elasticsearch_2.11:generatePomFileForMavenJavaPublication | |
:samza-elasticsearch_2.11:compileJava | |
:samza-elasticsearch_2.11:processResources UP-TO-DATE | |
:samza-elasticsearch_2.11:classes | |
:samza-elasticsearch_2.11:jar UP-TO-DATE | |
:samza-elasticsearch_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-elasticsearch_2.11:publishToMavenLocal | |
:samza-hdfs_2.11:generatePomFileForMavenJavaPublication | |
:samza-yarn_2.11:compileJava UP-TO-DATE | |
:samza-yarn_2.11:compileScala | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java uses unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-yarn_2.11:processResources UP-TO-DATE | |
:samza-yarn_2.11:classes | |
:samza-yarn_2.11:lesscss UP-TO-DATE | |
:samza-yarn_2.11:jar UP-TO-DATE | |
:samza-hdfs_2.11:compileJava UP-TO-DATE | |
:samza-hdfs_2.11:compileScala | |
:samza-hdfs_2.11:processResources UP-TO-DATE | |
:samza-hdfs_2.11:classes | |
:samza-hdfs_2.11:jar UP-TO-DATE | |
:samza-hdfs_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-hdfs_2.11:publishToMavenLocal | |
:samza-kafka_2.11:generatePomFileForMavenJavaPublication | |
:samza-kafka_2.11:compileJava UP-TO-DATE | |
:samza-kafka_2.11:compileScala | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala:81: warning: class ZkUtils in package utils is deprecated: This is an internal class that is no longer used by Kafka and will be removed in a future release. Please use org.apache.kafka.clients.admin.AdminClient instead. | |
[ant:scalac] val connectZk: () => ZkUtils) extends Logging { | |
[ant:scalac] ^ | |
[ant:scalac] /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala:110: warning: object ZkUtils in package utils is deprecated: This is an internal class that is no longer used by Kafka and will be removed in a future release. Please use org.apache.kafka.clients.admin.AdminClient instead. | |
[ant:scalac] ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics() | |
[ant:scalac] ^ | |
[ant:scalac] two warnings found | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java uses or overrides a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
Note: Some input files use unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-kafka_2.11:processResources UP-TO-DATE | |
:samza-kafka_2.11:classes | |
:samza-kafka_2.11:jar UP-TO-DATE | |
:samza-kafka_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-kafka_2.11:publishToMavenLocal | |
:samza-kv-inmemory_2.11:generatePomFileForMavenJavaPublication | |
:samza-kv_2.11:compileJava UP-TO-DATE | |
:samza-kv_2.11:compileScala | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java uses unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-kv_2.11:processResources UP-TO-DATE | |
:samza-kv_2.11:classes | |
:samza-kv_2.11:jar UP-TO-DATE | |
:samza-kv-inmemory_2.11:compileJava UP-TO-DATE | |
:samza-kv-inmemory_2.11:compileScala | |
:samza-kv-inmemory_2.11:processResources UP-TO-DATE | |
:samza-kv-inmemory_2.11:classes | |
:samza-kv-inmemory_2.11:jar UP-TO-DATE | |
:samza-kv-inmemory_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-kv-inmemory_2.11:publishToMavenLocal | |
:samza-kv-rocksdb_2.11:generatePomFileForMavenJavaPublication | |
:samza-kv-rocksdb_2.11:compileJava UP-TO-DATE | |
:samza-kv-rocksdb_2.11:compileScala | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java uses unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-kv-rocksdb_2.11:processResources UP-TO-DATE | |
:samza-kv-rocksdb_2.11:classes | |
:samza-kv-rocksdb_2.11:jar UP-TO-DATE | |
:samza-kv-rocksdb_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-kv-rocksdb_2.11:publishToMavenLocal | |
:samza-kv_2.11:generatePomFileForMavenJavaPublication | |
:samza-kv_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-kv_2.11:publishToMavenLocal | |
:samza-log4j2_2.11:generatePomFileForMavenJavaPublication | |
:samza-log4j2_2.11:compileJava | |
Processing annotations | |
Annotations processed | |
Processing annotations | |
No elements to process | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java uses unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-log4j2_2.11:processResources UP-TO-DATE | |
:samza-log4j2_2.11:classes | |
:samza-log4j2_2.11:jar UP-TO-DATE | |
:samza-log4j2_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-log4j2_2.11:publishToMavenLocal | |
:samza-log4j_2.11:generatePomFileForMavenJavaPublication | |
:samza-log4j_2.11:compileJava | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java uses unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-log4j_2.11:processResources UP-TO-DATE | |
:samza-log4j_2.11:classes | |
:samza-log4j_2.11:jar UP-TO-DATE | |
:samza-log4j_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-log4j_2.11:publishToMavenLocal | |
:samza-rest_2.11:generatePomFileForMavenJavaPublication | |
:samza-shell:compileJava UP-TO-DATE | |
:samza-shell:processResources UP-TO-DATE | |
:samza-shell:classes UP-TO-DATE | |
:samza-shell:jar UP-TO-DATE | |
:samza-rest_2.11:compileJava | |
Note: /home/svenkata/code/samza/samza-1/samza/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java uses or overrides a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
:samza-rest_2.11:processResources UP-TO-DATE | |
:samza-rest_2.11:classes | |
:samza-rest_2.11:jar UP-TO-DATE | |
:samza-rest_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-rest_2.11:publishToMavenLocal | |
:samza-shell:generatePomFileForMavenJavaPublication | |
:samza-shell:shellTarGz UP-TO-DATE | |
:samza-shell:publishMavenJavaPublicationToMavenLocal | |
:samza-shell:publishToMavenLocal | |
:samza-sql-shell_2.11:generatePomFileForMavenJavaPublication | |
:samza-sql_2.11:compileJava | |
Note: Some input files use or override a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
Note: Some input files use unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-sql_2.11:processResources UP-TO-DATE | |
:samza-sql_2.11:classes | |
:samza-sql_2.11:jar UP-TO-DATE | |
:samza-tools_2.11:compileJava | |
Note: Some input files use or override a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
:samza-tools_2.11:processResources UP-TO-DATE | |
:samza-tools_2.11:classes | |
:samza-tools_2.11:jar UP-TO-DATE | |
:samza-sql-shell_2.11:compileJava | |
Note: Some input files use or override a deprecated API. | |
Note: Recompile with -Xlint:deprecation for details. | |
:samza-sql-shell_2.11:processResources UP-TO-DATE | |
:samza-sql-shell_2.11:classes | |
:samza-sql-shell_2.11:jar UP-TO-DATE | |
:samza-sql-shell_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-sql-shell_2.11:publishToMavenLocal | |
:samza-sql_2.11:generatePomFileForMavenJavaPublication | |
:samza-sql_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-sql_2.11:publishToMavenLocal | |
:samza-test_2.11:generatePomFileForMavenJavaPublication | |
:samza-test_2.11:compileJava UP-TO-DATE | |
:samza-test_2.11:compileScala | |
Note: Some input files use unchecked or unsafe operations. | |
Note: Recompile with -Xlint:unchecked for details. | |
:samza-test_2.11:processResources UP-TO-DATE | |
:samza-test_2.11:classes | |
:samza-test_2.11:jar UP-TO-DATE | |
:samza-test_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-test_2.11:publishToMavenLocal | |
:samza-tools_2.11:generatePomFileForMavenJavaPublication | |
:samza-tools_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-tools_2.11:publishToMavenLocal | |
:samza-yarn_2.11:generatePomFileForMavenJavaPublication | |
:samza-yarn_2.11:publishMavenJavaPublicationToMavenLocal | |
:samza-yarn_2.11:publishToMavenLocal | |
BUILD SUCCESSFUL | |
Total time: 46.454 secs | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1) | |
╰─λ grep 'Ignoring it.' ./deploy/yarn/logs/userlogs/application_1556153599011_0008/container_1556153599011_0008_01_000001/samza-job-coordinator.log 0 < 19:12:25 | |
╰─λ grep 'Ignoring it' deploy/yarn/logs/userlogs/application_1556153599011_0009/container_1556153599011_0009_01_000001/samza-job-coordinator.log 0 < 09:52:53 | |
2019-04-26 09:52:48.646 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.key.class is empty or null. Ignoring it. | |
2019-04-26 09:52:48.646 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.msg.serde is empty or null. Ignoring it. | |
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: random-key is empty or null. Ignoring it. | |
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.factory is empty or null. Ignoring it. | |
2019-04-26 09:52:48.647 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-a158545f-60db-44e5-b8f4-60dfe4a95f2b.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.648 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.key.serde is empty or null. Ignoring it. | |
2019-04-26 09:52:48.648 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.changelog is empty or null. Ignoring it. | |
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-4ed15e46-b2c2-4fdf-be27-944aee04360e.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-a6eef306-2c02-4bdd-b005-b8d9e34431b4.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.649 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-55b1464f-2840-4104-be32-e378cfdae26d.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.650 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-85e21381-bf13-4212-8047-63de5f488ca0.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.650 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-52d71216-187a-4fe4-a362-13faaf079dfc.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.651 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.key.serde is empty or null. Ignoring it. | |
2019-04-26 09:52:48.651 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-2f3f4c41-7d3c-4240-b1fb-52cddba57693.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-ae153054-890f-422a-b98c-02c18761dc08.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.changelog is empty or null. Ignoring it. | |
2019-04-26 09:52:48.652 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.factory is empty or null. Ignoring it. | |
2019-04-26 09:52:48.653 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.recipient-offline-score.changelog.replication.factor is empty or null. Ignoring it. | |
2019-04-26 09:52:48.653 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-72395b28-b1a4-40f8-9203-2fe24128b971.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-ab33ac1d-459a-4fcb-a9c4-cf2009db8d3c.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-aa46a488-ac4b-4a64-ba94-8391491aacfc.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.654 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-fc28efcb-dc7b-4a8c-bb27-8544e84753a0.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.655 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.rocksdb.ttl.ms is empty or null. Ignoring it. | |
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: stores.last-message-interaction-v2.msg.serde is empty or null. Ignoring it. | |
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-60bbf2c3-0787-4399-8aed-4e2632d32656.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.656 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.JsonSerdeV2-20ea4875-8d9d-4d06-9c4b-b699af3c35e8.samza.serialized.instance is empty or null. Ignoring it. | |
2019-04-26 09:52:48.657 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.last-message-interaction-v2-specific-record-to-json.class is empty or null. Ignoring it. | |
2019-04-26 09:52:48.657 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.recipient-offline-score-specific-record-to-json.class is empty or null. Ignoring it. | |
2019-04-26 09:52:48.658 [main] CoordinatorStreamUtil$ [INFO] Value for key: serializers.registry.StringSerde-409f4a88-7605-4d43-830e-c27e9ad3baed.samza.serialized.instance is empty or null. Ignoring it. | |
``` | |
From logs, we can conclude that the SamzaContainer had started up successfully and the configuration-map read in the SamzaContainer did not have any string key with null value: | |
```java | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza (latest ✚1) | |
╰─λ tail -n 250 ./deploy/yarn/logs/userlogs/application_1556153599011_0008/container_1556153599011_0008_01_000002/samza-container-0.log 0 < 19:14:12 | |
2019-04-25 19:11:07.220 [main] SamzaContainer$ [INFO] Using configuration: {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, job.default.system=kafka, serializers.registry.StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, job.id=1, serializers.registry.JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, streams.enriched-pageview-join-output.samza.system=kafka, serializers.registry.JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, app.run.id=1556244654191-a2723d2d, job.name=pageview-profile-table-joiner, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, systems.kafka.producer.bootstrap.servers=localhost:9092, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, serializers.registry.JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==, serializers.registry.StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, systems.kafka.consumer.zookeeper.connect=localhost:2181, streams.pageview-join-input.samza.system=kafka, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, task.window.ms=-1, streams.profile-table-input.samza.system=kafka, app.mode=STREAM, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, app.class=samza.examples.cookbook.StreamTableJoinExample, stores.profile-table.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, job.container.count=1} | |
2019-04-25 19:11:07.333 [main] ConsumerConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config. | |
2019-04-25 19:11:07.333 [main] AppInfoParser [INFO] Kafka version : 2.0.1 | |
2019-04-25 19:11:07.333 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5 | |
2019-04-25 19:11:07.333 [main] AppInfoParser [WARN] Error registering AppInfo mbean | |
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka_consumer-pageview_profile_table_joiner-1 | |
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) | |
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) | |
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596) | |
at org.apache.samza.system.kafka.KafkaSystemConsumer.createKafkaConsumerImpl(KafkaSystemConsumer.java:146) | |
at org.apache.samza.system.kafka.KafkaSystemFactory.getConsumer(KafkaSystemFactory.scala:57) | |
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:223) | |
at org.apache.samza.container.SamzaContainer$$anonfun$14.apply(SamzaContainer.scala:219) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) | |
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) | |
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) | |
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) | |
at scala.collection.SetLike$class.map(SetLike.scala:92) | |
at scala.collection.AbstractSet.map(Set.scala:47) | |
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:219) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:93) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71) | |
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75) | |
2019-04-25 19:11:07.334 [main] KafkaSystemFactory [INFO] Created kafka consumer for system kafka, clientId kafka_consumer-pageview_profile_table_joiner-1: org.apache.kafka.clients.consumer.KafkaConsumer@4c398c80 | |
2019-04-25 19:11:07.334 [main] KafkaConsumerProxy [INFO] Creating KafkaConsumerProxy with systeName=kafka, clientId=kafka_consumer-pageview_profile_table_joiner-1, metricsName=kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.334 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Created proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.334 [main] KafkaSystemFactory [INFO] Created samza system consumer for system kafka, config {streams.pageview-profile-table-joiner-1-partition_by-join.samza.system=kafka, tables.profile-table.provider.factory=org.apache.samza.storage.kv.LocalTableProviderFactory, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, job.default.system=kafka, serializers.registry.StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, job.id=1, serializers.registry.JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAkc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5Qcm9maWxlAAAAAAAAAAAAAAB4cA==, systems.kafka.default.stream.replication.factor=1, stores.profile-table.key.serde=StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, streams.pageview-profile-table-joiner-1-partition_by-join.samza.priority=2147483647, streams.enriched-pageview-join-output.samza.system=kafka, serializers.registry.JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgAlc2FtemEuZXhhbXBsZXMuY29va2Jvb2suZGF0YS5QYWdlVmlldwAAAAAAAAAAAAAAeHA=, streams.pageview-join-input.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, app.run.id=1556244654191-a2723d2d, job.name=pageview-profile-table-joiner, streams.profile-table-input.samza.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, systems.kafka.producer.bootstrap.servers=localhost:9092, streams.pageview-profile-table-joiner-1-partition_by-join.samza.msg.serde=JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053, serializers.registry.JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLkpzb25TZXJkZVYyfnqWnLch0OMCAAFMAAVjbGF6enQAEUxqYXZhL2xhbmcvQ2xhc3M7eHB2cgA/c2FtemEuZXhhbXBsZXMuY29va2Jvb2suU3RyZWFtVGFibGVKb2luRXhhbXBsZSRFbnJpY2hlZFBhZ2VWaWV3AAAAAAAAAAAAAAB4cA==, serializers.registry.StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9.samza.serialized.instance=rO0ABXNyAChvcmcuYXBhY2hlLnNhbXphLnNlcmlhbGl6ZXJzLlN0cmluZ1NlcmRlRRMYxG2+VrsCAAFMAAhlbmNvZGluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdAAFVVRGLTg=, streams.pageview-profile-table-joiner-1-partition_by-join.samza.delete.committed.messages=true, streams.pageview-profile-table-joiner-1-partition_by-join.samza.offset.default=oldest, systems.kafka.consumer.zookeeper.connect=localhost:2181, streams.pageview-join-input.samza.system=kafka, streams.pageview-profile-table-joiner-1-partition_by-join.samza.physical.name=pageview-profile-table-joiner-1-partition_by-join, samza.internal.execution.plan={"jobs":[{"jobName":"pageview-profile-table-joiner","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":["pageview-profile-table-joiner-1-join-6"]},{"streamId":"profile-table-input","nextOperatorIds":["pageview-profile-table-joiner-1-map-2"]},{"streamId":"pageview-join-input","nextOperatorIds":["pageview-profile-table-joiner-1-partition_by-join"]}],"outputStreams":[{"streamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},{"streamId":"enriched-pageview-join-output","nextOperatorIds":[]}],"operators":{"pageview-profile-table-joiner-1-partition_by-join":{"opId":"pageview-profile-table-joiner-1-partition_by-join","opCode":"PARTITION_BY","sourceLocation":"StreamTableJoinExample.java:127","outputStreamId":"pageview-profile-table-joiner-1-partition_by-join","nextOperatorIds":[]},"pageview-profile-table-joiner-1-map-2":{"opId":"pageview-profile-table-joiner-1-map-2","opCode":"MAP","sourceLocation":"StreamTableJoinExample.java:123","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-3"]},"pageview-profile-table-joiner-1-send_to-3":{"opId":"pageview-profile-table-joiner-1-send_to-3","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:124","nextOperatorIds":[]},"pageview-profile-table-joiner-1-send_to-7":{"opId":"pageview-profile-table-joiner-1-send_to-7","opCode":"SEND_TO","sourceLocation":"StreamTableJoinExample.java:129","outputStreamId":"enriched-pageview-join-output","nextOperatorIds":[]},"pageview-profile-table-joiner-1-join-6":{"opId":"pageview-profile-table-joiner-1-join-6","tableId":"profile-table","opCode":"JOIN","sourceLocation":"StreamTableJoinExample.java:128","nextOperatorIds":["pageview-profile-table-joiner-1-send_to-7"]}}}}],"sourceStreams":{"profile-table-input":{"streamSpec":{"id":"profile-table-input","systemName":"kafka","physicalName":"profile-table-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]},"pageview-join-input":{"streamSpec":{"id":"pageview-join-input","systemName":"kafka","physicalName":"pageview-join-input","partitionCount":1},"sourceJobs":[],"targetJobs":["pageview-profile-table-joiner"]}},"sinkStreams":{"enriched-pageview-join-output":{"streamSpec":{"id":"enriched-pageview-join-output","systemName":"kafka","physicalName":"enriched-pageview-join-output","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":[]}},"intermediateStreams":{"pageview-profile-table-joiner-1-partition_by-join":{"streamSpec":{"id":"pageview-profile-table-joiner-1-partition_by-join","systemName":"kafka","physicalName":"pageview-profile-table-joiner-1-partition_by-join","partitionCount":1},"sourceJobs":["pageview-profile-table-joiner"],"targetJobs":["pageview-profile-table-joiner"]}},"tables":{"profile-table":{"id":"profile-table","providerFactory":"org.apache.samza.storage.kv.LocalTableProviderFactory"}},"applicationName":"pageview-profile-table-joiner","applicationId":"1"}, streams.pageview-profile-table-joiner-1-partition_by-join.samza.intermediate=true, task.inputs=kafka.pageview-profile-table-joiner-1-partition_by-join,kafka.profile-table-input,kafka.pageview-join-input, streams.pageview-profile-table-joiner-1-partition_by-join.replication.factor=1, streams.pageview-profile-table-joiner-1-partition_by-join.samza.key.serde=StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, stores.profile-table.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory, job.factory.class=org.apache.samza.job.yarn.YarnJobFactory, yarn.package.path=file:///home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/samza-hello-samza/target/hello-samza-1.1.0-dist.tar.gz, task.window.ms=-1, streams.profile-table-input.samza.system=kafka, app.mode=STREAM, streams.enriched-pageview-join-output.samza.msg.serde=JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, app.class=samza.examples.cookbook.StreamTableJoinExample, stores.profile-table.msg.serde=JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, job.container.count=1}: kafka:kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.335 [main] SamzaContainer$ [INFO] Got system consumers: Set(kafka) | |
2019-04-25 19:11:07.336 [main] JobPlanner [WARN] job.id is a deprecated configuration, use app.id instead. | |
2019-04-25 19:11:07.336 [main] JobPlanner [WARN] job.name is a deprecated configuration, use use app.name instead. | |
2019-04-25 19:11:07.336 [main] KafkaSystemFactory [INFO] Creating kafka producer for system kafka, producerClientId kafka_producer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.337 [main] SamzaContainer$ [INFO] Got system producers: Set(kafka) | |
2019-04-25 19:11:07.340 [main] SamzaContainer$ [INFO] Got serdes from factories: Set() | |
2019-04-25 19:11:07.357 [main] SamzaContainer$ [INFO] Got serdes from serialized instances: Set(StringSerde-bed6774e-71e2-48e7-bd1f-ea76888dd3f9, StringSerde-aeb99141-fcbe-40a0-a3fb-1c418beaf585, JsonSerdeV2-f194c542-e199-44cd-9a1d-6bf862208410, JsonSerdeV2-f763931f-bb77-4c94-81c5-32398cb9faad, JsonSerdeV2-7b09027d-762f-4f55-a393-163a41500053) | |
2019-04-25 19:11:07.393 [main] SamzaContainer$ [INFO] Got change log system streams: Map() | |
2019-04-25 19:11:07.396 [main] SamzaContainer$ [INFO] Got intermediate streams: List(pageview-profile-table-joiner-1-partition_by-join) | |
2019-04-25 19:11:07.400 [main] SamzaContainer$ [INFO] Setting up JVM metrics. | |
2019-04-25 19:11:07.405 [main] SamzaContainer$ [INFO] Setting up message chooser. | |
2019-04-25 19:11:07.428 [main] DefaultChooser [INFO] Building default chooser with: useBatching=false, useBootstrapping=false, usePriority=true | |
2019-04-25 19:11:07.433 [main] SamzaContainer$ [INFO] Setting up metrics reporters. | |
2019-04-25 19:11:07.435 [main] SamzaContainer$ [INFO] Got metrics reporters: Set() | |
2019-04-25 19:11:07.436 [main] SamzaContainer$ [INFO] Got security manager: null | |
2019-04-25 19:11:07.437 [main] SamzaContainer$ [INFO] Got checkpoint manager: null | |
2019-04-25 19:11:07.438 [main] SamzaContainer$ [INFO] Got checkpointListeners : Map() | |
2019-04-25 19:11:07.443 [main] SamzaContainer$ [INFO] Got offset manager: org.apache.samza.checkpoint.OffsetManager@2c3dec30 | |
2019-04-25 19:11:07.451 [main] SamzaContainer$ [INFO] Got storage engines: Set(profile-table) | |
2019-04-25 19:11:07.451 [main] SamzaContainer$ [INFO] Got single thread mode: false | |
2019-04-25 19:11:07.452 [main] SamzaContainer$ [INFO] Got thread pool size: 0 | |
2019-04-25 19:11:07.452 [main] TaskFactoryUtil [INFO] Got an AsyncStreamTask implementation. | |
2019-04-25 19:11:07.455 [main] SamzaContainer$ [INFO] Got default storage engine base directory: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state | |
2019-04-25 19:11:07.456 [main] SamzaContainer$ [INFO] Got base directory for non logged data stores: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state | |
2019-04-25 19:11:07.456 [main] SamzaContainer$ [WARN] No override was provided for logged store base directory. This disables local state re-use on application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment variable in all machines running the Samza container or configure job.logged.store.base.dir for your application | |
2019-04-25 19:11:07.457 [main] SamzaContainer$ [INFO] Got base directory for logged data stores: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state | |
2019-04-25 19:11:07.469 [main] ContainerStorageManager [INFO] Starting with changelogSystemStreams = {} sideInputSystemStreams = {} | |
2019-04-25 19:11:07.551 [main] RocksDbOptionsHelper [INFO] Using prepareForBulkLoad for restore to /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 | |
2019-04-25 19:11:07.564 [main] ContainerStorageManager [INFO] Created store profile-table for task Partition 0 in mode BulkLoad | |
2019-04-25 19:11:07.606 [main] SamzaContainer$ [INFO] Got task SSPs: Set(SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, profile-table-input, 0]) | |
2019-04-25 19:11:07.607 [main] SamzaContainer$ [INFO] Got task side input SSPs: Set() | |
2019-04-25 19:11:07.613 [main] TableManager [INFO] Added 1 tables | |
2019-04-25 19:11:07.613 [main] SamzaContainer$ [INFO] Got table manager | |
2019-04-25 19:11:07.620 [main] RunLoopFactory [INFO] Got window milliseconds: -1. | |
2019-04-25 19:11:07.620 [main] RunLoopFactory [INFO] Got commit milliseconds: 60000. | |
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got taskMaxConcurrency: 1. | |
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got asyncCommitEnabled: false. | |
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got callbackTimeout: -1. | |
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Got maxIdleMs: 10. | |
2019-04-25 19:11:07.621 [main] RunLoopFactory [INFO] Run loop in asynchronous mode. | |
2019-04-25 19:11:07.630 [main] NoThrottlingDiskQuotaPolicy [INFO] Using a no throttling disk quota policy | |
2019-04-25 19:11:07.631 [main] SamzaContainer$ [INFO] Disk quotas disabled because polling interval is not set (container.disk.poll.interval.ms) | |
2019-04-25 19:11:07.632 [main] SamzaContainer$ [INFO] Samza container setup complete. | |
2019-04-25 19:11:07.633 [main] ContainerLaunchUtil [INFO] Got execution environment container id: container_1556153599011_0008_01_000002 | |
2019-04-25 19:11:07.635 [main] ContainerHeartbeatMonitor [INFO] Starting ContainerHeartbeatMonitor | |
2019-04-25 19:11:07.637 [main] SamzaContainer [INFO] Starting container. | |
2019-04-25 19:11:07.637 [main] ContainerLaunchUtil [INFO] Before starting the container. | |
2019-04-25 19:11:07.643 [main] JmxServer [INFO] According to Util.getLocalHost.getHostName we are 172.21.224.112 | |
2019-04-25 19:11:07.690 [main] JmxServer [INFO] Started JmxServer registry port=35836 server port=37870 url=service:jmx:rmi://localhost:37870/jndi/rmi://localhost:35836/jmxrmi | |
2019-04-25 19:11:07.691 [main] JmxServer [INFO] If you are tunneling, you might want to try JmxServer registry port=35836 server port=37870 url=service:jmx:rmi://172.21.224.112:37870/jndi/rmi://172.21.224.112:35836/jmxrmi | |
2019-04-25 19:11:07.692 [main] SamzaContainer [INFO] Registering task instance metrics. | |
2019-04-25 19:11:07.694 [main] SamzaContainer [INFO] Starting JVM metrics. | |
2019-04-25 19:11:07.694 [main] SamzaContainer [INFO] Starting metrics reporters. | |
2019-04-25 19:11:07.695 [main] SamzaContainer [INFO] Starting admin multiplexer. | |
2019-04-25 19:11:07.696 [main] SamzaContainer [INFO] Registering task instances with offsets. | |
2019-04-25 19:11:07.698 [main] SamzaContainer [INFO] Starting offset manager. | |
2019-04-25 19:11:07.707 [main] OffsetManager [INFO] Successfully loaded last processed offsets: {} | |
2019-04-25 19:11:07.708 [main] OffsetManager [INFO] Successfully loaded starting offsets: Map(Partition 0 -> Map(SystemStreamPartition [kafka, profile-table-input, 0] -> 0, SystemStreamPartition [kafka, pageview-join-input, 0] -> 0, SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] -> 0)) | |
2019-04-25 19:11:07.709 [main] SamzaContainer [INFO] Starting container storage manager. | |
2019-04-25 19:11:07.709 [main] ContainerStorageManager [INFO] Store Restore started | |
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Got non logged storage partition directory as /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 | |
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Got logged storage partition directory as /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 | |
2019-04-25 19:11:07.711 [main] ContainerStorageManager [INFO] Deleting logged storage partition directory /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 | |
2019-04-25 19:11:07.713 [main] ContainerStorageManager [INFO] Using non logged storage partition directory: /tmp/hadoop-svenkata/nm-local-dir/usercache/svenkata/appcache/application_1556153599011_0008/container_1556153599011_0008_01_000002/state/profile-table/Partition_0 for store: profile-table | |
2019-04-25 19:11:07.713 [main] ContainerStorageManager [INFO] Validating change log streams: {} | |
2019-04-25 19:11:07.714 [main] ContainerStorageManager [INFO] Got change log stream metadata: {} | |
2019-04-25 19:11:07.715 [main] ContainerStorageManager [INFO] Assigning oldest change log offsets for taskName Partition 0 : {} | |
2019-04-25 19:11:07.716 [Samza Restore Thread-0] ContainerStorageManager [INFO] Starting stores in task instance Partition 0 | |
2019-04-25 19:11:07.754 [Samza Restore Thread-0] ContainerStorageManager [INFO] Stopped persistent stores {profile-table=org.apache.samza.storage.kv.KeyValueStorageEngine@37a0659a} | |
2019-04-25 19:11:07.756 [main] ContainerStorageManager [INFO] Re-created store profile-table in read-write mode for task Partition 0 because it a persistent store | |
2019-04-25 19:11:07.756 [main] ContainerStorageManager [INFO] Store Restore complete | |
2019-04-25 19:11:07.757 [main] SamzaContainer [INFO] Starting table manager in task instance Partition 0 | |
2019-04-25 19:11:07.758 [main] LocalTableProvider [INFO] Initializing table provider for table profile-table | |
2019-04-25 19:11:07.758 [main] LocalTableProvider [INFO] Initialized backing store for table profile-table | |
2019-04-25 19:11:07.758 [main] SamzaContainer [INFO] Starting host statistics monitor | |
2019-04-25 19:11:07.760 [main] SamzaContainer [INFO] Registering task instances with producers. | |
2019-04-25 19:11:07.762 [main] SamzaContainer [INFO] Starting producer multiplexer. | |
2019-04-25 19:11:07.763 [main] ProducerConfig [INFO] ProducerConfig values: | |
acks = 1 | |
batch.size = 16384 | |
bootstrap.servers = [localhost:9092] | |
buffer.memory = 33554432 | |
client.id = kafka_producer-pageview_profile_table_joiner-1 | |
compression.type = none | |
connections.max.idle.ms = 540000 | |
enable.idempotence = false | |
interceptor.classes = [] | |
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer | |
linger.ms = 10 | |
max.block.ms = 60000 | |
max.in.flight.requests.per.connection = 1 | |
max.request.size = 1048576 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner | |
receive.buffer.bytes = 32768 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 30000 | |
retries = 2147483647 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
transaction.timeout.ms = 60000 | |
transactional.id = null | |
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer | |
2019-04-25 19:11:07.766 [main] AppInfoParser [INFO] Kafka version : 2.0.1 | |
2019-04-25 19:11:07.766 [main] AppInfoParser [INFO] Kafka commitId : fa14705e51bd2ce5 | |
2019-04-25 19:11:07.766 [main] AppInfoParser [WARN] Error registering AppInfo mbean | |
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka_producer-pageview_profile_table_joiner-1 | |
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) | |
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) | |
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) | |
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) | |
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451) | |
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272) | |
at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$1.apply(KafkaSystemFactory.scala:75) | |
at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$1.apply(KafkaSystemFactory.scala:74) | |
at org.apache.samza.system.kafka.KafkaSystemProducer.start(KafkaSystemProducer.scala:53) | |
at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41) | |
at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41) | |
at scala.collection.Iterator$class.foreach(Iterator.scala:893) | |
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) | |
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) | |
at org.apache.samza.system.SystemProducers.start(SystemProducers.scala:41) | |
at org.apache.samza.container.SamzaContainer.startProducers(SamzaContainer.scala:974) | |
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:750) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:139) | |
at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:71) | |
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:75) | |
2019-04-25 19:11:07.767 [main] SamzaContainer [INFO] Initializing stream tasks. | |
2019-04-25 19:11:07.785 [main] OperatorImplGraph [INFO] SystemStream [system=kafka, stream=pageview-profile-table-joiner-1-partition_by-join] has 1 producer tasks. | |
2019-04-25 19:11:07.797 [main] SamzaContainer [INFO] Registering task instances with consumers. | |
2019-04-25 19:11:07.801 [main] SamzaContainer [INFO] Starting consumer multiplexer. | |
2019-04-25 19:11:07.804 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Consumer subscribes to [pageview-profile-table-joiner-1-partition_by-join-0, profile-table-input-0, pageview-join-input-0] | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: fetchThresholdBytes = -1; fetchThreshold=10000; numPartitions=3, perPartitionFetchThreshold=3333, perPartitionFetchThresholdBytes(0 if disabled)=0 | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: pageview-profile-table-joiner-1-partition_by-join-0 to 0. | |
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: profile-table-input-0 to 0. | |
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, profile-table-input, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] Updating the consumer fetch offsets of topic partition: pageview-join-input-0 to 0. | |
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Adding new topicPartition SystemStreamPartition [kafka, pageview-join-input, 0] with offset 0 to queue for consumer consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Starting proxy consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.805 [main] KafkaConsumerProxy [INFO] Starting KafkaConsumerProxy polling thread for consumerProxy-kafka-kafka_consumer-pageview_profile_table_joiner-1 | |
2019-04-25 19:11:07.805 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Starting consumer poll thread Samza KafkaConsumerProxy Poll Thread-2 - kafka for system kafka | |
2019-04-25 19:11:07.805 [main] KafkaSystemConsumer [INFO] kafka:kafka_consumer-pageview_profile_table_joiner-1: Consumer started | |
2019-04-25 19:11:07.806 [main] TieredPriorityChooser [INFO] Starting priority chooser with priorities: Map(SystemStream [system=kafka, stream=pageview-join-input] -> 0, SystemStream [system=kafka, stream=pageview-profile-table-joiner-1-partition_by-join] -> 2147483647, SystemStream [system=kafka, stream=profile-table-input] -> 0) | |
2019-04-25 19:11:07.807 [main] TieredPriorityChooser [INFO] Priority chooser has a default chooser: org.apache.samza.system.chooser.RoundRobinChooser@59221b97 | |
2019-04-25 19:11:07.808 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] Metadata [INFO] Cluster ID: 4lG0x-HeQPiUY-OWu7MHFg | |
2019-04-25 19:11:07.810 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0] is 0 (end=0, startOffset=0) | |
2019-04-25 19:11:07.810 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, profile-table-input, 0] is 0 (end=0, startOffset=0) | |
2019-04-25 19:11:07.811 [Samza KafkaConsumerProxy Poll Thread-2 - kafka] KafkaConsumerProxy [INFO] Initial lag for SSP SystemStreamPartition [kafka, pageview-join-input, 0] is 0 (end=0, startOffset=0) | |
2019-04-25 19:11:07.820 [main] SamzaContainer [INFO] Entering run loop. | |
2019-04-25 19:11:07.821 [main] ContainerLaunchUtil [INFO] Container Started | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment