Skip to content

Instantly share code, notes, and snippets.

@tonyg
Created January 24, 2015 22:28
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonyg/013529ab408e837b487d to your computer and use it in GitHub Desktop.
Save tonyg/013529ab408e837b487d to your computer and use it in GitHub Desktop.
Experiments with fusable streams and transducers in Racket
#lang racket
;; Fusable Streams, after Coutts, Leshchinskiy and Stewart 2007.
;; Haskell:
;; data Stream a where Stream :: (s -> Step s a) -> s -> Stream a
;; data Step s a = Yield a s | Skip s | Done
;; Clojure transducers support:
;; - early termination
;; - completion cleanup
;; - seed computation and transformation
(require racket/generic)
(require (prefix-in builtin: racket/stream))
(define-generics streamable
(->stream streamable)
#:defaults ([builtin:stream?
(define (->stream s)
(stream (lambda (s k)
(if (builtin:stream-empty? s)
(k)
(k (builtin:stream-first s) (builtin:stream-rest s))))
s))]))
(struct stream (step state)
#:transparent
#:methods gen:streamable [(define (->stream s) s)])
(define-syntax-rule (stream-transformer (step seed) step-exp seed-exp)
(match-lambda [(app ->stream (stream step seed)) (stream step-exp seed-exp)]))
(define-syntax-rule (define-stream-transformer (head step seed) step-exp seed-exp)
(define head (stream-transformer (step seed) step-exp seed-exp)))
(define-stream-transformer [(map_s f) step seed]
(lambda (s k)
(step s (case-lambda [() (k)]
[(s1) (k s1)]
[(v s1) (k (f v) s1)])))
seed)
(define-stream-transformer [(filter_s p) step seed]
(lambda (s k)
(step s (case-lambda [() (k)]
[(s1) (k s1)]
[(v s1) (if (p v) (k v s1) (k s1))])))
seed)
(define (range_s lo hi)
(stream (lambda (n k)
(if (< n hi)
(k n (+ n 1))
(k)))
lo))
(define-stream-transformer [concat_s step seed]
(lambda (s k)
(match s
[(cons #f inner-seed)
(step inner-seed
(case-lambda [() (k)]
[(s1) (k (cons #f s1))]
[(v s1) (k (cons (->stream v) s1))]))]
[(cons (stream step1 seed1) inner-seed)
(step1 seed1
(case-lambda [() (k (cons #f inner-seed))]
[(seed2) (k (cons (stream step1 seed2) inner-seed))]
[(v seed2) (k v (cons (stream step1 seed2) inner-seed))]))]))
(cons #f seed))
(define-stream-transformer [(partition-all_s n) step seed]
(lambda (s k)
(match s
[(list 0 acc seed)
(k (reverse acc) (list n '() seed))]
[(list remaining acc seed)
(step seed
(case-lambda [() (if (null? acc)
(k)
(k (reverse acc) #f))]
[(next-seed) (k (list remaining acc next-seed))]
[(v next-seed) (k (list (- remaining 1) (cons v acc) next-seed))]))]
[#f
(k)]))
(list n '() seed))
(define ((stream-foldl kons knil) s)
(match-define (stream step seed) s)
(let loop ((seed seed) (knil knil))
(step seed
(case-lambda [() knil]
[(next-seed) (loop next-seed knil)]
[(v next-seed) (loop next-seed (kons v knil))]))))
(define stream->list (compose reverse (stream-foldl cons '())))
(define stream->set (stream-foldl (lambda (x xs) (set-add xs x)) (set)))
(stream->list ((compose (partition-all_s 4)
(filter_s (lambda (x) (> x 5)))
concat_s
(map_s (lambda (x) (list x x x))))
(range_s 0 13)))
#lang racket
;; Fusable Streams, after Coutts, Leshchinskiy and Stewart 2007.
;; Haskell:
;; data Stream a where Stream :: (s -> Step s a) -> s -> Stream a
;; data Step s a = Yield a s | Skip s | Done
;; Clojure transducers support:
;; - early termination
;; - completion cleanup
;; - seed computation and transformation
(require racket/generic)
(require (prefix-in builtin: racket/stream))
(define-generics streamable
(->stream streamable)
#:defaults ([list?
(define (->stream xs)
(stream (lambda (xs k)
(if (null? xs)
(k)
(k (car xs) (cdr xs))))
xs))]
[vector?
(define (->stream v)
(define len (vector-length v))
(stream (lambda (i k)
(if (< i len)
(k (vector-ref v i) (+ i 1))
(k)))
0))]
[builtin:stream?
(define (->stream s)
(stream (lambda (s k)
(if (builtin:stream-empty? s)
(k)
(k (builtin:stream-first s) (builtin:stream-rest s))))
s))]))
(struct stream (step state)
#:transparent
#:methods gen:streamable [(define (->stream s) s)])
(define-syntax-rule (stream-transformer (step seed) step-exp seed-exp)
(match-lambda [(app ->stream (stream step seed)) (stream step-exp seed-exp)]))
(define-syntax-rule (define-stream-transformer (head step seed) step-exp seed-exp)
(define head (stream-transformer (step seed) step-exp seed-exp)))
(define-stream-transformer [(map_s f) step seed]
(lambda (s k)
(step s (case-lambda [() (k)]
[(s1) (k s1)]
[(v s1) (k (f v) s1)])))
seed)
(define-stream-transformer [(filter_s p) step seed]
(lambda (s k)
(step s (case-lambda [() (k)]
[(s1) (k s1)]
[(v s1) (if (p v) (k v s1) (k s1))])))
seed)
(define (range_s lo hi)
(stream (lambda (n k)
(if (< n hi)
(k n (+ n 1))
(k)))
lo))
(define-stream-transformer [concat_s step seed]
(lambda (s k)
(match s
[(cons #f inner-seed)
(step inner-seed
(case-lambda [() (k)]
[(s1) (k (cons #f s1))]
[(v s1) (k (cons (->stream v) s1))]))]
[(cons (stream step1 seed1) inner-seed)
(step1 seed1
(case-lambda [() (k (cons #f inner-seed))]
[(seed2) (k (cons (stream step1 seed2) inner-seed))]
[(v seed2) (k v (cons (stream step1 seed2) inner-seed))]))]))
(cons #f seed))
(define-stream-transformer [(partition-all_s n) step seed]
(lambda (s k)
(match s
[(list 0 acc seed)
(k (reverse acc) (list n '() seed))]
[(list remaining acc seed)
(step seed
(case-lambda [() (if (null? acc)
(k)
(k (reverse acc) #f))]
[(next-seed) (k (list remaining acc next-seed))]
[(v next-seed) (k (list (- remaining 1) (cons v acc) next-seed))]))]
[#f
(k)]))
(list n '() seed))
(define ((stream-foldl kons knil) s)
(match-define (stream step seed) s)
(let loop ((seed seed) (knil knil))
(step seed
(case-lambda [() knil]
[(next-seed) (loop next-seed knil)]
[(v next-seed) (loop next-seed (kons v knil))]))))
(define stream->list (compose reverse (stream-foldl cons '())))
(define stream->set (stream-foldl (lambda (x xs) (set-add xs x)) (set)))
;; (stream->list ((compose (partition-all_s 4)
;; (filter_s (lambda (x) (> x 5)))
;; concat_s
;; (map_s (lambda (x) (list x x x))))
;; (range_s 0 13)))
;; (stream->list ((lambda (input)
;; ((match-lambda [(app ->stream (stream step seed))
;; (stream (lambda (s k)
;; (match s
;; [(list 0 acc seed)
;; (k (reverse acc) (list 4 '() seed))]
;; [(list remaining acc seed)
;; (step seed
;; (case-lambda [() (if (null? acc)
;; (k)
;; (k (reverse acc) #f))]
;; [(next-seed) (k (list remaining acc next-seed))]
;; [(v next-seed) (k (list (- remaining 1) (cons v acc) next-seed))]))]
;; [#f
;; (k)]))
;; (list 4 '() seed))])
;; ((match-lambda [(app ->stream (stream step seed))
;; (stream
;; (lambda (s k)
;; (step s (case-lambda [() (k)]
;; [(s1) (k s1)]
;; [(v s1) (if ((lambda (x) (> x 5)) v) (k v s1) (k s1))])))
;; seed)])
;; ((match-lambda [(app ->stream (stream step seed))
;; (stream
;; (lambda (s k)
;; (match s
;; [(cons #f inner-seed)
;; (step inner-seed
;; (case-lambda [() (k)]
;; [(s1) (k (cons #f s1))]
;; [(v s1) (k (cons (->stream v) s1))]))]
;; [(cons (stream step1 seed1) inner-seed)
;; (step1 seed1
;; (case-lambda [() (k (cons #f inner-seed))]
;; [(seed2) (k (cons (stream step1 seed2) inner-seed))]
;; [(v seed2) (k v (cons (stream step1 seed2) inner-seed))]))]))
;; (cons #f seed))])
;; ((match-lambda [(app ->stream (stream step seed))
;; (stream
;; (lambda (s k)
;; (step s (case-lambda [() (k)]
;; [(s1) (k s1)]
;; [(v s1) (k ((lambda (x) (list x x x)) v) s1)])))
;; seed)])
;; input)))))
;; (range_s 0 13)))
;; (stream->list (stream (lambda (s k)
;; (match s
;; [(list 0 acc seed)
;; (k (reverse acc) (list 4 '() seed))]
;; [(list remaining acc seed)
;; ((lambda (s k)
;; ((lambda (s k)
;; (match s
;; [(cons #f inner-seed)
;; ((lambda (s k)
;; ((lambda (n k)
;; (if (< n 13)
;; (k n (+ n 1))
;; (k))) s (case-lambda [() (k)]
;; [(s1) (k s1)]
;; [(v s1) (k ((lambda (x) (list x x x)) v) s1)])))
;; inner-seed
;; (case-lambda [() (k)]
;; [(s1) (k (cons #f s1))]
;; [(v s1) (k (cons (->stream v) s1))]))]
;; [(cons (stream step1 seed1) inner-seed)
;; (step1 seed1
;; (case-lambda [() (k (cons #f inner-seed))]
;; [(seed2) (k (cons (stream step1 seed2) inner-seed))]
;; [(v seed2) (k v (cons (stream step1 seed2) inner-seed))]))]))
;; s
;; (case-lambda [() (k)]
;; [(s1) (k s1)]
;; [(v s1) (if ((lambda (x) (> x 5)) v) (k v s1) (k s1))])))
;; seed
;; (case-lambda [() (if (null? acc)
;; (k)
;; (k (reverse acc) #f))]
;; [(next-seed) (k (list remaining acc next-seed))]
;; [(v next-seed) (k (list (- remaining 1) (cons v acc) next-seed))]))]
;; [#f
;; (k)]))
;; (list 4 '() (cons #f 0))))
;; (stream->list (stream (lambda (s k)
;; (match s
;; [(list 0 acc seed)
;; (k (reverse acc) (list 4 '() seed))]
;; [(list remaining acc seed)
;; (match seed
;; [(cons #f inner-seed)
;; (if (< inner-seed 13)
;; (k (list remaining acc (cons (let ((xs (list inner-seed inner-seed inner-seed)))
;; (stream (lambda (xs k)
;; (if (null? xs)
;; (k)
;; (k (car xs) (cdr xs))))
;; xs))
;; (+ inner-seed 1))))
;; (if (null? acc)
;; (k)
;; (k (reverse acc) #f)))]
;; [(cons (stream step1 seed1) inner-seed)
;; (step1 seed1
;; (case-lambda [() (k (list remaining acc (cons #f inner-seed)))]
;; [(seed2) (k (list remaining acc (cons (stream step1 seed2) inner-seed)))]
;; [(v seed2) (let ((s1 (cons (stream step1 seed2) inner-seed)))
;; (if (> v 5)
;; (k (list (- remaining 1) (cons v acc) s1))
;; (k (list remaining acc s1))))]))])]
;; [#f
;; (k)]))
;; (list 4 '() (cons #f 0))))
;; (reverse
;; (let loop ((seed (list 4 '() (cons #f 0))) (knil '()))
;; (match seed
;; [(list 0 acc seed)
;; (loop (list 4 '() seed) (cons (reverse acc) knil))]
;; [(list remaining acc seed)
;; (match seed
;; [(cons #f inner-seed)
;; (if (< inner-seed 13)
;; (loop (list remaining acc (cons (let ((xs (list inner-seed inner-seed inner-seed)))
;; (stream (lambda (xs k)
;; (if (null? xs)
;; (k)
;; (k (car xs) (cdr xs))))
;; xs))
;; (+ inner-seed 1)))
;; knil)
;; (if (null? acc)
;; knil
;; (loop #f (cons (reverse acc) knil))))]
;; [(cons (stream step1 seed1) inner-seed)
;; (step1 seed1
;; (case-lambda [() (loop (list remaining acc (cons #f inner-seed)) knil)]
;; [(seed2) (loop (list remaining acc (cons (stream step1 seed2) inner-seed)) knil)]
;; [(v seed2)
;; (let ((s1 (cons (stream step1 seed2) inner-seed)))
;; (if (> v 5)
;; (loop (list (- remaining 1) (cons v acc) s1) knil)
;; (loop (list remaining acc s1) knil)))]))])]
;; [#f knil])))
;; (reverse
;; (let loop ((seed (list 4 '() #f 0)) (knil '()))
;; (match seed
;; [(list 0 acc x1 x2)
;; (loop (list 4 '() x1 x2) (cons (reverse acc) knil))]
;; [(list remaining acc #f inner-seed)
;; (if (< inner-seed 13)
;; (loop (list remaining
;; acc
;; (let ((xs (list inner-seed inner-seed inner-seed)))
;; (stream (lambda (xs k)
;; (if (null? xs)
;; (k)
;; (k (car xs) (cdr xs))))
;; xs))
;; (+ inner-seed 1))
;; knil)
;; (if (null? acc)
;; knil
;; (loop #f (cons (reverse acc) knil))))]
;; [(list remaining acc (stream step1 seed1) inner-seed)
;; (step1 seed1
;; (case-lambda [() (loop (list remaining acc #f inner-seed) knil)]
;; [(seed2) (loop (list remaining acc (stream step1 seed2) inner-seed) knil)]
;; [(v seed2)
;; (if (> v 5)
;; (loop (list (- remaining 1) (cons v acc) (stream step1 seed2) inner-seed) knil)
;; (loop (list remaining acc (stream step1 seed2) inner-seed) knil))]))]
;; [#f knil])))
;; (reverse
;; (let loop ((seed (list 4 '() #f 0)) (knil '()))
;; (match seed
;; [(list 0 acc x1 x2)
;; (loop (list 4 '() x1 x2) (cons (reverse acc) knil))]
;; [(list remaining acc #f inner-seed)
;; (if (< inner-seed 13)
;; (loop (list remaining
;; acc
;; (list inner-seed inner-seed inner-seed)
;; (+ inner-seed 1))
;; knil)
;; (if (null? acc)
;; knil
;; (loop #f (cons (reverse acc) knil))))]
;; [(list remaining acc seed1 inner-seed)
;; ((lambda (xs k)
;; (if (null? xs)
;; (k)
;; (k (car xs) (cdr xs))))
;; seed1
;; (case-lambda [() (loop (list remaining acc #f inner-seed) knil)]
;; [(seed2) (loop (list remaining acc seed2 inner-seed) knil)]
;; [(v seed2)
;; (if (> v 5)
;; (loop (list (- remaining 1) (cons v acc) seed2 inner-seed) knil)
;; (loop (list remaining acc seed2 inner-seed) knil))]))]
;; [#f knil])))
;; (reverse
;; (let loop ((seed (list 4 '() #f 0)) (knil '()))
;; (match seed
;; [(list 0 acc x1 x2)
;; (loop (list 4 '() x1 x2) (cons (reverse acc) knil))]
;; [(list remaining acc #f inner-seed)
;; (if (< inner-seed 13)
;; (loop (list remaining
;; acc
;; (list inner-seed inner-seed inner-seed)
;; (+ inner-seed 1))
;; knil)
;; (if (null? acc)
;; knil
;; (loop #f (cons (reverse acc) knil))))]
;; [(list remaining acc seed1 inner-seed)
;; (if (null? seed1)
;; (loop (list remaining acc #f inner-seed) knil)
;; (if (> (car seed1) 5)
;; (loop (list (- remaining 1) (cons (car seed1) acc) (cdr seed1) inner-seed) knil)
;; (loop (list remaining acc (cdr seed1) inner-seed) knil)))]
;; [#f knil])))
;; (reverse
;; (let loop ((state (list 4 '() #f 0)) (knil '()))
;; (match state
;; [(list 0 segment x1 x2)
;; (loop (list 4 '() x1 x2) (cons (reverse segment) knil))]
;; [(list remaining segment #f counter)
;; (if (< counter 13)
;; (loop (list remaining
;; segment
;; (list counter counter counter)
;; (+ counter 1))
;; knil)
;; (if (null? segment)
;; knil
;; (loop #f (cons (reverse segment) knil))))]
;; [(list remaining segment repeats counter)
;; (if (null? repeats)
;; (loop (list remaining segment #f counter) knil)
;; (let ((v (car repeats)))
;; (if (> v 5)
;; (loop (list (- remaining 1) (cons v segment) (cdr repeats) counter) knil)
;; (loop (list remaining segment (cdr repeats) counter) knil))))]
;; [#f knil])))
;; (reverse
;; (let ()
;; (define (loop0 remaining segment repeats counter segments)
;; (cond
;; [(zero? remaining)
;; (loop0 4 '() repeats counter (cons (reverse segment) segments))]
;; [(eq? repeats #f)
;; (cond
;; [(< counter 13)
;; (loop0 remaining segment (list counter counter counter) (+ counter 1) segments)]
;; [(null? segment)
;; segments]
;; [else
;; (loop1 (cons (reverse segment) segments))])]
;; [(null? repeats)
;; (loop0 remaining segment #f counter segments)]
;; [else
;; (let ((v (car repeats)))
;; (if (> v 5)
;; (loop0 (- remaining 1) (cons v segment) (cdr repeats) counter segments)
;; (loop0 remaining segment (cdr repeats) counter segments)))]))
;; (define (loop1 segments)
;; segments)
;; (loop0 4 '() #f 0 '())))
(reverse
(let loop ((remaining 4) (segment '()) (repeats #f) (counter 0) (segments '()))
(cond
[(zero? remaining)
(loop 4 '() repeats counter (cons (reverse segment) segments))]
[(eq? repeats #f)
(cond
[(< counter 13)
(loop remaining segment (list counter counter counter) (+ counter 1) segments)]
[(null? segment)
segments]
[else
(cons (reverse segment) segments)])]
[(null? repeats)
(loop remaining segment #f counter segments)]
[else
(let ((v (car repeats)))
(if (> v 5)
(loop (- remaining 1) (cons v segment) (cdr repeats) counter segments)
(loop remaining segment (cdr repeats) counter segments)))])))
#lang racket
;; Fusable Streams, after Coutts, Leshchinskiy and Stewart 2007.
;; Haskell:
;; data Stream a where Stream :: (s -> Step s a) -> s -> Stream a
;; data Step s a = Yield a s | Skip s | Done
;; Clojure transducers support:
;; - early termination
;; - completion cleanup
;; - seed computation and transformation
(require racket/generic)
(require (prefix-in builtin: racket/stream))
(define-generics streamable
(->stream streamable)
#:defaults ([builtin:stream?
(define (->stream s)
(stream (lambda (s ky ks kd)
(if (builtin:stream-empty? s)
(kd)
(ky (builtin:stream-first s) (builtin:stream-rest s))))
s))]))
(struct stream (step state)
#:transparent
#:methods gen:streamable [(define (->stream s) s)])
(define-syntax-rule (stream-transformer (step seed) step-exp seed-exp)
(match-lambda [(app ->stream (stream step seed)) (stream step-exp seed-exp)]))
(define-syntax-rule (define-stream-transformer (head step seed) step-exp seed-exp)
(define head (stream-transformer (step seed) step-exp seed-exp)))
(define-stream-transformer [(map_s f) step seed]
(lambda (s ky ks kd)
(step s
(lambda (v s1) (ky (f v) s1))
ks
kd))
seed)
(define-stream-transformer [(filter_s p) step seed]
(lambda (s ky ks kd)
(step s
(lambda (v s1) (if (p v) (ky v s1) (ks s1)))
ks
kd))
seed)
(define (range_s lo hi)
(stream (lambda (n ky ks kd)
(if (< n hi)
(ky n (+ n 1))
(kd)))
lo))
(define-stream-transformer [concat_s step seed]
(lambda (s ky ks kd)
(match s
[(cons #f inner-seed)
(step inner-seed
(lambda (v s1) (ks (cons (->stream v) s1)))
(lambda (s1) (ks (cons #f s1)))
kd)]
[(cons (stream step1 seed1) inner-seed)
(step1 seed1
(lambda (v seed2) (ky v (cons (stream step1 seed2) inner-seed)))
(lambda (seed2) (ks (cons (stream step1 seed2) inner-seed)))
(lambda () (ks (cons #f inner-seed))))]))
(cons #f seed))
(define-stream-transformer [(partition-all_s n) step seed]
(lambda (s ky ks kd)
(match s
[(list 0 acc seed)
(ky (reverse acc) (list n '() seed))]
[(list remaining acc seed)
(step seed
(lambda (v next-seed) (ks (list (- remaining 1) (cons v acc) next-seed)))
(lambda (next-seed) (ks (list remaining acc next-seed)))
(lambda ()
(if (null? acc)
(kd)
(ky (reverse acc) #f))))]
[#f
(kd)]))
(list n '() seed))
(define ((stream-foldl kons knil) s)
(match-define (stream step seed) s)
(let loop ((seed seed) (knil knil))
(step seed
(lambda (v next-seed) (loop next-seed (kons v knil)))
(lambda (next-seed) (loop next-seed knil))
(lambda () knil))))
(define stream->list (compose reverse (stream-foldl cons '())))
(define stream->set (stream-foldl (lambda (x xs) (set-add xs x)) (set)))
(stream->list ((compose (partition-all_s 4)
(filter_s (lambda (x) (> x 5)))
concat_s
(map_s (lambda (x) (list x x x))))
(range_s 0 13)))
#lang racket
;; Transducers, after Hickey 2014.
(require racket/generic)
;; (transducer-init t)
;; (transducer-complete t kernel)
;; (transducer-step t kernel item)
(define (map_t f)
(lambda (t)
(case-lambda [() (t)]
[(xs) (t xs)]
[(xs x) (t xs (f x))])))
(define (filter_t f)
(lambda (t)
(case-lambda [() (t)]
[(xs) (t xs)]
[(xs x) (if (f x) (t xs x) xs)])))
(define concat_t
(lambda (t)
(case-lambda [() (t)]
[(xs) (t xs)]
[(xs x) (stream-fold t xs x)])))
(define (transduce t->t t s #:init [init (t)])
(define t1 (t->t t))
(t1 (stream-fold t1 init s)))
(define cons+
(case-lambda [() '()]
[(xs) xs]
[(xs x) (cons x xs)] ;; :-/
))
(define (into sink t->t source)
(transduce t->t cons+ source #:init sink))
(define (partition-all_t n)
(lambda (t)
(define remaining n)
(define acc '())
(case-lambda [() (t)]
[(xs) (if (null? acc)
(t xs)
(t (t xs (reverse acc))))]
[(xs x) (if (= remaining 1)
(begin0 (t xs (reverse (cons x acc)))
(set! remaining n)
(set! acc '()))
(begin0 xs
(set! remaining (- remaining 1))
(set! acc (cons x acc))))])))
(reverse (stream->list (into '()
(compose (map_t (lambda (x) (list x x x)))
concat_t
(filter_t (lambda (x) (> x 5)))
(partition-all_t 4))
(in-range 0 13))))
@dkvasnicka
Copy link

Any plans on making this into an actual library? ;)

@tonyg
Copy link
Author

tonyg commented Jul 14, 2015

No concrete plans! I don't think I understand the finer design points well enough yet.

@halyconic
Copy link

Would you be willing to release your code under a permissive license, say MIT? I would love to learn from your code - I'd like to implement transducers in Guile (under something GPL compatible)

@tonyg
Copy link
Author

tonyg commented Apr 20, 2016

Sure, @halyconic. Let's call it LGPL 3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment