Skip to content

Instantly share code, notes, and snippets.

@abitofhelp
Last active July 28, 2018 20:53
Show Gist options
  • Save abitofhelp/5c4ac0629c2cf384b21764ee98d5b96b to your computer and use it in GitHub Desktop.
Save abitofhelp/5c4ac0629c2cf384b21764ee98d5b96b to your computer and use it in GitHub Desktop.
This gist implements a directed acyclic graph to implement an extract, transform, and load ("ETL") process for files.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package main implements an ETL process where a file is read, and its contents streamed to a fan-out consisting
// of two, parallel goroutines: One that calculates the MD5 checksum and passes it along to the fan-in stage; The
// other saves the content stream to a new file and passes the file's metadata to the fan-in stage. The fan-in stage
// is the sink for the ETL process. It merges the metadata from the fan-out operations into a JSON object, which is
// persisted to a local file.
// Package main is the entry point for the application and is responsible for configuring the environment.
package main
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"time"
)
/************************************************* CONSTANTS **********************************************************/
// Const kMaxBufferSize is the maximum number of bytes that will
// be read from the file in each iteration through the reading loop.
const kMaxBufferSize = 4096
// Const kFromPath is the file system path to the file that will be processed.
const kFromPath = "/home/mjgardner/Downloads/ABigFile.zip"
// Const kToPath is the file system path for where the processed file will be persisted.
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip"
// Const kJsonPath is the file system path to the metadata for the processed file.
const kJsonPath = "/home/mjgardner/Downloads/ANewBigFile.json"
/**********************************************************************************************************************/
/*************************************************** TYPES ************************************************************/
// Type WriteChunksFunction defines the handler that will be invoked when doWrite is not nil.
type WriteChunksFunction func(buf []byte) (nBytesWritten uint64, err error)
/**********************************************************************************************************************/
/**************************************************** MAIN ************************************************************/
// Function main is the entry point for the application and is responsible for configuring its environment.
func main() {
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed.
var wg sync.WaitGroup
// Start our timer...
start := time.Now()
// Doing something...
doIt(&wg)
// Wait here until all goroutines have completed their work.
wg.Wait()
// Show the duration.
fmt.Printf("Elapsed: %s", time.Since(start))
}
// Function doIt does the work.
func doIt(wg *sync.WaitGroup) {
// Create the bidirectional channel for communications between
// the goroutines.
filerCh := make(chan []byte, 4096)
checksumCh := make(chan []byte, 4096)
filerCh, checksumCh = sourceStageProcessing(filerCh, checksumCh, wg)
fileInfoCh := make(chan os.FileInfo)
checksumOutCh := make(chan string)
configureFanOut(filerCh, checksumCh, fileInfoCh, checksumOutCh, wg)
configureFanIn(fileInfoCh, checksumOutCh, wg)
}
func sourceStageProcessing(filerCh chan []byte, checksumCh chan []byte, wg *sync.WaitGroup) (chan []byte, chan []byte) {
wg.Add(1)
go func(path string) {
defer wg.Done()
// Read the kFromPath file in chunks and write the chunks to a new file.
nBytesRead, nChunks, checksum, err := readFileInChunks(kFromPath, kMaxBufferSize, func(buf []byte) (nBytesWritten uint64, err error) {
filerCh <- buf
checksumCh <- buf
return uint64(len(buf)), err
})
fmt.Printf("Sent:\t\tnBytesRead: %d, nChunks: %d, Checksum: %s, Error: %v\n", nBytesRead, nChunks, checksum, err)
// Close the channels since there are no more values to send...
// It signals the receivers that there are no more values,
// so when the channel is empty, they can return and the application
// can terminate.
close(filerCh)
close(checksumCh)
}(kFromPath)
return filerCh, checksumCh
}
func configureFanOut(
filerCh chan []byte,
checksumCh chan []byte,
fileInfoCh chan os.FileInfo,
checksumOutCh chan string,
wg *sync.WaitGroup) {
// Fan-out the file contents to two goroutines running in parallel: one will write the stream to a new file and
// the other will calculate its MD5 checksum.
wg.Add(1)
go func(fiIn <-chan []byte, filePath string, fiOut chan<- os.FileInfo) {
defer wg.Done()
filer(fiIn, filePath, fiOut)
}(filerCh, kToPath, fileInfoCh)
wg.Add(1)
go func(csIn <-chan []byte, csOut chan<- string) {
defer wg.Done()
checksumer(csIn, csOut)
}(checksumCh, checksumOutCh)
}
func configureFanIn(fileInfoCh chan os.FileInfo, checksumOutCh chan string, wg *sync.WaitGroup) {
wg.Add(1)
go func(fiIn <-chan os.FileInfo, csIn <-chan string, filePath string, jsonPath string) {
defer wg.Done()
jsonizer(fiIn, csIn, filePath, jsonPath)
}(fileInfoCh, checksumOutCh, kToPath, kJsonPath)
}
func jsonizer(fileInfoInCh <-chan os.FileInfo, checksumInCh <-chan string, toPath string, toJsonPath string) {
fi := <-fileInfoInCh
checksum := <-checksumInCh
ffi := NewFileInfo(fi, toPath, checksum)
// printing out json neatly to demonstrate
jsonString, _ := json.MarshalIndent(ffi, "", " ")
jfile, err := os.Create(toJsonPath)
if err != nil {
return
}
// Automatically close the file when exiting this method.
defer jfile.Close()
content := string(jsonString)
jfile.WriteString(content)
fmt.Println("Jsonizer: ", content, "\nJson Path: ", toJsonPath)
}
func checksumer(checksumInCh <-chan []byte, checksumOutCh chan<- string) {
// Create the MD5 hasher so we can calculate the MD5 value on the fly...
hasher := md5.New()
hasher.Reset()
nBytesRead := uint64(0)
nChunks := uint64(0)
for buffer := range checksumInCh {
bytesRead := len(buffer)
hasher.Write(buffer[:bytesRead])
// Update the cumulative counters...
nBytesRead += uint64(bytesRead)
nChunks++
}
// Generate the MD5 hex string...
checksum := hex.EncodeToString(hasher.Sum(nil))
checksumOutCh <- checksum
fmt.Printf("Checksumer:\tnBytesRead: %d, nChunks: %d, Checksum: %s\n", nBytesRead, nChunks, checksum)
}
func filer(filerInCh <-chan []byte, toPath string, fileInfoOutCh chan<- os.FileInfo) {
ofile, err := os.Create(toPath)
defer ofile.Close()
if err != nil {
return
}
nBytesRead := uint64(0)
nChunks := uint64(0)
for buffer := range filerInCh {
bytesRead := len(buffer)
ofile.Write(buffer)
// Update the cumulative counters...
nBytesRead += uint64(bytesRead)
nChunks++
}
// Get the file's info from the open file...
fInfo, err := ofile.Stat()
// Pass the file info along to the next goroutine...
fileInfoOutCh <- fInfo
fmt.Printf("Receiver:\tnBytesRead: %d, nChunks: %d\n", nBytesRead, nChunks)
}
// Function readFileInChunks will read the contents of a file at
// kFromPath in maxBufSize chunks, calculate the MD5 hash on the fly, and
// the data can be written somewhere via the doWrite() parameter.
// Parameter kFromPath is the full path to the file that will be read.
// Parameter maxBufSize is the maximum number of bytes that will be read from the
// file in each iteration through the reading loop.
// Function doWrite can take a buffer containing maxBufSize bytes and write them somewhere.
// Returns nil on success, otherwise an error.
// Returns the (number of bytes read, number of chunks processed, checksum hex string, and nil) for success;
// Otherwise, (0, 0, "", error).
// Remarks include that whatever doWrite is doing will not be cleaned up when there is an error, unless doWrite does it.
func readFileInChunks(
fromPath string,
maxBufSize uint64,
doWrite WriteChunksFunction) (nBytesRead uint64, nChunks uint64, checksum string, err error) {
if fromPath == "" {
return 0, 0, checksum, fmt.Errorf("the kFromPath to the file that will be read " +
"cannot be an empty string")
}
if maxBufSize == 0 {
return 0, 0, checksum, fmt.Errorf("the size of the buffer used for reading from " +
"the file cannot be zero")
}
// Open the file at kFromPath for reading...
file, err := os.Open(fromPath)
if err != nil {
return 0, 0, checksum, err
}
// Automatically close the file when exiting this method.
defer file.Close()
// Create the buffer that will be used for reading a chunk of maxBufSize bytes at a time.
buffer := make([]byte, maxBufSize)
// Number of bytes read for each file.Read()
bytesRead := 0
// Create the MD5 hasher so we can calculate the MD5 value on the fly...
hasher := md5.New()
hasher.Reset()
// Loop through the file, reading chunks of data, and providing each chunk to doWrite().
for {
// Read a chuck of bytes from the file...
// If the length of the file is not a whole multiple of the buffer size,
// the last iteration will read the remaining number of bytes into the buffer.
bytesRead, err = file.Read(buffer)
if err != nil {
if err != io.EOF {
return 0, 0, checksum, err
}
// All done reading the file.
break
}
// Update the cumulative counters...
nBytesRead += uint64(bytesRead)
nChunks++
// Update the MD5 hasher...
hasher.Write(buffer[:bytesRead])
if doWrite != nil {
bytesWritten, errw := doWrite(buffer[:bytesRead])
if errw != nil {
return 0, 0, checksum, errw
}
if bytesWritten != uint64(bytesRead) {
return 0, 0, checksum, fmt.Errorf("the number of bytes read %d does not "+
"equal the number of bytes writted %d\n", bytesRead, bytesWritten)
}
}
}
// Generate the MD5 hex string...
checksum = hex.EncodeToString(hasher.Sum(nil))
return nBytesRead, nChunks, checksum, nil
}
/**********************************************************************************************************************/
/************************************************** FILEINFO **********************************************************/
// Type FileInfo
type FileInfo struct {
Name string
Size uint64
Mode os.FileMode
ModTime time.Time
IsDir bool
Checksum string
Path string
}
// Function NewFileInfo
func NewFileInfo(finfo os.FileInfo, path string, checksum string) FileInfo {
ffi := FileInfo{
Name: finfo.Name(),
Size: uint64(finfo.Size()),
Mode: finfo.Mode(),
ModTime: finfo.ModTime().UTC(),
IsDir: finfo.IsDir(),
Checksum: checksum,
Path: path}
return ffi
}
/**********************************************************************************************************************/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment