Skip to content

Instantly share code, notes, and snippets.

@rocketnia
Created September 4, 2018 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rocketnia/1c50edf4bc90f106301856c08811bf21 to your computer and use it in GitHub Desktop.
Save rocketnia/1c50edf4bc90f106301856c08811bf21 to your computer and use it in GitHub Desktop.
Stream fusion infrastructure for Racket for discussion with Jack Firth.
#lang racket
; transducers
;
; Data structures, algorithms, and utilities for fusable stream
; manipulations.
;
; Inspired by a conversation with Jack Firth, who was using the
; terminology of "iterators," "collectors," and "transducers," with
; design goals that involved allowing certain clients to minimizing
; allcations and conditional branches during their transducers' loops
; and allowing clients to selectively decide whether to enforce
; encapsulation or to omit those checks for performance.
;
; This was meant to be an exploration of ideas like those, especially
; exploring the use of Racket's `new-∀/c` and `new-∃/c` for
; encapsulation. This module does nothing in service of the
; minimization of conditional branches or the ability to omit contract
; checks for performance, but it does try to minimize allocations in a
; certain way by using multiple-value return.
; Copyright 2018 Ross Angle
;
; Licensed under the Apache License, Version 2.0 (the "License");
; you may not use this file except in compliance with the License.
; You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing,
; software distributed under the License is distributed on an
; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
; either express or implied. See the License for the specific
; language governing permissions and limitations under the License.
; TODO: Write even a single test for this module.
; This module defines a representation for fusable stream operations
; and utilities for creating, consuming, and composing them. There are
; four operations we care about composing:
;
; - Traditional Racket procedures, which take a value to a value.
; - Iterators, which take a value to a stream. (Also known as
; unfolds, anamorphisms, or coinduction.)
; - Collectors, which take a stream to a value. (Similar to folds,
; catamorphisms, or induction, but note that the stream is
; coinductive rather than inductive.)
; - Transducers, which take a stream to a stream. (Similar to
; hylomorphisms.)
;
; Our underlying representations of iterators, collectors, and
; transducers are in terms of procedures which potentially avoid many
; allocations by using multiple-value return. (TODO: However, the
; efficiency gained from this is probably lost in this implementation,
; where we wrap everything in contracts multiple times.)
;
; On top of that, we have a set of `convenient-...` utilities, which
; allow iterators, collectors, and transducers to be used as though
; they don't use multiple-value return at all.
;
; And on top of that, we have a set of `...cps...` utilities, which
; allow them to be converted back and forth to structurally typed
; encodings that might look interesting from an algebraic perspective.
; A careful read of these signatures can be the backbone for
; understanding the whole library:
;
; ; isomorphic to (iterator/c i o)
; ;
; ; Given the input value, returns the output stream, a potentially
; ; infinite stream of values.
; ;
; ; The stream is represented as a state value that consists of
; ; either nothing (because the stream has no elements) or a thunk
; ; that can compute a single element of the output stream and
; ; another state value.
; ;
; ; In more algebraic syntax, assuming all the procedures are total:
; ;
; ; i -> (fix s. 1 + o * s)
; ;
; (-> i (fix/c s (or/c #f (-> (list/c o s)))))
;
; ; isomorphic to (collector/c i o)
; ;
; ; A thunk that returns a state that consists of either the output
; ; value (if it can be determined bofore reading the whole input
; ; stream) or a pair of functions. The first function can be called
; ; with a single element of the input stream to get another state.
; ; The second is a thunk that can be called to signify that there
; ; are no more values in the input stream; it returns the output
; ; value.
; ;
; ; In more algebraic syntax, assuming all the procedures are total:
; ;
; ; fix s. o + (i -> s) * o
; ;
; (->
; (fix/c s
; (or/c
; (struct/c collection-state-finished o)
; (struct/c collection-state-stalled
; (list/c (-> i s) (-> o))))))
;
; ; isomorphic to (transducer/c i o)
; ;
; ; A thunk that returns a state that consists of either nothing (if
; ; the output stream has no elements regardless of the rest of the
; ; input stream), a thunk that can compute an element
; ; of the output stream and another state (if we can determine that
; ; element regardless of the rest of the input stream), or a list
; ; of two functions (if the input stream must be consulted before
; ; determining any more of the output stream).
; ;
; ; The first function can be called with a single element of the
; ; input stream to return another state.
; ;
; ; The second function is a thunk that can be called to signify
; ; that there are no more values in the input stream. It returns a
; ; potentially infinite stream of the remaining elements in the
; ; output stream. This stream is represented the same way as it is
; ; for the structurally typed iterator signature.
; ;
; ; In more algebraic syntax, assuming all the procedures are total:
; ;
; ; fix s. 1 + o * s + (i -> s) * (fix sAfter. 1 + o * sAfter)
; ;
; (->
; (fix/c s
; (or/c
; #f
; (-> (list/c o s))
; (list/c
; (-> i s)
; (->
; (fix/c s-after-end
; (or/c #f (-> (list/c o s-after-end)))))))))
;
;
; WARNING: Deep algebraic stuff ahead that's sort of a wide digression
; from describing the code in this module.
;
; With some refactoring of the algebraic formulations, it seems all of
; these interfaces are transducers between fixed points of different
; polynomials:
;
; iterator i o = constantToStreamTransducer o i
; collector i o = streamToConstantTransducer o i
; transducer i o = streamToStreamTransducer o i
; where
; trivialPolynomial a = 1
; trivial = trivialPolynomial trivial
; constantPolynomial k a = k
; constant k = constantPolynomial k (constant k)
; streamPolynomial elem a = 1 + elem * a
; stream elem = streamPolynomial elem (stream elem)
;
; ; There's only one term in the trivial polynomial, so we can
; ; follow it anytime.
; trivialToTrivialTransducer =
; 1 -> trivialPolynomial trivialToTrivialTransducer
; trivialToConstantTransducer o =
; 1 -> constantPolynomial o (trivialToConstantTransducer o i)
; trivialToStreamTransducer o =
; 1 -> streamPolynomial o (trivialToStreamTransducer o)
;
; ; There's only one term in the constant polynomial, so we can
; ; follow it anytime.
; constantToTrivialTransducer i =
; i -> trivialPolynomial o trivialToTrivialTransducer
; constantToConstantTransducer o i =
; i -> streamPolynomial o (trivialToConstantTransducer o i)
; constantToStreamTransducer o i =
; i -> streamPolynomial o (trivialToStreamTransducer o i)
;
; ; NOTE: This type is not isomorphic to 1. That's because
; ; different values of this type may consume different amounts of
; ; the input stream.
; ;
; streamToTrivialTransducer o i =
;
; ; Behaviors we can follow anytime.
; trivialPolynomial
;
; ; Behaviors we can follow if we know exactly what term of the
; ; stream polynomial we're in, but not anytime.
; + (
; (1 -> trivialToTrivialTransducer o)
; * (i -> streamToTrivialTransducer o i)
; )
;
; streamToConstantTransducer o i =
;
; ; Behaviors we can follow anytime.
; constantPolynomial o (streamToConstantTransducer o i)
;
; ; Behaviors we can follow if we know exactly what term of the
; ; stream polynomial we're in, but not anytime.
; + (
; (1 -> trivialToConstantTransducer o) *
; (i -> streamToConstantTransducer o i)
; )
;
; streamToStreamTransducer o i =
;
; ; Behaviors we can follow anytime.
; streamPolynomial o (streamToStreamTransducer o i)
;
; ; Behaviors we can follow if we know exactly what term of the
; ; stream polynomial we're in, but not anytime.
; + (
; (1 -> trivialToStreamTransducer o)
; * (i -> streamToStreamTransducer o i)
; )
;
; It seems possible to coalesce all those transducers into a general
; operation like so, at least in pseudocode:
;
; iterator i o =
; generalTransducer (streamP o) (constP i) (fix a. constP i a)
; collector i o =
; generalTransducer (constP o) (streamP i) (fix a. streamP i a)
; transducer i o =
; generalTransducer (streamP o) (streamP i) (fix a. streamP i a)
; where
; constP k a = k
; streamP elem a = 1 + elem * a
;
; generalTransducer op ip iRest =
; forall (ap, bp : polynomial).
; isomorphism (ip iRest) (ap iRest + bp iRest)
; -> (
; (ap iRest -> op (generalTransducer op ap iRest))
; * (bp iRest -> op (generalTransducer op bp iRest))
; )
;
; The way we've formulated that is very general, allowing for any
; operation that partitions `(ip iRest)` into `(ap iRest)` and
; `(bp iRest)`. The transducer constructors defined in this module
; only allow two specific partitions: Partitioning into `(ip iRest)`
; and `0` (essentially not partitioning at all); and partitioning a
; stream into `1` and `(elem * iRest)`. Other possible partitions,
; like partitioning a stream into `(1 + elem)` and
; `(elem * elem * iRest)`, are not directly supported.
;
; A reasonable goal is to support a sufficient variety of partitions
; that if a desired hypothetical partition's type could be expressed
; while unrolling the input F-coalgebra's fixed point only a certain
; number of times, then a partition "nearby" to that one can actually
; be implemented. By nearby, we mean the type will not need to be
; unrolled any more times than the desired number, but that the
; partition may sometimes block on information that is unnecessary for
; making its decision.
;
; For instance, partitioning a transducer's input into `(1 + elem)`
; and `(elem + elem + iRest)` isn't directly supported by this module,
; but it is possible to generate the same output by using two
; partitions into `1` and `(elem + iRest)`, which is a program that
; can still be expressed by unrolling the stream polynomial exactly
; twice. Unfortunately, that's two partitions, so it blocks on up to
; two bits of information from the input stream, even though only one
; bit was needed. Specifically, it can't decide that the input stream
; contains less than two elements (`1 + elem`) unless it also sticks
; around long enough to learn exactly what number of elements the
; input stream contains (`1` for zero, or `elem` for one).
;
; By expressing a codata type as a polynomial ADT, it's
; straightforward to define a set of partitions that are "nearby" like
; this; namely, all the partitions that check whether the value is in
; one of the ADT's additive terms or not. The places where this is
; less straightforward are for types where the terms do not have
; canonical forms, or where there are HOTT-style "paths" between them;
; then it's intentionally ambiguous which ADT-style constructor a
; piece of data actually has. A common type (family) like this is the
; type (family) of finite sets.
;
; For transducers between polynomial ADTs, the creation of "nearby"
; transducer libraries could be automated. For transducers involving
; those more exotic types, it seems transducer libraries will need to
; be designed on a case-by-case basis.
; NOTE: We use `define-values` a lot here, so we import `match-define`
; as `define-match` to align with that.
(require (only-in racket/match [match-define define-match]))
(require syntax/parse/define)
; Evergreen contract utilities
;
; TODO: These should probably be provided by a different module.
;
(provide forall/c exists/c fix/c)
; Efficient iterators, collectors, and transducers
(provide
(struct-out iterator)
(struct-out collector)
(struct-out transducer)
iterator-impl/c
iterator/c
collector-impl/c
collector/c
transducer-values-condition/c
transducer-values-condition-after-end/c
transducer-values-state/c
transducer-values-state-after-end/c
transducer-impl/c
transducer/c)
; Single-value-return usage of iterators, collectors, and transducers
(provide
convenient-iterator
convenient-iterator-init
convenient-iterator-produce
(struct-out collection-state-finished)
(struct-out collection-state-stalled)
collection-state?
collection-state->values
values->collection-state
convenient-collector
convenient-collector-init
convenient-collector-consume-next
convenient-collector-consume-end
(struct-out transduction-state-finished)
(struct-out transduction-state-ready)
(struct-out transduction-state-stalled)
transduction-state?
transduction-state-after-end?
transduction-state/c
transduction-state-after-end/c
transduction-state->values
values->transduction-state
convenient-transducer
convenient-transducer-init
convenient-transducer-consume-next
convenient-transducer-consume-end
convenient-transducer-produce
convenient-transducer-produce-after-end)
; Structurally typed usage of iterators, collectors, and transducers
(provide
cps->iterator
iterator->cps
cps->collector
collector->cps
cps-transduction-state-after-end/c
cps-transduction-state/c
cps->transducer
transducer->cps)
; Composiiton of iterators, collectors, transducers, and procedures
(provide
identity-transducer
chain-transducer-transducer
; NOTE: The `...head-transducer...` operations aren't total. The
; `iterator->head-transducer` operation can raise an error if the
; resulting transducer's input stream is empty, and the
; `head-transducer->collector` operation can raise an error if the
; original transducer's output stream is empty.
iterator->head-transducer
singleton-transducer->iterator
collector->singleton-transducer
head-transducer->collector
procedure->singleton-iterator
singleton-collector->procedure
chain-iterator-transducer
chain-transducer-collector
chain-iterator-collector
chain-procedure-iterator
chain-collector-procedure
chain-collector-iterator
chain-dynamic-dynamic
chain-dynamic-list
chain-dynamic)
; ===== Evergreen contract utilities =================================
(define-simple-macro (forall/c var:id ... contract:expr)
(let ([var (new-∀/c 'var)] ...)
contract))
(define-simple-macro (exists/c var:id ... contract:expr)
(let ([var (new-∃/c 'var)] ...)
contract))
; NOTE: This takes the same options `recursive-contract` does, and it
; passes them along unmodified.
(define-simple-macro (fix/c var:id options ... contract:expr)
(let ()
(define var
(let ([var (recursive-contract var options ...)])
contract))
var))
; ===== Efficient iterators, collectors, and transducers =============
; NOTE: We only make the fields mutable so we can use impersonator
; contracts for them with `struct/c`.
(struct iterator (init produce) #:mutable)
(struct collector (init consume-next consume-end) #:mutable)
(struct transducer
(init consume-next consume-end produce produce-after-end)
#:mutable)
(define/contract (iterator-impl/c state/c input/c output-elem/c)
(-> contract? contract? contract? contract?)
(struct/c iterator
(->i ([input input/c])
(values
[is-finished boolean?]
[state (is-finished) (if is-finished #f state/c)]))
(->i ([state state/c])
(values
[is-finished boolean?]
[output-elem output-elem/c]
[new-state (is-finished) (if is-finished #f state/c)]))))
(define/contract (iterator/c input/c output-elem/c)
(-> contract? contract? contract?)
(exists/c state/c (iterator-impl/c state/c input/c output-elem/c)))
(define/contract (collector-impl/c state/c input-elem/c output/c)
(-> contract? contract? contract? contract?)
(struct/c collector
(->i ()
(values
[is-finished boolean?]
[state (is-finished) (if is-finished output/c state/c)]))
(->i ([state state/c] [input-elem input-elem/c])
(values
[is-finished boolean?]
[new-state (is-finished) (if is-finished output/c state/c)]))
(-> state/c output/c)))
(define/contract (collector/c input-elem/c output/c)
(-> contract? contract? contract?)
(exists/c state/c (collector-impl/c state/c input-elem/c output/c)))
(define/contract (transducer-values-condition/c)
(-> contract?)
(or/c 'finished 'ready 'stalled))
(define/contract (transducer-values-condition-after-end/c)
(-> contract?)
(or/c 'finished 'ready))
(define/contract
(transducer-values-state/c s-ready/c s-stalled/c condition)
(-> contract? contract? (transducer-values-condition/c) contract?)
(match condition
['finished #f]
['ready s-ready/c]
['stalled s-stalled/c]))
(define/contract
(transducer-values-state-after-end/c s-ready-after-end/c condition)
(-> contract? (transducer-values-condition-after-end/c) contract?)
(match condition
['finished #f]
['ready s-ready-after-end/c]))
(define/contract
(transducer-impl/c s-ready/c s-stalled/c s-ready-after-end/c
input-elem/c output-elem/c)
(-> contract? contract? contract? contract? contract? contract?)
(struct/c transducer
(->i ()
(values
[condition (transducer-values-condition/c)]
[state (condition)
(transducer-values-state/c
s-ready/c s-stalled/c condition)]))
(->i ([state s-stalled/c] [input-elem input-elem/c])
(values
[condition (transducer-values-condition/c)]
[new-state (condition)
(transducer-values-state/c
s-ready/c s-stalled/c condition)]))
(->i ([state s-stalled/c])
(values
[condition (transducer-values-condition-after-end/c)]
[new-state (condition)
(transducer-values-state-after-end/c
s-ready-after-end/c condition)]))
(->i ([state s-ready/c])
(values
[condition (transducer-values-condition/c)]
[output-elem output-elem/c]
[new-state (condition)
(transducer-values-state/c
s-ready/c s-stalled/c condition)]))
(->i ([state s-ready-after-end/c])
(values
[condition (transducer-values-condition-after-end/c)]
[output-elem output-elem/c]
[new-state (condition)
(transducer-values-state-after-end/c
s-ready-after-end/c condition)]))))
(define/contract (transducer/c input-elem/c output-elem/c)
(-> contract? contract? contract?)
(exists/c s-ready/c s-stalled/c s-ready-after-end/c
(transducer-impl/c s-ready/c s-stalled/c s-ready-after-end/c
input-elem/c output-elem/c)))
; ====================================================================
; Single-value-return usage of iterators, collectors, and transducers
(define/contract (convenient-iterator init produce)
(forall/c s i o
(->
(-> i (or/c #f (list/c s)))
(-> s (list/c o (or/c #f (list/c s))))
(iterator-impl/c s i o)))
(iterator
(lambda (input)
(match (init input)
[#f (values #t #f)]
[(list state) (values #f state)]))
(lambda (state)
(define-match (list output-elem new-state) state)
(match new-state
[#f (values #t output-elem #f)]
[(list state) (values #f output-elem state)]))))
(define/contract (convenient-iterator-init ite input)
(forall/c s i o (-> (iterator-impl/c s i o) i (or/c #f (list/c s))))
(define-match (iterator init produce) ite)
(define-values (is-finished state) (init input))
(if is-finished
#f
(list state)))
(define/contract (convenient-iterator-produce ite state)
(forall/c s i o
(-> (iterator-impl/c s i o) s (list/c o (or/c #f (list/c s)))))
(define-match (iterator init produce) ite)
(define-values (is-finished output-elem new-state) (produce state))
(list output-elem
(if is-finished
#f
(list new-state))))
; NOTE: We only make the fields mutable so we can use impersonator
; contracts for them with `struct/c`.
(struct collection-state-finished (output) #:mutable)
(struct collection-state-stalled (state) #:mutable)
(define/contract (collection-state? v)
(-> any/c boolean?)
(or
(collection-state-finished? v)
(collection-state-stalled? v)))
(define/contract (collection-state/c state/c output/c)
(-> contract? contract? contract?)
(or/c
(struct/c collection-state-finished output/c)
(struct/c collection-state-stalled state/c)))
(define (collection-state->values state)
(forall/c s o
(->i ([state (collection-state/c s o)])
(values
[is-finished boolean?]
[new-state (is-finished) (if is-finished o s)])))
(match state
[(collection-state-finished output) (values #t output)]
[(collection-state-stalled state) (values #f state)]))
(define (values->collection-state is-finished state)
(forall/c s o
(->i
(
[is-finished boolean?]
[state (is-finished) (if is-finished o s)])
[new-state (collection-state/c s o)]))
(if is-finished
(collection-state-finished state)
(collection-state-stalled state)))
(define/contract (convenient-collector init consume-next consume-end)
(forall/c s-stalled i o
(let ([s (collection-state/c s-stalled o)])
(-> (-> s) (-> s-stalled i s) (-> s-stalled o)
(collector-impl/c s-stalled i o))))
(collector init consume-next consume-end))
(define/contract (convenient-collector-init col)
(forall/c s-stalled i o
(let ([s (collection-state/c s-stalled o)])
(-> (collector-impl/c s-stalled i o) s)))
(define-match (collector init consume-next consume-end) col)
(define-values (is-finished new-state) (init))
(values->collection-state is-finished new-state))
(define/contract
(convenient-collector-consume-next col state input-elem)
(forall/c s-stalled i o
(let ([s (collection-state/c s-stalled o)])
(-> (collector-impl/c s-stalled i o) s-stalled i s)))
(define-match (collector init consume-next consume-end) col)
(define-values (is-finished new-state)
(consume-next state input-elem))
(values->collection-state is-finished new-state))
(define/contract (convenient-collector-consume-end col state)
(forall/c s-stalled i o
(-> (collector-impl/c s-stalled i o) s-stalled o))
(define-match (collector init consume-next consume-end) col)
(consume-end state))
; NOTE: We only make the fields mutable so we can use impersonator
; contracts for them with `struct/c`.
(struct transduction-state-finished () #:mutable)
(struct transduction-state-ready (state) #:mutable)
(struct transduction-state-stalled (state) #:mutable)
(define/contract (transduction-state? v)
(-> any/c boolean?)
(or
(transduction-state-finished? v)
(transduction-state-ready? v)
(transduction-state-stalled? v)))
(define/contract (transduction-state-after-end? v)
(-> any/c boolean?)
(or
(transduction-state-finished? v)
(transduction-state-ready? v)))
(define/contract (transduction-state/c s-ready/c s-stalled/c)
(-> contract? contract? contract?)
(or/c
(struct/c transduction-state-finished)
(struct/c transduction-state-ready s-ready/c)
(struct/c transduction-state-stalled s-stalled/c)))
(define/contract (transduction-state-after-end/c s-ready-after-end/c)
(-> contract? contract?)
(or/c
(struct/c transduction-state-finished)
(struct/c transduction-state-ready s-ready-after-end/c)))
(define (transduction-state->values state)
(forall/c s-ready s-stalled
(->i ([state (transduction-state/c s-ready s-stalled)])
(values
[condition (transducer-values-condition/c)]
[new-state (condition)
(transducer-values-state/c s-ready s-called condition)])))
(match state
[(transduction-state-finished) (values 'finished #f)]
[(transduction-state-ready state) (values 'ready state)]
[(transduction-state-stalled state) (values 'stalled state)]))
(define (values->transduction-state condition state)
(forall/c s-ready s-stalled
(->i
(
[condition (transducer-values-condition/c)]
[state (condition)
(transducer-values-state/c s-ready s-called condition)])
[new-state (transduction-state/c s-ready s-stalled)]))
(match condition
['finished (transduction-state-finished)]
['ready (transduction-state-ready state)]
['stalled (transduction-state-stalled state)]))
(define/contract
(convenient-transducer
init produce consume-next consume-end produce-after-end)
(forall/c s-ready s-stalled s-ready-after-end i o
(let
(
[s (transduction-state/c s-ready s-stalled)]
[s-after-end
(transduction-state-after-end/c s-ready-after-end)])
(->
(-> s)
(-> s-stalled i s)
(-> s-stalled s-after-end)
(-> s-ready (list/c o s))
(-> s-ready-after-end (list/c o s-after-end))
(transducer-impl/c s-ready s-stalled s-ready-after-end i o))))
(transducer
(lambda ()
(transduction-state->values (init)))
(lambda (state input-elem)
(transduction-state->values (consume-next state input-elem)))
(lambda (state)
(transduction-state->values (consume-end state)))
(lambda (state)
(define-match (list output-elem state-2) (produce state))
(define-values (condition state-3)
(transduction-state->values state-2))
(values condition output-elem state-3))
(lambda (state)
(define-match (list output-elem state-2)
(produce-after-end state))
(define-values (condition state-3)
(transduction-state->values state-2))
(values condition output-elem state-3))))
(define/contract (convenient-transducer-init tra)
(forall/c s-ready s-stalled s-ready-after-end i o
(let ([s (transduction-state/c s-ready s-stalled)])
(-> (transducer-impl/c s-ready s-stalled s-ready-after-end i o)
s)))
(define-match (transducer init _ _ _ _) tra)
(define-values (condition state) (init))
(values->transduction-state condition state))
(define/contract
(convenient-transducer-consume-next tra state input-elem)
(forall/c s-ready s-stalled s-ready-after-end i o
(let ([s (transduction-state/c s-ready s-stalled)])
(->
(transducer-impl/c s-ready s-stalled s-ready-after-end i o)
s-stalled
i
s)))
(define-match (transducer _ consume-next _ _ _) tra)
(define-values (condition new-state)
(consume-next state input-elem))
(values->transduction-state condition new-state))
(define/contract (convenient-transducer-consume-end tra state)
(forall/c s-ready s-stalled s-ready-after-end i o
(let
(
[s-after-end
(transduction-state-after-end/c s-ready-after-end)])
(->
(transducer-impl/c s-ready s-stalled s-ready-after-end i o)
s-stalled
s-after-end)))
(define-match (transducer _ _ consume-end _ _) tra)
(define-values (condition new-state) (consume-end state))
(values->transduction-state condition new-state))
(define/contract (convenient-transducer-produce tra state)
(forall/c s-ready s-stalled s-ready-after-end i o
(let ([s (transduction-state/c s-ready s-stalled)])
(->
(transducer-impl/c s-ready s-stalled s-ready-after-end i o)
s-ready
(list/c o s))))
(define-match (transducer _ _ _ produce _) tra)
(define-values (condition output-elem new-state) (produce state))
(list output-elem (values->transduction-state condition new-state)))
(define/contract (convenient-transducer-produce-after-end tra state)
(forall/c s-ready s-stalled s-ready-after-end i o
(let
(
[s-after-end
(transduction-state-after-end/c s-ready-after-end)])
(->
(transducer-impl/c s-ready s-stalled s-ready-after-end i o)
s-ready-after-end
(list/c o s-after-end))))
(define-match (transducer _ _ _ _ produce-after-end) tra)
(define-values (condition output-elem new-state)
(produce-after-end state))
(list output-elem (values->transduction-state condition new-state)))
; ====================================================================
; Structurally typed usage of iterators, collectors, and transducers
(define/contract (cps->iterator init)
(forall/c i o
(-> (-> i (fix/c s (or/c #f (-> (list/c o s)))))
(iterator/c i o)))
(define (->list state)
(match state
[#f #f]
[_ (list state)]))
(convenient-iterator
(lambda (input)
(->list (init input)))
(lambda (state)
(define-match (list output-elem new-state) (state))
(list output-elem (->list new-state)))))
(define/contract (iterator->cps ite)
(forall/c i o
(-> (iterator/c i o)
(-> i (fix/c s (or/c #f (-> (list/c o s)))))))
(lambda (input)
(let next ([state (convenient-iterator-init ite input)])
(match state
[#f #f]
[
(list state)
(lambda ()
(define-match (list output-elem new-state)
(convenient-iterator-produce ite state))
(list output-elem (next new-state)))]))))
(define/contract (cps->collector init)
(forall/c i o
(-> (-> (fix/c s (collection-state/c (list/c (-> i s) (-> o)) o)))
(collector/c i o)))
(convenient-collector init
(lambda (state input-elem)
(define-match (list consume-next consume-end) state)
(consume-next input-elem))
(lambda (state)
(define-match (list consume-next consume-end) state)
(consume-end))))
(define/contract (collector->cps col)
(forall/c i o
(-> (collector/c i o)
(-> (fix/c s (collection-state/c (list/c (-> i s) (-> o)) o)))))
(lambda ()
(let next ([state (convenient-collector-init col)])
(match state
[(collection-state-finished output) state]
[
(collection-state-stalled state)
(collection-state-stalled
(list
(lambda (input-elem)
(next
(convenient-collector-consume-next
col state input-elem)))
(lambda ()
(convenient-collector-consume-end col state))))]))))
(define/contract (cps-transduction-state-after-end/c output-elem/c)
(-> contract? contract?)
(fix/c s-after-end
(or/c
#f
(-> (list/c output-elem/c s-after-end)))))
(define/contract (cps-transduction-state/c input-elem/c output-elem/c)
(-> contract? contract? contract?)
(fix/c s
(or/c
#f
(-> (list/c output-elem/c s))
(list/c
(-> input-elem/c s)
(-> (cps-transduction-state-after-end/c output-elem/c))))))
(define/contract (cps->transducer init)
(forall/c i o
(-> (-> (cps-transduction-state/c i o)) (transducer/c i o)))
(define (cps->transduction-state state)
(match state
[#f (transduction-state-finished)]
[
(list consume-next consume-end)
(transduction-state-stalled state)]
[_ (transduction-state-ready state)]))
(convenient-transducer init
(lambda (state)
(cps->transduction-state (init)))
(lambda (state input-elem)
(define-match (list consume-next consume-end) state)
(cps->transduction-state (consume-next input-elem)))
(lambda (state)
(define-match (list consume-next consume-end) state)
(cps->transduction-state (consume-end))
(lambda (state)
(define-match (list output-elem new-state) (state))
(list output-elem (cps->transduction-state new-state)))
(lambda (state)
(define-match (list output-elem new-state) (state))
(list output-elem (cps->transduction-state new-state))))))
(define/contract (transducer->cps tra)
(forall/c i o
(-> (transducer/c i o) (-> (cps-transduction-state/c i o))))
(lambda ()
(let next
(
[is-after-end #f]
[state (convenient-transducer-init tra)])
(match state
[(transduction-state-finished) #f]
[
(transduction-state-ready state)
(lambda ()
(define-match (list output-elem new-state)
(if is-after-end
(convenient-transducer-produce-after-end tra state)
(convenient-transducer-produce tra state)))
(list output-elem (next is-after-end new-state)))]
[
(transduction-state-stalled state)
(list
(lambda (input-elem)
(next is-after-end
(convenient-transducer-consume-next tra input-elem)))
(lambda ()
(next #t (convenient-transducer-consume-end tra))))]))))
; ====================================================================
; Composiiton of iterators, collectors, transducers, and procedures
(define/contract (identity-transducer)
(forall/c x (-> (transducer/c x x)))
(transducer
(lambda () (values 'stalled #f))
(lambda (state input-elem) (values 'ready input-elem))
(lambda (state) (values 'finished #f))
(lambda (state) (values 'stalled state #f))
(lambda (state)
(error "Internal error: Tried to produce from an identity transducer after the input ended"))))
(struct chain-tratra-state-uninitialized-ready (b-state))
(struct chain-tratra-state-initialized-ready
(a-condition a-state b-state))
(struct chain-tratra-state-after-end-ready (b-state))
(struct chain-tratra-state-stalled-stalled (a-state b-state))
(define/contract (chain-transducer-transducer a b)
(forall/c x y z
(-> (transducer/c x y) (transducer/c y z) (transducer/c x z)))
(define-match
(transducer
a-init a-consume-next a-consume-end a-produce
a-produce-after-end)
a)
(define-match
(transducer
b-init b-consume-next b-consume-end b-produce
b-produce-after-end)
b)
(define
(make-chain-tratra-state-stalled
a-is-after-end a-condition a-state b-state)
(match a-condition
['finished
(let ()
(define-values (b-condition-2 b-state-2)
(b-consume-end b-state))
(make-chain-tratra-state-after-end
b-condition-2 b-state-2))]
['ready
(let ()
(define-values
(a-condition-2 intermediate-elem a-state-2)
(if a-is-after-end
(a-produce-after-end a-state)
(a-produce a-state)))
(define-values (b-condition-2 b-state-2)
(b-consume-next b-state intermediate-elem))
(make-chain-tratra-state-initialized
a-is-after-end a-condition-2 a-state-2
b-condition-2 b-state-2))]
['stalled
(values 'stalled
(chain-tratra-state-stalled-stalled
a-state b-state))]))
(define
(make-chain-tratra-state-initialized
a-is-after-end a-condition a-state b-condition b-state)
(match b-condition
['finished (values 'finished #f)]
['ready
(values 'ready
(chain-tratra-state-initialized-ready
a-condition a-state b-state))]
['stalled
(make-chain-tratra-state-stalled
a-is-after-end a-condition a-state b-state)]))
(define (make-chain-tratra-state-after-end b-condition b-state)
(match b-condition
['finished (values 'finished #f)]
['ready
(values 'ready
(chain-tratra-state-after-end-ready b-state))]))
(define
(make-chain-tratra-state-uninitialized b-condition b-state)
(match b-condition
['finished (values 'finished #f)]
['ready
(values 'ready
(chain-tratra-state-uninitialized-ready b-state))]
['stalled
(let ()
(define-values (a-condition a-state) (a-init))
(make-chain-tratra-state-stalled
#f a-condition a-state b-state))]))
(define (process-initialized a-is-after-end state)
(match state
[
(chain-tratra-state-initialized-ready
a-condition a-state b-state)
(let ()
(define-values (b-condition-2 output-elem b-state-2)
(b-produce b-state))
(define-values (overall-condition overall-state)
(make-chain-tratra-state-initialized
a-is-after-end a-condition a-state
b-condition-2 b-state-2))
(values overall-condition output-elem overall-state))]
[
(chain-tratra-state-after-end-ready b-state)
(let ()
(define-values (b-condition-2 output-elem b-state-2)
(b-produce-after-end b-state))
(define-values (overall-condition overall-state)
(make-chain-tratra-state-after-end
b-condition-2 b-state-2))
(values overall-condition output-elem overall-state))]))
(transducer
(lambda ()
(define-values (b-condition b-state) (b-init))
(make-chain-tratra-state-uninitialized b-condition b-state))
(lambda (state input-elem)
(define-match
(chain-tratra-state-stalled-stalled a-state b-state)
state)
(define-values (a-condition-2 a-state-2)
(a-consume-next a-state input-elem))
(make-chain-tratra-state-stalled
#f a-condition-2 a-state-2 b-state))
(lambda (state)
(define-match
(chain-tratra-state-stalled-stalled a-state b-state)
state)
(define-values (a-condition-2 a-state-2)
(a-consume-end a-state))
(make-chain-tratra-state-stalled
#t a-condition-2 a-state-2 b-state))
(lambda (state)
(match state
[
(chain-tratra-state-uninitialized-ready b-state)
(let ()
(define-values (b-condition-2 output-elem b-state-2)
(b-produce b-state))
(define-values (overall-condition overall-state)
(make-chain-tratra-state-uninitialized
b-condition-2 b-state-2))
(values overall-condition output-elem overall-state))]
[_ (process-initialized #f state)]))
(lambda (state)
(process-initialized #t state))))
(define/contract (iterator->head-transducer ite)
(forall/c i o (-> (iterator/c i o) (transducer/c i o)))
(define-match (iterator init produce) ite)
(transducer
(lambda () (values 'stalled #f))
(lambda (state input-elem)
(define-values (is-finished new-state) (init input-elem))
(if is-finished
(values 'finished #f)
(values 'ready new-state)))
(lambda (state)
(error "Gave an empty input stream to a transducer created by iterator->head-transducer"))
(lambda (state)
(define-values (is-finished output-elem state) (produce state))
(if is-finished
(values 'finished output-elem #f)
(values 'ready output-elem state)))
(lambda (state)
(error "Internal error: Tried to produce from an iterator->head-transformer transducer after the input ended"))))
(struct transducer-iterator-state-anticipating-one (input state))
(struct transducer-iterator-state-anticipating-zero (state))
(struct transducer-iterator-state-after-end (state))
(define/contract (singleton-transducer->iterator tra)
(forall/c i o (-> (transducer/c i o) (iterator/c i o)))
(define-match
(transducer
init consume-next consume-end produce produce-after-end)
tra)
(define (state-after-end condition state)
(match condition
['finished (values #t #f)]
['ready
(values #f (transducer-iterator-state-after-end state))]))
(define (state-anticipating-zero condition state)
(match condition
['finished (values #t #f)]
['ready
(values #f
(transducer-iterator-state-anticipating-zero state))]
['stalled
(let ()
(define-values (condition state)
(consume-end state))
(state-after-end condition state))]))
(define (state-anticipating-one input condition state)
(match condition
['finished (values #t #f)]
['ready
(values #f
(transducer-iterator-state-anticipating-one input state))]
['stalled
(let ()
(define-values (condition state)
(consume-next state input))
(state-anticipating-zero condition state))]))
(iterator
(lambda (input)
(define-values (condition state) (init))
(state-anticipating-one input condition state))
(lambda (state)
(match state
[
(transducer-iterator-state-anticipating-one input state)
(let ()
(define-values (condition output-elem state)
(produce state))
(define-values (is-finished new-state)
(state-anticipating-one input condition state))
(values is-finished output-elem new-state))]
[
(transducer-iterator-state-anticipating-zero state)
(let ()
(define-values (condition output-elem state)
(produce state))
(define-values (is-finished new-state)
(state-anticipating-zero condition state))
(values is-finished output-elem new-state))]
[
(transducer-iterator-state-after-end state)
(let ()
(define-values (condition output-elem state)
(produce state))
(define-values (is-finished new-state)
(state-after-end condition state))
(values is-finished output-elem new-state))]))))
(define/contract (collector->singleton-transducer col)
(forall/c i o (-> (collector/c i o) (transducer/c i o)))
(define-match (collector init consume-next consume-end) col)
(transducer
(lambda ()
(define-values (is-finished state) (init))
(if is-finished
(values 'ready state)
(values 'stalled state)))
(lambda (state input-elem)
(define-values (is-finished new-state)
(consume-next state input-elem))
(if is-finished
(values 'ready new-state)
(values 'stalled new-state)))
(lambda (state)
(values 'ready (consume-end state)))
(lambda (state)
(values 'finished state #f))
(lambda (state)
(values 'finished state #f))))
(define/contract (head-transducer->collector tra)
(forall/c i o (-> (transducer/c i o) (collector/c i o)))
(define-match
(transducer
init consume-next consume-end produce produce-after-end)
tra)
(define (transducer-values->collector-values condition state)
(match condition
['finished
(error "Produced zero values from a head-transducer->collector transducer")]
['ready
(let ()
(define-values (condition output state) (produce state))
(values #t output))]
['stalled
(values #f state)]))
(collector
(lambda ()
(define-values (condition state) (init))
(transducer-values->collector-values condition state))
(lambda (state input-elem)
(define-values (condition new-state)
(consume-next state input-elem))
(transducer-values->collector-values condition new-state))
(lambda (state)
(define-values (condition state-2) (consume-end state))
(define-values (is-finished state-3)
(transducer-values->collector-values condition state-2))
; NOTE: Here, `is-finished` must be true because `condition` is
; an "after-end" condition.
state-3)))
(define/contract (procedure->singleton-iterator proc)
(forall/c i o (-> (-> i o) (iterator/c i o)))
(iterator
(lambda (input) (values #f input))
(lambda (state) (values #t (proc state) #f))))
(define/contract (singleton-collector->procedure col)
(forall/c i o (-> (collector/c i o) (-> i o)))
(define-match (collector init consume-next consume-end) col)
(lambda (input)
(define-values (is-finished state) (init))
(if is-finished
state
(let ()
(define-values (is-finished-2 state-2)
(consume-next state input))
(if is-finished-2
state-2
(consume-end state-2))))))
(define/contract (chain-iterator-transducer a b)
(forall/c x y z
(-> (iterator/c x y) (transducer/c y z) (iterator/c x z)))
(singleton-transducer->iterator
(chain-transducer-transducer (iterator->head-transducer a) b)))
(define/contract (chain-transducer-collector a b)
(forall/c x y z
(-> (transducer/c x y) (collector/c y z) (collector/c x z)))
(head-transducer->collector
(chain-transducer-transducer
a
(collector->singleton-transducer b))))
(define/contract (chain-iterator-collector a b)
(forall/c x y z (-> (iterator/c x y) (collector/c y z) (-> x z)))
(singleton-collector->procedure
(chain-transducer-collector (iterator->head-transducer a) b)))
(define/contract (chain-procedure-iterator a b)
(forall/c x y z (-> (-> x y) (iterator/c y z) (iterator/c x z)))
(define-match (iterator init produce) b)
(iterator (lambda (input) (init (a input))) produce))
(define/contract (chain-collector-procedure a b)
(forall/c x y z (-> (collector/c x y) (-> y z) (collector/c x z)))
(define-match (collector init consume-next consume-end) a)
(collector
(lambda ()
(define-values (is-finished state) (init))
(if is-finished
(values #t (b state))
(values #f state)))
(lambda (state input-elem)
(define-values (is-finished new-state)
(consume-next state input-elem))
(if is-finished
(values #t (b state))
(values #f state)))
(lambda (state)
(b (consume-end state)))))
(define/contract (chain-collector-iterator a b)
(forall/c x y z
(-> (collector/c x y) (iterator/c y z) (transducer/c x z)))
(chain-transducer-transducer
(collector->singleton-transducer a)
(iterator->head-transducer b)))
(define/contract (chain-dynamic-dynamic a b)
(forall/c x y z
(case->
(-> (-> x y) (-> y z) (-> x z))
(-> (iterator/c x y) (collector/c y z) (-> x z))
(-> (-> x y) (iterator/c y z) (iterator/c x z))
(-> (iterator/c x y) (transducer/c y z) (iterator/c x z))
(-> (collector/c x y) (-> y z) (collector/c x z))
(-> (transducer/c x y) (collector/c y z) (collector/c x z))
(-> (transducer/c x y) (transducer/c y z) (transducer/c x z))
(-> (collector/c x y) (iterator/c y z) (transducer/c x z))))
(cond
[(and (procedure? a) (procedure? b)) (compose b a)]
[
(and (iterator? a) (collector? b))
(chain-iterator-collector a b)]
[
(and (procedure? a) (iterator? b))
(chain-procedure-iterator a b)]
[
(and (iterator? a) (transducer? b))
(chain-iterator-transducer a b)]
[
(and (collector? a) (procedure? b))
(chain-collector-procedure a b)]
[
(and (transducer? a) (collector? b))
(chain-transducer-collector a b)]
[
(and (transducer? a) (transducer? b))
(chain-transducer-transducer a b)]
[
(and (collector? a) (iterator? b))
(chain-collector-iterator a b)]))
(define/contract (chain-dynamic-list links)
(-> (and/c pair? list?) any/c)
(define-match (cons first rest) links)
(foldl (lambda (a b) (chain-dynamic a b)) first rest))
(define/contract (chain-dynamic link . links)
(->* (any/c) #:rest any/c any/c)
(chain-dynamic-list (cons link links)))
@rocketnia
Copy link
Author

I just made this Gist public, and I'm about to link to it from a blog post.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment