Last active
August 29, 2015 14:14
-
-
Save zcox/eb3d578a83cdbe636d13 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.samza.container.{SamzaContainer, RunLoop} | |
import org.apache.samza.job.local.ThreadJob | |
import java.util.concurrent.atomic.AtomicBoolean | |
import org.slf4j.LoggerFactory | |
/** Utilities for clean Samza container shutdown. Note that these utilities depend on private, internal implementation details of various Samza components, | |
* accessed via Java reflection, and will not work if these details change in Samza releases after 0.8.0. See also https://issues.apache.org/jira/browse/SAMZA-506. | |
* | |
* If you have a SamzaContainer instance, you can just call addShutdownHook(container) and on jvm shutdown (i.e. SIGTERM) the container will be cleanly shut down. | |
* | |
* If you have a ThreadJob instance (e.g. integration tests) you can just call requestShutdown(getSamzaContainer(job)) to request a clean shut down. | |
*/ | |
object SamzaContainerShutdown { | |
val logger = LoggerFactory.getLogger(getClass) | |
/** Global flag that signals shutdown is complete. The shutdown hook blocks until either this flag is true, or a timeout occurs. You should set this to true after SamzaContainer.run returns. */ | |
val shutdownIsComplete = new AtomicBoolean(false) | |
def getSamzaContainer(job: ThreadJob): SamzaContainer = { | |
val field = classOf[ThreadJob].getDeclaredField("org$apache$samza$job$local$ThreadJob$$runnable") //used javap to find this name | |
field.get(job).asInstanceOf[SamzaContainer] | |
} | |
def getRunLoop(container: SamzaContainer): RunLoop = { | |
val field = classOf[SamzaContainer].getDeclaredField("runLoop") | |
field.setAccessible(true) | |
val runLoop = field.get(container).asInstanceOf[RunLoop] | |
field.setAccessible(false) | |
runLoop | |
} | |
def setShutdownNowTrue(runLoop: RunLoop): Unit = { | |
val field = classOf[RunLoop].getDeclaredField("shutdownNow") | |
field.setAccessible(true) | |
field.setBoolean(runLoop, true) | |
field.setAccessible(false) | |
logger.debug("Set RunLoop.shutdownNow = true") | |
} | |
def requestCommitForAllTasks(runLoop: RunLoop): Unit = { | |
val taskCommitRequestsField = classOf[RunLoop].getDeclaredField("org$apache$samza$container$RunLoop$$taskCommitRequests") //used javap to find the actual name of this field | |
taskCommitRequestsField.setAccessible(true) | |
taskCommitRequestsField.set(runLoop, runLoop.taskInstances.keys) | |
taskCommitRequestsField.setAccessible(false) | |
logger.debug(s"Set RunLoop.taskCommitRequests = ${runLoop.taskInstances.keys}") | |
} | |
/** Requests the container to shut down, and optionally requests all tasks to commit their offsets. This method returns immediately; it does not block until shutdown is complete. */ | |
def requestShutdown(container: SamzaContainer, requestCommit: Boolean = true): Unit = { | |
val runLoop = getRunLoop(container) | |
if (requestCommit) requestCommitForAllTasks(runLoop) //this will request commits, but if RunLoop is past where it checks for requested commits, they won't happen | |
setShutdownNowTrue(runLoop) | |
} | |
/** Registers a jvm shutdown hook that will cleanly shutdown the specified container. */ | |
def addShutdownHook(container: SamzaContainer, requestCommit: Boolean = true) = Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run() { | |
logger.info("Container shutdown hook started...") | |
requestShutdown(container, requestCommit) | |
val start = System.currentTimeMillis | |
val timeout = 10000l //TODO should be configurable | |
def elapsed = System.currentTimeMillis - start | |
def timedOut = elapsed > timeout | |
logger.debug("going to start blocking until either shutdown is complete or timeout") | |
while (!(shutdownIsComplete.get || timedOut)) { | |
logger.debug("sleeping for 100 msec...") | |
Thread.sleep(100) | |
} | |
if (timedOut) logger.info("Container shutdown hook timed out :(") | |
logger.info("Container shutdown hook finished") | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment