Skip to content

Instantly share code, notes, and snippets.

@HelloGrayson
Last active August 29, 2015 14:24
Show Gist options
  • Save HelloGrayson/ca4f6b9c17861327aa8a to your computer and use it in GitHub Desktop.
Save HelloGrayson/ca4f6b9c17861327aa8a to your computer and use it in GitHub Desktop.
tchannel .call api change
# raw
response = yield tchannel.call(
argscheme="raw",
service="someservice",
endpoint="something",
body="RAWRESPONSE",
headers={
'X-Source': 'geo'
},
ttl=1000
)
# json
response = yield tchannel.call(
argscheme = "json"
service="someservice",
endpoint="maps",
body="{
'lat': 100,
'lng': 140
}",
headers={
'X-Source': 'geo'
},
ttl=1000
)
# thrift
Foo = get_tchannel_thrift(FooThrift, 'fooservice')
response = yield tchannel.call(
argscheme="thrift",
"some_binary",
headers={
'X-Source': 'geo'
},
ttl=1000
)
response = yield tchannel.call_thrift(
Foo.getBar(Foo.BarResponse("hi")),
headers={
'X-Source': 'geo'
},
timeout=5000
)
response = yield tchannel.as_thrift().call(
Foo.getBar(Foo.BarResponse("hi")),
headers={
'X-Source': 'geo'
},
timeout=5000
)
response = yield tchannel.thrift.call(
Foo.getBar(Foo.BarResponse("hi")),
headers={
'X-Source': 'geo'
},
timeout=5000
)
future = tchannel_sync.call(
Foo.baz(True),
headers={
'X-Source': 'geo'
},
ttl=1000
)
@HelloGrayson
Copy link
Author

I love that most thrift calls would look like:

response = yield tchannel.call(Foo.baz(True))

@abhinav
Copy link

abhinav commented Jul 13, 2015

Foo = get_tchannel_thrift(FooThrift, 'fooservice')

Probably need a better name for that. Also, keep in mind that the service name will match the module name. So, the user will end up with:

from my_service import Foo

FooRequest = get_tchannel_thrift(Foo)

(We can't reuse the Foo name because we want the user to be able to access the types defined in Foo.)


Also, I think we should try to be more explicit instead of magically determining the as header. What are your thoughts on:

root_tchannel = TChannel(..)
# The default TChannel object defaults to "as": "raw"

tchannel = root_tchannel.as_json()
# tchannel is a proxy object that forwards calls to as_json except it always adds the "as":"json" header
# and encodes/decodes JSON

tchannel = root_tchannel.as_http()
# same as above but for HTTP

tchannel = root_tchannel.as_thrift()

The .call will remain the same as you designed otherwise.

@HelloGrayson
Copy link
Author

This is how gRPC does it:

Given the following IDL:

syntax = "proto3";

option java_package = "io.grpc.examples";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

Then you generate the client which looks a bit like this:

import helloworld_pb2

_TIMEOUT_SECONDS = 10


def run():
  with helloworld_pb2.early_adopter_create_Greeter_stub('localhost', 50051) as stub:
    response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), _TIMEOUT_SECONDS)
    print "Greeter client received: " + response.message


if __name__ == '__main__':
  run()

From what I can tell, when you generate a stub, it breaks the 1:1 relationship between the IDL procedure and the actual parameters. Notice how _TIMEOUT_SECONDS exists as a parameter above but not in the definition:

rpc SayHello (HelloRequest) returns (HelloReply) {}

This is probably acceptable in Protobufs because you only are allowed to have 1 parameter, some type of Request struct.

For Thrift, it's a bit weird for us to be adding additional params that don't map to definition.

sources:

@HelloGrayson
Copy link
Author

Finagle's is brutal:

  1. create filters
  2. add to stack
  3. write current stack to header buffer
  4. use builder to create client with stack
  5. filter stack applies to each request

So with a trace filter they can set zipkin headers for each request.

val tracer = mock[Tracer]
//tracer.sampleTrace(any(classManifest[TraceId])) returns Some(true)
when(tracer.sampleTrace(any(classOf[TraceId]))).thenReturn(Some(true))

val filter = new TTwitterClientFilter("service", true, None, protocolFactory)
val buffer = new OutputBuffer(protocolFactory)
buffer().writeMessageBegin(
  new TMessage(ThriftTracing.CanTraceMethodName, TMessageType.CALL, 0))
val options = new thrift.ConnectionOptions
options.write(buffer())
buffer().writeMessageEnd()

val tracing = new TracingFilter[ThriftClientRequest, Array[Byte]](tracer, "TTwitterClientFilterTest")
val service = mock[Service[ThriftClientRequest, Array[Byte]]]
val _request = ArgumentCaptor.forClass(classOf[ThriftClientRequest])
when(service(_request.capture)).thenReturn(Future(Array[Byte]()))

val stack = tracing andThen filter
stack(new ThriftClientRequest(buffer.toArray, false), service)

val header = new thrift.RequestHeader
InputBuffer.peelMessage(_request.getValue.message, header, protocolFactory)

assert(header.isSampled)

sources:

@jc-fireball
Copy link

Wow. no Finagle.

@abhinav
Copy link

abhinav commented Jul 28, 2015

Based on discussion earlier, this is how we expect the client-side streaming API to look:


Response streaming

We'll introduce a stream method on the TChannel object and its argscheme-specific proxies that will accept the same arguments as call. Instead of a standard response, it'll return a streaming response object which provides a .read() method.

response = tchannel.stream(service='foo', endpoint='bar', body='listItems')
headers = response.headers
try:
    while True:
        chunk = yield response.read()
        process(chunk)
except EndOfStream:
    pass

Request streaming

We'll have a body_producer parameter on both call and stream. body_producer may be passed in lieu of the body. It will be a function that accepts a write function and calls it to write to the stream. The function must be a coroutine, or return a function that resolves to None when it finishes writing.

@gen.coroutine
def producer(write):
    for line in some_file:
        yield write(line)

response = yield tchannel.call(endpoint='foo', service='bar', body_producer=producer)

# Bidirectional streaming:

response = yield tchannel.stream(endpoint='foo', service='bar', body_producer=producer)
try:
    while True:
        chunk = yield response.read()
        process(chunk)
except EndOfStream:
    pass

@blampe
Copy link

blampe commented Jul 28, 2015

Suggestion: make the streamed response iterable, e.g. for thing in response.

@HelloGrayson
Copy link
Author

@blampe +1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment