Skip to content

Instantly share code, notes, and snippets.

@jerieljan
Created Aug 7, 2014
Embed
What would you like to do?
Dealing with long-running, processor-intensive tasks happen, and sometimes, requests for the same tasks get triggered and your system ends up attempting to process the -same- thing multiple times. JobManager is an attempt to resolve that by taking note of each Task<Result> as Jobs, which takes care of assigning future reattempts into existing Jo…
/*
* Job
* v1.0
* June 18, 2014
*
* Copyright 2014, Jeriel Jan del Prado
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jerieljan.app.jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A Job wraps around a {@link Task}
* Jobs are designed to be pointed by multiple sources that require the result of a single task.
*
* @see Task
* @param <T> the expected result type of the Task.
*
* @author jerieljan
*/
public class Job<T> {
public static final int SLEEP_INTERVAL = 250;
private static final Logger log = LoggerFactory.getLogger(Job.class);
private final Task<T> ongoingJob;
private final Object lock = new Object();
private AtomicBoolean isRunning = new AtomicBoolean(false);
private AtomicBoolean isFinished = new AtomicBoolean(false);
private T result = null;
private Exception caughtException = null;
/**
* Creates a new Job with the given Task.
*
* Note that it is advised to use
* {@link JobManager#newJob(Task)} to
* create new Jobs within the context of a JobManager.
*
* @param taskToPerform the task to perform.
*/
public Job(Task<T> taskToPerform) {
this.ongoingJob = taskToPerform;
}
/**
* Starts the task within the job.
* Contrary to its name, this method is marked private.
* Use {@link #get()} to start the inner Task.
*
* @return the result of the task.
* @throws Exception any exceptions encountered in the task.
*/
private T start() throws Exception {
T result = ongoingJob.call();
synchronized (lock) {
isRunning.set(false);
isFinished.set(true);
}
return result;
}
/**
* Gets the Task ID of this Job.
*
* @return task id
*/
public String getTaskIdentifier() {
return ongoingJob.getTaskIdentifier();
}
/**
* Performs the task included in the Job and returns the result.
*
* Get is designed to work when called by multiple classes simultaneously. Only the first thread will be accommodated to process the task.
* Once complete, all classes calling this method will receive the intended output.
*
* This method will block the currently running thread to those who call it, similar to Future's get() call.
*
* @return the result of the task.
* @throws Exception any exceptions encountered in the task.
*/
public T get() throws Exception {
//The running state and finished state determines whether the task has been performed or not.
//Atomicity is important so that other classes calling this method will be subjected to the loop trap below.
if (isRunning.compareAndSet(false, true) && !isFinished.get()) {
try {
result = start();
} catch (Exception inProcessException) {
//When an exception is caught, save its reference first before throwing again.
caughtException = inProcessException;
throw caughtException;
}
}
//This loop trap keeps other callers at bay while the task is currently being processed.
while (isRunning.get() && !isFinished.get()) {
//We throw Exceptions anyway, so we don't need to catch this.
log.trace("This job is currently being processed by another thread, waiting for result...");
Thread.sleep(SLEEP_INTERVAL);
}
//Once a result has been resolved, return it (or any exceptions that may have occurred.)
if (caughtException != null) {
throw caughtException;
}
return result;
}
public boolean isFinished() {
return isFinished.get();
}
}
/*
* JobManager
* v1.0
* June 18, 2014
*
* Copyright 2014, Jeriel Jan del Prado
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jerieljan.app.jobs;
import java.util.ArrayList;
import java.util.List;
/**
* JobManager maintains a pool of running {@link Job}s.
*/
public class JobManager {
private final List<Job> ongoingJobs;
private final List<Job> jobsToPurge;
private final Object lock = new Object();
/**
* Starts the Job Manager.
*/
public JobManager() {
ongoingJobs = new ArrayList<>();
jobsToPurge = new ArrayList<>();
}
/**
* Starts a new Job.
* If the job is already running, it will be reused.
*
* @param taskToPerform
* @param <T>
* @return
*/
public <T> Job newJob(Task<T> taskToPerform) {
Job runningJob = getRunningJob(taskToPerform.getTaskIdentifier());
//If the job is found, reuse it. Else, create it!
if (runningJob == null) {
runningJob = new Job<>(taskToPerform);
ongoingJobs.add(runningJob);
}
return runningJob;
}
/**
* Returns the running jobs (that are not yet finished) in the job pool.
* Note that this operation also expunges finished jobs, in order to clean the job pool.
*
* @param taskId the ID of the task to retrieve.
*
* @return
*/
private Job getRunningJob(String taskId) {
Job target = null;
synchronized (lock) {
for (Job runningJob : ongoingJobs) {
if (runningJob.isFinished()) {
jobsToPurge.add(runningJob);
} else if (runningJob.getTaskIdentifier().contentEquals(taskId)) {
target = runningJob;
break;
}
}
ongoingJobs.removeAll(jobsToPurge);
jobsToPurge.clear();
}
return target;
}
}
/*
* Task
* v1.0
* June 18, 2014
*
* Copyright 2014, Jeriel Jan del Prado
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jerieljan.app.jobs;
import java.util.concurrent.Callable;
/**
* A Task is a glorified {@link java.util.concurrent.Callable} that has an identifier.
* The identifier is used to compare multiple tasks together and to verify if they have the same parameters and tasks.
*
* @param <T>
*/
public abstract class Task<T> implements Callable<T> {
private final String taskIdentifier;
protected Task(String taskIdentifier) {
this.taskIdentifier = taskIdentifier;
}
/**
* A convenience static call to generate a task identifier, based on the parameters provided.
*
* @param objects
*
* @return
*/
public static String generateId(String methodName, Object... objects) {
String result = methodName + ": ";
for (Object object : objects) {
result = result.concat(object.toString());
}
return result;
}
/**
* Retrieves the task identifier of this Task.
* <p/>
* Note that task identifiers do not serve as unique identifiers. Task identifiers are used to
* compare tasks that perform the same parameters, similar to how MD5 hashes are used to
* validate file content equality.
*
* @return
*/
public String getTaskIdentifier() {
return taskIdentifier;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment