Skip to content

Instantly share code, notes, and snippets.

@hermanbanken
Created August 12, 2019 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hermanbanken/e676cb2224c3147b870264fa013b5760 to your computer and use it in GitHub Desktop.
Save hermanbanken/e676cb2224c3147b870264fa013b5760 to your computer and use it in GitHub Desktop.
Demuxing
type Muxed struct {
IsLast bool;
Payload []byte;
}
type State struct {
ReplyTo chan interface{}
}
// Incoming attaches [channel(buffer: 1)], if Mux finds multi-packet reply,
// it pushes [channel(buffer: 2)] with 1st packet and swaps the channel in state
func dealWithMux(packet Muxed, state State) {
if packet.IsLast {
state.ReplyTo <- packet.Payload
} else {
currentChan := state.ReplyTo
state.ReplyTo = make(chan interface{}, 2)
currentChan <- state.ReplyTo
state.ReplyTo <- packet.Payload
}
}
// ReadResponse Continuosly reads a channel, containing either bytes or
// continuations, until no more data is available. This allows for using
// buffered channels, while still allowing indefinite streams of data.
func ReadResponse(w io.Writer, stream chan interface{}) {
for {
result, hasData := <-stream
if !hasData {
return
}
continuation, isContinuation := result.(chan interface{})
if (isContinuation) {
stream = continuation
} else {
data, isData := result.([]byte)
if isData {
w.Write(data)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment