Skip to content

Instantly share code, notes, and snippets.

@kishorenc
Last active December 18, 2015 08:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kishorenc/89eb18276c51b1478a18 to your computer and use it in GitHub Desktop.
Save kishorenc/89eb18276c51b1478a18 to your computer and use it in GitHub Desktop.
Samza job on 0.10 does not shut down properly
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.coordinator.system=kafka
job.coordinator.replication.factor=1
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9082
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.commit.ms=60000
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# time between window() calls
task.window.ms=20000
serializers.registry.object.class=org.apache.samza.serializers.SerializableSerdeFactory
serializers.registry.bytes.class=org.apache.samza.serializers.ByteSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
job.name=dedup-export-job-1
task.class=spike.streaming.samza.DedupExportTask
task.inputs=kafka.DedupMapping
systems.kafka.streams.DedupMapping.samza.offset.default=oldest
systems.kafka.streams.DedupMapping.samza.key.serde=string
systems.kafka.streams.DedupMapping.samza.msg.serde=bytes
package streaming.samza
import org.apache.samza.config.Config
import org.apache.samza.job.JobRunner
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.task.TaskCoordinator.RequestScope
import org.apache.samza.task._
class DedupExportTask extends StreamTask with WindowableTask with InitableTask with ClosableTask {
override final def init(config: Config, taskContext: TaskContext): Unit = {
println("Initializing task... taskContext.getTaskName = " + taskContext.getTaskName)
}
override def process(incomingMessageEnvelope: IncomingMessageEnvelope,
messageCollector: MessageCollector,
taskCoordinator: TaskCoordinator): Unit = {
val key = incomingMessageEnvelope.getKey.asInstanceOf[String]
val value = new String(incomingMessageEnvelope.getMessage.asInstanceOf[Array[Byte]])
println("process called: key = " + key + " - value = " + value)
}
override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator): Unit = {
println("On window...")
taskCoordinator.shutdown(RequestScope.CURRENT_TASK)
}
override final def close(): Unit = {
println("Close called...")
}
}
object DedupExportJob extends App {
val dedupJobArgs = Array("--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path=src/main/resources/dedup_job.properties")
println("Starting the job...")
JobRunner.main(dedupJobArgs)
}
% jstack 10465 ✹ ✭
2015-12-18 13:52:10
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.45-b08 mixed mode):
"Attach Listener" daemon prio=10 tid=0x00007f5800001000 nid=0x2992 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" prio=10 tid=0x00007f5834013800 nid=0x28e3 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"RMI RenewClean-[192.168.0.12:59762]" daemon prio=10 tid=0x00007f57a8001800 nid=0x2921 in Object.wait() [0x00007f5829b90000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007d727fba0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
- locked <0x00000007d727fba0> (a java.lang.ref.ReferenceQueue$Lock)
at sun.rmi.transport.DGCClient$EndpointEntry$RenewCleanThread.run(DGCClient.java:535)
at java.lang.Thread.run(Thread.java:744)
"RMI Scheduler(0)" daemon prio=10 tid=0x00007f5834b83800 nid=0x2920 waiting on condition [0x00007f5829c91000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000784d4de08> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
"GC Daemon" daemon prio=10 tid=0x00007f5834b72000 nid=0x291e in Object.wait() [0x00007f5829e93000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000784a16c48> (a sun.misc.GC$LatencyLock)
at sun.misc.GC$Daemon.run(GC.java:117)
- locked <0x0000000784a16c48> (a sun.misc.GC$LatencyLock)
"RMI Reaper" prio=10 tid=0x00007f5834b70800 nid=0x291d in Object.wait() [0x00007f5829f94000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000784a1e620> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
- locked <0x0000000784a1e620> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
at sun.rmi.transport.ObjectTable$Reaper.run(ObjectTable.java:351)
at java.lang.Thread.run(Thread.java:744)
"RMI TCP Accept-59762" daemon prio=10 tid=0x00007f5834b6d800 nid=0x291c runnable [0x00007f582a095000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
at java.net.ServerSocket.implAccept(ServerSocket.java:530)
at java.net.ServerSocket.accept(ServerSocket.java:498)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:388)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:360)
at java.lang.Thread.run(Thread.java:744)
"RMI TCP Accept-0" daemon prio=10 tid=0x00007f5834b5a800 nid=0x291b runnable [0x00007f582a196000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
at java.net.ServerSocket.implAccept(ServerSocket.java:530)
at java.net.ServerSocket.accept(ServerSocket.java:498)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:388)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:360)
at java.lang.Thread.run(Thread.java:744)
"kafka-producer-network-thread | samza_producer-dedup_export_job_1-1-1450426742993-18" daemon prio=10 tid=0x00007f5834b03800 nid=0x290f runnable [0x00007f582b5ca000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x0000000784afb420> (a sun.nio.ch.Util$2)
- locked <0x0000000784afb410> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000784afadb8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.kafka.common.network.Selector.select(Selector.java:328)
at org.apache.kafka.common.network.Selector.poll(Selector.java:218)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:744)
"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at localhost:9082 for client samza_consumer-dedup_export_job_1-1-1450426742989-15" daemon prio=10 tid=0x00007f5834aea800 nid=0x290e runnable [0x00007f582b4c8000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x0000000784a994d0> (a sun.nio.ch.Util$2)
- locked <0x0000000784a994c0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000784a99278> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
- locked <0x0000000784a9a790> (a java.lang.Object)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
- locked <0x0000000784a9a920> (a sun.nio.ch.SocketAdaptor$SocketInputStream)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
- locked <0x0000000784ab48a8> (a java.lang.Object)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x0000000784a99138> (a java.lang.Object)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:147)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
at java.lang.Thread.run(Thread.java:744)
"SAMZA-BROKER-PROXY-BrokerProxy thread pointed at localhost:9082 for client samza_consumer-dedup_export_job_1-1-1450426741900-7" daemon prio=10 tid=0x00007f5834a64000 nid=0x290c runnable [0x00007f582b3c7000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x0000000784ae2430> (a sun.nio.ch.Util$2)
- locked <0x0000000784ae2420> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000784adff70> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
- locked <0x0000000784ae02a0> (a java.lang.Object)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
- locked <0x0000000784ae03c0> (a sun.nio.ch.SocketAdaptor$SocketInputStream)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
- locked <0x0000000784a23290> (a java.lang.Object)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x0000000784a23420> (a java.lang.Object)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:147)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
at java.lang.Thread.run(Thread.java:744)
"metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f5834abd800 nid=0x2905 waiting on condition [0x00007f582b7cc000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000784ab6d80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
"metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f5834a97000 nid=0x2904 waiting on condition [0x00007f582b6cb000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000784ab6d80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
"Monitor Ctrl-Break" daemon prio=10 tid=0x00007f58346db800 nid=0x28fc runnable [0x00007f58301b1000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
at java.net.ServerSocket.implAccept(ServerSocket.java:530)
at java.net.ServerSocket.accept(ServerSocket.java:498)
at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:90)
at java.lang.Thread.run(Thread.java:744)
"Service Thread" daemon prio=10 tid=0x00007f583409f800 nid=0x28fa runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" daemon prio=10 tid=0x00007f583409d800 nid=0x28f9 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" daemon prio=10 tid=0x00007f583409a800 nid=0x28f8 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x00007f5834090000 nid=0x28f7 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x00007f5834079800 nid=0x28f1 in Object.wait() [0x00007f583888a000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000784a4ef90> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
- locked <0x0000000784a4ef90> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189)
"Reference Handler" daemon prio=10 tid=0x00007f5834075800 nid=0x28f0 in Object.wait() [0x00007f583898b000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000784a4ea50> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:503)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
- locked <0x0000000784a4ea50> (a java.lang.ref.Reference$Lock)
"VM Thread" prio=10 tid=0x00007f5834073000 nid=0x28ed runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f5834029800 nid=0x28e8 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f583402b000 nid=0x28e9 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f583402d000 nid=0x28ea runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f583402f000 nid=0x28eb runnable
"VM Periodic Task Thread" prio=10 tid=0x00007f58340aa800 nid=0x28fb waiting on condition
JNI global references: 168
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment