Last active
November 15, 2021 23:38
-
-
Save samdphillips/974df804ca07d8a5323d2a67d8613fcc to your computer and use it in GitHub Desktop.
Partitioning a Racket stream to streams with channels.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket/base | |
(require racket/async-channel | |
racket/promise | |
racket/stream) | |
;; This version correctly does not eagerly consume the source stream | |
;; - This is not kill-safe | |
;; - Infinite streams and non matching predicate will probably exhaust memory | |
(define eos (list 'eos)) | |
(define (eos? v) (eq? v eos)) | |
(define (ch-stream-cur st) | |
(force (ch-stream-cur/p st))) | |
(define (ch-stream-rest st) | |
(ch-stream-cur st) | |
(force (ch-stream-rest/p st))) | |
(struct ch-stream (cur/p rest/p) | |
#:methods gen:stream | |
[(define (stream-empty? st) | |
(eos? (ch-stream-cur st))) | |
(define (stream-first st) | |
(define cur (ch-stream-cur st)) | |
(when (eos? cur) | |
(error 'stream-first "end of stream")) | |
cur) | |
(define (stream-rest st) | |
(ch-stream-rest st))]) | |
(define (make-ch-stream ach rch) | |
(ch-stream | |
(delay/sync (get-cur-element ach rch)) | |
(delay/sync (make-ch-stream ach rch)))) | |
(define (get-cur-element ach rch) | |
(cond | |
[(async-channel-try-get ach) => values] | |
[else | |
(channel-put rch #t) | |
(async-channel-get ach)])) | |
(define (stream-partition pred? st) | |
(define lch (make-async-channel)) | |
(define rch (make-async-channel)) | |
(define lreq (make-channel)) | |
(define rreq (make-channel)) | |
(define (splitter st lwant? rwant?) | |
(cond | |
[(stream-empty? st) | |
(async-channel-put lch eos) | |
(async-channel-put rch eos)] | |
[(or lwant? rwant?) | |
(define v (stream-first st)) | |
(cond | |
[(pred? v) | |
(async-channel-put lch v) | |
(splitter (stream-rest st) #f rwant?)] | |
[else | |
(async-channel-put rch v) | |
(splitter (stream-rest st) lwant? #f)])] | |
[else | |
(sync (handle-evt lreq (lambda (x) (splitter st #t #f))) | |
(handle-evt rreq (lambda (x) (splitter st #f #t))))])) | |
(thread (lambda () (splitter st #f #f))) | |
(values (make-ch-stream lch lreq) | |
(make-ch-stream rch rreq))) | |
(define-values (s1 s2) | |
(stream-partition | |
(lambda (x) | |
(displayln x) | |
#f) | |
'(1 2 3))) | |
(stream-empty? s2) | |
#; | |
(stream-empty? s1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket/base | |
(require racket/async-channel | |
racket/stream) | |
;; WARNING: at present this will consume the whole stream which is probably not what you want. | |
;; - This is not kill-safe | |
;; - Infinite streams and non matching predicate will probably exhaust memory | |
(define eos (list 'eos)) | |
(define (eos? v) (eq? v eos)) | |
(define unforced (list 'unforced)) | |
(define (unforced? s) (eq? (ch-stream-cur s) unforced)) | |
(define (force-ch-stream! stream) | |
(when (unforced? stream) | |
(set-ch-stream-cur! stream (async-channel-get (ch-stream-ch stream))))) | |
(struct ch-stream (ch [cur #:mutable]) | |
#:methods gen:stream | |
[(define (stream-empty? stream) | |
(force-ch-stream! stream) | |
(eos? (ch-stream-cur stream))) | |
(define (stream-first stream) | |
(force-ch-stream! stream) | |
(when (eos? (ch-stream-cur stream)) | |
(error 'stream-first "end of stream")) | |
(ch-stream-cur stream)) | |
(define (stream-rest stream) | |
(force-ch-stream! stream) | |
(ch-stream (ch-stream-ch stream) unforced))]) | |
(define (stream-partition pred? st) | |
(define lch (make-async-channel)) | |
(define rch (make-async-channel)) | |
(define (splitter st) | |
(cond | |
[(stream-empty? st) | |
(async-channel-put lch eos) | |
(async-channel-put rch eos)] | |
[else | |
(define v (stream-first st)) | |
(async-channel-put (if (pred? v) lch rch) v) | |
(splitter (stream-rest st))])) | |
(thread (lambda () (splitter st))) | |
(values (ch-stream lch unforced) | |
(ch-stream rch unforced))) | |
(let-values ([(x y) | |
(stream-partition | |
(lambda (x) | |
(displayln "prop") | |
#f) | |
'(1 2 3))]) | |
(displayln (stream-empty? x)) | |
(displayln (stream-empty? y))) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket/base | |
(require racket/async-channel | |
racket/promise | |
racket/stream) | |
;; - Same as before but with promises | |
;; - This is not kill-safe | |
;; - Infinite streams and non matching predicate will probably exhaust memory | |
(define eos (list 'eos)) | |
(define (eos? v) (eq? v eos)) | |
(define (ch-stream-cur st) | |
(force (ch-stream-promise st))) | |
(struct ch-stream (ch promise) | |
#:methods gen:stream | |
[(define (stream-empty? st) | |
(eos? (ch-stream-cur st))) | |
(define (stream-first st) | |
(define cur (ch-stream-cur st)) | |
(when (eos? cur) | |
(error 'stream-first "end of stream")) | |
cur) | |
(define (stream-rest st) | |
(ch-stream-cur st) | |
(make-ch-stream (ch-stream-ch st)))]) | |
(define (make-ch-stream ch) | |
(ch-stream ch (delay (async-channel-get ch)))) | |
(define (stream-partition pred? st) | |
(define lch (make-async-channel)) | |
(define rch (make-async-channel)) | |
(define (splitter st) | |
(cond | |
[(stream-empty? st) | |
(async-channel-put lch eos) | |
(async-channel-put rch eos)] | |
[else | |
(define v (stream-first st)) | |
(async-channel-put (if (pred? v) lch rch) v) | |
(splitter (stream-rest st))])) | |
(thread (lambda () (splitter st))) | |
(values (make-ch-stream lch) | |
(make-ch-stream rch))) | |
(let-values ([(x y) | |
(stream-partition | |
(lambda (x) | |
(displayln x) | |
#f) | |
'(1 2 3))]) | |
(displayln (stream-empty? x)) | |
(displayln (stream-empty? y))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment