Skip to content

@arjones /gist:784413
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Kestrel Producer/Consumer example in Java
package net.arjones.sandbox;
import net.lag.configgy.Config;
import net.lag.kestrel.PersistentQueue;
public class KestrelTest {
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
// using memory only
config.setBool("journal", false);
PersistentQueue queue = new PersistentQueue("", "", config.copy());
// publish 100 messages
for (int i = 0; i < 100; i++) {
String value = "Hello World from Java #" + i;
// boolean sucess =
queue.add(value.getBytes(), 0);
// System.out.println("Sucess: " + sucess);
}
int CONSUMERS = 4;
Thread[] threads = new Thread[CONSUMERS];
// start all
for (int i = 0; i < CONSUMERS; i++) {
threads[i] = new Thread(new QueueConsumer(queue));
threads[i].start();
}
// join all
for (int i = 0; i < CONSUMERS; i++) {
threads[i].join();
}
// Flushing the queue remove all messages
// queue.flush();
System.out.println("Closing the queue");
queue.close();
}
}
package net.arjones.sandbox;
import java.util.Date;
import scala.Option;
import net.lag.kestrel.PersistentQueue;
import net.lag.kestrel.QItem;
public class QueueConsumer implements Runnable {
private final PersistentQueue queue;
public QueueConsumer(PersistentQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Option<QItem> qitem = queue.remove();
// finish execution when there is no payload
if (null == qitem || qitem.isEmpty()) break;
// Consume message
String message = new String(qitem.get().data());
System.out.println(String.format("\nThread: %s\nQueued at: %s\nMessage: %s\n", Thread.currentThread().getId(), new Date(qitem
.get().addTime()), message));
try {
System.out.print(".");
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@mariagrineva

Hi arjones!

Unfortunately, this example doesn't compile with the latest Kestrel lib. (I just downloaded kestrel-2.1.4.jar). Could you help me with it? I would really appreciate it.

Maria

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.