|
package main |
|
|
|
import ( |
|
"bufio" |
|
"fmt" |
|
"log" |
|
"math/rand" |
|
"os" |
|
"regexp" |
|
"strings" |
|
"sync" |
|
"time" |
|
) |
|
|
|
func main() { |
|
// Get handle for file |
|
file, err := os.Open("example.log") |
|
if err != nil { |
|
// Fail hard if it didn't work |
|
log.Fatal(err) |
|
} |
|
// Ensure file is closed once main() returns (upon program exit) |
|
defer file.Close() |
|
|
|
// Number of workers a.k.a. goroutimes/threads (this should be at least the # of CPU cores) |
|
numWorkers := 4 |
|
// Mutex to manage the goroutimes/workers |
|
var wg sync.WaitGroup |
|
// Tell mutex how many goroutimes/workers there are to keep track of |
|
wg.Add(numWorkers) |
|
// Channel for fanning-out work items to be processed (cross-goroutine/cross-thread communication) |
|
workc := make(chan string) |
|
|
|
// Establish map and slice variables and mutex (semaphore) to protect concurrent access to them from multiple simultaneous goroutines |
|
dateDict := make(map[string]int) |
|
ipsToInvestigate := make([]string, 0) |
|
mu := sync.Mutex{} |
|
|
|
// Compile regexp expression for IP matching |
|
numBlock := "(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])" |
|
regexPattern := numBlock + "\\." + numBlock + "\\." + numBlock + "\\." + numBlock |
|
ipRegEx := regexp.MustCompile(regexPattern) |
|
|
|
// Start workers |
|
for i := 0; i < numWorkers; i++ { |
|
// Run each worker in a separate goroutine |
|
go func(i int) { |
|
// Loop over whatever comes out of the work channel, will block until channel is closed |
|
for w := range workc { |
|
// Note when something was received by a worker |
|
log.Printf("Worker %d working line %s\n", i, w) |
|
|
|
// DO WORK HERE! |
|
|
|
// Find date by taking substring of line |
|
date := w[0:10] |
|
|
|
// Find any IPs |
|
matches := ipRegEx.FindAllString(w, -1) |
|
|
|
// Take the mutex and update the variable(s), release mutex when finished |
|
mu.Lock() |
|
dateDict[date] += 1 |
|
ipsToInvestigate = append(ipsToInvestigate, matches...) |
|
mu.Unlock() |
|
|
|
// Simulate work that takes a variable amount of time (0s<=time<=5s in this example) |
|
time.Sleep(time.Duration(rand.Intn(5)) * time.Second) |
|
} |
|
|
|
log.Printf("Worker %d stopping\n", i) |
|
// Once channel closes, tell mutex that this thread is exiting: |
|
wg.Done() |
|
}(i) |
|
} |
|
|
|
// Feed the workers using an I/O buffer |
|
scanner := bufio.NewScanner(file) |
|
// Loop over lines in the file |
|
for scanner.Scan() { |
|
// For each line of text in the file, push the line into the channel (one at a time) |
|
// Lines will be pushed into the channel and pulled out of the channel sequentially, but work may not complete sequentially as multiple goroutines are vying for each work item |
|
workc <- scanner.Text() |
|
} |
|
// Once finished looping over the file lines, close the channel |
|
// This signals to the workers to stop 'working' |
|
// The last value can still be pulled out of a channel even after it's closed |
|
close(workc) |
|
|
|
// Make sure there were no errors reading the input file |
|
if err := scanner.Err(); err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
// Wait for any last work to be finished |
|
// Will block until all goroutines/workers have called wg.Done() |
|
// This is critical, because without it the entire program could exit before all the worker goroutines return/finish their processing |
|
wg.Wait() |
|
log.Println("All workers stopped successfully") |
|
|
|
// Print the results |
|
fmt.Println("===============\nRESULTS:") |
|
|
|
// Loop over map (note that map keys are not sorted/sortable) |
|
for k, v := range dateDict { |
|
fmt.Printf("Date %s had %d occurances\n", k, v) |
|
} |
|
|
|
// Operations on string slice |
|
fmt.Printf("IPs (length is %d):\n", len(ipsToInvestigate)) |
|
fmt.Println(strings.Join(ipsToInvestigate, ", ")) |
|
} |