Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
grpc bidirectional streams in golang
package main
import (
"log"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "github.com/jzelinskie/grpc/simple"
)
func main() {
conn, err := grpc.Dial("localhost:6000", grpc.WithInsecure())
if err != nil {
log.Fatalf("failed to connect: %s", err)
}
defer conn.Close()
client := pb.NewSimpleServiceClient(conn)
stream, err := client.SimpleRPC(context.Background())
waitc := make(chan struct{})
msg := &pb.SimpleData{"sup"}
go func() {
for {
log.Println("Sleeping...")
time.Sleep(2 * time.Second)
log.Println("Sending msg...")
stream.Send(msg)
}
}()
<-waitc
stream.CloseSend()
}
package main
import (
"io"
"log"
"net"
"google.golang.org/grpc"
pb "github.com/jzelinskie/grpc/simple"
)
type simpleServer struct {
}
func (s *simpleServer) SimpleRPC(stream pb.SimpleService_SimpleRPCServer) error {
log.Println("Started stream")
for {
in, err := stream.Recv()
log.Println("Received value")
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Println("Got " + in.Msg)
}
}
func main() {
grpcServer := grpc.NewServer()
pb.RegisterSimpleServiceServer(grpcServer, &simpleServer{})
l, err := net.Listen("tcp", ":6000")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("Listening on tcp://localhost:6000")
grpcServer.Serve(l)
}
// Code generated by protoc-gen-go.
// source: simple.proto
// DO NOT EDIT!
/*
Package simple is a generated protocol buffer package.
It is generated from these files:
simple.proto
It has these top-level messages:
SimpleData
*/
package simple
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type SimpleData struct {
Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
}
func (m *SimpleData) Reset() { *m = SimpleData{} }
func (m *SimpleData) String() string { return proto.CompactTextString(m) }
func (*SimpleData) ProtoMessage() {}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// Client API for SimpleService service
type SimpleServiceClient interface {
SimpleRPC(ctx context.Context, opts ...grpc.CallOption) (SimpleService_SimpleRPCClient, error)
}
type simpleServiceClient struct {
cc *grpc.ClientConn
}
func NewSimpleServiceClient(cc *grpc.ClientConn) SimpleServiceClient {
return &simpleServiceClient{cc}
}
func (c *simpleServiceClient) SimpleRPC(ctx context.Context, opts ...grpc.CallOption) (SimpleService_SimpleRPCClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SimpleService_serviceDesc.Streams[0], c.cc, "/simple.SimpleService/SimpleRPC", opts...)
if err != nil {
return nil, err
}
x := &simpleServiceSimpleRPCClient{stream}
return x, nil
}
type SimpleService_SimpleRPCClient interface {
Send(*SimpleData) error
Recv() (*SimpleData, error)
grpc.ClientStream
}
type simpleServiceSimpleRPCClient struct {
grpc.ClientStream
}
func (x *simpleServiceSimpleRPCClient) Send(m *SimpleData) error {
return x.ClientStream.SendMsg(m)
}
func (x *simpleServiceSimpleRPCClient) Recv() (*SimpleData, error) {
m := new(SimpleData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for SimpleService service
type SimpleServiceServer interface {
SimpleRPC(SimpleService_SimpleRPCServer) error
}
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
func _SimpleService_SimpleRPC_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SimpleServiceServer).SimpleRPC(&simpleServiceSimpleRPCServer{stream})
}
type SimpleService_SimpleRPCServer interface {
Send(*SimpleData) error
Recv() (*SimpleData, error)
grpc.ServerStream
}
type simpleServiceSimpleRPCServer struct {
grpc.ServerStream
}
func (x *simpleServiceSimpleRPCServer) Send(m *SimpleData) error {
return x.ServerStream.SendMsg(m)
}
func (x *simpleServiceSimpleRPCServer) Recv() (*SimpleData, error) {
m := new(SimpleData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _SimpleService_serviceDesc = grpc.ServiceDesc{
ServiceName: "simple.SimpleService",
HandlerType: (*SimpleServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SimpleRPC",
Handler: _SimpleService_SimpleRPC_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}
syntax = "proto3";
package simple;
service SimpleService {
rpc SimpleRPC (stream SimpleData) returns (stream SimpleData) {}
}
message SimpleData {
string msg = 1;
}
@eufat

This comment has been minimized.

Copy link

eufat commented Mar 12, 2019

Nice example, but not yet a "bidirectional", since the server didn't return any stream to the client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.