Skip to content

Instantly share code, notes, and snippets.

@bennadel
Created August 1, 2020 12:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bennadel/32f9cbfe32b92302b055a5568fa57911 to your computer and use it in GitHub Desktop.
Save bennadel/32f9cbfe32b92302b055a5568fa57911 to your computer and use it in GitHub Desktop.
Using A Task CFThread To Run And Restart Daemon CFThreads Indefinitely In Lucee CFML 5.3.6.61
component
output = false
hint = "I setup the application settings and event handlers."
{
this.name = "TaskThreadTesting";
this.directory = getDirectoryFromPath( getCurrentTemplatePath() );
this.mappings = {
"/vendor" = "#this.directory#vendor/"
};
// ---
// EVENT-HANDLER METHODS.
// ---
/**
* I get called once when the application is being boot-strapped. This method is
* naturally single-threaded / synchronized by the ColdFusion runtime.
*/
public void function onApplicationStart() {
debug( "Bootstrapping Lucee CFML application." );
application.redisPool = new RedisPool( "localhost", 6379 );
application.consumer = new Consumer( application.redisPool );
application.backgroundThreadRunner = new BackgroundThreadRunner();
// The Consumer has a .run() method, which is what the background-runner will
// call from inside a "persistent" thread. If the thread dies or ends, then the
// background-runner will call .run() again inside a new thread.
application.backgroundThreadRunner.runInBackground( application.consumer );
}
/**
* I get called once when the application is being shut-down.
*
* CAUTION: This function only has access to PUBLIC METHODS on this component. As
* such, we had to make the debug() method PUBLIC in order to use it here.
*
* @applicationScope I am the scope associated with the running application.
*/
public void function onApplicationEnd( required struct applicationScope ) {
debug( "Shutting down Lucee CFML application." );
// If we're shutting down the application, let's kill the background thread
// runner or the task will continue to run in the background.
applicationScope.backgroundThreadRunner?.stop();
}
// ---
// PUBLIC METHODS.
// ---
/**
* I log the given message to the server error-output stream.
*
* @message I am message being logged.
*/
public void function debug( required string message ) {
systemOutput( "[ Application ]: #message#", true, true );
}
}
component
output = false
hint = "I provide a means to run persistent background threads."
{
/**
* I initialize the background-thread runner.
*
* @sleepDuration I am the duration, in milliseconds, that the Task thread will sleep in between checks to see if the runnables are running.
*/
public void function init( numeric sleepDuration = 1000 ) {
// I determine how long the TASK THREAD will sleep before checking back with the
// background-runner to make sure that all runnables are running.
variables.sleepDuration = arguments.sleepDuration;
// I am the ID that will associate this ColdFusion component instance with the
// TASK thread via the Server scope.
taskThreadID = ( "background_task_thread_" & createUniqueId() );
// Dealing with asynchronous processing requires locking in order to cut down on
// race-conditions. We'll use this as our base lock ID for exclusive named locks.
// We'll use two different locks:
// ---
// * "#lockID#.task" - This is the high-level lock used to synchronize all access
// points into the background task runner.
// * "#lockID#.entry" - This is the low-level lock used to synchronize all access
// points to the individual runnables.
// ---
lockID = ( "lock_" & createUniqueId() );
// I keep track of the Runnable interfaces that this background thread runner is
// managing. Each Runnable will get its own CFThread wrapper.
entries = [];
}
// ---
// PUBLIC METHODS.
// ---
/**
* I ensure that each Runnable is actually Running.
*
* CAUTION: This is to be used by the TASK THREAD ONLY! This has to be public, but is
* not meant for public consumption.
*/
public void function _ensureRunnablesAreRunning() {
lock
type = "exclusive"
name = "#lockID#.task"
timeout = 60
throwOnTimeout = true
{
debug( "Ensuring runnables are running." );
for ( var entry in entries ) {
try {
ensureRunnableIsRunning( entry );
} catch ( any error ) {
// If the ensuring operation throws an error, it would be because the
// underlying CFLock timed-out or because the daemon CFThread could
// not be spawned.
debug( "Error ensuring runnable with ID #entry.runnableID#: #serializeJson( error )#." );
}
}
} // END: Task-lock.
}
/**
* I start running the given Runnable in the background using a persistent background
* thread. All Runnables are given their own daemon CFThread wrapper, all of which are
* monitored by a single TASK thread.
*
* @runnable I am the Runnable being managed in the background.
*/
public void function runInBackground( required any runnable ) {
lock
type = "exclusive"
name = "#lockID#.task"
timeout = 60
throwOnTimeout = true
{
ensureTaskThread();
// This runnable will be "activated" during the execution of the task thread.
entries.append({
runnable: runnable,
runnableID: createUniqueId(),
runnableThread: nullValue()
});
} // END: Task-lock.
}
/**
* I stop the background TASK thread and any Runnable instances that it is managing.
*/
public void function stop() {
lock
type = "exclusive"
name = "#lockID#.task"
timeout = 60
throwOnTimeout = true
{
debug( "Stopping background thread runner." );
// By deleting the server-key, it will tell the TASK thread to STOP CALLING
// back into this background runner.
server.delete( taskThreadID );
for ( var entry in entries ) {
try {
ensureRunnableIsStopped( entry );
} catch ( any error ) {
debug( "Error stopping runnable with ID #entry.runnableID#: #serializeJson( error )#." );
}
}
} // END: Task-lock.
}
// ---
// PRIVATE METHODS.
// ---
/**
* I log the given message to the server error-output stream.
*
* @message I am message being logged.
*/
private void function debug( required string message ) {
systemOutput( "[ BackgroundRunner ]: #message#", true, true );
}
/**
* I ensure that the given entry is running.
*
* @entry I am the runnable entry being started / monitored.
*/
private void function ensureRunnableIsRunning( required struct entry ) {
lock
type = "exclusive"
name = "#lockID#.entry"
timeout = 60
throwOnTimeout = true
{
// Each runnable entry will receive its own deamon CFThread wrapper. If this
// thread has not yet been spawned - or, has been terminated / completed - we
// need to spawn it and track it.
if (
isNull( entry.runnableThread ) ||
( entry.runnableThread.status == "TERMINATED" ) ||
( entry.runnableThread.status == "COMPLETED" )
) {
// If the deamon CFThread wrapper exists and ended in error, let's log
// the error.
if ( entry.runnableThread?.keyExists( "error" ) ?: false ) {
debug( "Runnable ended in error: #serializeJson( entry.runnableThread.error )#" );
}
debug( "Starting runnable with ID #entry.runnableID#" );
// NOTE: Since we may end-up creating the "same thread" over time, we
// have to make sure it has a server-unique name otherwise Lucee will
// throw an error about duplicate threads in the same request.
var threadName = ( taskThreadID & "_" & entry.runnableID & "_" & createUniqueId() );
// Spawn the deamon CFThread for this Runnable - each runnable runs
// inside its own asynchronous thread context.
// --
// CAUTION: In Lucee CFML, the "entry" attribute here is NOT DEEP-CLONED
// when it is passed into the CFThread context. This is a deviation from
// the way Adobe ColdFusion works, which will deep-clone the instance.
thread
type = "deamon"
name = threadName
entry = entry
{
// There's no easy way to terminate a CFThread across different
// page requests by name (as such the threadTerminate() function
// won't work). To overcome this, we're going to expose a stop()
// method ON THE DAEMON THREAD that will turn around and stop the
// underlying Runnable. This should allow the daemon CFThread to
// exit naturally.
thread.stopRunnable = () => {
entry.runnable.stop();
};
// NOTE: We don't need a try/catch around this because we are already
// going to log an errors that get attached to terminated threads
// (see top of this IF-statement).
entry.runnable.run();
}
// Store this CFThread instance back into the entry. This way, we can
// monitor the state of the runnable wrapper as part of the background
// runner execution.
entry.runnableThread = cfthread[ threadName ];
} // END: If.
} // END: Entry-Lock.
}
/**
* I ensure that the given entry is stopped / stopping.
*
* @entry I am the runnable entry being stopped.
*/
private void function ensureRunnableIsStopped( required struct entry ) {
lock
type = "exclusive"
name = "#lockID#.entry"
timeout = 60
throwOnTimeout = true
{
debug( "Stopping runnable with ID #entry.runnableID#" );
// We can't actually terminate CFThreads across different requests (at least
// not that I could find). As such, we can only ask the CFThread to stop its
// own internal Runnable using the Function that we exposed on the Thread.
entry.runnableThread?.stopRunnable();
} // END: Entry-Lock.
}
/**
* I ensure that the TASK THREAD is running in the background.
*/
private void function ensureTaskThread() {
lock
type = "exclusive"
name = "#lockID#.task"
timeout = 60
throwOnTimeout = true
{
// Task Threads are very "sticky" - even if you delete them from the Lucee
// Admin, they will continue to run in the background. As such, let's make
// sure we don't currently have one already defined.
if ( server.keyExists( taskThreadID ) ) {
return;
}
// When we start the TASK THREAD, Lucee will serialize the data associated
// with the thread and then store it to disk (as far as I can tell). Lucee
// then deserializes the task data and runs the task in the background based
// on the retry intervals. As such, the TASK THREAD runs COMPLETELY OUTSIDE
// THE CURRENT APPLICATION CONTEXT of our main application. To wire these two
// contexts back together, we're going to use the SERVER SCOPE as the
// communication tunnel. And, in order to do that, we have to keep track of
// the APPLICATION CONTEXT so that we can re-attach the TASK THREAD to the
// current application during its execution.
// --
// READ MORE: https://dev.lucee.org/t/using-native-java-threading-in-lucee/471/14
server[ taskThreadID ] = {
runner: this,
applicationContext: getPageContext().getApplicationContext()
};
} // END: Task-lock.
// The TASK THREAD is going to use an internal loop that MOSTLY keeps it going.
// However, if it crashes, we need to have Lucee try to restart it. That is what
// the retryInterval is for. Since we want this to be a "persistent" background
// thread, we want the retries to converge of "infinite".
var retryInterval = [{
tries: createObject( "java", "java.lang.Integer" ).MAX_VALUE,
interval: createTimeSpan( 0, 0, 0, 5 )
}];
thread
type = "task"
name = "task.#taskThreadID#"
retryInterval = retryInterval
taskThreadID = taskThreadID
sleepDuration = sleepDuration
{
// Since the TASK THREAD doesn't have access to the parent VARIABLE scope
// (remember this is running completely outside the application), we need to
// give the thread its own version of the debug() method.
function taskDebug( required string message ) {
systemOutput( "[ Task Thread ]: #message#", true, true );
}
// CAUTION: Errors in a Task Thread seem to get swallowed-up by the ether if
// they are not explicitly caught and logged!
try {
var runner = server[ taskThreadID ].runner;
var applicationContext = server[ taskThreadID ].applicationContext;
// The TASK THREAD runs in a completely separate application context /
// page context. As such, in order to call the following "Runner" in an
// expected state, we have to re-attach the current TASK application
// context to the application context in which the Runner was defined.
getPageContext().setApplicationContext( applicationContext );
// Since we want to implement persistent background threads, we just
// want to keep looping here forever. And, if this thread crashes, Lucee
// will restart it based on the retryInterval.
while ( true ) {
// If the server key for this background processor has been deleted,
// it means the background processing is being shut-down. As such,
// let's exit out of this Task thread.
if ( ! server.keyExists( taskThreadID ) ) {
return;
}
try {
runner._ensureRunnablesAreRunning();
} catch ( any runnerError ) {
taskDebug( "Error while ensuring runnables: #serializeJson( runnerError )#" );
}
sleep( sleepDuration );
}
} catch ( any taskError ) {
taskDebug( "Error while wiring-up task thread internals: #serializeJson( taskError )#" );
}
} // END: CFThread.
}
}
component
output = false
hint = "I am a persistent background consumer that processes messages from a specific Redis list."
{
/**
* I initialize the background Consumer using the given Redis pool. This Consumer
* exposes a run() method which is intended to run indefinitely in the background of
* the application.
*
* @redisPool I am the Redis connection pool.
*/
public void function init( required any redisPool ) {
variables.redisPool = arguments.redisPool;
// I determine if the Consumer should continue processing the Redis list. This
// flag allows us to halt the background processing after it has begun.
isRunning = false;
// This is the Redis KEY that we will be monitoring and consuming.
redisListKey = "ben.demo";
}
// ---
// PUBLIC METHODS.
// ---
/**
* I kick-off the Consumer processing. This method is intended to run indefinitely
* (or, at least until the stop() method is called).
*/
public void function run() {
isRunning = true;
// The point of the Consumer is to run indefinitely, looking for and processing
// values in the given Redis list. As such, we're going to keep looping until the
// running-flag is turned-off.
while ( isRunning ) {
debug( "Looking for messages in Redis (blocking operation)." );
// The blpop (Blocking List Pop) command will block the current thread while
// waiting for a list-item to become available. If it reaches the timeout
// (which is 5-seconds in this case) without finding a list, a NULL value
// will be returned.
var result = redisPool.withRedis(
( redis ) => {
return( redis.blpop( 5, [ redisListKey ] ) );
}
);
// No list item was found within the timeout.
if ( isNull( result ) ) {
debug( "No messages found after block-timeout." );
// A list item was found! Woot!
} else {
var listName = result[ 1 ];
var listItemValue = result[ 2 ];
debug( "Message found:" );
debug( "" );
debug( " #listItemValue#" );
debug( "" );
// FOR THE DEMO, to make things a little more interesting, we're going
// to crash the Consumer thread if the list value was "crash". This will
// give the background-task an opportunity to restart it and make things
// a bit more fun :D
if ( listItemValue == "crash" ) {
throw( type = "OopsCrash", message = "UnexpectedValue" );
}
}
} // While-loop.
debug( "Messaging processing stopped." );
}
/**
* I stop the background processing of the Consumer.
*/
public void function stop() {
isRunning = false;
debug( "Stopping processing of the Redis list." );
}
// ---
// PRIVATE METHODS.
// ---
/**
* I log the given message to the server error-output stream.
*
* @message I am message being logged.
*/
private void function debug( required string message ) {
systemOutput( "[ Consumer ]: #message#", true, true );
}
}
<cfscript>
param name="form.message" type="string" default="";
// If there is a message, let's push it into the Redis list.
if ( form.message.len() ) {
application.redisPool.withRedis(
( redis ) => {
redis.rpush( "ben.demo", [ form.message ] );
}
);
}
</cfscript>
<cfoutput>
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" type="text/css" href="./producer.css">
</head>
<body>
<h1>
Produce a Message
</h1>
<form
method="post"
action="#encodeForHtmlAttribute( cgi.script_name )#">
<strong>Message</strong>:
<input
type="text"
name="message"
size="30"
autofocus
autocomplete="off"
/>
<button type="submit">
Send Message
</button>
</form>
<p>
<a href="./stop.cfm">Stop application</a>
</p>
</body>
</html>
</cfoutput>
component
output = false
hint = "I provide a simple, callback-based wrapper for the Jedis connection pool."
{
/**
* I initialize the Jedis connection pool to connect to the given Redis host.
*
* @redisHost I am the Redis host name.
* @redisPort I am the Redis port being monitored (probably 6379).
*/
public void function init(
required string redisHost,
required numeric redisPort
) {
// In Lucee CFML, we can provide a set of custom JAR files to the createObject()
// function that will create an isolated Java Class Loader. This is so awesome!
jedisJarPaths = [
expandPath( "/vendor/apache/commons-pool2-2.2.jar" ),
expandPath( "/vendor/jedis/lib/jedis-2.6.0.jar" )
];
jedisPoolConfig = javaNew( "redis.clients.jedis.JedisPoolConfig" )
.init()
;
jedisPool = javaNew( "redis.clients.jedis.JedisPool" )
.init( jedisPoolConfig, redisHost, redisPort )
;
}
// ---
// PUBLIC METHODS.
// ---
/**
* I get a Jedis connection (to the Redis database) and pass it the given callback.
* Any data returned from the callback execution is propagated back up to the calling
* context. The connection to Redis is managed transparently to the callback.
*
* @callback I am the callback function to which the Redis connection is passed.
*/
public any function withRedis( required function callback ) {
var redis = jedisPool.getResource();
try {
return( callback( redis ) );
} finally {
redis?.close();
}
}
// ---
// PRIVATE METHODS.
// ---
/**
* I create the Java Class proxy with the given name (using the custom JAR files as
* the source code).
*
* @className I am the Java Class proxy to create.
*/
private any function javaNew( required string className ) {
return( createObject( "java", className, jedisJarPaths ) );
}
}
[ Application ]: Bootstrapping Lucee CFML application.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Starting runnable with ID 2f
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: hello
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: world
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: how
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: is
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: it
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: going?
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: crash
[ Consumer ]:
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Runnable ended in error:
{"Message":"UnexpectedValue" ..... truncated for blog .....}
[ BackgroundRunner ]: Starting runnable with ID 2f
[ Consumer ]: Looking for messages in Redis (blocking operation).
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ BackgroundRunner ]: Ensuring runnables are running.
[ Consumer ]: Message found:
[ Consumer ]:
[ Consumer ]: woot
[ Consumer ]:
[ Consumer ]: Looking for messages in Redis (blocking operation).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment