Created
April 16, 2012 18:35
-
-
Save eestolano/2400564 to your computer and use it in GitHub Desktop.
InterruptedException when closing DistributedQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import com.netflix.curator.framework.*; | |
import com.netflix.curator.framework.recipes.queue.QueueBuilder; | |
import com.netflix.curator.framework.recipes.queue.DistributedQueue; | |
import com.netflix.curator.framework.recipes.queue.BlockingQueueConsumer; | |
import com.netflix.curator.framework.recipes.queue.QueueSerializer; | |
import com.netflix.curator.framework.state.ConnectionState; | |
import com.netflix.curator.framework.state.ConnectionStateListener; | |
import com.netflix.curator.retry.RetryNTimes; | |
public class MinimalExample { | |
public static class SimpleListener implements ConnectionStateListener { | |
public void stateChanged(CuratorFramework client, | |
ConnectionState newState) { | |
System.out.printf("New connection state is %s\n", newState.toString()); | |
} | |
} | |
public static class SimpleStringSerializer implements QueueSerializer<String> { | |
public String deserialize(byte[] bytes) { | |
return new String(bytes); | |
} | |
public byte[] serialize(String s) { | |
return s.getBytes(); | |
} | |
} | |
public static class ProducerThread extends Thread { | |
DistributedQueue q; | |
ProducerThread(DistributedQueue q) { | |
this.q = q; | |
} | |
public void run() { | |
try { | |
q.put("test-data"); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
static CuratorFramework client = null; | |
public static void main(String args[]) throws Exception { | |
try { | |
client = CuratorFrameworkFactory.newClient("localhost:2181", | |
1000, | |
1000, | |
new RetryNTimes(2, 500)); | |
client.start(); | |
System.out.println("Enqueueing..."); | |
SimpleListener l = new SimpleListener(); | |
client.getConnectionStateListenable().addListener(l); | |
BlockingQueueConsumer<String> qc = new BlockingQueueConsumer<String>(l); | |
QueueBuilder qb = QueueBuilder.builder(client, | |
qc, | |
new SimpleStringSerializer(), | |
"/test"); | |
DistributedQueue q = qb.buildQueue(); | |
q.start(); | |
new ProducerThread(q).run(); | |
while(!q.flushPuts(5000, TimeUnit.MILLISECONDS)) { | |
System.out.printf("Waiting for enqueueing to finish..."); | |
} | |
while (qc.size() < 1) { | |
System.out.println("Waiting for consumer."); | |
Thread.sleep(1000); | |
} | |
System.out.printf("Items in queue: %d\n", qc.size()); | |
q.close(); | |
System.out.println("Done."); | |
} finally { | |
System.out.println("Closing client..."); | |
if (client != null) { | |
client.close(); | |
} | |
System.out.println("Closed client."); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Apr 16, 2012 11:30:01 AM com.netflix.curator.framework.imps.CuratorFrameworkImpl start | |
INFO: Starting | |
2012-04-16 11:30:01,418 - INFO [main:Environment@97] - Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT | |
2012-04-16 11:30:01,421 - INFO [main:Environment@97] - Client environment:host.name=192.168.1.104 | |
2012-04-16 11:30:01,422 - INFO [main:Environment@97] - Client environment:java.version=1.6.0_31 | |
2012-04-16 11:30:01,423 - INFO [main:Environment@97] - Client environment:java.vendor=Apple Inc. | |
2012-04-16 11:30:01,423 - INFO [main:Environment@97] - Client environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home | |
2012-04-16 11:30:01,424 - INFO [main:Environment@97] - Client environment:java.class.path=jars/curator-client-1.0.8.jar:jars/curator-framework-1.0.8.jar:jars/curator-recipes-1.0.8.jar:jars/guava-11.0.1.jar:jars/job.jar:jars/log4j-1.2.15.jar:jars/slf4j-api-1.6.3.jar:jars/zookeeper-3.3.4.jar:conf/ | |
2012-04-16 11:30:01,424 - INFO [main:Environment@97] - Client environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java | |
2012-04-16 11:30:01,425 - INFO [main:Environment@97] - Client environment:java.io.tmpdir=/var/folders/69/7ch13cqn0cb_kw563f4x8yncs4xk04/T/ | |
2012-04-16 11:30:01,425 - INFO [main:Environment@97] - Client environment:java.compiler=<NA> | |
2012-04-16 11:30:01,426 - INFO [main:Environment@97] - Client environment:os.name=Mac OS X | |
2012-04-16 11:30:01,427 - INFO [main:Environment@97] - Client environment:os.arch=x86_64 | |
2012-04-16 11:30:01,427 - INFO [main:Environment@97] - Client environment:os.version=10.7.3 | |
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.name=eestolano | |
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.home=/Users/eestolano | |
2012-04-16 11:30:01,428 - INFO [main:Environment@97] - Client environment:user.dir=/Users/eestolano/git/cluster/clusteros | |
2012-04-16 11:30:01,430 - INFO [main:ZooKeeper@379] - Initiating client connection, connectString=localhost:2181 sessionTimeout=1000 watcher=com.netflix.curator.ConnectionState@2fcac6db | |
2012-04-16 11:30:01,452 - INFO [main-SendThread():ClientCnxn$SendThread@1061] - Opening socket connection to server localhost/127.0.0.1:2181 | |
Enqueueing... | |
2012-04-16 11:30:01,472 - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@950] - Socket connection established to localhost/127.0.0.1:2181, initiating session | |
2012-04-16 11:30:01,485 - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@739] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x136b7a41c1e000a, negotiated timeout = 4000 | |
Apr 16, 2012 11:30:01 AM com.netflix.curator.framework.state.ConnectionStateManager addStateChange | |
INFO: State change: CONNECTED | |
New connection state is CONNECTED | |
Waiting for consumer. | |
Items in queue: 1 | |
Done. | |
Closing client... | |
Apr 16, 2012 11:30:02 AM com.netflix.curator.framework.recipes.queue.DistributedQueue runLoop | |
SEVERE: Exception caught in background handler | |
java.lang.InterruptedException | |
at java.lang.Object.wait(Native Method) | |
at java.lang.Object.wait(Object.java:485) | |
at com.netflix.curator.framework.recipes.queue.DistributedQueue.runLoop(DistributedQueue.java:421) | |
at com.netflix.curator.framework.recipes.queue.DistributedQueue.access$100(DistributedQueue.java:62) | |
at com.netflix.curator.framework.recipes.queue.DistributedQueue$2.call(DistributedQueue.java:187) | |
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:138) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) | |
at java.lang.Thread.run(Thread.java:680) | |
2012-04-16 11:30:02,533 - INFO [main:ZooKeeper@544] - Session: 0x136b7a41c1e000a closed | |
Closed client. | |
2012-04-16 11:30:02,535 - INFO [main-EventThread:ClientCnxn$EventThread@521] - EventThread shut down |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment