Created
August 1, 2020 12:30
-
-
Save bennadel/32f9cbfe32b92302b055a5568fa57911 to your computer and use it in GitHub Desktop.
Using A Task CFThread To Run And Restart Daemon CFThreads Indefinitely In Lucee CFML 5.3.6.61
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
component | |
output = false | |
hint = "I setup the application settings and event handlers." | |
{ | |
this.name = "TaskThreadTesting"; | |
this.directory = getDirectoryFromPath( getCurrentTemplatePath() ); | |
this.mappings = { | |
"/vendor" = "#this.directory#vendor/" | |
}; | |
// --- | |
// EVENT-HANDLER METHODS. | |
// --- | |
/** | |
* I get called once when the application is being boot-strapped. This method is | |
* naturally single-threaded / synchronized by the ColdFusion runtime. | |
*/ | |
public void function onApplicationStart() { | |
debug( "Bootstrapping Lucee CFML application." ); | |
application.redisPool = new RedisPool( "localhost", 6379 ); | |
application.consumer = new Consumer( application.redisPool ); | |
application.backgroundThreadRunner = new BackgroundThreadRunner(); | |
// The Consumer has a .run() method, which is what the background-runner will | |
// call from inside a "persistent" thread. If the thread dies or ends, then the | |
// background-runner will call .run() again inside a new thread. | |
application.backgroundThreadRunner.runInBackground( application.consumer ); | |
} | |
/** | |
* I get called once when the application is being shut-down. | |
* | |
* CAUTION: This function only has access to PUBLIC METHODS on this component. As | |
* such, we had to make the debug() method PUBLIC in order to use it here. | |
* | |
* @applicationScope I am the scope associated with the running application. | |
*/ | |
public void function onApplicationEnd( required struct applicationScope ) { | |
debug( "Shutting down Lucee CFML application." ); | |
// If we're shutting down the application, let's kill the background thread | |
// runner or the task will continue to run in the background. | |
applicationScope.backgroundThreadRunner?.stop(); | |
} | |
// --- | |
// PUBLIC METHODS. | |
// --- | |
/** | |
* I log the given message to the server error-output stream. | |
* | |
* @message I am message being logged. | |
*/ | |
public void function debug( required string message ) { | |
systemOutput( "[ Application ]: #message#", true, true ); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
component | |
output = false | |
hint = "I provide a means to run persistent background threads." | |
{ | |
/** | |
* I initialize the background-thread runner. | |
* | |
* @sleepDuration I am the duration, in milliseconds, that the Task thread will sleep in between checks to see if the runnables are running. | |
*/ | |
public void function init( numeric sleepDuration = 1000 ) { | |
// I determine how long the TASK THREAD will sleep before checking back with the | |
// background-runner to make sure that all runnables are running. | |
variables.sleepDuration = arguments.sleepDuration; | |
// I am the ID that will associate this ColdFusion component instance with the | |
// TASK thread via the Server scope. | |
taskThreadID = ( "background_task_thread_" & createUniqueId() ); | |
// Dealing with asynchronous processing requires locking in order to cut down on | |
// race-conditions. We'll use this as our base lock ID for exclusive named locks. | |
// We'll use two different locks: | |
// --- | |
// * "#lockID#.task" - This is the high-level lock used to synchronize all access | |
// points into the background task runner. | |
// * "#lockID#.entry" - This is the low-level lock used to synchronize all access | |
// points to the individual runnables. | |
// --- | |
lockID = ( "lock_" & createUniqueId() ); | |
// I keep track of the Runnable interfaces that this background thread runner is | |
// managing. Each Runnable will get its own CFThread wrapper. | |
entries = []; | |
} | |
// --- | |
// PUBLIC METHODS. | |
// --- | |
/** | |
* I ensure that each Runnable is actually Running. | |
* | |
* CAUTION: This is to be used by the TASK THREAD ONLY! This has to be public, but is | |
* not meant for public consumption. | |
*/ | |
public void function _ensureRunnablesAreRunning() { | |
lock | |
type = "exclusive" | |
name = "#lockID#.task" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
debug( "Ensuring runnables are running." ); | |
for ( var entry in entries ) { | |
try { | |
ensureRunnableIsRunning( entry ); | |
} catch ( any error ) { | |
// If the ensuring operation throws an error, it would be because the | |
// underlying CFLock timed-out or because the daemon CFThread could | |
// not be spawned. | |
debug( "Error ensuring runnable with ID #entry.runnableID#: #serializeJson( error )#." ); | |
} | |
} | |
} // END: Task-lock. | |
} | |
/** | |
* I start running the given Runnable in the background using a persistent background | |
* thread. All Runnables are given their own daemon CFThread wrapper, all of which are | |
* monitored by a single TASK thread. | |
* | |
* @runnable I am the Runnable being managed in the background. | |
*/ | |
public void function runInBackground( required any runnable ) { | |
lock | |
type = "exclusive" | |
name = "#lockID#.task" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
ensureTaskThread(); | |
// This runnable will be "activated" during the execution of the task thread. | |
entries.append({ | |
runnable: runnable, | |
runnableID: createUniqueId(), | |
runnableThread: nullValue() | |
}); | |
} // END: Task-lock. | |
} | |
/** | |
* I stop the background TASK thread and any Runnable instances that it is managing. | |
*/ | |
public void function stop() { | |
lock | |
type = "exclusive" | |
name = "#lockID#.task" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
debug( "Stopping background thread runner." ); | |
// By deleting the server-key, it will tell the TASK thread to STOP CALLING | |
// back into this background runner. | |
server.delete( taskThreadID ); | |
for ( var entry in entries ) { | |
try { | |
ensureRunnableIsStopped( entry ); | |
} catch ( any error ) { | |
debug( "Error stopping runnable with ID #entry.runnableID#: #serializeJson( error )#." ); | |
} | |
} | |
} // END: Task-lock. | |
} | |
// --- | |
// PRIVATE METHODS. | |
// --- | |
/** | |
* I log the given message to the server error-output stream. | |
* | |
* @message I am message being logged. | |
*/ | |
private void function debug( required string message ) { | |
systemOutput( "[ BackgroundRunner ]: #message#", true, true ); | |
} | |
/** | |
* I ensure that the given entry is running. | |
* | |
* @entry I am the runnable entry being started / monitored. | |
*/ | |
private void function ensureRunnableIsRunning( required struct entry ) { | |
lock | |
type = "exclusive" | |
name = "#lockID#.entry" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
// Each runnable entry will receive its own deamon CFThread wrapper. If this | |
// thread has not yet been spawned - or, has been terminated / completed - we | |
// need to spawn it and track it. | |
if ( | |
isNull( entry.runnableThread ) || | |
( entry.runnableThread.status == "TERMINATED" ) || | |
( entry.runnableThread.status == "COMPLETED" ) | |
) { | |
// If the deamon CFThread wrapper exists and ended in error, let's log | |
// the error. | |
if ( entry.runnableThread?.keyExists( "error" ) ?: false ) { | |
debug( "Runnable ended in error: #serializeJson( entry.runnableThread.error )#" ); | |
} | |
debug( "Starting runnable with ID #entry.runnableID#" ); | |
// NOTE: Since we may end-up creating the "same thread" over time, we | |
// have to make sure it has a server-unique name otherwise Lucee will | |
// throw an error about duplicate threads in the same request. | |
var threadName = ( taskThreadID & "_" & entry.runnableID & "_" & createUniqueId() ); | |
// Spawn the deamon CFThread for this Runnable - each runnable runs | |
// inside its own asynchronous thread context. | |
// -- | |
// CAUTION: In Lucee CFML, the "entry" attribute here is NOT DEEP-CLONED | |
// when it is passed into the CFThread context. This is a deviation from | |
// the way Adobe ColdFusion works, which will deep-clone the instance. | |
thread | |
type = "deamon" | |
name = threadName | |
entry = entry | |
{ | |
// There's no easy way to terminate a CFThread across different | |
// page requests by name (as such the threadTerminate() function | |
// won't work). To overcome this, we're going to expose a stop() | |
// method ON THE DAEMON THREAD that will turn around and stop the | |
// underlying Runnable. This should allow the daemon CFThread to | |
// exit naturally. | |
thread.stopRunnable = () => { | |
entry.runnable.stop(); | |
}; | |
// NOTE: We don't need a try/catch around this because we are already | |
// going to log an errors that get attached to terminated threads | |
// (see top of this IF-statement). | |
entry.runnable.run(); | |
} | |
// Store this CFThread instance back into the entry. This way, we can | |
// monitor the state of the runnable wrapper as part of the background | |
// runner execution. | |
entry.runnableThread = cfthread[ threadName ]; | |
} // END: If. | |
} // END: Entry-Lock. | |
} | |
/** | |
* I ensure that the given entry is stopped / stopping. | |
* | |
* @entry I am the runnable entry being stopped. | |
*/ | |
private void function ensureRunnableIsStopped( required struct entry ) { | |
lock | |
type = "exclusive" | |
name = "#lockID#.entry" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
debug( "Stopping runnable with ID #entry.runnableID#" ); | |
// We can't actually terminate CFThreads across different requests (at least | |
// not that I could find). As such, we can only ask the CFThread to stop its | |
// own internal Runnable using the Function that we exposed on the Thread. | |
entry.runnableThread?.stopRunnable(); | |
} // END: Entry-Lock. | |
} | |
/** | |
* I ensure that the TASK THREAD is running in the background. | |
*/ | |
private void function ensureTaskThread() { | |
lock | |
type = "exclusive" | |
name = "#lockID#.task" | |
timeout = 60 | |
throwOnTimeout = true | |
{ | |
// Task Threads are very "sticky" - even if you delete them from the Lucee | |
// Admin, they will continue to run in the background. As such, let's make | |
// sure we don't currently have one already defined. | |
if ( server.keyExists( taskThreadID ) ) { | |
return; | |
} | |
// When we start the TASK THREAD, Lucee will serialize the data associated | |
// with the thread and then store it to disk (as far as I can tell). Lucee | |
// then deserializes the task data and runs the task in the background based | |
// on the retry intervals. As such, the TASK THREAD runs COMPLETELY OUTSIDE | |
// THE CURRENT APPLICATION CONTEXT of our main application. To wire these two | |
// contexts back together, we're going to use the SERVER SCOPE as the | |
// communication tunnel. And, in order to do that, we have to keep track of | |
// the APPLICATION CONTEXT so that we can re-attach the TASK THREAD to the | |
// current application during its execution. | |
// -- | |
// READ MORE: https://dev.lucee.org/t/using-native-java-threading-in-lucee/471/14 | |
server[ taskThreadID ] = { | |
runner: this, | |
applicationContext: getPageContext().getApplicationContext() | |
}; | |
} // END: Task-lock. | |
// The TASK THREAD is going to use an internal loop that MOSTLY keeps it going. | |
// However, if it crashes, we need to have Lucee try to restart it. That is what | |
// the retryInterval is for. Since we want this to be a "persistent" background | |
// thread, we want the retries to converge of "infinite". | |
var retryInterval = [{ | |
tries: createObject( "java", "java.lang.Integer" ).MAX_VALUE, | |
interval: createTimeSpan( 0, 0, 0, 5 ) | |
}]; | |
thread | |
type = "task" | |
name = "task.#taskThreadID#" | |
retryInterval = retryInterval | |
taskThreadID = taskThreadID | |
sleepDuration = sleepDuration | |
{ | |
// Since the TASK THREAD doesn't have access to the parent VARIABLE scope | |
// (remember this is running completely outside the application), we need to | |
// give the thread its own version of the debug() method. | |
function taskDebug( required string message ) { | |
systemOutput( "[ Task Thread ]: #message#", true, true ); | |
} | |
// CAUTION: Errors in a Task Thread seem to get swallowed-up by the ether if | |
// they are not explicitly caught and logged! | |
try { | |
var runner = server[ taskThreadID ].runner; | |
var applicationContext = server[ taskThreadID ].applicationContext; | |
// The TASK THREAD runs in a completely separate application context / | |
// page context. As such, in order to call the following "Runner" in an | |
// expected state, we have to re-attach the current TASK application | |
// context to the application context in which the Runner was defined. | |
getPageContext().setApplicationContext( applicationContext ); | |
// Since we want to implement persistent background threads, we just | |
// want to keep looping here forever. And, if this thread crashes, Lucee | |
// will restart it based on the retryInterval. | |
while ( true ) { | |
// If the server key for this background processor has been deleted, | |
// it means the background processing is being shut-down. As such, | |
// let's exit out of this Task thread. | |
if ( ! server.keyExists( taskThreadID ) ) { | |
return; | |
} | |
try { | |
runner._ensureRunnablesAreRunning(); | |
} catch ( any runnerError ) { | |
taskDebug( "Error while ensuring runnables: #serializeJson( runnerError )#" ); | |
} | |
sleep( sleepDuration ); | |
} | |
} catch ( any taskError ) { | |
taskDebug( "Error while wiring-up task thread internals: #serializeJson( taskError )#" ); | |
} | |
} // END: CFThread. | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
component | |
output = false | |
hint = "I am a persistent background consumer that processes messages from a specific Redis list." | |
{ | |
/** | |
* I initialize the background Consumer using the given Redis pool. This Consumer | |
* exposes a run() method which is intended to run indefinitely in the background of | |
* the application. | |
* | |
* @redisPool I am the Redis connection pool. | |
*/ | |
public void function init( required any redisPool ) { | |
variables.redisPool = arguments.redisPool; | |
// I determine if the Consumer should continue processing the Redis list. This | |
// flag allows us to halt the background processing after it has begun. | |
isRunning = false; | |
// This is the Redis KEY that we will be monitoring and consuming. | |
redisListKey = "ben.demo"; | |
} | |
// --- | |
// PUBLIC METHODS. | |
// --- | |
/** | |
* I kick-off the Consumer processing. This method is intended to run indefinitely | |
* (or, at least until the stop() method is called). | |
*/ | |
public void function run() { | |
isRunning = true; | |
// The point of the Consumer is to run indefinitely, looking for and processing | |
// values in the given Redis list. As such, we're going to keep looping until the | |
// running-flag is turned-off. | |
while ( isRunning ) { | |
debug( "Looking for messages in Redis (blocking operation)." ); | |
// The blpop (Blocking List Pop) command will block the current thread while | |
// waiting for a list-item to become available. If it reaches the timeout | |
// (which is 5-seconds in this case) without finding a list, a NULL value | |
// will be returned. | |
var result = redisPool.withRedis( | |
( redis ) => { | |
return( redis.blpop( 5, [ redisListKey ] ) ); | |
} | |
); | |
// No list item was found within the timeout. | |
if ( isNull( result ) ) { | |
debug( "No messages found after block-timeout." ); | |
// A list item was found! Woot! | |
} else { | |
var listName = result[ 1 ]; | |
var listItemValue = result[ 2 ]; | |
debug( "Message found:" ); | |
debug( "" ); | |
debug( " #listItemValue#" ); | |
debug( "" ); | |
// FOR THE DEMO, to make things a little more interesting, we're going | |
// to crash the Consumer thread if the list value was "crash". This will | |
// give the background-task an opportunity to restart it and make things | |
// a bit more fun :D | |
if ( listItemValue == "crash" ) { | |
throw( type = "OopsCrash", message = "UnexpectedValue" ); | |
} | |
} | |
} // While-loop. | |
debug( "Messaging processing stopped." ); | |
} | |
/** | |
* I stop the background processing of the Consumer. | |
*/ | |
public void function stop() { | |
isRunning = false; | |
debug( "Stopping processing of the Redis list." ); | |
} | |
// --- | |
// PRIVATE METHODS. | |
// --- | |
/** | |
* I log the given message to the server error-output stream. | |
* | |
* @message I am message being logged. | |
*/ | |
private void function debug( required string message ) { | |
systemOutput( "[ Consumer ]: #message#", true, true ); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<cfscript> | |
param name="form.message" type="string" default=""; | |
// If there is a message, let's push it into the Redis list. | |
if ( form.message.len() ) { | |
application.redisPool.withRedis( | |
( redis ) => { | |
redis.rpush( "ben.demo", [ form.message ] ); | |
} | |
); | |
} | |
</cfscript> | |
<cfoutput> | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<link rel="stylesheet" type="text/css" href="./producer.css"> | |
</head> | |
<body> | |
<h1> | |
Produce a Message | |
</h1> | |
<form | |
method="post" | |
action="#encodeForHtmlAttribute( cgi.script_name )#"> | |
<strong>Message</strong>: | |
<input | |
type="text" | |
name="message" | |
size="30" | |
autofocus | |
autocomplete="off" | |
/> | |
<button type="submit"> | |
Send Message | |
</button> | |
</form> | |
<p> | |
<a href="./stop.cfm">Stop application</a> | |
</p> | |
</body> | |
</html> | |
</cfoutput> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
component | |
output = false | |
hint = "I provide a simple, callback-based wrapper for the Jedis connection pool." | |
{ | |
/** | |
* I initialize the Jedis connection pool to connect to the given Redis host. | |
* | |
* @redisHost I am the Redis host name. | |
* @redisPort I am the Redis port being monitored (probably 6379). | |
*/ | |
public void function init( | |
required string redisHost, | |
required numeric redisPort | |
) { | |
// In Lucee CFML, we can provide a set of custom JAR files to the createObject() | |
// function that will create an isolated Java Class Loader. This is so awesome! | |
jedisJarPaths = [ | |
expandPath( "/vendor/apache/commons-pool2-2.2.jar" ), | |
expandPath( "/vendor/jedis/lib/jedis-2.6.0.jar" ) | |
]; | |
jedisPoolConfig = javaNew( "redis.clients.jedis.JedisPoolConfig" ) | |
.init() | |
; | |
jedisPool = javaNew( "redis.clients.jedis.JedisPool" ) | |
.init( jedisPoolConfig, redisHost, redisPort ) | |
; | |
} | |
// --- | |
// PUBLIC METHODS. | |
// --- | |
/** | |
* I get a Jedis connection (to the Redis database) and pass it the given callback. | |
* Any data returned from the callback execution is propagated back up to the calling | |
* context. The connection to Redis is managed transparently to the callback. | |
* | |
* @callback I am the callback function to which the Redis connection is passed. | |
*/ | |
public any function withRedis( required function callback ) { | |
var redis = jedisPool.getResource(); | |
try { | |
return( callback( redis ) ); | |
} finally { | |
redis?.close(); | |
} | |
} | |
// --- | |
// PRIVATE METHODS. | |
// --- | |
/** | |
* I create the Java Class proxy with the given name (using the custom JAR files as | |
* the source code). | |
* | |
* @className I am the Java Class proxy to create. | |
*/ | |
private any function javaNew( required string className ) { | |
return( createObject( "java", className, jedisJarPaths ) ); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ Application ]: Bootstrapping Lucee CFML application. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Starting runnable with ID 2f | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: hello | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: world | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: how | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: is | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: it | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: going? | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: crash | |
[ Consumer ]: | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Runnable ended in error: | |
{"Message":"UnexpectedValue" ..... truncated for blog .....} | |
[ BackgroundRunner ]: Starting runnable with ID 2f | |
[ Consumer ]: Looking for messages in Redis (blocking operation). | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ BackgroundRunner ]: Ensuring runnables are running. | |
[ Consumer ]: Message found: | |
[ Consumer ]: | |
[ Consumer ]: woot | |
[ Consumer ]: | |
[ Consumer ]: Looking for messages in Redis (blocking operation). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment