Skip to content

Instantly share code, notes, and snippets.

@enreeco
Forked from scottbcovert/!Queueable Apex
Created February 29, 2016 08:57
Show Gist options
  • Save enreeco/dcce839c57132e1b8483 to your computer and use it in GitHub Desktop.
Save enreeco/dcce839c57132e1b8483 to your computer and use it in GitHub Desktop.
Gist of Centralized Async Handling via Queueable Apex; for accompanying presentation see http://scottbcovert.github.io/queueable-apex NOTE: The following source alone will not compile as it is one piece of a larger Force.com development framework available at https://github.com/scottbcovert/Centralized-Salesforce-Dev-Framework
Gist of Centralized Async Handling via Queueable Apex
For accompanying presentation see http://scottbcovert.github.io/queueable-apex
NOTE: The following source alone will not compile as it is one piece of a larger Force.com development framework available at https://github.com/scottbcovert/Centralized-Salesforce-Dev-Framework
/**
* @author Scott Covert
* @date 2/21/2016
* @description Defines the Handler for making Callouts from Apex
*/
public class CalloutHandler {
/** Constant to represent the default callout time limit */
private static final Integer DEFAULT_CALLOUT_TIME = 10000;
/** Constant representing the conent type for JSON */
private static final String JSON_CONTENT_TYPE = 'application/json';
/** Constant representing the OK status for callout requests */
private static final String OK_STATUS = 'OK';
/** Constant representing the KO status for callout requests */
private static final String KO_STATUS = 'KO';
/** Constant representing the failed status for callout requests */
private static final String FAILED_STATUS = 'Failed';
/** Constant representing the queued status for callout requests */
public static final String QUEUED_STATUS = 'Queued';
/** Constant to represent the maximum callout time limit */
public static final Integer MAX_CALLOUT_TIME = 120000;
/** Constant representing the attachment name used to store callout requests */
public static final String REQUEST_NAME = 'request';
/** Constant representing the attachment name used to store callout responses */
public static final String RESPONSE_NAME = 'response';
/** Constant representing the GET HTTP Method */
public static final String GET_METHOD = 'GET';
/** Constant representing the POST HTTP Method */
public static final String POST_METHOD = 'POST';
/** Constant representing the PUT HTTP Method */
public static final String PUT_METHOD = 'PUT';
/** Constant representing the PATCH HTTP Method */
public static final String PATCH_METHOD = 'PATCH';
/** Constant representing the DELETE HTTP Method */
public static final String DELETE_METHOD = 'DELETE';
/** Constant representing the endpoint for callouts to use; over time additional endpoints can be stored in other member variables for different callout request types */
public static final String ENDPOINT = '';
/**
* @author Scott Covert
* @date 2/21/2016
* @description Executes Apex Callout Asynchronously
* @param Id Id of the callout request to be made
*/
@future(callout=true)
public static void sendAsyncCallout(Id crId)
{
// Verify kill switch is not enabled
if (ConfigService.KillSwitchEnabled) return;
// Query for Callout Request
CalloutRequest__c cr = [SELECT Id, AsyncRequest__c, Endpoint__c, Method__c, Timeout__c, Status__c, Sent__c FROM CalloutRequest__c WHERE Id = :crId LIMIT 1];
// Send Callout
sendCallout(cr);
}
/**
* @author Scott Covert
* @date 2/23/2016
* @description Executes Apex Callout Synchronously
* @param CalloutRequest__c Callout request to be made
*/
public static void sendCallout(CalloutRequest__c cr)
{
// Perform HTTP Callout
Long start = System.now().getTime();
try {
Http h = new Http();
HttpRequest request = new HttpRequest();
request.setMethod(cr.Method__c);
request.setEndpoint(cr.Endpoint__c);
request.setTimeout(calculateTimeout(cr));
list<Attachment> requestBody = [SELECT Id, CreatedDate, ParentId, Name, ContentType, Body FROM Attachment WHERE Name = :REQUEST_NAME AND ParentId = :cr.Id ORDER BY CreatedDate DESC LIMIT 1];
if (requestBody.size() > 0) request.setBody(requestBody[0].Body.toString());
HttpResponse response = h.send(request);
processCalloutResponse(response, cr);
}
catch (Exception ex){
processFailure(cr, ex);
}
updateCalloutRequest(cr, start);
}
/**
* @author Scott Covert
* @date 2/21/2016
* @description Creates Request Body Attachments for Callout Requests
* @param String Serialized JSON string to be used as HTTP request payload
* @param Id Corresponding CalloutRequest Id to attach request body to
*/
public static void createRequestBody(String jsonPayload, Id crId)
{
Attachment requestBodyAttachment = new Attachment(ParentId=crId,Name=REQUEST_NAME,Body=Blob.valueOf(jsonPayload),ContentType=JSON_CONTENT_TYPE);
insert requestBodyAttachment;
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Processes timeout to be used for HTTP callout and updates callout request record as needed
* @param CalloutRequest__c The callout request to be sent
* @return Integer Timeout (in milliseconds) to be used for HTTP callout
*/
private static Integer calculateTimeout(CalloutRequest__c cr)
{
if (cr.Timeout__c==null || cr.Timeout__c <= 0)
{
cr.Timeout__c = DEFAULT_CALLOUT_TIME / 1000;
return DEFAULT_CALLOUT_TIME;
}
else if ((cr.Timeout__c*1000) > MAX_CALLOUT_TIME)
{
cr.Timeout__c = MAX_CALLOUT_TIME / 1000;
return MAX_CALLOUT_TIME;
}
else return Integer.valueOf(cr.Timeout__c*1000);
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Processes HTTP callout responses and updates callout request fields accordingly
* @param HttpResponse Response of an HTTP callout
* @param CalloutRequest__c Callout request that was made
*/
private static void processCalloutResponse(HttpResponse response, CalloutRequest__c cr)
{
if (response.getStatusCode() == 200){
cr.Status__c = OK_STATUS;
}
else{
cr.Status__c = KO_STATUS;
}
// Create response body attachment
createResponseBody(response.getBody(),cr.Id);
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Processes failure encountered in attempting to perform HTTP callout
* @param CalloutRequest__c Callout request that should have been made
* @param Exception System Exception encountered while attempting to perform HTTP callout
*/
private static void processFailure(CalloutRequest__c cr, Exception ex)
{
cr.Status__c = FAILED_STATUS;
DiagnosticsInstrumentation.DebugException(ex);
if (DiagnosticsInstrumentation.CurrentLog()!=null) createResponseBody('Diagnostic Log\n' + DiagnosticsInstrumentation.CurrentLog(), cr.Id);
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Calculates callout duration and updates callout request
* @param CalloutRequest__c Callout request that was made/attempted
* @param Long Start time of callout request
*/
private static void updateCalloutRequest(CalloutRequest__c cr, Long start)
{
cr.Sent__c = System.now();
cr.Duration__c = (cr.Sent__c.getTime() - start) / 1000;
update cr;
}
/**
* @author Scott Covert
* @date 2/23/2016
* @description Creates Response Body Attachments for Callout Requests
* @param String HTTP response body
* @param Id Corresponding CalloutRequest Id to attach request body to
*/
private static void createResponseBody(String httpResponse, Id crId)
{
Attachment requestBodyAttachment = new Attachment(ParentId=crId,Name=RESPONSE_NAME,Body=Blob.valueOf(httpResponse),ContentType=JSON_CONTENT_TYPE);
insert requestBodyAttachment;
}
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Helper Methods for Sample Callout Handling; run via Dev Console with 'CalloutHelper.queueCallout();'
*/
public with sharing class CalloutHelper {
/**
* @author Scott Covert
* @date 2/24/2016
* @description Queues Asynchronous Callout
*/
public static void queueCallout(){
// HTTP Method defaults to GET
CalloutRequest__c cr = new CalloutRequest__c(Endpoint__c = CalloutHandler.ENDPOINT);
QueueableHandler.startCalloutJob(QueueableHandler.CALLOUT_TOKEN, cr);
// Comment above and uncomment below to have async request perform the callout itself asynchronously once processed
// QueueableHandler.startCalloutJob(QueueableHandler.ASYNC_TOKEN + QueueableHandler.CALLOUT_TOKEN, cr);
}
}
/**
* @author Scott Covert
* @date 2/21/2016
* @description Defines the interface for Queueable Apex
*/
public class QueueableDispatcher Implements Queueable, Database.AllowsCallouts {
/**
* @author Scott Covert
* @date 2/21/2016
* @description Queueable Apex interface
*/
public Interface IQueueableDispatched
{
void execute(QueueableContext sc);
}
/**
* @author Scott Covert
* @date 2/21/2016
* @description Executes QueueableHandler.
* @param QueueableContext Current queueable context
*/
public void execute(QueueableContext sc)
{
Type targettype = Type.forName('QueueableHandler');
if(targettype!=null) {
IQueueableDispatched obj = (IQueueableDispatched)targettype.NewInstance();
obj.execute(sc);
}
}
}
/**
* @author Scott Covert
* @date 2/21/2016
* @description Defines the Handler for Queueable Apex
*/
public class QueueableHandler implements QueueableDispatcher.IQueueableDispatched {
/** Stores whether or not the current transaction has already queued an async job for future processing */
public static Boolean asyncJobQueued = false;
/** Stores the id of the async job queued by this transaction */
private static Id queuedJobId;
/** Constant representing the token that signifies the queueable apex is to perform a callout */
public static final String CALLOUT_TOKEN = 'Callout:';
/** Constant representing the token that signifies the queueable apex job itself should be processed asynchronously */
public static final String ASYNC_TOKEN = 'Async';
/**
* @author Scott Covert
* @date 2/21/2016
* @description Execute Queueable Apex
* @param QueueableContext Current schedulable context
*/
public void execute(QueueableContext sc)
{
// Verify kill switch is not enabled
if (ConfigService.KillSwitchEnabled)
{
flushAsyncQueue();
return;
}
// Instantiate async request for later processing
AsyncRequest__c ar;
// Instantiate empty map to store processed async requests ready for deletion
Map<Id,AsyncRequest__c> arsToDelete = new Map<Id,AsyncRequest__c>();
try{
// Query for the earliest scheduled async request that still needs to be processed
List<AsyncRequest__c> asyncRequests = [SELECT Id, AsyncType__c, JobId__c, ScheduledTime__c, Params__c, Attempts__c, Aborted__c FROM AsyncRequest__c WHERE IsDeleted = false AND Aborted__c = false ORDER BY ScheduledTime__c ASC LIMIT 1];
// End chaining if all asynchronous requests have been or are being processed
if (asyncRequests.isEmpty()) return;
// Unfortunately the ORDER BY clause is not allowed while pessimistically locking rows in SOQL, so the query must be repeated to prevent concurrency issues
asyncRequests = [SELECT Id, AsyncType__c, JobId__c, ScheduledTime__c, Params__c, Attempts__c, Aborted__c FROM AsyncRequest__c WHERE Aborted__c = false AND Id = :asyncRequests[0].Id LIMIT 1 FOR UPDATE];
// Since the first SOQL query ordered by scheduled time was not row locking we must verify again that asyncRequests is not empty
if (asyncRequests.isEmpty()) return;
// Set async request to be processed
ar = asyncRequests[0];
// Process async request
processAsyncRequest(ar);
// Mark async request for deletion
arsToDelete.put(ar.Id,ar);
}
catch(Exception ex){
// Long-running processes could mean locking errors are hit in lieu of concurrency errors; additionally a runtime error could occur while processing the async request
// In this case the async request record will not be marked for deletion; leaving the job at the beginning of the queue to be processed again later
DiagnosticsInstrumentation.DebugException(ex);
}
// Update/Delete Aync Request as needed
updateOrDeleteAsyncRequest(ar, arsToDelete);
// Chain job to process additional async requests
chainJob();
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Enqueues Queueable Apex Job
* @param String Type of asynchronous job to be queued
*/
public static AsyncRequest__c startJob(String asyncJobType)
{
// Only enqueue a new job as needed
if (!asyncJobQueued)
{
asyncJobQueued = true;
queuedJobId = System.enqueueJob(new QueueableDispatcher());
}
// Add the async request to the queue
AsyncRequest__c ar = new AsyncRequest__c(JobId__c = queuedJobId, AsyncType__c = asyncJobType);
insert ar;
return ar;
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Enqueues Queueable Apex Job
* @param String Type of asynchronous job to be queued
* @param CalloutRequest__c Callout request to be performed by the asynchronous job; should have HTTP request info populated but should not yet inserted to the database
*/
public static void startCalloutJob(String asyncJobType, CalloutRequest__c cr)
{
// Add the async request to the queue
AsyncRequest__c ar = startJob(asyncJobType);
// Insert a related callout request to store HTTP request/response info
cr.AsyncRequest__c = ar.Id;
insert cr;
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Flushes Async Queue
*/
private void flushAsyncQueue()
{
try
{
// Pessimistic locking SOQL query prevents other async jobs from processing async requests
Map<Id,AsyncRequest__c> asyncRequestsToFlush = new Map<Id,AsyncRequest__c>([SELECT Id, Aborted__c FROM AsyncRequest__c WHERE IsDeleted = false AND Aborted__c = false FOR UPDATE]);
if (!asyncRequestsToFlush.isEmpty())
{
for (AsyncRequest__c ar : asyncRequestsToFlush.values()) ar.Aborted__c = true;
Database.update(asyncRequestsToFlush.values());
}
}
catch(Exception ex)
{
// Locking error could occur if some async requests are currently being processed
// In this case, the other async job will chain itself once it's done and the queue will be flushed then
DiagnosticsInstrumentation.DebugException(ex);
}
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Processes async request based on type; this is where specific business logic will be added
* @param AsyncRequest__c Async request to be processed
*/
private void processAsyncRequest(AsyncRequest__c ar)
{
if (ar.AsyncType__c.contains(CALLOUT_TOKEN)){
// Initialize Callout Handler
List<CalloutRequest__c> calloutRequests = [SELECT Id, AsyncRequest__c, Endpoint__c, Method__c, Timeout__c, Status__c, Sent__c FROM CalloutRequest__c WHERE AsyncRequest__c = :ar.Id AND Status__c = :CalloutHandler.QUEUED_STATUS LIMIT 1];
if (!calloutRequests.isEmpty() && ar.AsyncType__c.contains(ASYNC_TOKEN)) CalloutHandler.sendAsyncCallout(calloutRequests[0].Id);
else if (!calloutRequests.isEmpty()) CalloutHandler.sendCallout(calloutRequests[0]);
}
}
/**
* @author Scott Covert
* @date 2/24/2016
* @description Increments async request attempts counter and aborts the job if needed or deletes the job altogether
* @param AsyncRequest__c Async request that was processed
* @param Map Map of Async Requests to be deleted from the queue
*/
private void updateOrDeleteAsyncRequest(AsyncRequest__c ar, Map<Id,AsyncRequest__c> arsToDelete)
{
// Increment attempts counter
ar.Attempts__c++;
// This async request has failed the maximum number of times allowed; abort
if (ar.Attempts__c >= ar.MaxAttempts__c) ar.Aborted__c = true;
// Only bother to update async request if it is not set to be deleted
// This also must be done after processing of the async request in case a synchronous callout is made
// (SF does not allow DML prior to a callout in the same transaction - http://bit.ly/1QeV6dC)
if (!arsToDelete.containsKey(ar.Id)) Database.update(ar);
if (!arsToDelete.isEmpty())
{
// Delete executed AsyncRequests and empty them from the recycling bin
Database.delete(arsToDelete.values());
Database.emptyRecycleBin(arsToDelete.values());
}
}
/**
* @author Scott Covert
* @date 2/22/2016
* @description Chains Queueable Apex
*/
@future
private static void chainJob()
{
// This method for chaining the queued apex job is itself asynchronous in case the completed job made a callout
// (Normally making a callout prevents chaining from being possible - http://sforce.co/1RnhPD9)
System.enqueueJob(new QueueableDispatcher());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment