Skip to content

Instantly share code, notes, and snippets.

@tennessine
Created May 22, 2014 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tennessine/3e0e67d5d12ab6638009 to your computer and use it in GitHub Desktop.
Save tennessine/3e0e67d5d12ab6638009 to your computer and use it in GitHub Desktop.
Thrift: Bidirectional Async RPC
Source on GitHub: http://github.com/JoelPM/BidiThrift
A reader posted to the thrift-user mailing list wondering if it was possible for a Thrift RPC server to send messages to the client. The responses indicated that this could be accomplished by polling the server for updates or hosting another Thrift server in the client that could receive RPCs from the server (requires opening another port on the client and handling firewall issues). I responded with a technique I'd used for accomplishing something similar and the responses made me think maybe this would be worth writing up and posting some example code.
Here's my email to the mailing list that describes what I'm doing:
I think I've done something similar to what you're trying to do, and as long as you can commit to using only async messages it's possible to pull it off without having to start a server on the client to accept RPCs from the server.
When your RPC is marked as async the server doesn't send a response and the client doesn't try to read one. So, if all your RPC calls from the client to the server are async you have effectively freed up the inbound half of the socket connection. That means that you can use it for receiving async messages from the server - the only catch is that you have to start a new thread to read and dispatch the incoming async RPC calls.
In a typical Thrift RPC system you'd create a MyService.Processor on your server and a MyService.Client on your client. To do bidirectional async message sending you'll need to go a step further and create a MyService.Client on your server for each client that connects (this can be accomplished by providing your own TProcessorFactory) and then on each client you create a MyService.Processor. (This assumes that you've gone with a generic MyService definition like you described above that has a bunch of optional messages, another option would be to define separate service definitions for the client and server.) With two clients connected the objects in existence would look something like this:
Server:
MyService.Processor mainProcessor - handles incoming async RPCs
MyService.Client clientA - used to send outgoing async RPCs to ClientA
MyService.Client clientB - used to send outgoing async RPCs to ClientB
ClientA:
MyService.Client - used to send messages to Server
MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs
ClientB:
MyService.Client - used to send messages to Server
MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs
Hopefully that explains the concept. If you need example code I can try and pull something together (it will be in Java). The nice thing about this method is that you don't have to establish two connections, so you can get around the firewall issues others have mentioned. I've been using this method on a service in production and haven't had any problems. When you have a separate thread in your client running a Processor you're basically blocking on a read, waiting for a message from the server. The benefit of this is that you're notified immediately when the server shuts down instead of having to wait until you send a message and then finding out that the TCP connection was reset.
Cheers,
Joel
Here's an example app that sends messages between clients connected to a server. It's similar to a chat app.
Thrift Definition
First, define your Thrift objects and the service. Our object and service are extremely simple:
1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/local/bin/thrift --gen java:beans:hashcode -O ../
namespace java com.joelpm.bidiMessages.generated
struct Message {
1: string clientName,
2: string message
}
service MessageService {
oneway void sendMessage(Message msg),
}
view rawservice.thrift hosted with ❤ by GitHub
In this case the service is generic enough that both the client and server will use the same service definition. We could also create a ClientMessageService and a ServerMesageService if we needed different functionality.
Server
On the server side, when a client connection is accepted we want to create a MessageService.Client object that we'll use to send messages back to the client. We can accomplish this by creating our own TProcessorFactory and using the getProcessor method as an opportunity to get access to the transport being used between the client and server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final MessageDistributor messageDistributor = new MessageDistributor();
new Thread(messageDistributor).start();
TProcessorFactory processorFactory = new TProcessorFactory(null) {
@Override
public TProcessor getProcessor(TTransport trans) {
messageDistributor.addClient(new MessageServiceClient(trans));
return new MessageService.Processor(messageDistributor);
}
};
TServerTransport serverTransport = new TServerSocket(port);
TServer server = new TThreadPoolServer(processorFactory, serverTransport);
LOGGER.info("Server started");
server.serve();
view rawserver.java hosted with ❤ by GitHub
As you can see above, we're using the same MessageDistributor for each new processor that we create. Before we create and return the processor we create a new client and add it to the list of clients that the MessageDistributor is aware of. The server is pretty simple and you can take a look at the code to see how the MessageDistributor uses the clients to send messages back.
Client
On the client side things are a little more complex because we have to create a separate thread to read incoming messages (this is handled by the TThreadPoolServer on the server side). Here's the class that reads incoming messages:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class MessageReceiver extends ConnectionRequiredRunnable {
private final MessageService.Processor processor;
private final TProtocol protocol;
public MessageReceiver(
TProtocol protocol,
MessageService.Iface messageService,
ConnectionStatusMonitor connectionMonitor) {
super(connectionMonitor, "Message Receiver");
this.protocol = protocol;
this.processor = new MessageService.Processor(messageService);
}
@Override
public void run() {
connectWait();
while (true) {
try {
while (processor.process(protocol, protocol) == true) { }
} catch (TException e) {
disconnected();
}
}
}
}
view rawclient.java hosted with ❤ by GitHub
It extends a utility class called ConnectionRequiredRunnable that provides utility methods for handling server disconnects and reconnects, but on the whole it's pretty simple because we pass in a separate class that actually handles the incoming messages. We also create a MessageService.Client, but we wrap it in a separate thread and use a blocking queue so that other components in the system wanting to send a message can do so very quickly - or at least, have the message handed off for delivery extremely quickly.
Here's the class that handles our message sending:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MessageSender extends ConnectionRequiredRunnable {
private final MessageService.Client client;
private final BlockingQueue<Message> msgSendQueue;
public MessageSender(
TProtocol protocol,
ConnectionStatusMonitor connectionMonitor) {
super(connectionMonitor, "Message Sender");
this.client = new MessageService.Client(protocol);
this.msgSendQueue = new LinkedBlockingQueue<Message>();
}
public void send(Message msg) {
msgSendQueue.add(msg);
}
@Override
public void run() {
connectWait();
while (true) {
try {
Message msg = msgSendQueue.take();
try {
client.sendMessage(msg);
} catch (TException e) {
// The message isn't lost, but it could end up being sent out of
// order - not ideal.
msgSendQueue.add(msg);
disconnected();
}
} catch (InterruptedException e) {
// Thread will be interrupted if connection is lost, we should wait
// for reconnection if that happens.
connectWait();
}
}
}
}
view rawMessageSender.java hosted with ❤ by GitHub
This class also extends ConnectionRequiredRunnable since it can't send messages without a connection. Here's the main method of the Client that establishes the connection to the server:
1
2
3
4
5
6
7
8
9
10
11
12
this.transport = new TSocket(server, port);
this.protocol = new TBinaryProtocol(transport);
this.connectionMonitor = new ConnectionStatusMonitor(transport);
this.sender = new MessageSender(protocol, connectionMonitor);
this.receiver = new MessageReceiver(protocol, messageHandler, connectionMonitor);
new Thread(sender).start();
new Thread(receiver).start();
this.connectionMonitor.tryOpen();
view rawclient.java hosted with ❤ by GitHub
It actually looks pretty simple since all the different pieces are organized in separate classes. The ConnectionStatusMonitor class is responsible for opening the actual connection and notifying the MessageSender and MessageReceiver when the connection has been established, at which point they'll start sending and receiving messages. If the server dies both of those processes will stop and wait until a connection has been re-established (a task the ConnectionStatusMonitor is responsible for). Here's sample output from the server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2009-04-03 16:28:44,029 INFO main com.joelpm.bidiMessages.server.Server:43 - Server started
2009-04-03 16:28:45,814 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:36 - Added client at 127.0.0.1
2009-04-03 16:28:45,822 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Hello there!)
2009-04-03 16:28:45,823 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 0)
2009-04-03 16:28:46,807 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 1)
2009-04-03 16:28:46,864 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:36 - Added client at 127.0.0.1
2009-04-03 16:28:46,895 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client2, message:Hello there!)
2009-04-03 16:28:46,897 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client2, message:Message 0)
2009-04-03 16:28:47,805 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 2)
2009-04-03 16:28:47,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client2, message:Message 1)
2009-04-03 16:28:48,806 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 3)
2009-04-03 16:28:48,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client2, message:Message 2)
2009-04-03 16:28:49,806 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 4)
2009-04-03 16:28:49,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client2, message:Message 3)
^C2009-04-03 16:28:50,807 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue:
Message(clientName:client1, message:Message 5)
view rawserver.bash hosted with ❤ by GitHub
And here's output from client1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ java -jar Client/target/BidiMessages.Client-0.9-jar-with-dependencies.jar client1 localhost 10101
2009-04-03 16:28:45,792 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established.
2009-04-03 16:28:45,792 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established.
2009-04-03 16:28:45,803 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Sender notified of connection, resuming execution
2009-04-03 16:28:45,806 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Receiver notified of connection, resuming execution
Got msg: Message(clientName:client1, message:Hello there!)
Got msg: Message(clientName:client1, message:Message 0)
Got msg: Message(clientName:client1, message:Message 1)
Got msg: Message(clientName:client2, message:Hello there!)
Got msg: Message(clientName:client2, message:Message 0)
Got msg: Message(clientName:client1, message:Message 2)
Got msg: Message(clientName:client2, message:Message 1)
Got msg: Message(clientName:client1, message:Message 3)
Got msg: Message(clientName:client2, message:Message 2)
Got msg: Message(clientName:client1, message:Message 4)
Got msg: Message(clientName:client2, message:Message 3)
Got msg: Message(clientName:client1, message:Message 5)
2009-04-03 16:28:51,146 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:30 - Message Receiver detected a disconnect from the server.
2009-04-03 16:28:51,148 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established.
2009-04-03 16:28:51,149 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established.
view rawclient1.bash hosted with ❤ by GitHub
And here's from client2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ java -jar Client/target/BidiMessages.Client-0.9-jar-with-dependencies.jar client2 localhost 10101
2009-04-03 16:28:46,851 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established.
2009-04-03 16:28:46,854 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established.
2009-04-03 16:28:46,867 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Sender notified of connection, resuming execution
2009-04-03 16:28:46,879 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Receiver notified of connection, resuming execution
Got msg: Message(clientName:client2, message:Hello there!)
Got msg: Message(clientName:client2, message:Message 0)
Got msg: Message(clientName:client1, message:Message 2)
Got msg: Message(clientName:client2, message:Message 1)
Got msg: Message(clientName:client1, message:Message 3)
Got msg: Message(clientName:client2, message:Message 2)
Got msg: Message(clientName:client1, message:Message 4)
Got msg: Message(clientName:client2, message:Message 3)
Got msg: Message(clientName:client1, message:Message 5)
2009-04-03 16:28:51,146 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:30 - Message Receiver detected a disconnect from the server.
2009-04-03 16:28:51,147 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established.
2009-04-03 16:28:51,147 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established.
view rawclient2.bash hosted with ❤ by GitHub
You can see that client1 and client2 paused when the server was terminated. Had the server restarted the clients would have reconnected and begun sending messages again.
The source is built using Maven and requires that you've installed libthrift.jar in your local maven repo (see the README for details). I'm also including a tgz with the compiled jar files for those who can't build the source.
Source: BidiMessages.tgz Jars: BidiMessagesJars.tgz
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment