Skip to content

Instantly share code, notes, and snippets.

@abitofhelp
Last active September 15, 2018 01:15
Show Gist options
  • Save abitofhelp/b5f3eebf7e316b96b2cdc4188db2d742 to your computer and use it in GitHub Desktop.
Save abitofhelp/b5f3eebf7e316b96b2cdc4188db2d742 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a file. It uses a reference counting pool of buffers.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package refcnt implements a reference count based []byte buffer.
package refcnt
import (
"errors"
"fmt"
)
/*************************************************** TYPES ************************************************************/
// Type Buffer maintains a []byte buffer with a reference counter. If the count is zero, the buffer can be
// return to the pool and reused.
type Buffer struct {
ReferenceCounter
Buf []byte
}
/***************************************************** VAR ************************************************************/
// The buffer size if one is not provided.
var bufferSize = uint64(1024 << 9)
// Buffer pool
var bufferPool = NewPool(
func(counter ReferenceCounter) ReferenceCountable {
br := new(Buffer)
br.ReferenceCounter = counter
br.Buf = make([]byte, bufferSize)
return br
}, reset)
/*********************************************** EXPORTED METHODS *****************************************************/
// Function NewBuffer creates or retrieves a Buffer.
func NewBuffer(bufSize uint64) *Buffer {
if bufSize == 0 {
bufferSize = bufSize
}
e := acquire()
return e
}
/******************************************* INTERNAL FUNCTIONS/METHODS ***********************************************/
// Function to get a Buffer instance.
func acquire() *Buffer {
return bufferPool.Get().(*Buffer)
}
// Method to reset the buffer inside the Buffer object.
// The reference countable pool uses this method.
func (e *Buffer) reset() {
// At this time, we just reset the length to zero.
e.Buf = e.Buf[:0]
}
// Function to reset the Buffer object.
// The reference countable pool uses this method.
func reset(i interface{}) error {
obj, ok := i.(*Buffer)
if !ok {
errors.New(fmt.Sprintf("illegal object sent to resetRefCntBuffer: %v\n", i))
}
obj.reset()
return nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// The implementation of the pool was inspired by the following blog:
// http://www.akshaydeo.com/blog/2017/12/23/How-did-I-improve-latency-by-700-percent-using-syncPool/
// Package refcnt implements a reference count based []byte buffer.
package refcnt
import (
"reflect"
"sync"
"sync/atomic"
)
// Interface ReferenceCountable is a reference countable interface.
type ReferenceCountable interface {
// Method to set the current instance
SetInstance(i interface{})
// Method to increment the reference count
IncrementReferenceCount()
// Method to decrement reference count
DecrementReferenceCount()
}
// Type ReferenceCountedPool defines the pool.
type ReferenceCountedPool struct {
pool *sync.Pool
factory func() ReferenceCountable
returned uint32
allocated uint32
referenced uint32
}
// Function NewPool creates a new reference counting pool of Buffers.
func NewPool(factory func(referenceCounter ReferenceCounter) ReferenceCountable, reset func(interface{}) error) *ReferenceCountedPool {
p := new(ReferenceCountedPool)
p.pool = new(sync.Pool)
p.pool.New = func() interface{} {
// Incrementing allocated count
atomic.AddUint32(&p.allocated, 1)
c := factory(ReferenceCounter{
count: new(uint32),
destination: p.pool,
released: &p.returned,
reset: reset,
id: p.allocated,
})
return c
}
return p
}
// Method to get new object
func (p *ReferenceCountedPool) Get() ReferenceCountable {
c := p.pool.Get().(ReferenceCountable)
c.SetInstance(c)
atomic.AddUint32(&p.referenced, 1)
c.IncrementReferenceCount()
return c
}
// Method to return reference counted pool stats
func (p *ReferenceCountedPool) Stats() map[string]interface{} {
return map[string]interface{}{"allocated": p.allocated, "referenced": p.referenced, "returned": p.returned}
}
// Struct representing reference
// This struct is supposed to be embedded inside the object to be pooled
// Along with that incrementing and decrementing the references is highly important specifically around routines
type ReferenceCounter struct {
count *uint32 `sql:"-" json:"-" yaml:"-"`
destination *sync.Pool `sql:"-" json:"-" yaml:"-"`
released *uint32 `sql:"-" json:"-" yaml:"-"`
Instance interface{} `sql:"-" json:"-" yaml:"-"`
reset func(interface{}) error `sql:"-" json:"-" yaml:"-"`
id uint32 `sql:"-" json:"-" yaml:"-"`
}
// Method to increment a reference
func (r ReferenceCounter) IncrementReferenceCount() {
atomic.AddUint32(r.count, 1)
}
// Method to decrement a reference
// If the reference count goes to zero, the object is put back inside the pool
func (r ReferenceCounter) DecrementReferenceCount() {
if atomic.LoadUint32(r.count) == 0 {
panic("this should not happen =>" + reflect.TypeOf(r.Instance).String())
}
if atomic.AddUint32(r.count, ^uint32(0)) == 0 {
atomic.AddUint32(r.released, 1)
if err := r.reset(r.Instance); err != nil {
panic("error while resetting an instance => " + err.Error())
}
r.destination.Put(r.Instance)
r.Instance = nil
}
}
// Method to set the current instance
func (r *ReferenceCounter) SetInstance(i interface{}) {
r.Instance = i
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package main implements streaming of a file through a buffered channel where it is written to a new file.
// A data race condition arose in the for loop that reads data from the input file and sending it to the channel.
// I implemented a reference counting pool of buffers, to reduce the time spent allocating memory.
// A quick test with a 3.9GB binary file reduced the time from 5.3s to 3.4s.
// Package main is the entry point for the application and is responsible for configuring the environment.
package main
import (
"errors"
"fmt"
"github.com/abitofhelp/junk/refcnt"
"io"
"log"
"os"
"sync"
"time"
)
/************************************************* CONSTANTS **********************************************************/
// Const kMaxBufferSize is the maximum number of bytes that will
// be read from the file in each iteration through the reading loop.
const kMaxBufferSize = 1024 << 9
// Const kMaxChannelSize is the number of []byte that can be placed
// into the buffered channel.
const kMaxChannelSize = 17
// Const kFromPath is the file system path to the file that will be read.
//const kFromPath = "/home/mjgardner/Downloads/Shills.rar" // 56.2GB
//const kFromPath = "/home/mjgardner/Downloads/sit.exe" // 220MB
const kFromPath = "/home/mjgardner/Downloads/ABigFile.zip" // 3.9GB
// Const kToPath is the file system path for where the data from the buffered channel will be written.
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip"
/**********************************************************************************************************************/
/**************************************************** MAIN ************************************************************/
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system
// and channel.
// Parameter channelSize is the number of []byte that can be placed into the buffered channel.
// Parameter fromPath is the file system path to the file that will be read.
// Parameter toPath is the file system path to the file that will be created.
// Returns a configured channel, input file handle, and output file handle, and no error instance on success; Otherwise,
// all items will be nil, except for the error.
func configureInterfaces(channelSize uint64, fromPath string, toPath string) (chan *refcnt.Buffer, *os.File, *os.File, error) {
// Create the channel that will be used by the file reader and file write goroutines.
ch := make(chan *refcnt.Buffer, channelSize)
if ch == nil {
err := errors.New("failed to create the channel")
log.Fatal(err)
return nil, nil, nil, err
}
// Open the file at fromPath for reading...
ifile, err := os.Open(fromPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, err
}
// Create the output file at toPath for writing...
ofile, err := os.Create(toPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, err
}
return ch, ifile, ofile, err
}
// Function main is the entry point for the application and is responsible for configuring its environment.
func main() {
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed.
var wg sync.WaitGroup
// Start our timer...
start := time.Now()
// Create and configure the interfaces used by the application, such as the file system and channel.
ch, ifile, ofile, err := configureInterfaces(kMaxChannelSize, kFromPath, kToPath)
if err != nil {
// Error has already been logged.
return
}
// Automatically close the files when exiting.
defer ifile.Close()
defer ofile.Close()
fmt.Println("Starting processing...\n")
// File writing goroutine.
wg.Add(1)
go func(ch <-chan *refcnt.Buffer, of *os.File) {
defer wg.Done()
// Receive the input file's data through the buffered channel.
receiver(ch, of)
}(ch, ofile)
// File reading goroutine
wg.Add(1)
go func(ch chan<- *refcnt.Buffer, ifile *os.File) {
defer wg.Done()
// Write the input file's data to the buffered channel.
sender(ifile, ch)
}(ch, ifile)
// Wait here until all goroutines have completed their work.
wg.Wait()
// Show the duration.
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start))
}
// Function sender write the input file's data to the buffered channel.
// Parameter if is the input file's handle.
// Parameter ch is the buffered channel to which data will be written.
func sender(ifile *os.File, ch chan<- *refcnt.Buffer) error {
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
fmt.Println("\tSender is starting to read the input file...")
// Loop through the input file reading chunks of data, which is sent over the channel.
for {
buf := refcnt.NewBuffer(kMaxBufferSize)
// Read a chunk of data from the file...
n, err := ifile.Read(buf.Buf[:cap(buf.Buf)])
// Did we read any data from the file? Was there an error?
if n == 0 {
if err == nil {
// No data and no error; Keep going...
continue
}
if err == io.EOF {
// End of file, so exit the loop...
break
}
// Ouch! Log the error and exit.
log.Fatal(err)
return err
}
// Update the cumulative counters.
nChunks++
nBytes += uint64(n)
// Send the data over the channel.
ch <- buf
}
// Signal the receiving goroutines that there is no more data.
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...")
close(ch)
// When there is no more data to process, display the sender's status.
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
return nil
}
// Receive the input file's data through the buffered channel.
// Parameter of is the output file's handle.
// Parameter ch is the buffered channel from which data will be read.
func receiver(ch <-chan *refcnt.Buffer, of *os.File) {
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...")
// While there is data to read in the channel, we will get it and writing it to the output file.
for data := range ch {
// Determine the length of the chunk of data that is available.
n := len(data.Buf)
// Write the chunk to the output file.
of.Write(data.Buf[:n])
// Update the cumulative counters...
nBytes += uint64(n)
nChunks++
data.DecrementReferenceCount()
}
fmt.Println("\tReceiver has emptied the channel and is exiting...")
// When there is no more data to process, display the receiver's status.
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment