Skip to content

Instantly share code, notes, and snippets.

@nilium
Created December 28, 2014 02:13
Show Gist options
  • Save nilium/b2352d8a02514301b31b to your computer and use it in GitHub Desktop.
Save nilium/b2352d8a02514301b31b to your computer and use it in GitHub Desktop.
Read pool used internally in gork to avoid spawning too many threads due to syscalls from IO (not sure if this is actually a concern) and limit the number of file descriptors in use by gork (this one is).
package gork
import (
"fmt"
"io/ioutil"
"log"
"os"
)
var (
readPoolSize int = 10
readRequests chan<- ioReadRequest = nil
)
// ioReadResult is the result of sending an ioReadRequest to an ioPoolReader
// goroutine via a channel it listens on. Its err field or data field are
// non-nil depending on the result. For successful reads of an entire file, the
// data field is a byte slice containing the entire file's contents. If an
// error occurs at any point, the err field is set to the first error
// encountered.
//
// Errors encountered while closing a file are not passed back as errors and
// are instead logged to standard error. This is because an error on close must
// happen after another error already occurs or after a file has been read in
// its entirety.
type ioReadResult struct {
data []byte
err error
}
// ioReadRequest represents a request to read a file from a specific path. This
// is to be processed by an ioPoolReader goroutine and the result returned via
// its out channel field.
type ioReadRequest struct {
path string
out chan ioReadResult
log *log.Logger
}
// reject sends a successful ioReadResult with a byte slice back through the
// ioReadRequest's out channel.
func (r *ioReadRequest) resolve(data []byte) {
r.out <- ioReadResult{data: data}
}
// reject sends an error ioReadResult back through the ioReadRequest's out
// channel.
func (r *ioReadRequest) reject(err error) {
r.out <- ioReadResult{err: err}
}
// force reads the file requested by the ioReadRequest and passes the
// ioReadResult for it back through its out channel.
func (r *ioReadRequest) force() {
var err error
var fi *os.File
fi, err = os.Open(r.path)
if err != nil {
r.log.Printf("Error opening file %q: %v", r.path, err)
r.reject(err)
return
}
defer func() {
closeErr := fi.Close()
if closeErr != nil {
r.log.Printf("Error closing file %q: %v", r.path, closeErr)
}
}()
var data []byte
data, err = ioutil.ReadAll(fi)
if err != nil {
r.log.Printf("Error reading file %q: %v", r.path, err)
r.reject(err)
return
}
r.resolve(data)
}
// ioPoolReader listens for ioReadRequests on the given reqs channel and
// processes them. The id given is an ID number used for logging errors on a
// specific goroutine.
func ioPoolReader(id int, reqs <-chan ioReadRequest) {
prefix := fmt.Sprintf("[reader %d] ", id)
logger := log.New(os.Stderr, prefix, log.LstdFlags)
defer logger.Println("Closing ioPoolReader goroutine.")
for req := range reqs {
req.log = logger
req.force()
}
}
// poolRead reads a file at the given path and returns its entire contents as a
// byte slice. If an error occurs, the returned byte slice is nil and an error
// is returned.
//
// poolRead uses pooled readers to ensure only a specific number of files are
// open at a time.
func poolRead(path string) ([]byte, error) {
if readRequests == nil {
initPoolReaders()
}
out := make(chan ioReadResult, 1)
readRequests <- ioReadRequest{path: path, out: out}
result := <-out
return result.data, result.err
}
// initPoolReaders starts up a set ioPoolReader goroutines to handle reading
// files. The number of goroutines started is however many are indicated by
// the readPoolSize global variable.
func initPoolReaders() error {
if readPoolSize < 1 {
return fmt.Errorf("readPoolSize(%d) < 1: file reading is impossible (existing readers will not be killed)", readPoolSize)
}
if readRequests != nil {
log.Println("Closing old ioPoolReader request channel and creating a new one.")
close(readRequests)
}
// If readRequests is non-nil already, just overwrite it and let any
// readers die out after the channel is collected by the GC.
reqs := make(chan ioReadRequest)
readRequests = reqs
for i := 0; i < readPoolSize; i++ {
go ioPoolReader(i, reqs)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment