Skip to content

Instantly share code, notes, and snippets.

@nabeken
Last active October 29, 2015 12:08
Show Gist options
  • Save nabeken/7f85026ccdb828f7bca7 to your computer and use it in GitHub Desktop.
Save nabeken/7f85026ccdb828f7bca7 to your computer and use it in GitHub Desktop.
Tempfile-based ReadSeekCloser implementation
package ioutils
import (
"io"
"io/ioutil"
"log"
"os"
"sync"
"sync/atomic"
)
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
}
type ReadSeeker struct {
// reader for source
sr io.Reader
// reader for tempfile-based buffer
rs *os.File
// writer for tempfile-based buffer
rw *os.File
mu sync.Mutex
// err from sr and rw
err error
// whether rw is closed
closed bool
// buffer between sr and rw to write data to rw
copybuf []byte
// closed when copying is done
copych chan struct{}
// signaled when copy() writes data to rw
copycond *sync.Cond
once sync.Once
// position for rw
wpos int64
// position for rs
rpos int64
}
func (rs *ReadSeeker) Read(p []byte) (int, error) {
rs.once.Do(func() { go rs.copy() })
rs.mu.Lock()
defer rs.mu.Unlock()
rpos := atomic.LoadInt64(&rs.rpos)
wpos := atomic.LoadInt64(&rs.wpos)
if !rs.closed && wpos == rpos {
// we are waiting for data to be available
log.Printf("read: waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos)
rs.copycond.Wait()
log.Printf("read: done waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos)
}
n, err := rs.rs.Read(p)
atomic.AddInt64(&rs.rpos, int64(n))
if rs.closed {
log.Printf("read: copying is over: err:%s rs.err:%s rpos:%d wpos: %d", err, rs.err, rs.rpos, rs.wpos)
// copying is over
if rs.err != nil {
err = rs.err
}
// we can return io.EOF after copying is over
return n, err
}
log.Printf("read: copying is still ongoing: err:%s rpos:%d wpos: %d", err, rs.rpos, rs.wpos)
// copying is still ongoing so even if we got EOF from rs
if err == io.EOF {
err = nil
}
return n, err
}
func (rs *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
rs.once.Do(func() { go rs.copy() })
n, err := rs.rs.Seek(offset, whence)
atomic.StoreInt64(&rs.rpos, n)
log.Printf("seek: seek pos:%d offset:%d whence:%d err:%s rpos:%d wpos: %d", n, offset, whence, err, rs.rpos, rs.wpos)
return n, err
}
func (rs *ReadSeeker) Close() error {
// Closing reader
rs.rs.Close()
// Closing rw but we should wait for copy() to be finishd
// since copying is happened in another goroutine
<-rs.copych
return os.Remove(rs.rw.Name())
}
func (rs *ReadSeeker) copy() {
for {
nr, err := rs.sr.Read(rs.copybuf)
if nr > 0 {
nw, ew := rs.rw.Write(rs.copybuf[0:nr])
if ew != nil {
rs.err = ew
break
}
if nr != nw {
rs.err = io.ErrShortWrite
break
}
atomic.AddInt64(&rs.wpos, int64(nw))
log.Printf("copy: read: n:%d rpos:%d wpos:%d", nw, rs.rpos, rs.wpos)
// signal that data is available for read
rs.copycond.Signal()
}
if err == io.EOF {
break
}
if err != nil {
rs.err = err
break
}
}
// we're done copying so closing...
// reader must be blocked while we're closing
rs.mu.Lock()
defer rs.mu.Unlock()
if err := rs.rw.Close(); err != nil {
rs.err = err
}
rs.closed = true
close(rs.copych)
log.Printf("copy: done: rpos:%d wpos:%d", rs.rpos, rs.wpos)
// signal that data is available for last read
rs.copycond.Signal()
}
func NewReadSeeker(r_ io.Reader) (ReadSeekCloser, error) {
rw, err := ioutil.TempFile("", "hoge")
if err != nil {
return nil, err
}
rs, err := os.Open(rw.Name())
if err != nil {
return nil, err
}
rsc := &ReadSeeker{
sr: r_,
rs: rs,
rw: rw,
// https://golang.org/src/io/io.go?s=12227:12287#L378
copybuf: make([]byte, 32*1024),
// this should be buffered since copy will send a message
// when no one wait on Close()
copych: make(chan struct{}, 1),
}
rsc.copycond = sync.NewCond(&rsc.mu)
return rsc, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment