Skip to content

Instantly share code, notes, and snippets.

@tshevchuk
Last active April 4, 2024 13:59
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tshevchuk/c6568601912843c50f97d965d4ce09e2 to your computer and use it in GitHub Desktop.
Save tshevchuk/c6568601912843c50f97d965d4ce09e2 to your computer and use it in GitHub Desktop.
Utility class to enqueue Queueable Jobs.

Have you ever met the Too many queueable jobs added to the queue: 2 exception? In such case this little framework can be interesting for you.

Utility class to enqueue Queueable Jobs. It is intended to be used as a replacement for System.enqueJob() method. Advantages in comparison with the system method:

  • Allows to enqueue Queueable job even if you have run out of governon limit on number of Queueable jobs. It can be helpful in case you need to enqueue multiple Queueable jobs from Apex Batch Job, future method, or Queueable Job
  • Better error handling. Error message contains full stack trace, there is possibility to retry eecution of failed Queueable Jobs.

The Queueable Job should support serialization to JSON string. JSON.serializePretty() and JSON.deserialize() are used to serialize the job to Custom Field Params__c on Async_Request__c

The Queueable Jobs should be derived from BaseQueueable abstract class. In case you need to enqueue some existing job, you can create QueueableWrapper which is derived from BaseQueueable. It should take the Queueable job as a constructor parameter.

How to use it?

Create Apex class which is derived from the BaseQueueable

public with sharing class MyQueueable extends BaseQueueable {
    public override void doExecute(QueueableContext context) {
        System.debug('doExecute()');
    }
}

Enqueue the Queueable class:

List<BaseQueueable> queueables = new List<BaseQueueable> {
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
    new MyQueueable()
};
QueueableManager.enqueue(queueables);

It is recommended to derive all Queueable Jobs in your project from BaseQueueable class and use QueueableManager to start the new Queueable jobs. In such case there will higher chances that some specific Queueable Job will be picked up by the framework even if limits for Queueable Job are exceeded in current transaction.

To restart failed jobs, you can change status on corresponding Async Request records from Error to null and execute QueueableManager.enqueueNextJob(); method from Anonymous Apex. It is important that either CreatedById or LastModifiedById on the records is the same as Id of the user which executes the script above.

Possible Ways of Improvements

  • The framework serializes BaseQueueable to JSON format to be able to restore it later and execute corresponding Queueable Job. The length of the serialized JSON is limited by Async_Request__c.Params__c field which is 131KB. Possible workaround is to change the implementation so the large serialized JSON is written to file instead of Large Text Area field.
  • Implement alternative approaches to enqueue Async Tasks even if limits for Queueable Jobs exceeded. Possible option is to create static Boolean variable hasEnqueuedAnyJobInCurrentTransaction inside QueueableManager to indicate if QueueableManager was used previously to enqueue any job. In case limits for Queueable Jobs exceeded and the framework was not called previously, than either future method or Schedulable can be used as a backup mechanism to enqueue the jobs under current user. Alternative approach is using Transaction Finalizers (Beta) for it.
  • Implement Schedulable job to monitor status of Async Requests and send corresponding email
  • Use Transaction Finalizers (Beta) for error handling, enqueuening the next Queueable Job from Async Request, removing Async Request record for successfully completed Queueable Job.

References

Implementation ideas were taken from:

Custom Object with the following fields:
* Apex_Class_Name__c - Text(255)
* Params__c - Long Text Area(131072)
* Status__c - Picklist (New, Enqueued, Error)
* Async_Apex_Job_Id__c - Text(18) (Unique Case Insensitive)
* Error_Message__c - Long Text Area(32768)
/**
* @description Abstract class to be used for Queueable Jobs implementation. It removes completed Async_Request__c
* records, populates detailed error message for failed jobs, and enqueues next pending Queueable jobs
*/
public abstract with sharing class BaseQueueable implements Queueable {
public void execute(QueueableContext context) {
Savepoint savepoint = Database.setSavepoint();
try {
doExecute(context);
delete [SELECT Id FROM Async_Request__c WHERE Async_Apex_Job_Id__c = :context.getJobId()];
} catch (Exception ex) {
Database.rollback(savepoint);
Async_Request__c asyncRequest =
[SELECT Id FROM Async_Request__c WHERE Async_Apex_Job_Id__c = :context.getJobId()];
asyncRequest.Status__c = 'Error';
asyncRequest.Error_Message__c =
ex.getTypeName() + ': ' + ex.getMessage() + '; cause: ' + ex.getCause() + '; line number: '
+ ex.getLineNumber() + '; stack trace: ' + ex.getStackTraceString();
update asyncRequest;
}
QueueableManager.enqueueNextJob();
}
/**
* @description This method should be overridden and implemented instead of execute() method of Queueable job.
*/
public abstract void doExecute(QueueableContext context);
}
/**
* @description Utility class to enqueue Queueable Jobs. It is intended to be used as a replacement for
* System.enqueJob() method. Advantages in comparison with the system method:
* * Allows to enqueue Queueable job even if you have ran out of governon limit on number of Queueable jobs. It can
* be helpful in case you need to enqueue multiple Queueable jobs from Apex Batch Job, future method, or Queueable Job
* * Better error handling. Error message contains full stack trace
*
* The Queueable Job should support serialization to JSON string. JSON.serializePretty() and JSON.deserialize() are used
* to serialize the job to Custom Field Params__c on Async_Request__c
*
* The Queueable Jobs should be derived from BaseQueueable abstract class. In case you need to enqueue some existing job,
* you can create QueueableWrapper which is derived from BaseQueueable. It should take the Queueable job as a
* constructor parameter.
*/
public with sharing class QueueableManager {
public static void enqueue(BaseQueueable queueable) {
enqueue(new List<BaseQueueable>{queueable});
}
public static void enqueue(List<BaseQueueable> queueables) {
List<Async_Request__c> asyncRequestsToInsert = new List<Async_Request__c>();
Integer availableLimit = Limits.getLimitQueueableJobs() - Limits.getQueueableJobs();
for (BaseQueueable queueable : queueables) {
Async_Request__c asyncRequest = new Async_Request__c(
Apex_Class_Name__c = getObjectType(queueable),
Params__c = JSON.serializePretty(queueable)
);
if (availableLimit > 0) {
asyncRequest.Async_Apex_Job_Id__c = System.enqueueJob(queueable);
asyncRequest.Status__c = 'Enqueued';
availableLimit--;
} else {
asyncRequest.Status__c = 'New';
}
asyncRequestsToInsert.add(asyncRequest);
}
insert asyncRequestsToInsert;
}
/**
* @description Enqueues the next Queueable Job which is serialized to Async_Request__c object. It is
* intended to be called from execute() method when logic of current Queueable Job is completed.
*/
public static void enqueueNextJob() {
if (
Limits.getLimitQueueableJobs() == Limits.getQueueableJobs()
|| Limits.getLimitQueries() == Limits.getQueries()
|| Limits.getLimitQueryRows() == Limits.getQueryRows()
|| Limits.getLimitDmlRows() <= Limits.getDmlRows() + 3
|| Limits.getLimitDmlStatements() <= Limits.getDmlStatements() + 3
|| Limits.getLimitCpuTime() <= Limits.getCpuTime() + 500
) {
return;
}
Savepoint savepoint;
try {
Id currentUserId = UserInfo.getUserId();
List<Async_Request__c> asyncRequests =
[
SELECT Apex_Class_Name__c, Params__c
FROM Async_Request__c
WHERE Status__c = 'New' AND (CreatedById = :currentUserId OR LastModifiedById = :currentUserId)
LIMIT 1
FOR UPDATE
]
;
if (asyncRequests.isEmpty()) {
return;
}
Async_Request__c asyncRequest = asyncRequests[0];
BaseQueueable queueable;
try {
queueable =
(BaseQueueable) JSON.deserialize(
asyncRequest.Params__c, Type.forName(asyncRequest.Apex_Class_Name__c)
)
;
} catch (Exception ex) {
asyncRequest.Status__c = 'Error';
asyncRequest.Error_Message__c =
ex.getTypeName() + ': ' + ex.getMessage() + '; cause: ' + ex.getCause() + '; line number: '
+ ex.getLineNumber() + '; stack trace: ' + ex.getStackTraceString()
;
update asyncRequest;
enqueueNextJob();
return;
}
savepoint = Database.setSavepoint();
Id jobId = System.enqueueJob(queueable);
asyncRequest.Status__c = 'Enqueued';
asyncRequest.Async_Apex_Job_Id__c = jobId;
update asyncRequest;
} catch(Exception ex) {
if (savepoint != null) {
Database.rollback(savepoint);
}
System.debug(
ex.getTypeName() + ': ' + ex.getMessage() + '; cause: ' + ex.getCause() + '; line number: '
+ ex.getLineNumber() + '; stack trace string: ' + ex.getStackTraceString()
);
}
}
/**
* @description Workaround to get object type as a string dynamically
* Source: https://salesforce.stackexchange.com/a/219558/47929
*/
private static String getObjectType(Object obj) {
String result = 'DateTime';
try {
DateTime typeCheck = (DateTime) obj;
}
catch(System.TypeException typeException) {
String message = typeException.getMessage().substringAfter('Invalid conversion from runtime type ');
result = message.substringBefore(' to Datetime');
}
return result;
}
}
@MichaelPaisner
Copy link

MichaelPaisner commented Jan 24, 2023

Hi,
Love the Async_Request__c approach. I've implemented it, and it works really well most of the time. One particular use case did cause an issue. The "FOR UPDATE" flag didn't prevent two threads from grabbing the same record which resulted in two processes performing the same operation.

To address this, we added a thread attribute in order to prevent this from happening. Here are the additions and changes:

Async_Request__c Object:
Added a new number field: Thread__c

BaseQueueable class:
NEW: public integer thread;
CHANGE:
WAS: QueueableManager.enqueueNextJob();
IS: QueueableManager.enqueueNextJob(this.thread);

QueueableManager Class:
CHANGED for loop within enqueue method:

    integer threadCount = 0;
    integer threadLimit = availableLimit;
    for (BaseQueueable queueable : queueables) {
        queueable.thread = threadCount;
        String paramsJSON = JSON.serialize(queueable);
        Async_Request__c asyncRequest = new Async_Request__c(
            Apex_Class_Name__c = getObjectType(queueable),
            Params__c = paramsJSON.length() > 131072 ? paramsJSON.substring(0,131071) : paramsJSON,
            Thread__c = threadCount
        );
        if (availableLimit > 0) {
            asyncRequest.Async_Apex_Job_Id__c = System.enqueueJob(queueable);
            asyncRequest.Status__c = 'Enqueued';
            availableLimit--;
        }
        asyncRequestsToInsert.add(asyncRequest);
        threadCount++;
        if (threadCount >= threadLimit) threadCount = 0;

    }
    insert asyncRequestsToInsert;

CHANGED enqueueNextJob Method:

public static void enqueueNextJob(Integer currentThread) {

            List<Async_Request__c> asyncRequests =
                [
                    SELECT Apex_Class_Name__c, Params__c, Thread__c
                    FROM Async_Request__c
                    WHERE Status__c = 'New' AND (CreatedById = :currentUserId OR LastModifiedById = :currentUserId) AND
                        Thread__c =: currentThread
                    LIMIT 1
                    FOR UPDATE
                ];
            Async_Request__c asyncRequest = asyncRequests[0];
            BaseQueueable queueable;
            try {
                queueable =
                    (BaseQueueable) JSON.deserialize(
                        asyncRequest.Params__c, Type.forName(asyncRequest.Apex_Class_Name__c)
                    )
                ;
                queueable.thread = currentThread;
            } catch (Exception ex) {
                asyncRequest.Status__c = 'Error';
                asyncRequest.Error_Message__c =
                    ex.getTypeName() + ': ' + ex.getMessage() + '; cause: ' + ex.getCause() + '; line number: '
                        + ex.getLineNumber() + '; stack trace: ' + ex.getStackTraceString()
                ;
                update asyncRequest;
                
                enqueueNextJob(currentThread);
                return;
            }

I thought the "FOR UPDATE" operation would prevent this type of issue, but it doesn't seem to be working for us.

With the above code changes, now as each queueable job finishes, it only grabs the next job with the same thread number. This prevents multiple processes from grabbing the same record.

Hope this helps!
Mike

@trineshsfdc
Copy link

@MichaelPaisner @tshevchuk Can we have a sample to see how it can be called from a Trigger.? Its urgent pl.

@trineshsfdc
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment