Skip to content

Instantly share code, notes, and snippets.

@nilebox
Created August 14, 2017 00:17
Show Gist options
  • Save nilebox/fbf90b871113506f1b865c54d417d1c0 to your computer and use it in GitHub Desktop.
Save nilebox/fbf90b871113506f1b865c54d417d1c0 to your computer and use it in GitHub Desktop.
package queue;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class PostgresQueueDao extends NamedParameterJdbcDaoSupport implements QueueDao {
private static final Logger log = LoggerFactory.getLogger(PostgresQueueDao.class);
public PostgresQueueDao(JdbcTemplate jdbcTemplate) {
setJdbcTemplate(jdbcTemplate);
}
private static final String INSERT_TASK_SQL;
private static final String BATCH_INSERT_TASKS_SQL;
static {
String insertTaskSQL = "INSERT INTO tasks(queue_name, task, process_time, actor)" +
" VALUES (:queue_name, :task, now() + :process_time_delay_sec * INTERVAL '1 SECOND', :actor)";
INSERT_TASK_SQL = insertTaskSQL + " RETURNING id";
BATCH_INSERT_TASKS_SQL = insertTaskSQL;
}
@Override
public TaskId enqueue(@Nonnull Queue queue, Optional<String> task, Optional<String> actor, Duration firstExecutionDelay) {
return enqueue(queue.getQueueName(), task, actor, firstExecutionDelay);
}
@Override
public int batchEnqueue(@Nonnull Queue queue, @Nonnull List<String> tasks, Optional<String> actor, @Nonnull Duration firstExecutionDelay) {
log.info("batchEnqueue(): queue={}, tasks count={}, actor={}, firstExecutionDelay={}", queue.getQueueName(), tasks.size(), actor, firstExecutionDelay);
final MapSqlParameterSource[] params = tasks.stream()
.map(task -> convertTaskToSqlParams(queue.getQueueName(), task, actor, firstExecutionDelay))
.toArray(size -> new MapSqlParameterSource[size]);
int result = getNamedParameterJdbcTemplate().batchUpdate(BATCH_INSERT_TASKS_SQL, params).length;
log.info("batchEnqueue(): queue={}, enqueued count={} of {}", queue.getQueueName(), result, tasks.size());
return result;
}
private MapSqlParameterSource convertTaskToSqlParams(@Nonnull String queueName, @Nullable String task, Optional<String> actor, Duration firstExecutionDelay) {
return new MapSqlParameterSource("queue_name", queueName)
.addValue("task", task)
.addValue("process_time_delay_sec", firstExecutionDelay.toStandardSeconds().getSeconds())
.addValue("actor", actor.orElse(null));
}
public TaskId enqueue(@Nonnull String queueName, String task, Duration firstExecutionDelay) {
return enqueue(queueName, Optional.of(task), Optional.empty(), firstExecutionDelay);
}
public TaskId enqueue(@Nonnull String queueName, Optional<String> task, Optional<String> actor, Duration firstExecutionDelay) {
log.info("enqueue(): queue=" + queueName + ", task=***, actor=" + actor
+ ", firstExecutionDelay=" + firstExecutionDelay);
TaskId taskId = new TaskId(getNamedParameterJdbcTemplate().queryForObject(INSERT_TASK_SQL,
convertTaskToSqlParams(queueName, task.orElse(null), actor, firstExecutionDelay), Long.class));
log.info("enqueue(): queue=" + queueName + ", id=" + taskId.asLong() + ", task=***, actor=" + actor
+ ", firstExecutionDelay=" + firstExecutionDelay);
return taskId;
}
private static final String NEXT_TASK_SQL = "SELECT out_id as id, out_task as task, out_attempt as attempt, " +
"out_actor as actor, out_create_time as create_time " +
"FROM nextTask(:queue_name, :is_delay_linear, :linear_delay_in_seconds) WHERE out_id IS NOT NULL;";
public Optional<TaskRecord> nextTask(@Nonnull Queue queue, @Nonnull Optional<Duration> linearDelay) {
//log.info("nextTask(): queue=" + queue.getQueueName() + ", linearDelay=" + linearDelay);
return getNamedParameterJdbcTemplate().query(NEXT_TASK_SQL,
new MapSqlParameterSource("queue_name", queue.getQueueName())
.addValue("is_delay_linear", linearDelay.isPresent())
.addValue("linear_delay_in_seconds", linearDelay.orElse(Duration.ZERO).toStandardSeconds().getSeconds()),
new RowMapper<TaskRecord>() {
@Override
public TaskRecord mapRow(ResultSet rs, int rowNum) throws SQLException {
return new TaskRecord(rs.getLong("id"), rs.getString("task"), rs.getLong("attempt"),
rs.getString("actor"), rs.getTimestamp("create_time"));
}
}).stream().findFirst();
}
private static final String DELETE_ALL_PENDING_TASKS_SQL = "DELETE FROM tasks WHERE process_time > now() AND attempt > 1";
public void deleteAllPendingTasks() {
log.info("deleteAllPendingTasks");
getJdbcTemplate().update(DELETE_ALL_PENDING_TASKS_SQL);
}
private static final String DELETE_TASK_SQL = "DELETE FROM tasks WHERE queue_name = :queue_name AND id = :id";
public void deleteTask(@Nonnull Queue queue, @Nonnull TaskId taskId) {
log.info("deleteTask(): queue=" + queue.getQueueName() + ", taskId=" + taskId);
getNamedParameterJdbcTemplate().update(DELETE_TASK_SQL,
ImmutableMap.of("queue_name", queue.getQueueName(), "id", taskId.asLong()));
}
private static final String DELETE_TASKS_BY_ACTOR_SQL = "DELETE FROM tasks WHERE queue_name = :queue_name AND actor = :actor";
public void deleteTasksByActor(@Nonnull Queue queue, @Nonnull String actor) {
log.info("deleteTasksByActor(): queue=" + queue.getQueueName() + ", actor=" + actor);
getNamedParameterJdbcTemplate().update(DELETE_TASKS_BY_ACTOR_SQL,
ImmutableMap.of("queue_name", queue.getQueueName(), "actor", actor));
}
private static final String UPDATE_TASK_SQL = "UPDATE tasks\n" +
"SET\n" +
" process_time = now() + :next_execution_delay_in_seconds * INTERVAL '1 SECOND',\n" +
" task = :task,\n" +
" attempt = 0\n" +
"WHERE id = :id";
public void reenqueue(@Nonnull TaskRecord task, @Nonnull Duration nextExecutionDelay) {
log.info("reenqueue(): task=***, nextExecutionDelay=" + nextExecutionDelay);
getNamedParameterJdbcTemplate().update(UPDATE_TASK_SQL, ImmutableMap.of("id", task.getId().asLong(),
"task", task.getTask(), "next_execution_delay_in_seconds", nextExecutionDelay.toStandardSeconds().getSeconds()));
}
private static final String IS_TASK_EXISTS_SQL = "SELECT count(*) FROM tasks" +
" WHERE queue_name = :queue_name AND actor = :actor";
public boolean isTaskExist(@Nonnull Queue queue, @Nonnull String actor) {
log.info("hasTaskForGivenActor(): queue=" + queue.getQueueName() + ", actor=" + actor);
Integer taskCount = getNamedParameterJdbcTemplate().queryForObject(IS_TASK_EXISTS_SQL, ImmutableMap.of("queue_name", queue.getQueueName(),
"actor", actor), Integer.class);
return taskCount != null && taskCount > 0;
}
@Override
public void resetAllPendingTasks() {
getJdbcTemplate().execute("UPDATE tasks SET process_time=now(), attempt=0 WHERE attempt > 0");
}
public Map<String, Long> getPendingTasksCount() {
final Map<String, Long> pendingTasks = new LinkedHashMap<>();
getJdbcTemplate().query("SELECT queue_name, COUNT(1) FROM tasks WHERE (attempt > 1) OR (attempt = 1 AND process_time < now()) " +
"GROUP BY queue_name ORDER BY queue_name", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
pendingTasks.put(rs.getString(1), rs.getLong(2));
}
});
return pendingTasks;
}
public Map<String, Long> getAllTasksCount() {
final Map<String, Long> tasks = new LinkedHashMap<>();
getJdbcTemplate().query("SELECT queue_name, COUNT(1) FROM tasks GROUP BY queue_name ORDER BY queue_name", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
tasks.put(rs.getString(1), rs.getLong(2));
}
});
return tasks;
}
public Map<String, List<QueueSummary>> getPendingTasks() {
final Map<String, List<QueueSummary>> result = new HashMap<>();
getJdbcTemplate().query("SELECT t.id, t.queue_name, substring(t.task for 2000) as task, t.process_time, t.attempt, t.create_time, t.actor " +
"FROM tasks t JOIN (SELECT id, row_number() OVER (PARTITION BY queue_name ORDER BY id desc) rn \n" +
"FROM tasks WHERE (attempt > 1) OR (attempt = 1 AND process_time < now())) task_ids " +
"on task_ids.id=t.id WHERE task_ids.rn<=10", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
String name = rs.getString("queue_name");
if (!result.containsKey(name)) {
result.put(name, new ArrayList<>(10));
}
result.get(name).add(new QueueSummary(rs.getString("id"), name, rs.getString("task"),
QueueUtils.formatMoscowDateTime(rs.getTimestamp("process_time")), rs.getString("attempt"),
QueueUtils.formatMoscowDateTime(rs.getTimestamp("create_time")), rs.getString("actor")));
}
});
return result;
}
/**
* Delete tasks from the queue.
*
* @param queueName Name of the queue.
*/
@Override
public void clearQueue(String queueName) {
String sql = "delete from tasks where queue_name = ?";
getJdbcTemplate().update(sql, queueName);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment