Skip to content

Instantly share code, notes, and snippets.

@Idorobots
Last active June 20, 2019 19:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Idorobots/84efbab01276ac30d9c74fd60f42c6f1 to your computer and use it in GitHub Desktop.
Save Idorobots/84efbab01276ac30d9c74fd60f42c6f1 to your computer and use it in GitHub Desktop.
Reactive streams?
(define-struct subscriber (on-subscription on-next on-error on-complete))
(define-struct subscription (on-request on-cancel))
(define-struct publisher (on-subscriber on-publish on-error on-complete))
(define (subscription on-request on-cancel)
(make-subscription on-request on-cancel))
(define (request sub n)
((subscription-on-request sub) n))
(define (cancel sub)
((subscription-on-cancel sub)))
(define (subscriber on-subscription on-next on-error on-complete)
(make-subscriber on-subscription on-next on-error on-complete))
(define (on-subscription s sub)
((subscriber-on-subscription s) sub))
(define (on-next s v)
((subscriber-on-next s) v))
(define (on-error s e)
((subscriber-on-error s) e))
(define (on-complete s)
((subscriber-on-complete s)))
(define (publisher on-subscriber on-publish on-error on-complete)
(make-publisher on-subscriber on-publish on-error on-complete))
(define (subscribe p s)
((publisher-on-subscriber p) s))
(define (publish p v)
((publisher-on-publish p) v))
(define (error p e)
((publisher-on-error p) e))
(define (complete p)
((publisher-on-complete p)))
;; A useful publisher
(define-struct subscription-data (demand queue) #:mutable)
(define (queueing-multi-publisher)
(let ((subscribers '()))
(publisher (lambda (s)
(unless (member s (map car subscribers))
(let* ((data (make-subscription-data 0 '()))
(sub (subscription
(lambda (count)
(let* ((q (subscription-data-queue data))
(len (length q)))
(cond ((>= len count)
(map (curry on-next s)
(reverse (drop q (- len count))))
(set-subscription-data-queue! data (take q (- len count))))
((> len 0)
(map (curry on-next s)
(reverse q))
(set-subscription-data-queue! data '())
(set-subscription-data-demand! data (- count len)))
('else
(set-subscription-data-demand! data (+ count (subscription-data-demand data)))))))
(lambda ()
(set! subscribers
(filter (lambda (e)
(not (equal? s (car e))))
subscribers))))))
(set! subscribers (cons (cons s data) subscribers))
(on-subscription s sub))))
(lambda (v)
(map (lambda (e)
(let* ((s (car e))
(data (cdr e))
(demand (subscription-data-demand data)))
(if (> demand 0)
(begin (on-next s v)
(set-subscription-data-demand! data (- demand 1)))
(set-subscription-data-queue! data (cons v (subscription-data-queue data))))))
subscribers))
(lambda (error)
(map (lambda (e)
(on-error (car e) error))
subscribers)
(set! subscribers '()))
(lambda ()
(map (compose on-complete car) subscribers)
(set! subscribers '())))))
;; Actual streams
(define-struct stream-stage (subscriber publisher))
(define (source-from-publisher p)
(make-stream-stage '() p))
(define (flow f)
(let* ((in-sub '())
(out-sub '())
(p (publisher (lambda (s)
(set! out-sub s)
(on-subscription s (subscription
(lambda (count)
(request in-sub count))
(lambda ()
(cancel in-sub)))))
(lambda (v)
(on-next out-sub v))
(lambda (e)
(on-error out-sub))
(lambda ()
(on-complete out-sub))))
(s (subscriber (lambda (s)
(set! in-sub s))
(lambda (v)
(publish p (f v)))
(lambda (e)
(error p e))
(lambda ()
(complete p)))))
(make-stream-stage s p)))
(define (sink-foreach f)
(make-stream-stage (subscriber (curry (flip request) 123423425564889876543) ; All the thing!
f
f
identity)
'()))
(define (via source flow)
(subscribe (stream-stage-publisher source)
(stream-stage-subscriber flow))
flow)
(define to via)
;; Exmaple
(define p (queueing-multi-publisher))
(define s (-> (source-from-publisher p)
(via (flow (lambda (v)
(* 2 v))))
(to (sink-foreach println))))
(publish p 23)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment