public
Created

Kestrel Producer/Consumer example in Java

  • Download Gist
gistfile1.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
package net.arjones.sandbox;
 
import net.lag.configgy.Config;
import net.lag.kestrel.PersistentQueue;
 
public class KestrelTest {
 
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
 
// using memory only
config.setBool("journal", false);
 
PersistentQueue queue = new PersistentQueue("", "", config.copy());
 
// publish 100 messages
for (int i = 0; i < 100; i++) {
String value = "Hello World from Java #" + i;
// boolean sucess =
queue.add(value.getBytes(), 0);
// System.out.println("Sucess: " + sucess);
 
}
 
int CONSUMERS = 4;
Thread[] threads = new Thread[CONSUMERS];
 
// start all
for (int i = 0; i < CONSUMERS; i++) {
threads[i] = new Thread(new QueueConsumer(queue));
threads[i].start();
}
 
// join all
for (int i = 0; i < CONSUMERS; i++) {
threads[i].join();
}
 
// Flushing the queue remove all messages
// queue.flush();
System.out.println("Closing the queue");
queue.close();
}
 
}
gistfile2.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
package net.arjones.sandbox;
 
import java.util.Date;
 
import scala.Option;
import net.lag.kestrel.PersistentQueue;
import net.lag.kestrel.QItem;
 
public class QueueConsumer implements Runnable {
 
private final PersistentQueue queue;
 
public QueueConsumer(PersistentQueue queue) {
this.queue = queue;
}
 
@Override
public void run() {
while (true) {
Option<QItem> qitem = queue.remove();
 
// finish execution when there is no payload
if (null == qitem || qitem.isEmpty()) break;
 
// Consume message
String message = new String(qitem.get().data());
System.out.println(String.format("\nThread: %s\nQueued at: %s\nMessage: %s\n", Thread.currentThread().getId(), new Date(qitem
.get().addTime()), message));
 
try {
System.out.print(".");
Thread.sleep(300);
 
} catch (InterruptedException e) {
e.printStackTrace();
}
}
 
}
}

Hi arjones!

Unfortunately, this example doesn't compile with the latest Kestrel lib. (I just downloaded kestrel-2.1.4.jar). Could you help me with it? I would really appreciate it.

Maria

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.