Skip to content

Instantly share code, notes, and snippets.

@arjones
Created January 18, 2011 13:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save arjones/784413 to your computer and use it in GitHub Desktop.
Save arjones/784413 to your computer and use it in GitHub Desktop.
Kestrel Producer/Consumer example in Java
package net.arjones.sandbox;
import net.lag.configgy.Config;
import net.lag.kestrel.PersistentQueue;
public class KestrelTest {
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
// using memory only
config.setBool("journal", false);
PersistentQueue queue = new PersistentQueue("", "", config.copy());
// publish 100 messages
for (int i = 0; i < 100; i++) {
String value = "Hello World from Java #" + i;
// boolean sucess =
queue.add(value.getBytes(), 0);
// System.out.println("Sucess: " + sucess);
}
int CONSUMERS = 4;
Thread[] threads = new Thread[CONSUMERS];
// start all
for (int i = 0; i < CONSUMERS; i++) {
threads[i] = new Thread(new QueueConsumer(queue));
threads[i].start();
}
// join all
for (int i = 0; i < CONSUMERS; i++) {
threads[i].join();
}
// Flushing the queue remove all messages
// queue.flush();
System.out.println("Closing the queue");
queue.close();
}
}
package net.arjones.sandbox;
import java.util.Date;
import scala.Option;
import net.lag.kestrel.PersistentQueue;
import net.lag.kestrel.QItem;
public class QueueConsumer implements Runnable {
private final PersistentQueue queue;
public QueueConsumer(PersistentQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Option<QItem> qitem = queue.remove();
// finish execution when there is no payload
if (null == qitem || qitem.isEmpty()) break;
// Consume message
String message = new String(qitem.get().data());
System.out.println(String.format("\nThread: %s\nQueued at: %s\nMessage: %s\n", Thread.currentThread().getId(), new Date(qitem
.get().addTime()), message));
try {
System.out.print(".");
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@mariagrineva
Copy link

Hi arjones!

Unfortunately, this example doesn't compile with the latest Kestrel lib. (I just downloaded kestrel-2.1.4.jar). Could you help me with it? I would really appreciate it.

Maria

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment