Skip to content

Instantly share code, notes, and snippets.

@ankur22
Last active May 8, 2020 20:14
Show Gist options
  • Save ankur22/bdf03529e24d3fe60eabf5a4d45b0334 to your computer and use it in GitHub Desktop.
Save ankur22/bdf03529e24d3fe60eabf5a4d45b0334 to your computer and use it in GitHub Desktop.
Goroutines, Channels, Contexts, Timers, Errgroups and Waitgroups
lines := make(chan string, 1)
go func() {
defer close(lines)
for {
if !scanner.Scan() {
fmt.Println("Reader: Completed")
break
}
lines <- scanner.Text()
select {
case <-ctx.Done():
fmt.Println("Reader: Context closed")
return
default:
}
}
if err := scanner.Err(); err != nil {
fmt.Printf("reader error - %v\n", err)
}
}()
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Sender: Context closed")
return
case l, ok := <-lines:
if !ok {
fmt.Printf("Sender: Channel closed\n", l)
cancel()
return
}
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
}()
<-ctx.Done()
fmt.Println("Main: Context closed")
ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Caught %s\n", sig)
cancel()
}()
go func() {
fmt.Println("Reading from file")
<-ctx.Done()
fmt.Println("Reader: Context closed")
}()
go func() {
fmt.Println("Sending to remote database")
<-ctx.Done()
fmt.Println("Sender: Context closed")
}()
<-ctx.Done()
fmt.Println("Main: Context closed")
done := ctx.Done()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(lines)
for {
if !scanner.Scan() {
fmt.Println("Reader: Completed")
break
}
lines <- scanner.Text()
time.Sleep(time.Second)
select {
case <-done:
fmt.Println("Reader: Context closed")
return ctx.Err()
default:
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("reader error - %w", err)
}
return nil
})
timeout := time.Millisecond * 500
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()
eg.Go(func() error {
for {
select {
case <-done:
fmt.Println("Sender: Context closed")
return ctx.Err()
case <-timeoutTimer.C:
timeoutTimer.Reset(timeout)
fmt.Println("Sender: Write to channel is taking sometime")
case l, ok := <-lines:
if !ok {
fmt.Printf("Sender: Channel closed\n", l)
return nil
}
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
})
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(lines)
for {
if !scanner.Scan() {
fmt.Println("Reader: Completed")
break
}
lines <- scanner.Text()
select {
case <-ctx.Done():
fmt.Println("Reader: Context closed")
return ctx.Err()
default:
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("reader error - %w", err)
}
return nil
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
fmt.Println("Sender: Context closed")
return ctx.Err()
case l, ok := <-lines:
if !ok {
fmt.Printf("Sender: Channel closed\n", l)
return nil
}
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
})
err = eg.Wait()
if err != nil {
fmt.Printf("error from goroutine - %v\n", err)
}
eg.Go(func() error {
for l := range lines {
select {
case <-ctx.Done():
fmt.Println("Sender: Context closed")
return ctx.Err()
default:
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
return nil
})
eg.Go(func() error {
defer close(lines)
for {
if !scanner.Scan() {
fmt.Println("Reader: Completed")
break
}
lines <- scanner.Text()
time.Sleep(time.Second) // To fake the slow writing to the lines channel
select {
case <-ctx.Done():
fmt.Println("Reader: Context closed")
return ctx.Err()
default:
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("reader error - %w", err)
}
return nil
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
fmt.Println("Sender: Context closed")
return ctx.Err()
case <-time.NewTimer(time.Millisecond * 500).C:
fmt.Println("Sender: Write to channel is taking sometime")
case l, ok := <-lines:
if !ok {
fmt.Printf("Sender: Channel closed\n", l)
return nil
}
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
wg.Done()
close(lines)
}()
for {
if !scanner.Scan() {
fmt.Println("Reader: Completed")
return
}
lines <- scanner.Text()
select {
case <-ctx.Done():
fmt.Println("Reader: Context closed")
return
default:
}
}
if err := scanner.Err(); err != nil {
fmt.Printf("reader error - %v\n", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("Sender: Context closed")
return
case l, ok := <-lines:
if !ok {
fmt.Printf("Sender: Channel closed\n", l)
return
}
fmt.Printf("Sender: Sending %s to remote database\n", l)
}
}
}()
wg.Wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment