Skip to content

Instantly share code, notes, and snippets.

@jzelinskie
Last active October 27, 2021 03:37
Show Gist options
  • Save jzelinskie/10ceca82f4f5085c106d to your computer and use it in GitHub Desktop.
Save jzelinskie/10ceca82f4f5085c106d to your computer and use it in GitHub Desktop.
grpc bidirectional streams in golang
package main
import (
"log"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "github.com/jzelinskie/grpc/simple"
)
func main() {
conn, err := grpc.Dial("localhost:6000", grpc.WithInsecure())
if err != nil {
log.Fatalf("failed to connect: %s", err)
}
defer conn.Close()
client := pb.NewSimpleServiceClient(conn)
stream, err := client.SimpleRPC(context.Background())
waitc := make(chan struct{})
msg := &pb.SimpleData{"sup"}
go func() {
for {
log.Println("Sleeping...")
time.Sleep(2 * time.Second)
log.Println("Sending msg...")
stream.Send(msg)
}
}()
<-waitc
stream.CloseSend()
}
package main
import (
"io"
"log"
"net"
"google.golang.org/grpc"
pb "github.com/jzelinskie/grpc/simple"
)
type simpleServer struct {
}
func (s *simpleServer) SimpleRPC(stream pb.SimpleService_SimpleRPCServer) error {
log.Println("Started stream")
for {
in, err := stream.Recv()
log.Println("Received value")
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Println("Got " + in.Msg)
}
}
func main() {
grpcServer := grpc.NewServer()
pb.RegisterSimpleServiceServer(grpcServer, &simpleServer{})
l, err := net.Listen("tcp", ":6000")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("Listening on tcp://localhost:6000")
grpcServer.Serve(l)
}
// Code generated by protoc-gen-go.
// source: simple.proto
// DO NOT EDIT!
/*
Package simple is a generated protocol buffer package.
It is generated from these files:
simple.proto
It has these top-level messages:
SimpleData
*/
package simple
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type SimpleData struct {
Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
}
func (m *SimpleData) Reset() { *m = SimpleData{} }
func (m *SimpleData) String() string { return proto.CompactTextString(m) }
func (*SimpleData) ProtoMessage() {}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// Client API for SimpleService service
type SimpleServiceClient interface {
SimpleRPC(ctx context.Context, opts ...grpc.CallOption) (SimpleService_SimpleRPCClient, error)
}
type simpleServiceClient struct {
cc *grpc.ClientConn
}
func NewSimpleServiceClient(cc *grpc.ClientConn) SimpleServiceClient {
return &simpleServiceClient{cc}
}
func (c *simpleServiceClient) SimpleRPC(ctx context.Context, opts ...grpc.CallOption) (SimpleService_SimpleRPCClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SimpleService_serviceDesc.Streams[0], c.cc, "/simple.SimpleService/SimpleRPC", opts...)
if err != nil {
return nil, err
}
x := &simpleServiceSimpleRPCClient{stream}
return x, nil
}
type SimpleService_SimpleRPCClient interface {
Send(*SimpleData) error
Recv() (*SimpleData, error)
grpc.ClientStream
}
type simpleServiceSimpleRPCClient struct {
grpc.ClientStream
}
func (x *simpleServiceSimpleRPCClient) Send(m *SimpleData) error {
return x.ClientStream.SendMsg(m)
}
func (x *simpleServiceSimpleRPCClient) Recv() (*SimpleData, error) {
m := new(SimpleData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for SimpleService service
type SimpleServiceServer interface {
SimpleRPC(SimpleService_SimpleRPCServer) error
}
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
func _SimpleService_SimpleRPC_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SimpleServiceServer).SimpleRPC(&simpleServiceSimpleRPCServer{stream})
}
type SimpleService_SimpleRPCServer interface {
Send(*SimpleData) error
Recv() (*SimpleData, error)
grpc.ServerStream
}
type simpleServiceSimpleRPCServer struct {
grpc.ServerStream
}
func (x *simpleServiceSimpleRPCServer) Send(m *SimpleData) error {
return x.ServerStream.SendMsg(m)
}
func (x *simpleServiceSimpleRPCServer) Recv() (*SimpleData, error) {
m := new(SimpleData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _SimpleService_serviceDesc = grpc.ServiceDesc{
ServiceName: "simple.SimpleService",
HandlerType: (*SimpleServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SimpleRPC",
Handler: _SimpleService_SimpleRPC_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}
syntax = "proto3";
package simple;
service SimpleService {
rpc SimpleRPC (stream SimpleData) returns (stream SimpleData) {}
}
message SimpleData {
string msg = 1;
}
@eufat
Copy link

eufat commented Mar 12, 2019

Nice example, but not yet a "bidirectional", since the server didn't return any stream to the client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment