Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save abitofhelp/40d22fd85fdd6b412b4527ba5bc54a48 to your computer and use it in GitHub Desktop.
Save abitofhelp/40d22fd85fdd6b412b4527ba5bc54a48 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a Linux pipe.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package main implements streaming of a file through a FIFO pipe, from which data is written to a new file.
// Using a FIFO pipe avoids having to allocate buffers within the for loop, which eliminates a data race condition
// on reading data from the file and sending it to the channel. However, using a pipe is slower than a
// channel. A quick test with a 3.9GB binary file required 5.3s with a channel, and 12.6s with a pipe.
// I've created a companion gist,"go-stream-file-between-goroutines-with-channel" for comparison.
// Package main is the entry point for the application and is responsible for configuring the environment.
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"sync"
"time"
)
/************************************************* CONSTANTS **********************************************************/
// Const kMaxBufferSize is the maximum number of bytes that will
// be read from the file in each iteration through the reading loop.
const kMaxBufferSize = 1024 << 5
// Const kFromPath is the file system path to the file that will be read.
//const kFromPath = "/home/mjgardner/Downloads/abc.rar" // 56.2GB
//const kFromPath = "/home/mjgardner/Downloads/def.exe" // 220MB
const kFromPath = "/home/mjgardner/Downloads/ghi.zip" // 3.9GB
// Const kToPath is the file system path for where the data from the buffered channel will be written.
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip"
/**********************************************************************************************************************/
/**************************************************** MAIN ************************************************************/
// Function main is the entry point for the application and is responsible for configuring its environment.
func main() {
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed.
var wg sync.WaitGroup
// Start our timer...
start := time.Now()
// Create and configure the interfaces used by the application, such as the file system and pipe.
pw, pr, ifile, ofile, err := configureInterfaces(kFromPath, kToPath)
if err != nil {
// Error has already been logged.
return
}
// Automatically close the files when exiting.
defer ifile.Close()
defer ofile.Close()
fmt.Println("Starting processing...\n")
// File writing goroutine.
wg.Add(1)
go func(r *io.PipeReader, of *os.File) {
defer wg.Done()
// Receive the input file's data through the FIFO pipe.
receiver(r, of)
}(pr, ofile)
// File reading goroutine
wg.Add(1)
go func(w *io.PipeWriter, ifile *os.File) {
defer wg.Done()
// Write the input file's data to the FIFO pipe.
sender(ifile, w)
}(pw, ifile)
// Wait here until all goroutines have completed their work.
wg.Wait()
// Show the duration.
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start))
}
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system
// and pipe.
// Parameter fromPath is the file system path to the file that will be read.
// Parameter toPath is the file system path to the file that will be created.
// Returns a configured pipe writer and reader, input file handle, and output file handle,
// and no error instance on success; Otherwise,
// all items will be nil, except for the error.
func configureInterfaces(fromPath string, toPath string) (*io.PipeWriter, *io.PipeReader, *os.File, *os.File, error) {
// Create a FIFO pipe.
pr, pw := io.Pipe()
// Open the file at fromPath for reading...
ifile, err := os.Open(fromPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, nil, err
}
// Create the output file at toPath for writing...
ofile, err := os.Create(toPath)
if err != nil {
log.Fatal(err)
return nil, nil, nil, nil, err
}
return pw, pr, ifile, ofile, err
}
// Function sender write the input file's data to the FIFO pipe.
// Parameter if is the input file's handle.
// Parameter pw pipe's writer where data from the input file will be written.
func sender(ifile *os.File, pw *io.PipeWriter) error {
defer pw.Close()
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
// The buffer for data that is read from the file.
buf := make([]byte, kMaxBufferSize)
fmt.Println("\tSender is starting to read the input file...")
// Loop through the input file reading chunks of data, which is sent over the pipe.
for {
// Read a chunk of data from the file...
n, err := ifile.Read(buf[:cap(buf)])
// Did we read any data from the file? Was there an error?
if n == 0 {
if err == nil {
// No data and no error; Keep going...
continue
}
if err == io.EOF {
// End of file, so exit the loop...
break
}
// Ouch! Log the error and exit.
log.Fatal(err)
return err
}
// Update the cumulative counters.
nChunks++
nBytes += uint64(n)
// Send the data over the channel.
pw.Write(buf[:n])
}
// Signal the receiving goroutines that there is no more data.
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...")
pw.Close()
// When there is no more data to process, display the sender's status.
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
return nil
}
// Receive the input file's data through the FIFO pipe.
// Parameter of is the output file's handle.
// Parameter pr is the pipe's reader from which file data will be read.
func receiver(pr *io.PipeReader, of *os.File) error {
// Cumulative counters
nBytes := uint64(0)
nChunks := uint64(0)
// Buffered writing to the output file.
writer := bufio.NewWriter(of)
// Buffer used for each chunk of data from the pipe.
buf := make([]byte, kMaxBufferSize)
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...")
// While there is data to read in the pipe, we will get it and writing it to the output file.
for {
n, err := pr.Read(buf[:cap(buf)])
// Did we read any data from the file? Was there an error?
if n == 0 {
if err == nil {
// No data and no error; Keep going...
continue
}
if err == io.EOF {
// End of file, so exit the loop...
break
}
// Ouch! Log the error and exit.
log.Fatal(err)
return err
}
// Write the chunk to the output file.
writer.Write(buf[:n])
// Update the cumulative counters...
nBytes += uint64(n)
nChunks++
}
fmt.Println("\tReceiver has emptied the pipe and is exiting...")
// When there is no more data to process, display the receiver's status.
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment