Skip to content

Instantly share code, notes, and snippets.

@kevinschoon
Last active February 19, 2016 22:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinschoon/b0bb23c92b51e6d8d60d to your computer and use it in GitHub Desktop.
Save kevinschoon/b0bb23c92b51e6d8d60d to your computer and use it in GitHub Desktop.
Demonstration of concurrency using Go routines with shared lock plus error and signal handling.
/*
Demonstration of concurrency using Go routines with shared lock plus error and signal handling.
*/
package main
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/signal"
"strings"
"sync"
"time"
)
const (
PollingInterval time.Duration = 100 * time.Millisecond // Run every 10 times per second
WorkerCount int = 3 // Launch 3 routines
)
type Reader struct {
words []string // Array of words we have processed thus far
lock sync.RWMutex // Mutex for working with other routines
}
/*
Read a random word from /usr/share/dict/words and
append it to the words array, if the word already exists
return an error.
*/
func (reader *Reader) Read(workerName string) error {
reader.lock.Lock() // Prevent other routines from running this call
defer reader.lock.Unlock() // Unlock at the very end
data, err := ioutil.ReadFile("/usr/share/dict/words") // Open and read the Unix standard words file
if err != nil { // File could not be opened, Windows?
return err
}
rand.Seed(int64(time.Now().Nanosecond())) // Seed randomness
words := strings.Split(string(data), "\n") // Split the file into an array separated by the new line character
choice := words[rand.Intn(len(words))] // Randomly choose a word from the array
for _, word := range reader.words { // Check each word already saved
if choice == word { // The word already exists, return an error
return fmt.Errorf("Word %s already exists", choice)
}
}
reader.words = append(reader.words, choice) // Add the chosen word to our array
fmt.Printf("%s: Added %s to word list (%d of %d) words\n", workerName, choice, len(reader.words), len(words))
return nil
}
/*
Each Worker runs in it's own Go routine and shares a Reader object
*/
type Worker struct {
name string // Name of this worker
err error // The last error the worker encountered
reader *Reader // Shared reader object
}
/*
Each worker will loop waiting for a time.Tick message specified by PollingInterval or
will handle a shutdown signal. If an error occurs the worker will assign it to itself and then
send an interrupt to all remaining workers.
*/
func (worker *Worker) Run(ticker <-chan time.Time, shutdown chan os.Signal, waitGroup *sync.WaitGroup) {
defer waitGroup.Done() // Signal the worker has finished at the very end
for {
select {
case <-ticker: // Polling interval exceeded
err := worker.reader.Read(worker.name) // Perform the work
if err != nil { // An error occurred
worker.err = err // Assign error to worker
for i := 0; i < WorkerCount-1; i++ { // Send interrupt to other workers
shutdown <- os.Interrupt // TODO: Race condition exists here if a another failure occurs during this time
}
return
}
case sig := <-shutdown: // Received a shutdown signal
fmt.Printf("%s Caught signal: %s \n", worker.name, sig.String())
return
}
}
}
func main() {
reader := &Reader{words: make([]string, 0), lock: sync.RWMutex{}} // Shared reader object
shutdown := make(chan os.Signal) // Shutdown channel
workers := make([]*Worker, 0) // Array of workers
waitGroup := &sync.WaitGroup{} // WaitGroup (block until all workers have called waitGroup.Done())
waitGroup.Add(WorkerCount) // Number of calls to waitGroup.Done() to wait for
signal.Notify(shutdown, os.Interrupt) // Catch Interrupt signals (ctrl-c)
for i := 0; i < WorkerCount; i++ { // Create n workers specified by WorkerCount
worker := &Worker{name: fmt.Sprintf("Worker%d", i), reader: reader} // Create the workers
workers = append(workers, worker) // Append each worker to an array to later check for errors
go worker.Run(time.Tick(PollingInterval), shutdown, waitGroup) // Run the Go routine
}
waitGroup.Wait() // Wait for all workers to call waitGroup.Done()
for _, worker := range workers { // Check each worker for an error
if worker.err != nil {
fmt.Printf("%s: %s\n", worker.name, worker.err.Error())
os.Exit(1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment