Skip to content

Instantly share code, notes, and snippets.

@Codebreaker101
Last active September 12, 2022 11:51
Show Gist options
  • Save Codebreaker101/a7ee3d9aac4855bdb0420aa6bd46c704 to your computer and use it in GitHub Desktop.
Save Codebreaker101/a7ee3d9aac4855bdb0420aa6bd46c704 to your computer and use it in GitHub Desktop.
RSocket Go bug report
//nolint:funlen
package client_test
import (
"context"
"fmt"
"testing"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/flux"
)
func TestRsocket(t *testing.T) {
readyCh := make(chan struct{})
go rsocketServer(readyCh)
<-readyCh
rsocketClient()
}
func rsocketServer(readyCh chan struct{}) {
requestStreamHandler := rsocket.RequestStream(func(request payload.Payload) flux.Flux {
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
defer sink.Complete()
for i := 0; i < 10; i++ {
sink.Next(payload.NewString(fmt.Sprintf("%d", i), extension.TextPlain.String()))
}
})
return f
})
err := rsocket.Receive().
OnStart(func() { close(readyCh) }).
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(requestStreamHandler), nil
}).
Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
Serve(context.Background())
panic(err)
}
func rsocketClient() {
tp := rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()
client, err := rsocket.Connect().Transport(tp).Start(context.Background())
if err != nil {
panic(err)
}
defer client.Close()
res, err := client.RequestStream(payload.Empty()).BlockSlice(context.Background())
if err != nil {
panic(err)
}
for i := range res {
fmt.Println(res[i].Data())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment