Skip to content

Instantly share code, notes, and snippets.

@timothyekl
Created May 17, 2015 16:59
Show Gist options
  • Save timothyekl/5469767e2728e3b6da7b to your computer and use it in GitHub Desktop.
Save timothyekl/5469767e2728e3b6da7b to your computer and use it in GitHub Desktop.
package main
import (
"time"
)
func IntRangeGenerator(min, max int) <-chan int {
output := make(chan int)
go func() {
for i := min; i < max; i++ {
output <- i
}
close(output)
}()
return output
}
func IntFilter(input <-chan int, filter func(int) bool) <-chan int {
output := make(chan int)
go func() {
for i := range input {
if filter(i) {
output <- i
}
}
close(output)
}()
return output
}
func IntMapper(input <-chan int, mapper func(int) int) <-chan int {
output := make(chan int)
go func() {
for i := range input {
output <- mapper(i)
}
close(output)
}()
return output
}
func IntDelayer(input <-chan int, delay time.Duration) <-chan int {
output := make(chan int)
go func() {
for i := range input {
output <- i
time.Sleep(delay)
}
close(output)
}()
return output
}
func IntAggregator(input <-chan int, interval time.Duration, capacity int) <-chan int {
buffer := make(chan int, capacity)
output := make(chan int)
go func() {
// Read everything into the buffer as quickly as possible
for i := range input {
buffer <- i
}
close(buffer)
}()
go func() {
// Only push values onto the output channel once every interval
forever: for {
select {
case i, ok := <-buffer:
if !ok {
close(output)
break forever
}
output <- i
default:
time.Sleep(interval)
}
}
}()
return output
}
func main() {
generator := IntRangeGenerator(0, 100)
filter := IntFilter(generator, func(i int) bool { return i % 2 == 0 })
halver := IntMapper(filter, func(i int) int { return i / 2 })
delayer := IntDelayer(halver, 100 * time.Millisecond)
aggregator := IntAggregator(delayer, 1 * time.Second, 11)
for i := range aggregator {
println(i)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment