Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save abitofhelp/5ff65b14ac030cffed0cbdeac6ba8449 to your computer and use it in GitHub Desktop.
Save abitofhelp/5ff65b14ac030cffed0cbdeac6ba8449 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package main implements streaming of a file through a buffered channel where it is written to a new file.
// A data race condition arose in the for loop that reads data from the input file and sending it to the channel.
// I moved the allocation of the buffer to the top of the loop, to resolve the data race issue.
// I've created a companion gist,"go-stream-file-between-goroutines-with-pipe" that resolves the race issue using a
// FIFO pipe. It worked, but the price was that it was slower than using a channel.
// A quick test with a 3.9GB binary file required 5.3s with a channel, and 12.6s with a pipe.
// Package main is the entry point for the application and is responsible for configuring the environment.
package main
import (
"errors"
"fmt"
"io"
"log"
"os"
"sync"
"time"
)
/************************************************* CONSTANTS **********************************************************/
// Const kMaxBufferSize is the maximum number of bytes that will
// be read from the file in each iteration through the reading loop.
const kMaxBufferSize = 4096
// Const kMaxChannelSize is the number of []byte that can be placed
// into the buffered channel.
const kMaxChannelSize = 100
// Const kFromPath is the file system path to the file that will be read.
const kFromPath = "/home/mjgardner/Downloads/ABigFile.zip"
// Const kToPath is the file system path for where the data from the buffered channel will be written.
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip"
/**********************************************************************************************************************/
/**************************************************** MAIN ************************************************************/
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system
// and channel.
// Parameter channelSize is the number of []byte that can be placed into the buffered channel.
// Parameter fromPath is the file system path to the file that will be read.
// Parameter toPath is the file system path to the file that will be created.
// Returns a configured channel, input file handle, and output file handle, and no error instance on success; Otherwise,
// all items will be nil, except for the error.
func configureInterfaces(channelSize uint64, fromPath string, toPath string) (chan []byte, *os.File, *os.File, error) {
// Create the channel that will be used by the file reader and file write goroutines.
ch := make(chan []byte, channelSize)
if ch == nil {
err := errors.New("failed to create the channel")
log.Fatal(err)
return nil, nil, nil, err
}
// Open the file at fromPath for reading...
ifile, err := os.Open(fromPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, err
}
// Create the output file at toPath for writing...
ofile, err := os.Create(toPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, err
}
return ch, ifile, ofile, err
}
// Function main is the entry point for the application and is responsible for configuring its environment.
func main() {
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed.
var wg sync.WaitGroup
// Start our timer...
start := time.Now()
// Create and configure the interfaces used by the application, such as the file system and channel.
ch, ifile, ofile, err := configureInterfaces(kMaxChannelSize, kFromPath, kToPath)
if err != nil {
// Error has already been logged.
return
}
// Automatically close the files when exiting.
defer ifile.Close()
defer ofile.Close()
fmt.Println("Starting processing...\n")
// File writing goroutine.
wg.Add(1)
go func(ch <-chan []byte, of *os.File) {
defer wg.Done()
// Receive the input file's data through the buffered channel.
receiver(ch, of)
}(ch, ofile)
// File reading goroutine
wg.Add(1)
go func(ch chan<- []byte, ifile *os.File) {
defer wg.Done()
// Write the input file's data to the buffered channel.
sender(ifile, ch)
}(ch, ifile)
// Wait here until all goroutines have completed their work.
wg.Wait()
// Show the duration.
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start))
}
// Function sender write the input file's data to the buffered channel.
// Parameter if is the input file's handle.
// Parameter ch is the buffered channel to which data will be written.
func sender(ifile *os.File, ch chan<- []byte) error {
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
fmt.Println("\tSender is starting to read the input file...")
// Loop through the input file reading chunks of data, which is sent over the channel.
for {
// The buffer for data that is read from the file. It is created on each
// cycle through the loop to avoid a race condition with ifile.Read().
buf := make([]byte, kMaxBufferSize)
// Read a chunk of data from the file...
n, err := ifile.Read(buf[:cap(buf)])
// Did we read any data from the file? Was there an error?
if n == 0 {
if err == nil {
// No data and no error; Keep going...
continue
}
if err == io.EOF {
// End of file, so exit the loop...
break
}
// Ouch! Log the error and exit.
log.Fatal(err)
return err
}
// Update the cumulative counters.
nChunks++
nBytes += uint64(n)
// Send the data over the channel.
ch <- buf[:n]
}
// Signal the receiving goroutines that there is no more data.
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...")
close(ch)
// When there is no more data to process, display the sender's status.
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
return nil
}
// Receive the input file's data through the buffered channel.
// Parameter of is the output file's handle.
// Parameter ch is the buffered channel from which data will be read.
func receiver(ch <-chan []byte, of *os.File) {
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...")
// While there is data to read in the channel, we will get it and writing it to the output file.
for data := range ch {
// Determine the length of the chunk of data that is available.
n := len(data)
// Write the chunk to the output file.
of.Write(data[:n])
// Update the cumulative counters...
nBytes += uint64(n)
nChunks++
}
fmt.Println("\tReceiver has emptied the channel and is exiting...")
// When there is no more data to process, display the receiver's status.
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment