Created
January 11, 2019 03:20
-
-
Save shanthoosh/07357bb615d9cbbfa23cc02b98c9d142 to your computer and use it in GitHub Desktop.
Testing the input stream expansion for stateful samza jobs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Samza job name: StreamTableJoinExample | |
Samza application implementation: https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java | |
Task to SSP assignment before the partition expansion: | |
``` | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5) | |
╰─λ grep -i 'grouped' /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/deploy/yarn/logs/userlogs/application_1547094655534_0006/container_1547094655534_0006_01_000001/samza-job-coordinator.log 0 < 19:13:09 | |
2019-01-10 18:53:57.567 [main] JobModelManager$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@58326051 has grouped the SystemStreamPartitions into 1 tasks with the following taskNames: {Partition 0=[SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, profile-table-input, 0]]} | |
``` | |
Increase the partition count of input topic pageview-join-input using the kafka-console tool to 2: | |
``` | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5) | |
╰─λ ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic pageview-join-input, --partitions 2 0 < 19:02:06 | |
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected | |
Adding partitions succeeded! | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5) | |
``` | |
Samza ApplicationMaster detected the change in partition counts and restarted the Application Master process. Here's the relevant AM logs: | |
``` | |
2019-01-10 19:03:59.841 [Samza-StreamPartitionCountMonitor-0] KafkaSystemAdmin [INFO] SystemStream partition counts for system kafka: {pageview-profile-table-joiner-1-partition_by-join=SystemStreamMetadata [streamName=pageview-profile-table-joiner-1-partition_by-join, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}], profile-table-input=SystemStreamMetadata [streamName=profile-table-input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}], pageview-join-input=SystemStreamMetadata [streamName=pageview-join-input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=1]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]} | |
2019-01-10 19:03:59.842 [Samza-StreamPartitionCountMonitor-0] StreamPartitionCountMonitor [WARN] Change of partition count detected in stream SystemStream [system=kafka, stream=pageview-join-input]. old partition count: 1, current partition count: 2 | |
2019-01-10 19:03:59.842 [Samza-StreamPartitionCountMonitor-0] StreamPartitionCountMonitor [ERROR] Shutting down (stateful) or restarting (stateless) the job since current partition count 2 is greater than the old partition count 1 for stream SystemStream [system=kafka, stream=pageview-join-input]. | |
2019-01-10 19:03:59.917 [main] ClusterBasedJobCoordinator [ERROR] Exception thrown in the JobCoordinator loop {} | |
org.apache.samza.coordinator.PartitionChangeException: Input topic partition count changes detected. | |
at org.apache.samza.clustermanager.ClusterBasedJobCoordinator.lambda$getPartitionCountMonitor$4(ClusterBasedJobCoordinator.java:320) | |
at org.apache.samza.coordinator.StreamPartitionCountMonitor.updatePartitionCountMetric(StreamPartitionCountMonitor.java:204) | |
at org.apache.samza.coordinator.StreamPartitionCountMonitor$1.run(StreamPartitionCountMonitor.java:138) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) | |
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) | |
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) | |
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) | |
2019-01-10 19:03:59.922 [main] ContainerProcessManager [INFO] Invoked stop of the Samza container process manager | |
2019-01-10 19:04:01.334 [main] ContainerProcessManager [INFO] Stopped container allocator | |
2019-01-10 19:04:01.335 [main] ContainerProcessManager [INFO] Stopped metrics reporters | |
2019-01-10 19:04:01.335 [main] YarnClusterResourceManager [INFO] Stopping AM client | |
2019-01-10 19:04:01.336 [main] SamzaYarnAppMasterLifecycle [INFO] Shutting down SamzaAppStatus: UNDEFINED yarn status: UNDEFINED | |
2019-01-10 19:04:01.337 [main] SamzaYarnAppMasterLifecycle [INFO] Not unregistering AM from the RM. This will enable RM retries | |
2019-01-10 19:04:01.338 [main] YarnClusterResourceManager [INFO] Stopping the AM service | |
``` | |
Task to system stream partition group after the partition expansion: | |
``` | |
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5) | |
╰─λ grep -i 'grouped' /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/deploy/yarn/logs/userlogs/application_1547094655534_0006/container_1547094655534_0006_02_000001/samza-job-coordinator.log 0 < 19:12:59 | |
2019-01-10 19:04:05.060 [main] JobModelManager$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@6492fab5 has grouped the SystemStreamPartitions into 1 tasks with the following taskNames: {Partition 0=[SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, pageview-join-input, 1], SystemStreamPartition [kafka, profile-table-input, 0]]} | |
``` | |
Task to system stream partitions stored in coordinator stream after the partition expansion. | |
``` | |
╰─λ deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic __samza_coordinator_pageview-profile-table-joiner_1 --from-beginning --property print.key=true --property key.separator="-" | grep taskNames 0 < 19:17:13 | |
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-profile-table-joiner-1-partition_by-join\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237662} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237665} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"profile-table-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237668} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-profile-table-joiner-1-partition_by-join\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845128} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845139} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":1,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845142} | |
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"profile-table-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845144} | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment