Skip to content

Instantly share code, notes, and snippets.

@samdphillips
Last active November 15, 2021 23:38
Show Gist options
  • Save samdphillips/974df804ca07d8a5323d2a67d8613fcc to your computer and use it in GitHub Desktop.
Save samdphillips/974df804ca07d8a5323d2a67d8613fcc to your computer and use it in GitHub Desktop.
Partitioning a Racket stream to streams with channels.
#lang racket/base
(require racket/async-channel
racket/promise
racket/stream)
;; This version correctly does not eagerly consume the source stream
;; - This is not kill-safe
;; - Infinite streams and non matching predicate will probably exhaust memory
(define eos (list 'eos))
(define (eos? v) (eq? v eos))
(define (ch-stream-cur st)
(force (ch-stream-cur/p st)))
(define (ch-stream-rest st)
(ch-stream-cur st)
(force (ch-stream-rest/p st)))
(struct ch-stream (cur/p rest/p)
#:methods gen:stream
[(define (stream-empty? st)
(eos? (ch-stream-cur st)))
(define (stream-first st)
(define cur (ch-stream-cur st))
(when (eos? cur)
(error 'stream-first "end of stream"))
cur)
(define (stream-rest st)
(ch-stream-rest st))])
(define (make-ch-stream ach rch)
(ch-stream
(delay/sync (get-cur-element ach rch))
(delay/sync (make-ch-stream ach rch))))
(define (get-cur-element ach rch)
(cond
[(async-channel-try-get ach) => values]
[else
(channel-put rch #t)
(async-channel-get ach)]))
(define (stream-partition pred? st)
(define lch (make-async-channel))
(define rch (make-async-channel))
(define lreq (make-channel))
(define rreq (make-channel))
(define (splitter st lwant? rwant?)
(cond
[(stream-empty? st)
(async-channel-put lch eos)
(async-channel-put rch eos)]
[(or lwant? rwant?)
(define v (stream-first st))
(cond
[(pred? v)
(async-channel-put lch v)
(splitter (stream-rest st) #f rwant?)]
[else
(async-channel-put rch v)
(splitter (stream-rest st) lwant? #f)])]
[else
(sync (handle-evt lreq (lambda (x) (splitter st #t #f)))
(handle-evt rreq (lambda (x) (splitter st #f #t))))]))
(thread (lambda () (splitter st #f #f)))
(values (make-ch-stream lch lreq)
(make-ch-stream rch rreq)))
(define-values (s1 s2)
(stream-partition
(lambda (x)
(displayln x)
#f)
'(1 2 3)))
(stream-empty? s2)
#;
(stream-empty? s1)
#lang racket/base
(require racket/async-channel
racket/stream)
;; WARNING: at present this will consume the whole stream which is probably not what you want.
;; - This is not kill-safe
;; - Infinite streams and non matching predicate will probably exhaust memory
(define eos (list 'eos))
(define (eos? v) (eq? v eos))
(define unforced (list 'unforced))
(define (unforced? s) (eq? (ch-stream-cur s) unforced))
(define (force-ch-stream! stream)
(when (unforced? stream)
(set-ch-stream-cur! stream (async-channel-get (ch-stream-ch stream)))))
(struct ch-stream (ch [cur #:mutable])
#:methods gen:stream
[(define (stream-empty? stream)
(force-ch-stream! stream)
(eos? (ch-stream-cur stream)))
(define (stream-first stream)
(force-ch-stream! stream)
(when (eos? (ch-stream-cur stream))
(error 'stream-first "end of stream"))
(ch-stream-cur stream))
(define (stream-rest stream)
(force-ch-stream! stream)
(ch-stream (ch-stream-ch stream) unforced))])
(define (stream-partition pred? st)
(define lch (make-async-channel))
(define rch (make-async-channel))
(define (splitter st)
(cond
[(stream-empty? st)
(async-channel-put lch eos)
(async-channel-put rch eos)]
[else
(define v (stream-first st))
(async-channel-put (if (pred? v) lch rch) v)
(splitter (stream-rest st))]))
(thread (lambda () (splitter st)))
(values (ch-stream lch unforced)
(ch-stream rch unforced)))
(let-values ([(x y)
(stream-partition
(lambda (x)
(displayln "prop")
#f)
'(1 2 3))])
(displayln (stream-empty? x))
(displayln (stream-empty? y)))
#lang racket/base
(require racket/async-channel
racket/promise
racket/stream)
;; - Same as before but with promises
;; - This is not kill-safe
;; - Infinite streams and non matching predicate will probably exhaust memory
(define eos (list 'eos))
(define (eos? v) (eq? v eos))
(define (ch-stream-cur st)
(force (ch-stream-promise st)))
(struct ch-stream (ch promise)
#:methods gen:stream
[(define (stream-empty? st)
(eos? (ch-stream-cur st)))
(define (stream-first st)
(define cur (ch-stream-cur st))
(when (eos? cur)
(error 'stream-first "end of stream"))
cur)
(define (stream-rest st)
(ch-stream-cur st)
(make-ch-stream (ch-stream-ch st)))])
(define (make-ch-stream ch)
(ch-stream ch (delay (async-channel-get ch))))
(define (stream-partition pred? st)
(define lch (make-async-channel))
(define rch (make-async-channel))
(define (splitter st)
(cond
[(stream-empty? st)
(async-channel-put lch eos)
(async-channel-put rch eos)]
[else
(define v (stream-first st))
(async-channel-put (if (pred? v) lch rch) v)
(splitter (stream-rest st))]))
(thread (lambda () (splitter st)))
(values (make-ch-stream lch)
(make-ch-stream rch)))
(let-values ([(x y)
(stream-partition
(lambda (x)
(displayln x)
#f)
'(1 2 3))])
(displayln (stream-empty? x))
(displayln (stream-empty? y)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment