Skip to content

Instantly share code, notes, and snippets.

@eestolano
Created April 16, 2012 18:35
Show Gist options
  • Save eestolano/2400564 to your computer and use it in GitHub Desktop.
Save eestolano/2400564 to your computer and use it in GitHub Desktop.
InterruptedException when closing DistributedQueue
import java.util.concurrent.TimeUnit;
import com.netflix.curator.framework.*;
import com.netflix.curator.framework.recipes.queue.QueueBuilder;
import com.netflix.curator.framework.recipes.queue.DistributedQueue;
import com.netflix.curator.framework.recipes.queue.BlockingQueueConsumer;
import com.netflix.curator.framework.recipes.queue.QueueSerializer;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;
import com.netflix.curator.retry.RetryNTimes;
public class MinimalExample {
public static class SimpleListener implements ConnectionStateListener {
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
System.out.printf("New connection state is %s\n", newState.toString());
}
}
public static class SimpleStringSerializer implements QueueSerializer<String> {
public String deserialize(byte[] bytes) {
return new String(bytes);
}
public byte[] serialize(String s) {
return s.getBytes();
}
}
public static class ProducerThread extends Thread {
DistributedQueue q;
ProducerThread(DistributedQueue q) {
this.q = q;
}
public void run() {
try {
q.put("test-data");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
static CuratorFramework client = null;
public static void main(String args[]) throws Exception {
try {
client = CuratorFrameworkFactory.newClient("localhost:2181",
1000,
1000,
new RetryNTimes(2, 500));
client.start();
System.out.println("Enqueueing...");
SimpleListener l = new SimpleListener();
client.getConnectionStateListenable().addListener(l);
BlockingQueueConsumer<String> qc = new BlockingQueueConsumer<String>(l);
QueueBuilder qb = QueueBuilder.builder(client,
qc,
new SimpleStringSerializer(),
"/test");
DistributedQueue q = qb.buildQueue();
q.start();
new ProducerThread(q).run();
while(!q.flushPuts(5000, TimeUnit.MILLISECONDS)) {
System.out.printf("Waiting for enqueueing to finish...");
}
while (qc.size() < 1) {
System.out.println("Waiting for consumer.");
Thread.sleep(1000);
}
System.out.printf("Items in queue: %d\n", qc.size());
q.close();
System.out.println("Done.");
} finally {
System.out.println("Closing client...");
if (client != null) {
client.close();
}
System.out.println("Closed client.");
}
}
}
Apr 16, 2012 11:30:01 AM com.netflix.curator.framework.imps.CuratorFrameworkImpl start
INFO: Starting
2012-04-16 11:30:01,418 - INFO [main:Environment@97] - Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
2012-04-16 11:30:01,421 - INFO [main:Environment@97] - Client environment:host.name=192.168.1.104
2012-04-16 11:30:01,422 - INFO [main:Environment@97] - Client environment:java.version=1.6.0_31
2012-04-16 11:30:01,423 - INFO [main:Environment@97] - Client environment:java.vendor=Apple Inc.
2012-04-16 11:30:01,423 - INFO [main:Environment@97] - Client environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
2012-04-16 11:30:01,424 - INFO [main:Environment@97] - Client environment:java.class.path=jars/curator-client-1.0.8.jar:jars/curator-framework-1.0.8.jar:jars/curator-recipes-1.0.8.jar:jars/guava-11.0.1.jar:jars/job.jar:jars/log4j-1.2.15.jar:jars/slf4j-api-1.6.3.jar:jars/zookeeper-3.3.4.jar:conf/
2012-04-16 11:30:01,424 - INFO [main:Environment@97] - Client environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java
2012-04-16 11:30:01,425 - INFO [main:Environment@97] - Client environment:java.io.tmpdir=/var/folders/69/7ch13cqn0cb_kw563f4x8yncs4xk04/T/
2012-04-16 11:30:01,425 - INFO [main:Environment@97] - Client environment:java.compiler=<NA>
2012-04-16 11:30:01,426 - INFO [main:Environment@97] - Client environment:os.name=Mac OS X
2012-04-16 11:30:01,427 - INFO [main:Environment@97] - Client environment:os.arch=x86_64
2012-04-16 11:30:01,427 - INFO [main:Environment@97] - Client environment:os.version=10.7.3
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.name=eestolano
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.home=/Users/eestolano
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.dir=/Users/eestolano/git/cluster/clusteros
2012-04-16 11:30:01,430 - INFO [main:ZooKeeper@379] - Initiating client connection, connectString=localhost:2181 sessionTimeout=1000 watcher=com.netflix.curator.ConnectionState@2fcac6db
2012-04-16 11:30:01,452 - INFO [main-SendThread():ClientCnxn$SendThread@1061] - Opening socket connection to server localhost/127.0.0.1:2181
Enqueueing...
2012-04-16 11:30:01,472 - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@950] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2012-04-16 11:30:01,485 - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@739] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x136b7a41c1e000a, negotiated timeout = 4000
Apr 16, 2012 11:30:01 AM com.netflix.curator.framework.state.ConnectionStateManager addStateChange
INFO: State change: CONNECTED
New connection state is CONNECTED
Waiting for consumer.
Items in queue: 1
Done.
Closing client...
Apr 16, 2012 11:30:02 AM com.netflix.curator.framework.recipes.queue.DistributedQueue runLoop
SEVERE: Exception caught in background handler
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at com.netflix.curator.framework.recipes.queue.DistributedQueue.runLoop(DistributedQueue.java:421)
at com.netflix.curator.framework.recipes.queue.DistributedQueue.access$100(DistributedQueue.java:62)
at com.netflix.curator.framework.recipes.queue.DistributedQueue$2.call(DistributedQueue.java:187)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
2012-04-16 11:30:02,533 - INFO [main:ZooKeeper@544] - Session: 0x136b7a41c1e000a closed
Closed client.
2012-04-16 11:30:02,535 - INFO [main-EventThread:ClientCnxn$EventThread@521] - EventThread shut down
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment