Last active
January 9, 2023 14:21
-
-
Save asabo-rapid7/b71bf9ef4d51ccd024465e42ed1aa8fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.logentries.mq; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.logentries.api.Batch; | |
import redis.clients.jedis.Jedis; | |
import redis.clients.jedis.JedisPool; | |
public class ChannelAdapter { | |
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelAdapter.class); | |
private final String waitQueue; | |
private final String workQueue; | |
private final JedisPool jedisPool; | |
/** | |
* instantiates ChannelAdapter, in charge of communication with messaging | |
* service. | |
* | |
* @param jedisPool | |
* - provider of Jedis connections, if null means let | |
* ChannelAdapter create one by itself using data from AppConfig | |
*/ | |
public ChannelAdapter(JedisPool jedisPool) { | |
this.jedisPool = jedisPool; | |
this.waitQueue = "TestSample:waitQueue"; | |
this.workQueue = "TestSample:workQueue"; | |
} | |
/* implementation of conversion Batch object into json presentation */ | |
private String batchToJson(Batch job) { | |
return job.toString(); | |
} | |
/* implementation of conversion json into Batch presentation */ | |
private Batch jsonToBatch(String batchJson) { | |
return null;// new Batch(batchJson); | |
} | |
/** | |
* sends batch job to a queue for further processing. | |
* | |
* @param job | |
* task that will be serialized and sent to queue | |
* @return true if job has been successfully queued | |
*/ | |
public boolean sendJobToWaitQueue(Batch job) { | |
LOGGER.debug("Trying to push job to queue: " + job.toString()); | |
String jobJson = batchToJson(job); | |
Jedis instance = null; | |
try { | |
instance = this.jedisPool.getResource(); | |
// left push to a wait queue | |
instance.lpush(waitQueue, jobJson); | |
LOGGER.debug("Job successfully published to channel {} {}", waitQueue, jobJson); | |
return true; | |
} catch (Exception e) { | |
LOGGER.error("Problem while publishing message to a channel", e); | |
return false; | |
} finally { | |
instance.close(); | |
} | |
} | |
/** | |
* checks if there is job available waiting in a 'wait' queue. If there is | |
* job waiting in a queue, it will be transfered into 'work' queue and | |
* returned back. | |
* | |
* @return Batch if available for work, otherwise null | |
*/ | |
public Batch checkIfJobAvailable() { | |
String jobJson = null; | |
Jedis instance = null; | |
Batch job = null; | |
try { | |
instance = this.jedisPool.getResource(); | |
// trying to pick up new job from 'wait' queue and transfer it to | |
// 'work' queue in single transaction | |
String message = instance.rpoplpush(waitQueue, workQueue); | |
if (message == null) { | |
return null; | |
} | |
job = jsonToBatch(message); | |
if (job == null) { | |
return job; | |
} | |
LOGGER.debug("Job successfully transferred to 'work' queue:{} json:{}", workQueue, jobJson); | |
return job; | |
} catch (Exception e) { | |
LOGGER.error("Problem while checking new job message", e); | |
return null; | |
} finally { | |
instance.close(); | |
} | |
} | |
/** | |
* makes sure ChannelAdapter will stop its activities in a secure manner, | |
* closing all connections. | |
*/ | |
public void stopActivities() { | |
this.jedisPool.close(); | |
} | |
/** | |
* assures all needed for a job to be returned successfully to a 'wait' | |
* queue. removes job from 'work' queue, | |
* | |
* @return information if transaction succeeded or not. | |
*/ | |
public boolean returnJobBackToWaitQueue(Batch job) { | |
boolean res = false; | |
String jobId = job.getId(); | |
LOGGER.info("Returning job {} back to 'wait' queue", jobId); | |
// 1. remove it from working queue | |
res = removeJobFromWorkQueue(job); | |
if (!res) { | |
LOGGER.error("Failed to take job off 'work' queue; id: {}", jobId); | |
return res; | |
} | |
// 2. back to wait queue | |
res = sendJobToWaitQueue(job); | |
return res; | |
} | |
/* | |
* searches for dedicated job on queue and removes it off queue, send back | |
* true if removal successful | |
*/ | |
private boolean removeJobFromWorkQueue(Batch job) { | |
if (job.getId() == null) { | |
LOGGER.warn("Got null ID of batch to remove off queue?!? Buckets: {}", job.getBuckets()); | |
return false; | |
} | |
String batchJson = batchToJson(job); | |
Jedis instance = null; | |
Long res = -1L; | |
try { | |
instance = this.jedisPool.getResource(); | |
res = instance.lrem(workQueue, 1, batchJson); | |
} catch (Exception e) { | |
LOGGER.warn("Problem while removing job {} off queue {} Ex:{}", job.getId(), workQueue, e); | |
} finally { | |
instance.close(); | |
} | |
return res == 1; | |
} | |
/** | |
* lists all jobs that are currently in progress. | |
*/ | |
public List<Batch> getJobsInProgress() { | |
Jedis instance = null; | |
List<String> res; | |
try { | |
instance = this.jedisPool.getResource(); | |
LOGGER.debug("Trying to read all elements in {} queue", workQueue); | |
res = instance.lrange(workQueue, 0, -1); | |
List<Batch> jobs = new ArrayList<>(res.size()); | |
for (String json : res) { | |
Batch job = jsonToBatch(json); | |
jobs.add(job); | |
} | |
return jobs; | |
} catch (Exception e) { | |
LOGGER.warn("Problem while listing job list of all elements, queue:{}", workQueue, e); | |
return null; | |
} finally { | |
if (instance != null) { | |
instance.close(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment