Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
golang - A blocking reader
package main
import (
"fmt"
"bytes"
"os"
"log"
"time"
"io"
"http"
"bufio"
)
type BlockReadWriter struct {
buf *bytes.Buffer
read chan []byte
write chan []byte
nrc chan int
errc chan os.Error
}
func NewBlockReadWriter() *BlockReadWriter {
ret := &BlockReadWriter{bytes.NewBuffer(nil),
make(chan []byte),
make(chan []byte),
make(chan int),
make(chan os.Error)}
go func(bc *BlockReadWriter) {
for {
var readTo, write []byte
var hasData = true
doWrite := func() {
_, err := bc.buf.Write(write)
if err != nil {
log.Fatal("BlockReadWriter:", err)
}
}
select {
case readTo = <-bc.read:
if bc.buf.Len() == 0 {
for {
if !hasData {
break
}
write,hasData = <-bc.write
doWrite()
if !hasData {break}
if bc.buf.Len() > 0 {
break
}
}
}
nr, err := bc.buf.Read(readTo)
bc.nrc <- nr
bc.errc <- err
case write = <-bc.write:
if !hasData {
log.Fatal("write after eof")
}
doWrite()
}
}
}(ret)
return ret
}
func (bc *BlockReadWriter) Write(data []byte) (nr int, err os.Error) {
bc.write <- data
nr = len(data)
return
}
func (bc *BlockReadWriter) Read(b []byte) (nr int, err os.Error) {
bc.read <- b
nr = <-bc.nrc
err = <-bc.errc
return
}
func example1() {
out := []byte("abcd")
in := []byte{0, 0}
bc := NewBlockReadWriter()
bc.Write([]byte{'x'})
go func() { fmt.Println("Waiting"); time.Sleep(10000); fmt.Println("working"); bc.Write(out) }()
//bc.Write(out)
//bc.Read(in)
fmt.Println("reading")
bc.Read(in)
fmt.Println("Hello World!", string(in))
}
func parserExample() {
bc := NewBlockReadWriter()
reqc := make(chan *http.Request)
go func() {
r, _ := http.ReadRequest(bufio.NewReader(bc))
reqc <- r
}()
for _, line := range []string{"GET /?q=a%20b%23 HTTP/1.1", "Host: google.com", ""} {
bc.Write([]byte(line))
bc.Write([]byte("\r\n"))
}
bc.Write([]byte("salamandra"))
req := <-reqc
fmt.Printf("q value is '%v'\n", req.FormValue("q"))
}
func main() {
example1()
parserExample()
}
@maxmcd
Copy link

maxmcd commented Aug 19, 2019

alternative setup that worked for me

package main

import (
	"bytes"
	"fmt"
	"io"
	"sync"
)

type blockingReader struct {
	buf  bytes.Buffer
	cond *sync.Cond
}

func newBlockingReader() *blockingReader {
	m := sync.Mutex{}
	return &blockingReader{
		cond: sync.NewCond(&m),
		buf:  bytes.Buffer{},
	}
}

func (br *blockingReader) Write(b []byte) (ln int, err error) {
	ln, err = br.buf.Write(b)
	br.cond.Broadcast()
	return
}

func (br *blockingReader) Read(b []byte) (ln int, err error) {
	ln, err = br.buf.Read(b)
	if err == io.EOF {
		br.cond.L.Lock()
		br.cond.Wait()
		br.cond.L.Unlock()
		ln, err = br.buf.Read(b)
	}
	return
}

@ostretsov
Copy link

ostretsov commented Dec 2, 2019

@maxmcd, Thanks!

@eliezedeck
Copy link

eliezedeck commented Jan 3, 2020

Thanks @maxmcd

@ii64
Copy link

ii64 commented Nov 18, 2021

import "io"
type pipeReadWriteCloser struct {
	*io.PipeReader
	*io.PipeWriter
}

func newPipeReadWriteCloser() pipeReadWriteCloser {
	pr, pw := io.Pipe()
	return pipeReadWriteCloser{pr, pw}
}

func (c pipeReadWriteCloser) Close() (err error) {
	if err = c.PipeReader.Close(); err != nil {
		return
	}
	err = c.PipeWriter.Close()
	return
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment