Skip to content

Instantly share code, notes, and snippets.

@asabo-rapid7
Last active January 9, 2023 14:21
Show Gist options
  • Save asabo-rapid7/b71bf9ef4d51ccd024465e42ed1aa8fa to your computer and use it in GitHub Desktop.
Save asabo-rapid7/b71bf9ef4d51ccd024465e42ed1aa8fa to your computer and use it in GitHub Desktop.
package com.logentries.mq;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.logentries.api.Batch;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class ChannelAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelAdapter.class);
private final String waitQueue;
private final String workQueue;
private final JedisPool jedisPool;
/**
* instantiates ChannelAdapter, in charge of communication with messaging
* service.
*
* @param jedisPool
* - provider of Jedis connections, if null means let
* ChannelAdapter create one by itself using data from AppConfig
*/
public ChannelAdapter(JedisPool jedisPool) {
this.jedisPool = jedisPool;
this.waitQueue = "TestSample:waitQueue";
this.workQueue = "TestSample:workQueue";
}
/* implementation of conversion Batch object into json presentation */
private String batchToJson(Batch job) {
return job.toString();
}
/* implementation of conversion json into Batch presentation */
private Batch jsonToBatch(String batchJson) {
return null;// new Batch(batchJson);
}
/**
* sends batch job to a queue for further processing.
*
* @param job
* task that will be serialized and sent to queue
* @return true if job has been successfully queued
*/
public boolean sendJobToWaitQueue(Batch job) {
LOGGER.debug("Trying to push job to queue: " + job.toString());
String jobJson = batchToJson(job);
Jedis instance = null;
try {
instance = this.jedisPool.getResource();
// left push to a wait queue
instance.lpush(waitQueue, jobJson);
LOGGER.debug("Job successfully published to channel {} {}", waitQueue, jobJson);
return true;
} catch (Exception e) {
LOGGER.error("Problem while publishing message to a channel", e);
return false;
} finally {
instance.close();
}
}
/**
* checks if there is job available waiting in a 'wait' queue. If there is
* job waiting in a queue, it will be transfered into 'work' queue and
* returned back.
*
* @return Batch if available for work, otherwise null
*/
public Batch checkIfJobAvailable() {
String jobJson = null;
Jedis instance = null;
Batch job = null;
try {
instance = this.jedisPool.getResource();
// trying to pick up new job from 'wait' queue and transfer it to
// 'work' queue in single transaction
String message = instance.rpoplpush(waitQueue, workQueue);
if (message == null) {
return null;
}
job = jsonToBatch(message);
if (job == null) {
return job;
}
LOGGER.debug("Job successfully transferred to 'work' queue:{} json:{}", workQueue, jobJson);
return job;
} catch (Exception e) {
LOGGER.error("Problem while checking new job message", e);
return null;
} finally {
instance.close();
}
}
/**
* makes sure ChannelAdapter will stop its activities in a secure manner,
* closing all connections.
*/
public void stopActivities() {
this.jedisPool.close();
}
/**
* assures all needed for a job to be returned successfully to a 'wait'
* queue. removes job from 'work' queue,
*
* @return information if transaction succeeded or not.
*/
public boolean returnJobBackToWaitQueue(Batch job) {
boolean res = false;
String jobId = job.getId();
LOGGER.info("Returning job {} back to 'wait' queue", jobId);
// 1. remove it from working queue
res = removeJobFromWorkQueue(job);
if (!res) {
LOGGER.error("Failed to take job off 'work' queue; id: {}", jobId);
return res;
}
// 2. back to wait queue
res = sendJobToWaitQueue(job);
return res;
}
/*
* searches for dedicated job on queue and removes it off queue, send back
* true if removal successful
*/
private boolean removeJobFromWorkQueue(Batch job) {
if (job.getId() == null) {
LOGGER.warn("Got null ID of batch to remove off queue?!? Buckets: {}", job.getBuckets());
return false;
}
String batchJson = batchToJson(job);
Jedis instance = null;
Long res = -1L;
try {
instance = this.jedisPool.getResource();
res = instance.lrem(workQueue, 1, batchJson);
} catch (Exception e) {
LOGGER.warn("Problem while removing job {} off queue {} Ex:{}", job.getId(), workQueue, e);
} finally {
instance.close();
}
return res == 1;
}
/**
* lists all jobs that are currently in progress.
*/
public List<Batch> getJobsInProgress() {
Jedis instance = null;
List<String> res;
try {
instance = this.jedisPool.getResource();
LOGGER.debug("Trying to read all elements in {} queue", workQueue);
res = instance.lrange(workQueue, 0, -1);
List<Batch> jobs = new ArrayList<>(res.size());
for (String json : res) {
Batch job = jsonToBatch(json);
jobs.add(job);
}
return jobs;
} catch (Exception e) {
LOGGER.warn("Problem while listing job list of all elements, queue:{}", workQueue, e);
return null;
} finally {
if (instance != null) {
instance.close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment