Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shanthoosh/07357bb615d9cbbfa23cc02b98c9d142 to your computer and use it in GitHub Desktop.
Save shanthoosh/07357bb615d9cbbfa23cc02b98c9d142 to your computer and use it in GitHub Desktop.
Testing the input stream expansion for stateful samza jobs
Samza job name: StreamTableJoinExample
Samza application implementation: https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
Task to SSP assignment before the partition expansion:
```
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5)
╰─λ grep -i 'grouped' /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/deploy/yarn/logs/userlogs/application_1547094655534_0006/container_1547094655534_0006_01_000001/samza-job-coordinator.log 0 < 19:13:09
2019-01-10 18:53:57.567 [main] JobModelManager$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@58326051 has grouped the SystemStreamPartitions into 1 tasks with the following taskNames: {Partition 0=[SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, profile-table-input, 0]]}
```
Increase the partition count of input topic pageview-join-input using the kafka-console tool to 2:
```
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5)
╰─λ ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic pageview-join-input, --partitions 2 0 < 19:02:06
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5)
```
Samza ApplicationMaster detected the change in partition counts and restarted the Application Master process. Here's the relevant AM logs:
```
2019-01-10 19:03:59.841 [Samza-StreamPartitionCountMonitor-0] KafkaSystemAdmin [INFO] SystemStream partition counts for system kafka: {pageview-profile-table-joiner-1-partition_by-join=SystemStreamMetadata [streamName=pageview-profile-table-joiner-1-partition_by-join, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}], profile-table-input=SystemStreamMetadata [streamName=profile-table-input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}], pageview-join-input=SystemStreamMetadata [streamName=pageview-join-input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=1]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
2019-01-10 19:03:59.842 [Samza-StreamPartitionCountMonitor-0] StreamPartitionCountMonitor [WARN] Change of partition count detected in stream SystemStream [system=kafka, stream=pageview-join-input]. old partition count: 1, current partition count: 2
2019-01-10 19:03:59.842 [Samza-StreamPartitionCountMonitor-0] StreamPartitionCountMonitor [ERROR] Shutting down (stateful) or restarting (stateless) the job since current partition count 2 is greater than the old partition count 1 for stream SystemStream [system=kafka, stream=pageview-join-input].
2019-01-10 19:03:59.917 [main] ClusterBasedJobCoordinator [ERROR] Exception thrown in the JobCoordinator loop {}
org.apache.samza.coordinator.PartitionChangeException: Input topic partition count changes detected.
at org.apache.samza.clustermanager.ClusterBasedJobCoordinator.lambda$getPartitionCountMonitor$4(ClusterBasedJobCoordinator.java:320)
at org.apache.samza.coordinator.StreamPartitionCountMonitor.updatePartitionCountMetric(StreamPartitionCountMonitor.java:204)
at org.apache.samza.coordinator.StreamPartitionCountMonitor$1.run(StreamPartitionCountMonitor.java:138)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2019-01-10 19:03:59.922 [main] ContainerProcessManager [INFO] Invoked stop of the Samza container process manager
2019-01-10 19:04:01.334 [main] ContainerProcessManager [INFO] Stopped container allocator
2019-01-10 19:04:01.335 [main] ContainerProcessManager [INFO] Stopped metrics reporters
2019-01-10 19:04:01.335 [main] YarnClusterResourceManager [INFO] Stopping AM client
2019-01-10 19:04:01.336 [main] SamzaYarnAppMasterLifecycle [INFO] Shutting down SamzaAppStatus: UNDEFINED yarn status: UNDEFINED
2019-01-10 19:04:01.337 [main] SamzaYarnAppMasterLifecycle [INFO] Not unregistering AM from the RM. This will enable RM retries
2019-01-10 19:04:01.338 [main] YarnClusterResourceManager [INFO] Stopping the AM service
```
Task to system stream partition group after the partition expansion:
```
╭─svenkata at svenkata-ld2 in /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza (latest ✚5)
╰─λ grep -i 'grouped' /home/svenkata/code/samza/samza-hello-samza/samza-hello-samza/deploy/yarn/logs/userlogs/application_1547094655534_0006/container_1547094655534_0006_02_000001/samza-job-coordinator.log 0 < 19:12:59
2019-01-10 19:04:05.060 [main] JobModelManager$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@6492fab5 has grouped the SystemStreamPartitions into 1 tasks with the following taskNames: {Partition 0=[SystemStreamPartition [kafka, pageview-profile-table-joiner-1-partition_by-join, 0], SystemStreamPartition [kafka, pageview-join-input, 0], SystemStreamPartition [kafka, pageview-join-input, 1], SystemStreamPartition [kafka, profile-table-input, 0]]}
```
Task to system stream partitions stored in coordinator stream after the partition expansion.
```
╰─λ deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic __samza_coordinator_pageview-profile-table-joiner_1 --from-beginning --property print.key=true --property key.separator="-" | grep taskNames 0 < 19:17:13
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-profile-table-joiner-1-partition_by-join\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237662}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237665}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"profile-table-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175237668}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-profile-table-joiner-1-partition_by-join\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845128}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845139}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":1,\"stream\":\"pageview-join-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845142}
[1,"set-task-partition-assignment","{\"system\":\"kafka\",\"partition\":0,\"stream\":\"profile-table-input\"}"]-{"host":"172.21.224.112","source":"SamzaContainer","values":{"taskNames":"[\"Partition 0\"]"},"username":"svenkata","timestamp":1547175845144}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment