-
-
Save costela/bae6460b4c802f11387bcb10a77e8b07 to your computer and use it in GitHub Desktop.
package main | |
import ( | |
"context" | |
"log" | |
"net/http" | |
"sync/atomic" | |
"time" | |
) | |
var counter int64 | |
func handler(w http.ResponseWriter, r *http.Request) { | |
log.Print("Request received.") | |
defer log.Print("Request done.") | |
ctx := r.Context() // If we generate error consider context.WithCancel | |
publisherChan := streamPublisher(ctx) | |
resultChan := make(chan []byte) | |
go writeResult(resultChan, w) | |
for value := range publisherChan { | |
select { | |
case resultChan <- value: | |
case <-r.Context().Done(): // Client has closed the socket. | |
log.Print("Request is Done.") | |
close(resultChan) // Close the result channel to exit the writer. | |
return | |
case <-time.After(1 * time.Second): // The socket is not writeable in given timeout, quit | |
log.Print("Output channel is not writable for 1 second. Close and exit.") | |
close(resultChan) // Close the result channel to exit the writer. | |
return | |
} | |
} | |
} | |
func writeResult(resultChan chan []byte, w http.ResponseWriter) { | |
atomic.AddInt64(&counter, 1) | |
defer atomic.AddInt64(&counter, -1) | |
for r := range resultChan { | |
log.Printf("writing %s", r) | |
w.Write(r) | |
} | |
log.Print("Write Result is done.") | |
} | |
func streamPublisher(ctx context.Context) chan []byte { | |
dst := make(chan []byte) | |
randomContent := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.") | |
go func() { // publisher of sequential numbers | |
defer log.Print("Exit publisher routine.") | |
for { | |
select { | |
case <-ctx.Done(): // the request is closed. | |
log.Print("publisher detected ctx.Done.") | |
close(dst) | |
return | |
case dst <- randomContent: | |
// log.Print("writing") | |
} | |
} | |
}() | |
return dst | |
} | |
func main() { | |
mux := http.NewServeMux() | |
mux.HandleFunc("/", handler) | |
s := &http.Server{ | |
Addr: ":8080", | |
Handler: mux, | |
ReadTimeout: 10 * time.Second, | |
// WriteTimeout: 10 * time.Second, | |
MaxHeaderBytes: 1 << 20, | |
} | |
go func() { | |
for ; ; <-time.Tick(2 * time.Second) { | |
log.Printf("#writeResult = %d", counter) | |
} | |
}() | |
log.Fatal(s.ListenAndServe()) | |
} |
Assuming you know this much better.
Is request cancel via context ok solution?
There is your code with cancel instead of return that too works on MacOS and is not returning before releasing the writer.
package main
import (
"context"
"log"
"net/http"
"sync"
"sync/atomic"
"time"
)
const timeout = 2 * time.Second
var counter int64
func handler(w http.ResponseWriter, r *http.Request) {
log.Print("Request handler started.")
defer log.Print("Request handler done.")
var writerWait sync.WaitGroup
writerWait.Add(1)
defer writerWait.Wait()
defer log.Print("Waiting writer to exit...")
ctx, cancel := context.WithCancel(r.Context())
r = r.WithContext(ctx)
defer cancel()
defer log.Print("Cancelling request.")
publisherChan := streamPublisher(ctx)
resultChan := make(chan []byte)
defer close(resultChan)
defer log.Print("Closing writer channel.")
go writeResult(resultChan, w, &writerWait)
for value := range publisherChan {
select {
case resultChan <- value:
case <-r.Context().Done(): // Client has closed the socket.
log.Print("Request is Done.")
return
case <-time.After(timeout): // The socket is not writeable in given timeout, quit
log.Print("Output channel is not writable for timeout. Exiting....")
return
}
}
}
func writeResult(resultChan chan []byte, w http.ResponseWriter, writer *sync.WaitGroup) {
atomic.AddInt64(&counter, 1)
defer atomic.AddInt64(&counter, -1)
defer writer.Done()
for r := range resultChan {
log.Printf("writing %s", r)
w.Write(r)
}
log.Print("Writer done.")
}
func streamPublisher(ctx context.Context) chan []byte {
dst := make(chan []byte)
randomContent := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.")
go func() { // publisher of sequential numbers
defer close(dst)
defer log.Print("Exit publisher routine.")
var i uint64 = 0
for {
select {
case <-ctx.Done(): // the request is closed.
log.Print("Publisher detected request context Done. Exiting...")
return
case dst <- randomContent:
i++
}
}
}()
return dst
}
func serve(srv *http.Server) {
log.Fatal(srv.ListenAndServe())
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", handler)
s := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 10 * time.Second,
// WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
go func() {
for ; ; <-time.Tick(2 * time.Second) {
log.Printf("#writeResult = %d", counter)
}
}()
log.Fatal(s.ListenAndServe())
}
By the way both approaches work on MacOS i.e. the code you provided works fine on MacOS and the writeResult goes back to zero in couple of seconds. I guess other OSs may be behaving differently.
Here is the output just in case
2021/03/30 16:12:02 writing Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
2021/03/30 16:12:02 #writeResult = 1
2021/03/30 16:12:04 Output channel is not writable for timeout. Exiting....
2021/03/30 16:12:04 Closing writer channel.
2021/03/30 16:12:04 Cancelling request.
2021/03/30 16:12:04 Waiting writer to exit...
2021/03/30 16:12:04 Publisher detected request context Done. Exiting...
2021/03/30 16:12:04 Exit publisher routine.
2021/03/30 16:12:04 #writeResult = 1
2021/03/30 16:12:06 #writeResult = 1
2021/03/30 16:12:07 Writer done.
2021/03/30 16:12:07 Request handler done.
2021/03/30 16:12:08 #writeResult = 0
Ok on Linux neither variant works. I guess it will be stuck on the slow consumer until one chunk of data is consumed and the buffers have room for one more.
It is OS dependent behavior.
PS Confirmed. Linux waits until some data gets confirmed and the next write cycles comes around. This is pretty bad.
PSS out of curiosity I will check Windows. I guess this is the underlying implementation of the IO which evidently is not good on Linux. It will be interesting to see http/2 where application layer may handle this
I does not work on Windows.
So it is simply luck I tested on MacOS first.
Original code here