Skip to content

Instantly share code, notes, and snippets.

@zcox
Last active August 29, 2015 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zcox/eb3d578a83cdbe636d13 to your computer and use it in GitHub Desktop.
Save zcox/eb3d578a83cdbe636d13 to your computer and use it in GitHub Desktop.
import org.apache.samza.container.{SamzaContainer, RunLoop}
import org.apache.samza.job.local.ThreadJob
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.LoggerFactory
/** Utilities for clean Samza container shutdown. Note that these utilities depend on private, internal implementation details of various Samza components,
* accessed via Java reflection, and will not work if these details change in Samza releases after 0.8.0. See also https://issues.apache.org/jira/browse/SAMZA-506.
*
* If you have a SamzaContainer instance, you can just call addShutdownHook(container) and on jvm shutdown (i.e. SIGTERM) the container will be cleanly shut down.
*
* If you have a ThreadJob instance (e.g. integration tests) you can just call requestShutdown(getSamzaContainer(job)) to request a clean shut down.
*/
object SamzaContainerShutdown {
val logger = LoggerFactory.getLogger(getClass)
/** Global flag that signals shutdown is complete. The shutdown hook blocks until either this flag is true, or a timeout occurs. You should set this to true after SamzaContainer.run returns. */
val shutdownIsComplete = new AtomicBoolean(false)
def getSamzaContainer(job: ThreadJob): SamzaContainer = {
val field = classOf[ThreadJob].getDeclaredField("org$apache$samza$job$local$ThreadJob$$runnable") //used javap to find this name
field.get(job).asInstanceOf[SamzaContainer]
}
def getRunLoop(container: SamzaContainer): RunLoop = {
val field = classOf[SamzaContainer].getDeclaredField("runLoop")
field.setAccessible(true)
val runLoop = field.get(container).asInstanceOf[RunLoop]
field.setAccessible(false)
runLoop
}
def setShutdownNowTrue(runLoop: RunLoop): Unit = {
val field = classOf[RunLoop].getDeclaredField("shutdownNow")
field.setAccessible(true)
field.setBoolean(runLoop, true)
field.setAccessible(false)
logger.debug("Set RunLoop.shutdownNow = true")
}
def requestCommitForAllTasks(runLoop: RunLoop): Unit = {
val taskCommitRequestsField = classOf[RunLoop].getDeclaredField("org$apache$samza$container$RunLoop$$taskCommitRequests") //used javap to find the actual name of this field
taskCommitRequestsField.setAccessible(true)
taskCommitRequestsField.set(runLoop, runLoop.taskInstances.keys)
taskCommitRequestsField.setAccessible(false)
logger.debug(s"Set RunLoop.taskCommitRequests = ${runLoop.taskInstances.keys}")
}
/** Requests the container to shut down, and optionally requests all tasks to commit their offsets. This method returns immediately; it does not block until shutdown is complete. */
def requestShutdown(container: SamzaContainer, requestCommit: Boolean = true): Unit = {
val runLoop = getRunLoop(container)
if (requestCommit) requestCommitForAllTasks(runLoop) //this will request commits, but if RunLoop is past where it checks for requested commits, they won't happen
setShutdownNowTrue(runLoop)
}
/** Registers a jvm shutdown hook that will cleanly shutdown the specified container. */
def addShutdownHook(container: SamzaContainer, requestCommit: Boolean = true) = Runtime.getRuntime.addShutdownHook(new Thread {
override def run() {
logger.info("Container shutdown hook started...")
requestShutdown(container, requestCommit)
val start = System.currentTimeMillis
val timeout = 10000l //TODO should be configurable
def elapsed = System.currentTimeMillis - start
def timedOut = elapsed > timeout
logger.debug("going to start blocking until either shutdown is complete or timeout")
while (!(shutdownIsComplete.get || timedOut)) {
logger.debug("sleeping for 100 msec...")
Thread.sleep(100)
}
if (timedOut) logger.info("Container shutdown hook timed out :(")
logger.info("Container shutdown hook finished")
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment