Last active
February 19, 2016 22:01
-
-
Save kevinschoon/b0bb23c92b51e6d8d60d to your computer and use it in GitHub Desktop.
Demonstration of concurrency using Go routines with shared lock plus error and signal handling.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Demonstration of concurrency using Go routines with shared lock plus error and signal handling. | |
*/ | |
package main | |
import ( | |
"fmt" | |
"io/ioutil" | |
"math/rand" | |
"os" | |
"os/signal" | |
"strings" | |
"sync" | |
"time" | |
) | |
const ( | |
PollingInterval time.Duration = 100 * time.Millisecond // Run every 10 times per second | |
WorkerCount int = 3 // Launch 3 routines | |
) | |
type Reader struct { | |
words []string // Array of words we have processed thus far | |
lock sync.RWMutex // Mutex for working with other routines | |
} | |
/* | |
Read a random word from /usr/share/dict/words and | |
append it to the words array, if the word already exists | |
return an error. | |
*/ | |
func (reader *Reader) Read(workerName string) error { | |
reader.lock.Lock() // Prevent other routines from running this call | |
defer reader.lock.Unlock() // Unlock at the very end | |
data, err := ioutil.ReadFile("/usr/share/dict/words") // Open and read the Unix standard words file | |
if err != nil { // File could not be opened, Windows? | |
return err | |
} | |
rand.Seed(int64(time.Now().Nanosecond())) // Seed randomness | |
words := strings.Split(string(data), "\n") // Split the file into an array separated by the new line character | |
choice := words[rand.Intn(len(words))] // Randomly choose a word from the array | |
for _, word := range reader.words { // Check each word already saved | |
if choice == word { // The word already exists, return an error | |
return fmt.Errorf("Word %s already exists", choice) | |
} | |
} | |
reader.words = append(reader.words, choice) // Add the chosen word to our array | |
fmt.Printf("%s: Added %s to word list (%d of %d) words\n", workerName, choice, len(reader.words), len(words)) | |
return nil | |
} | |
/* | |
Each Worker runs in it's own Go routine and shares a Reader object | |
*/ | |
type Worker struct { | |
name string // Name of this worker | |
err error // The last error the worker encountered | |
reader *Reader // Shared reader object | |
} | |
/* | |
Each worker will loop waiting for a time.Tick message specified by PollingInterval or | |
will handle a shutdown signal. If an error occurs the worker will assign it to itself and then | |
send an interrupt to all remaining workers. | |
*/ | |
func (worker *Worker) Run(ticker <-chan time.Time, shutdown chan os.Signal, waitGroup *sync.WaitGroup) { | |
defer waitGroup.Done() // Signal the worker has finished at the very end | |
for { | |
select { | |
case <-ticker: // Polling interval exceeded | |
err := worker.reader.Read(worker.name) // Perform the work | |
if err != nil { // An error occurred | |
worker.err = err // Assign error to worker | |
for i := 0; i < WorkerCount-1; i++ { // Send interrupt to other workers | |
shutdown <- os.Interrupt // TODO: Race condition exists here if a another failure occurs during this time | |
} | |
return | |
} | |
case sig := <-shutdown: // Received a shutdown signal | |
fmt.Printf("%s Caught signal: %s \n", worker.name, sig.String()) | |
return | |
} | |
} | |
} | |
func main() { | |
reader := &Reader{words: make([]string, 0), lock: sync.RWMutex{}} // Shared reader object | |
shutdown := make(chan os.Signal) // Shutdown channel | |
workers := make([]*Worker, 0) // Array of workers | |
waitGroup := &sync.WaitGroup{} // WaitGroup (block until all workers have called waitGroup.Done()) | |
waitGroup.Add(WorkerCount) // Number of calls to waitGroup.Done() to wait for | |
signal.Notify(shutdown, os.Interrupt) // Catch Interrupt signals (ctrl-c) | |
for i := 0; i < WorkerCount; i++ { // Create n workers specified by WorkerCount | |
worker := &Worker{name: fmt.Sprintf("Worker%d", i), reader: reader} // Create the workers | |
workers = append(workers, worker) // Append each worker to an array to later check for errors | |
go worker.Run(time.Tick(PollingInterval), shutdown, waitGroup) // Run the Go routine | |
} | |
waitGroup.Wait() // Wait for all workers to call waitGroup.Done() | |
for _, worker := range workers { // Check each worker for an error | |
if worker.err != nil { | |
fmt.Printf("%s: %s\n", worker.name, worker.err.Error()) | |
os.Exit(1) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment