Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tapestry Workers & Tasks
package com.anjlab.spelling.web.services.workers;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.tapestry5.ioc.Messages;
import org.apache.tapestry5.ioc.ReloadAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.anjlab.spelling.web.entities.Task;
import com.anjlab.spelling.web.services.atmosphere.WebSocketProgressMonitor;
import com.anjlab.spelling.web.services.managers.TaskManager;
public abstract class AbstractWorker implements Worker, ReloadAware {
@Inject
private volatile Messages messages;
private volatile WorkerStatus status = WorkerStatus.IDLE;
private volatile boolean paused;
protected volatile boolean taskCanceled;
protected volatile Task currentTask;
@Inject
protected volatile TaskManager taskManager;
protected volatile WebSocketProgressMonitor progressMonitor;
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
private WorkerRoutine routine;
public void setRoutine(WorkerRoutine routine) {
this.routine = routine;
}
public boolean shutdownImplementationForReload()
{
// Support live class reloading during development cycles
if (owningThread != null)
{
routine.workerReloading();
owningThread.interrupt();
}
return true;
}
private volatile Thread owningThread;
private static final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
public WorkerStatus getStatus() {
return status;
}
public WebSocketProgressMonitor getProgressMonitor() {
return progressMonitor;
}
private volatile boolean shuttingDown = false;
public void shutdown()
{
shuttingDown = true;
if (owningThread != null)
{
owningThread.interrupt();
}
}
public boolean isShuttingDown()
{
return shuttingDown;
}
public void checkForNewTask()
{
try {
queue.offer(new Object(), 100, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
logger.error("Interrupted exception", e);
}
}
public boolean togglePaused()
{
paused = !paused;
return paused;
}
public boolean cancelTask(int taskId)
{
Task task = currentTask;
boolean cancelingCurrentTask = task != null && task.getId() == taskId;
if (cancelingCurrentTask && this.taskCanceled)
{
internalCancelTask();
return true;
}
if (!cancelingCurrentTask)
{
return false;
}
internalCancelTask();
return true;
}
private void internalCancelTask()
{
taskCanceled = true;
if (owningThread != null)
{
owningThread.interrupt();
}
}
public boolean isTaskCancelled() {
return taskCanceled;
}
public Task currentTask() {
return currentTask;
}
public void cleanup()
{
// Clear interrupted state
Thread.interrupted();
taskCanceled = false;
progressMonitor = null;
currentTask = null;
if (owningThread == null) {
owningThread = Thread.currentThread();
owningThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaugth exception in worker", e);
}
});
}
}
public void run() throws InterruptedException, RuntimeException
{
while (paused || (!taskCanceled && (currentTask = findTask()) == null))
{
status = paused ? WorkerStatus.PAUSED : WorkerStatus.IDLE;
// Ignore result. Queue only used for synchronization purposes
queue.take();
}
if (taskCanceled)
{
return;
}
status = WorkerStatus.RUNNING;
try {
progressMonitor = new WebSocketProgressMonitor(currentTask, messages, taskManager);
progressMonitor.pending();
runTask();
taskManager.deleteTask(currentTask.getId());
}
catch (Throwable e)
{
if (e instanceof InterruptedException)
{
if (currentTask != null)
{
taskManager.deleteTask(currentTask.getId());
}
throw (InterruptedException) e;
}
if (currentTask != null)
{
taskManager.markAsError(currentTask.getId(), e);
}
throw new RuntimeException(e);
}
finally
{
if (progressMonitor != null)
{
progressMonitor.destroy();
}
}
}
protected abstract void runTask() throws InterruptedException;
private Task findTask()
{
// OptimisticLockException may be thrown on commit?
return taskManager.findTaskForWorker(getTaskType());
}
public void onExit()
{
this.status = WorkerStatus.DOWN;
}
}
package com.anjlab.spelling.web.services;
// Imports omitted
public class AppModule
{
@Contribute(WorkerManager.class)
public static void defineWorkers(Configuration<Worker> conf)
{
conf.addInstance(GitPullWorker.class);
conf.addInstance(GitCloneWorker.class);
conf.addInstance(SpellcheckWorker.class);
conf.addInstance(CorrectionsWorker.class);
conf.addInstance(GitPullRequestWorker.class);
}
@Startup
public static void startup(ParallelExecutor executor,
final WorkerManager workerManager,
final Logger logger,
TaskManager taskManager,
@Symbol(START_WORKERS) boolean startWorkers,
RegistryShutdownHub shutdownHub)
{
if (!startWorkers)
{
return;
}
shutdownHub.addRegistryWillShutdownListener(new Runnable()
{
public void run()
{
for (Worker worker : workerManager.workers())
{
worker.shutdown();
}
}
});
for (final Worker worker : workerManager.workers())
{
executor.invoke(new WorkerRoutine(logger, worker, taskManager, workerManager));
}
}
}
package com.anjlab.spelling.web.entities;
import gumi.builders.UrlBuilder;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.ManyToOne;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.persistence.Version;
import org.apache.tapestry5.ioc.Messages;
@Entity
public class Task {
public static final String TASK_PARAMETER_OAUTH2_TOKEN = "oAuth2Token";
public static final String TASK_PARAMETER_TASK_COMPLETED_EVENT_URL = "taskCompletedEventUrl";
public static final String COMMITTER_NAME = "committerName";
public static final String COMMITTER_EMAIL = "committerEmail";
public static final String TASK_PARAMETER_NOTIFY_GITHUB_ABOUT_NEW_TYPOS = "notifyAboutNewTypos";
public static final String TASK_PARAMETER_PULL_REQUEST_MESSAGE = "pullRequestMessage";
public enum TaskStatus {
PENDING,
RUNNING,
ERROR,
CANCELED,
COMPLETED
}
public enum TaskType {
GIT_CLONE,
GIT_PULL,
SPELLCHECK,
BRANCH_CORRECTIONS,
GIT_PULL_REQUEST
}
@Id @GeneratedValue(strategy=GenerationType.TABLE)
private Integer id;
@Version
private int version;
@Temporal(TemporalType.TIMESTAMP)
@Column(name="created_at", nullable = false)
private Date createdAt;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private TaskStatus status;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private TaskType type;
@Temporal(TemporalType.TIMESTAMP)
@Column(name="started_at")
private Date startedAt;
@ManyToOne(fetch=FetchType.LAZY, optional=false)
private Project project;
@Column(length=4096)
private String parameters;
@Column(name="status_message")
private String statusMessage;
public Task() {
createdAt = new Date();
status = TaskStatus.PENDING;
}
public Task(Project project, TaskType type)
{
this();
this.project = project;
this.type = type;
}
public Date getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Date createdAt) {
this.createdAt = createdAt;
}
public Integer getId() {
return id;
}
public TaskStatus getStatus() {
return status;
}
public void setStatus(TaskStatus status) {
this.status = status;
}
public Project getProject() {
return project;
}
public void setProject(Project project) {
this.project = project;
}
public TaskType getType() {
return type;
}
public void setType(TaskType type) {
this.type = type;
}
public void setStartedAt(Date startedAt) {
this.startedAt = startedAt;
}
public Date getStartedAt() {
return startedAt;
}
public String getParameters() {
return parameters;
}
void setParameters(String parameters) {
this.parameters = parameters;
}
public void addParameter(String kay, String value)
{
String params = getParameters();
UrlBuilder builder =
UrlBuilder.fromString(params == null ? "task://" : params, paramsCharset())
.addParameter(kay, value);
setParameters(builder.toString());
}
private Charset paramsCharset() {
return Charset.forName("UTF-8");
}
public String getParameter(String key)
{
String params = getParameters();
if (params == null)
{
return null;
}
List<String> list = UrlBuilder.fromString(params, paramsCharset()).queryParameters.get(key);
return list == null || list.size() == 0 ? null : list.get(0);
}
public boolean hasParameter(String key)
{
String params = getParameters();
if (params == null)
{
return false;
}
return UrlBuilder.fromString(params, paramsCharset()).queryParameters.containsKey(key);
}
public String getTitle(Messages messages) {
return messages.get(getType().toString());
}
public String getActivityTitle(Messages messages) {
StringBuilder activityTitle = new StringBuilder(
messages.format("TASK_" + getStatus().toString(), getTitle(messages)));
if (getStatusMessage() != null)
{
activityTitle.append(": ").append(getStatusMessage());
}
return activityTitle.toString();
}
public boolean isInErrorState()
{
return TaskStatus.ERROR == getStatus();
}
public boolean isInPendingState()
{
return TaskStatus.PENDING == getStatus();
}
public boolean isInRunningState()
{
return TaskStatus.RUNNING == getStatus();
}
public boolean isInFiniteState() {
switch (getStatus()) {
case ERROR:
case COMPLETED:
case CANCELED:
return true;
default:
return false;
}
}
public void setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
}
public String getStatusMessage() {
return statusMessage;
}
@Override
public String toString()
{
return MessageFormat.format(
"Task(id={0}, type={1}, status={2})", id, type, status);
}
}
package com.anjlab.spelling.web.services.managers.impl;
import java.util.List;
import org.apache.tapestry5.ioc.Messages;
import org.apache.tapestry5.ioc.annotations.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.anjlab.spelling.web.entities.Project;
import com.anjlab.spelling.web.entities.Task;
import com.anjlab.spelling.web.entities.Task.TaskStatus;
import com.anjlab.spelling.web.entities.Task.TaskType;
import com.anjlab.spelling.web.services.atmosphere.WebSocketProgressMonitor;
import com.anjlab.spelling.web.services.atmosphere.WebSocketProgressMonitor.ProgressEvent;
import com.anjlab.spelling.web.services.dao.TaskDAO;
import com.anjlab.spelling.web.services.managers.SecurityManager;
import com.anjlab.spelling.web.services.managers.TaskManager;
import com.anjlab.spelling.web.services.managers.TaskManager.TaskProgress.ActivityStatus;
import com.anjlab.spelling.web.services.managers.WorkerManager;
import com.anjlab.spelling.web.services.workers.Worker;
public class TaskManagerImpl implements TaskManager {
private static final Logger logger = LoggerFactory.getLogger(TaskManagerImpl.class);
@Inject private TaskDAO taskDAO;
@Inject private WorkerManager workerManager;
@Inject private Messages messages;
@Inject private SecurityManager securityManager;
public Task findTaskForWorker(TaskType taskType)
{
Task task = taskDAO.findTaskForWorker(taskType);
if (task == null)
{
return null;
}
if (!taskDAO.tryLockTask(task.getId()))
{
// Another task was scheduled for this project, or task was canceled, etc.
return null;
}
return task;
}
public void deleteTask(int taskId)
{
taskDAO.deleteTask(taskId);
}
public void markAsError(int taskId, Throwable t)
{
taskDAO.updateStatus(taskId, TaskStatus.ERROR, t.getMessage());
}
public void markAsCanceled(int taskId)
{
taskDAO.updateStatus(taskId, TaskStatus.CANCELED, null);
}
public boolean submitTask(Task newTask)
{
Project project = newTask.getProject();
if (project == null)
{
throw new IllegalArgumentException("No project specified for new task");
}
TaskType taskType = newTask.getType();
if (taskType == null)
{
throw new IllegalArgumentException("Task type not specified for new task");
}
if (isProjectHasTasksOfType(project.getId(), taskType))
{
// Prevent double submit
return false;
}
if (taskDAO.hasErrorTasks(project.getId()))
{
logger.warn("Task wasn't submitted because project {} has tasks in error status", project.getId());
return false;
}
if (!newTask.hasParameter(Task.TASK_PARAMETER_OAUTH2_TOKEN))
{
String oAuth2Token = securityManager.getCurrentUserGitHubOAuth2Token();
newTask.addParameter(Task.TASK_PARAMETER_OAUTH2_TOKEN, oAuth2Token);
}
taskDAO.saveTask(newTask);
return true;
}
public List<Task> findTasks(int projectId)
{
return taskDAO.findTasks(projectId);
}
public Task findById(int taskId)
{
return taskDAO.findById(taskId);
}
public void cleanupTasksAfterError(int projectId)
{
taskDAO.deleteTasksExceptErrorState(projectId);
}
public TaskProgress getTaskProgress(Task task)
{
Worker worker = workerManager.getWorker(task.getId());
if (worker == null)
{
return defaultProgress(task);
}
Task workerTask = worker.currentTask();
if (workerTask != null && workerTask.getId().equals(task.getId()))
{
WebSocketProgressMonitor progressMonitor = worker.getProgressMonitor();
if (progressMonitor == null)
{
return defaultProgress(task);
}
ProgressEvent lastEvent = progressMonitor.getLastEvent();
if (lastEvent == null)
{
return defaultProgress(task);
}
return new TaskProgress(lastEvent.getActivityProgress(), lastEvent.getActivityTitle(), ActivityStatus.RUNNING);
}
return defaultProgress(task);
}
private TaskProgress defaultProgress(Task task)
{
int progress = task.getStatus() == TaskStatus.ERROR ? 100 : 0;
ActivityStatus activityStatus = task.getStatus() == TaskStatus.RUNNING ? ActivityStatus.STUCK : ActivityStatus.RUNNING;
StringBuilder activityTitle = new StringBuilder(task.getActivityTitle(messages));
if (activityStatus != ActivityStatus.RUNNING)
{
activityTitle.append(" (").append(messages.get(activityStatus.toString())).append(")");
}
return new TaskProgress(progress, activityTitle.toString(), activityStatus);
}
public boolean isProjectHasTasksOfType(int projectId, TaskType taskType)
{
return taskDAO.hasTasksOfType(projectId, taskType);
}
public boolean hasErrorTasks(int projectId)
{
return taskDAO.hasErrorTasks(projectId);
}
public boolean hasAnyTasks(int projectId)
{
return taskDAO.hasAnyTasks(projectId);
}
public Task findTask(int projectId, TaskType taskType)
{
return taskDAO.findTask(projectId, taskType);
}
}
package com.anjlab.spelling.web.services.workers;
import com.anjlab.spelling.web.entities.Task;
import com.anjlab.spelling.web.entities.Task.TaskType;
import com.anjlab.spelling.web.services.atmosphere.WebSocketProgressMonitor;
public interface Worker {
public enum WorkerStatus {
IDLE, RUNNING, DOWN, PAUSED
}
WorkerStatus getStatus();
WebSocketProgressMonitor getProgressMonitor();
void run() throws InterruptedException;
Task currentTask();
boolean cancelTask(int taskId);
boolean isTaskCancelled();
void checkForNewTask();
TaskType getTaskType();
void cleanup();
void onExit();
void shutdown();
boolean isShuttingDown();
void setRoutine(WorkerRoutine routine);
boolean togglePaused();
}
package com.anjlab.spelling.web.services.managers.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.tapestry5.ioc.annotations.Inject;
import com.anjlab.spelling.web.entities.Task;
import com.anjlab.spelling.web.services.managers.TaskManager;
import com.anjlab.spelling.web.services.managers.WorkerManager;
import com.anjlab.spelling.web.services.workers.Worker;
public class WorkerManagerImpl implements WorkerManager {
@Inject
private TaskManager taskManager;
private final List<Worker> workers;
public WorkerManagerImpl(Collection<Worker> workers) {
this.workers = new ArrayList<Worker>();
for (Worker worker : workers) {
registerWorker(worker);
}
}
public void registerWorker(Worker worker) {
workers.add(worker);
}
public Iterable<Worker> workers() {
return workers;
}
public Worker getWorker(int taskId) {
for (Worker worker : workers) {
Task task = worker.currentTask();
if (task != null && task.getId() == taskId) {
return worker;
}
}
return null;
}
public void cancelTask(int taskId) {
boolean canceled = false;
Worker worker = getWorker(taskId);
if (worker != null) {
canceled = worker.cancelTask(taskId);
}
// In case if no worker running this task we should cancel it here
if (!canceled) {
taskManager.deleteTask(taskId);
} else {
taskManager.markAsCanceled(taskId);
}
}
public void submitTask(Task newTask)
{
if (taskManager.submitTask(newTask))
{
taskSubmitted(newTask);
}
}
private void taskSubmitted(Task task)
{
for (Worker worker : workers)
{
if (worker.getTaskType().equals(task.getType()))
{
checkForNewTask(worker);
}
}
}
public void taskCompleted(Worker worker)
{
for (Worker worker2 : workers)
{
if (worker2 != worker)
{
checkForNewTask(worker2);
}
}
}
public void togglePaused()
{
for (Worker worker : workers)
{
boolean paused = worker.togglePaused();
if (!paused)
{
checkForNewTask(worker);
}
}
}
private void checkForNewTask(Worker worker)
{
worker.checkForNewTask();
}
}
package com.anjlab.spelling.web.services.workers;
import org.apache.tapestry5.ioc.Invokable;
import org.slf4j.Logger;
import com.anjlab.spelling.web.entities.Task;
import com.anjlab.spelling.web.services.managers.TaskManager;
import com.anjlab.spelling.web.services.managers.WorkerManager;
public class WorkerRoutine implements Invokable<Object> {
private final Logger logger;
private final Worker worker;
private final TaskManager taskManager;
private final WorkerManager workerManager;
private volatile boolean workerReloading;
public WorkerRoutine(Logger logger, Worker worker, TaskManager taskManager, WorkerManager workerManager) {
this.logger = logger;
this.worker = worker;
this.taskManager = taskManager;
this.workerManager = workerManager;
}
public void workerReloading()
{
this.workerReloading = true;
}
public Object invoke() {
while (true)
{
try
{
workerReloading = false;
worker.setRoutine(this);
worker.cleanup();
worker.run();
workerManager.taskCompleted(worker);
}
catch (InterruptedException e)
{
logger.error("Interrupted exception", e);
if (!workerReloading && (!worker.isTaskCancelled() || worker.isShuttingDown()))
{
logger.info("Task wasn't canceled. Exiting...");
Thread.currentThread().interrupt();
break;
}
else
{
logger.info("Worker task was interrupted, resuming to next task");
Thread.interrupted();
workerManager.taskCompleted(worker);
}
}
catch (Exception e)
{
logger.error("Worker " + worker.getTaskType() + " failed with exception", e);
if (e.getMessage().contains("Cannot open connection")
|| e.getMessage().contains("Unable to locate a single EntityManager"))
{
throw new RuntimeException(e);
}
Task task = worker.currentTask();
if (task != null)
{
int projectId = task.getProject().getId();
try
{
taskManager.cleanupTasksAfterError(projectId);
}
catch (Exception ex)
{
logger.error("Error performing tasks cleanup for project " + projectId, ex);
}
}
}
finally
{
logger.debug("Worker routine iteration completed");
if (worker.isShuttingDown())
{
break;
}
}
}
logger.warn("Exiting worker routine");
worker.onExit();
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment