Skip to content

Instantly share code, notes, and snippets.

@tonyg
Last active May 22, 2019 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonyg/90c2c8406a24b5357aa44b8677089a90 to your computer and use it in GitHub Desktop.
Save tonyg/90c2c8406a24b5357aa44b8677089a90 to your computer and use it in GitHub Desktop.
Exploration of `replace-evt` in the context of getting a reliable signal that a complete frame of input is available.
#lang racket
;; Exploration of `replace-evt` in the context of getting a reliable
;; signal that a complete frame of input is available.
(module+ test (require rackunit))
;; Imagine a protocol, based on a TCP socket, where every two adjacent
;; bytes sent by the server are to be treated together. That is,
;; fixed-size two-byte framing.
;; If you're not careful, a server sending *one* byte followed by a
;; long delay and then the subsequent byte (or EOF) can cause your
;; client to block, even if you are using asynchronous events to
;; multiplex multiple input sources.
;; The problem arises when programs use asynchronous events to detect
;; the availability of some portion of a packet, followed by use of
;; synchronous calls to retrieve the packet itself. Unfortunately,
;; this is a standard idiom for users of Racket's `evt`-based,
;; CML-style I/O multiplexing system.
;; `read-frame` reads the next two bytes from `p`, converting them to
;; an unsigned integer using big-endian order. If one or zero bytes
;; are available, followed by end-of-file, `read-frame` returns eof.
;; Otherwise, if fewer than two bytes are available, `read-frame`
;; blocks until either eof or more bytes arrive.
;;
(define (read-frame p)
(define bs (read-bytes 2 p))
(cond [(eof-object? bs) eof]
[(< (bytes-length bs) 2) eof]
[else (integer-bytes->integer bs #f #t)]))
(module+ test
;; Some simple tests.
;;
(check-equal? (read-frame (open-input-bytes #"")) eof)
(check-equal? (read-frame (open-input-bytes #"a")) eof)
(check-equal? (read-frame (open-input-bytes #"aa")) 24929)
(define *PAUSE-UNIT* 0.05)
(define (time-thunk f)
(let* ((start (current-inexact-milliseconds))
(result (f))
(stop (current-inexact-milliseconds)))
(list (/ (- stop start) 1000.0)
result)))
;; To test blocking, we'll use multiple threads and insert
;; artificial delays. `check-io-characteristics` trickles bytes
;; through in a background thread, and reads frames in a foreground
;; thread.
;;
(define (check-io-characteristics writer reader expected-result expected-speed threshold)
(let-values (((i o) (make-pipe)))
(thread (lambda ()
(writer o)
(close-output-port o)))
(match (time-thunk (lambda () (reader i)))
[(list t v)
(check-equal? v expected-result)
(match expected-speed
['fast (check-true (< t threshold) "Reader should be fast")]
['slow (check-true (> t threshold) "Reader should be slow")])])))
;; One byte eventually followed by EOF should yield `#f`, slowly.
(define (write:a2 o)
(write-bytes #"a" o)
(sleep (* 2 *PAUSE-UNIT*)))
(check-io-characteristics write:a2 read-frame eof 'slow *PAUSE-UNIT*)
;; One byte eventually followed by another should yield a frame, but slowly.
(define (write:a2a o)
(write-bytes #"a" o)
(sleep (* 2 *PAUSE-UNIT*))
(write-bytes #"a" o))
(check-io-characteristics write:a2a read-frame 24929 'slow *PAUSE-UNIT*)
;; Three bytes eventually followed by another should yield one of the two frames immediately.
(define (write:bba2a o)
(write-bytes #"bba" o)
(sleep (* 2 *PAUSE-UNIT*))
(write-bytes #"a" o))
(check-io-characteristics write:bba2a read-frame 25186 'fast *PAUSE-UNIT*)
;; Three bytes eventually followed by another should yield both frames eventually.
(check-io-characteristics write:bba2a
(lambda (i) (list (read-frame i) (read-frame i)))
(list 25186 24929)
'slow
*PAUSE-UNIT*))
(module+ test
;; Now to show the problem we're considering here, where a slow
;; (potentially malicious) input can hold up processing of other
;; inputs.
;; `slow-stream` runs `writer` in the background, and returns an
;; input-port reading from `writer`.
;;
(define (slow-stream writer)
(let-values (((i o) (make-pipe)))
(thread (lambda () (writer o) (close-output-port o)))
i))
;; `multi-io` is like `check-io-characteristics` but for multiple
;; inputs. It doesn't explicitly check timing of events, instead
;; accumulating frames in order as they arrive and yielding the
;; final list. The order of frames in the list implicitly encodes
;; enough of the timing that we can check for the problem.
;;
(define (multi-io/1 i1 i2 reader)
(let loop ((enable1 #t)
(enable2 #t)
(frames '()))
(define (clause enabled i k)
(if enabled
(handle-evt i
(lambda (_)
(match (reader i)
[(? eof-object?) (k #f frames)]
[v (k #t (cons v frames))])))
never-evt))
(if (or enable1 enable2)
(sync (clause enable1 i1 (lambda (enable1 frames) (loop enable1 enable2 frames)))
(clause enable2 i2 (lambda (enable2 frames) (loop enable1 enable2 frames))))
(reverse frames))))
;; This first test shows the issue. We expect `(list 25186 24929)`
;; because the "bb" frame is available before the "aa" frame is
;; complete. However, the code will yield `(list 24929 25186)`
;; because it starts reading the "aa" frame and gets stuck waiting
;; for it to finish before it can return to check for other inputs!
;;
(define (write:1bb o)
(sleep *PAUSE-UNIT*)
(write-bytes #"bb" o))
(check-equal? (multi-io/1 (slow-stream write:a2a) (slow-stream write:1bb) read-frame)
;; We really want the expected result to be (list 25186 24929),
;; but it is
(list 24929 25186)
;; as explained above.
)
;; We don't see the same with single-byte reads! The streams
;; interleave properly.
;;
(check-equal? (multi-io/1 (slow-stream write:a2a) (slow-stream write:1bb) read-byte)
(list 97 98 98 97))
)
;; Let's try to use `replace-evt` to consume data as it comes
;; available, without blocking other streams.
(define (const-evt v)
(handle-evt always-evt (lambda (_) v)))
(define (read-frame-evt/1 p)
(let loop ((remaining 2) (accumulator '()))
(if (zero? remaining)
(const-evt (integer-bytes->integer (list->bytes (reverse accumulator)) #f #t))
(replace-evt p
(lambda (_)
(match (read-byte p)
[(? eof-object?) (const-evt eof)]
[b (loop (- remaining 1) (cons b accumulator))]))))))
(module+ test
;; First, we'll try it on a single stream:
;;
(let ((i (slow-stream write:a2a)))
(check-equal? (sync (read-frame-evt/1 i)) 24929))
;; That looks very promising! Does it work for multiple streams at once?
;;
(define (multi-io/2 i1 i2 evt-maker)
(let loop ((enable1 #t)
(enable2 #t)
(frames '()))
(define (clause enabled i k)
(if enabled
(handle-evt (evt-maker i)
(match-lambda
[(? eof-object?) (k #f frames)]
[v (k #t (cons v frames))]))
never-evt))
(if (or enable1 enable2)
(sync (clause enable1 i1 (lambda (enable1 frames) (loop enable1 enable2 frames)))
(clause enable2 i2 (lambda (enable2 frames) (loop enable1 enable2 frames))))
(reverse frames))))
;; The answer is *no*.
;;
(check-equal? (multi-io/2 (slow-stream write:a2a) (slow-stream write:1bb) read-frame-evt/1)
;; We expect this:
;; (list 25186 24929)
;; but get:
(list 25186))
;;
;; Each time `evt-maker` (i.e. `read-frame-evt/1`) is called, it
;; creates a fresh frame-reader. So when the "aa" frame is half-way
;; through, the "bb" frame completes, which makes the `multi-io/2`
;; loop go round to the top again, creating fresh frame-reader
;; events, discarding the built-up state in the "aa" frame-reader.
;; Subsequently, the "bb" reader completes with eof, and the "aa"
;; reader is left with the odd "a" byte alone, meaning it too yields
;; eof.
;; Let's try gathering frames by passing in pre-made read events,
;; instead of having an `evt-maker` to cons them up.
;;
(define (multi-io/3 evt1 evt2)
(let loop ((enable1 #t)
(enable2 #t)
(frames '()))
(define (clause enabled evt k)
(if enabled
(handle-evt evt
(match-lambda
[(? eof-object?) (k #f frames)]
[v (k #t (cons v frames))]))
never-evt))
(if (or enable1 enable2)
(sync (clause enable1 evt1 (lambda (enable1 frames) (loop enable1 enable2 frames)))
(clause enable2 evt2 (lambda (enable2 frames) (loop enable1 enable2 frames))))
(reverse frames))))
(check-equal? (multi-io/3 (read-frame-evt/1 (slow-stream write:a2a))
(read-frame-evt/1 (slow-stream write:1bb)))
;; We want (list 25186 24929), but we get
(list 25186))
;;
;; It still doesn't work. Each time `sync` is called, it starts the
;; whole chain of `replace-evt`s from the beginning, even though
;; this time it's using the same actual event objects.
)
;; Let's try moving the state of the parse outside the `replace-evt`
;; and using mutation (!) instead of the quasi-functional approach
;; used in `read-frame-evt/1`.
;;
;; We will have to take special care to reset the internal state after
;; yielding our first value, because the same event object could be
;; used to parse *multiple* frames in succession!
;;
(define (read-frame-evt/2 p)
(define accumulator '())
(define remaining 2)
(define self-evt
(replace-evt p
(lambda (_)
(if (zero? remaining)
(begin0 (const-evt (integer-bytes->integer
(list->bytes (reverse accumulator)) #f #t))
(set! accumulator '())
(set! remaining 2))
(match (read-byte p)
[(? eof-object?) (const-evt eof)]
[b (set! remaining (- remaining 1))
(set! accumulator (cons b accumulator))
self-evt])))))
self-evt)
(module+ test
;; At last! This seems to work:
;;
(check-equal? (multi-io/3 (read-frame-evt/2 (slow-stream write:a2a))
(read-frame-evt/2 (slow-stream write:1bb)))
(list 25186 24929))
(define (write:a2aa2a o)
(write:a2a o)
(write:a2a o))
(define (write:1bb2bb o)
(write:1bb o)
(sleep *PAUSE-UNIT*)
(write:1bb o))
;; ... or, at least, it works for reading SINGLE frames! For some
;; reason, we don't get the ordering we expect with multi-frame
;; streams that have embedded delays:
;;
(let ((A (slow-stream write:a2aa2a))
(B (slow-stream write:1bb2bb)))
(check-equal? (multi-io/3 (read-frame-evt/2 A)
(read-frame-evt/2 B))
;; We expect (list 25186 24929 25186 24929), but get
(list 24929 25186 25186 24929)))
;; Furthermore, not only is it not really doing the right thing, the
;; ergonomics are *horrible*. We have no way to write a parsing
;; routine so that it looks like the simple blocking implementation,
;; but works in a non-blocking context.
)
;; What if we push the state into the input-port, by peeking rather
;; than reading until the whole frame is ready?
;;
;; It might be able to be made to work, but this approach:
;;
;; (define (read-frame-evt/3 p)
;; (define self-evt
;; (replace-evt p
;; (lambda (_)
;; (define bs (make-bytes 2))
;; (match (peek-bytes-avail! bs 0 #f p)
;; [(? eof-object?) (const-evt eof)]
;; [2 (begin0 (const-evt (integer-bytes->integer bs #f #t))
;; (read-bytes 2 p))]
;; [_ self-evt]))))
;; self-evt)
;;
;; ... fails because `p` never becomes unready, so the whole thing
;; *spins* until a complete frame is available.
;;
;; (module+ test
;; (check-equal? (multi-io/3 (read-frame-evt/3 (slow-stream write:a2a))
;; (read-frame-evt/3 (slow-stream write:1bb)))
;; (list 25186 24929)))
;;
;; Furthermore, the code itself is not ideal. It doesn't get us any
;; closer to having a direct approach to writing non-blocking parsers.
;; Also, the read-bytes to discard the already-seen input is ugly; we
;; could try port-commit-peeked to fix that, but its interface is
;; complicated.
;; OK. I feel like the nuclear option is to Just Use Threads, and that
;; that option is problematic because of thread management,
;; exception-handling, race conditions etc etc. So before I go there,
;; let's just see if delimited continuations (in the form of
;; "algebraic" effects!) might give us what we need.
;; You'll need to have the "effects" package from
;; https://github.com/tonyg/racket-effects installed for this to work.
(require effects)
(define *tag* (make-effect-tag 'io))
;; This version doesn't terminate, because it ends by yielding the
;; very first frame read and never resets its state to read any
;; subsequent ones:
;;
;; (define (io-evt proc)
;; (with-effect #:deep *tag* k
;; ([evt (replace-evt evt (once-lambda (v) (k v)))])
;; (const-evt (proc))))
;;
;; where
;;
;; (define-syntax-rule (once-lambda args body ...)
;; (let ((ran? #f)
;; (result #f))
;; (lambda args
;; (when (not ran?)
;; (let ((answer (begin body ...)))
;; (set! ran? #t)
;; (set! result answer)))
;; result)))
;; This version works, but suffers a space leak with each subsequent
;; round:
;;
;; (define (io-evt proc)
;; (with-effect #:deep *tag* k
;; ([evt (replace-evt evt (once-lambda (v) (k v)))])
;; (let ((result (proc)))
;; (define next-round (once-lambda () (io-evt proc)))
;; (define consumed? #f)
;; (guard-evt (lambda ()
;; (if consumed?
;; (next-round)
;; (begin0 (const-evt result)
;; (set! consumed? #t))))))))
;; This version works, and by reusing the `guard-evt` object, avoids
;; the space leak:
;;
(define (io-evt proc)
(define state #f)
(define yielding-evt (guard-evt (lambda () (state))))
(define (goto! next-state)
(set! state next-state)
yielding-evt)
(define (run-proc-with-effects)
(with-effect #:deep *tag* k
([evt (goto! (lambda () (replace-evt evt k)))]) ;; "in-progress" state
(let ((result (proc)))
(goto! (lambda () ;; "ready" state
(set! state run-proc-with-effects)
(const-evt result))))))
(goto! run-proc-with-effects))
(define (io-sync proc)
(sync (io-evt proc)))
(define io-wait! (perform *tag*))
(define (io-evt->proc ctor)
(lambda args
(io-wait! (apply ctor args))))
(define (read-byte* in)
(when (not (byte-ready? in))
(io-wait! in))
(read-byte in))
(define read-bytes* (io-evt->proc read-bytes-evt))
(module+ test
(check-equal? (let ((i (open-input-bytes #"aa")))
(sync (handle-evt (io-evt (lambda ()
(list (read-byte* i)
(read-byte* i))))
(lambda (bs)
(cons 'done bs)))))
(list 'done 97 97))
(check-equal? (sync (handle-evt (io-evt (lambda ()
(define i (open-input-bytes #"aa"))
(list (read-byte* i)
(read-byte* i))))
(lambda (bs)
(cons 'done bs))))
(list 'done 97 97))
(check-equal? (io-sync (lambda () (read-bytes* 2 (open-input-bytes #"aa"))))
(bytes 97 97)))
(define (read-frame*/one-byte-at-a-time p)
(match (read-byte* p)
[(? eof-object?) eof]
[b1 (match (read-byte* p)
[(? eof-object?) eof]
[b2 (integer-bytes->integer (bytes b1 b2) #f #t)])]))
(define (read-frame*/two-bytes-at-once p)
(define bs (read-bytes* 2 p))
(cond [(eof-object? bs) eof]
[(< (bytes-length bs) 2) eof]
[else (integer-bytes->integer bs #f #t)]))
(module+ test
(define (test-read-frame* read-frame*)
(check-equal? (let* ((i (open-input-bytes #"aabbff"))
(e (io-evt (lambda () (read-frame* i)))))
(for/list [(i 6)] (sync e)))
(list 24929 25186 26214 eof eof eof))
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"")))) eof)
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"a")))) eof)
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"aa")))) 24929)
(let ((A (slow-stream write:a2a))
(B (slow-stream write:1bb)))
(check-equal? (multi-io/3 (io-evt (lambda () (read-frame* A)))
(io-evt (lambda () (read-frame* B))))
(list 25186 24929)))
(let ((A (slow-stream write:a2aa2a))
(B (slow-stream write:1bb2bb)))
(check-equal? (multi-io/3 (io-evt (lambda () (read-frame* A)))
(io-evt (lambda () (read-frame* B))))
(list 25186 24929 25186 24929))))
(test-read-frame* read-frame*/one-byte-at-a-time)
(test-read-frame* read-frame*/two-bytes-at-once))
;; Note that this is not transactional! An `io-evt` can get *part-way*
;; through reading a frame without ever being selected, and the
;; consequences of this are observable externally.
;; But, that aside, now all we need is a way to reuse `read-frame` in
;; an `io-evt` context!
;;---------------------------------------------------------------------------
;; OK, let's finally explore the nuclear option of threads.
;; We must ensure that we can cleanly shut down a read thread when we
;; no longer want the input stream, whether or not the backing input
;; port is closed.
;; We must ensure that an exception in the read thread manifests next
;; time a read is performed.
;; We must ensure that the thread doesn't go ahead and read a
;; bajillion messages without being asked: that reading only happens
;; when we're definitely not *un*ready (hmm) for input.
(define executor (make-will-executor))
(void (thread (lambda () (let loop () (will-execute executor) (loop)))))
(define (read-frame-evt/4* i)
(define data-ch (make-channel))
(define termination-reason (void))
(define reader-thread
(thread (lambda ()
(with-handlers [((lambda (e) #t) (lambda (e) (set! termination-reason e)))]
(let loop ((backlog '()))
(define nack-evt (thread-receive))
(define (deliver item)
(sync (handle-evt nack-evt (lambda (_) (loop (list item))))
(handle-evt (channel-put-evt data-ch item) (lambda (_) (loop '())))))
(match backlog
['()
(sync (handle-evt nack-evt (lambda (_) (loop '())))
(handle-evt i (lambda (_)
(match (read-frame i)
[(? eof-object?) (void)] ;; terminate!
[item (deliver item)]))))]
[(list item)
(deliver item)]))))))
(define frame-evt
(nack-guard-evt (lambda (nack-evt)
(thread-send reader-thread nack-evt #f)
(choice-evt data-ch
(handle-evt (thread-dead-evt reader-thread)
(lambda (_)
(if (void? termination-reason) ;; ordinary eof
eof
(raise termination-reason))))))))
(will-register executor frame-evt (lambda (_) (break-thread reader-thread)))
(values reader-thread frame-evt))
(define (read-frame-evt/4 i)
(define-values (_thread evt) (read-frame-evt/4* i))
evt)
(module+ test
(check-equal? (multi-io/3 (read-frame-evt/4 (slow-stream write:a2a))
(read-frame-evt/4 (slow-stream write:1bb)))
(list 25186 24929))
(check-equal? (multi-io/3 (read-frame-evt/4 (slow-stream write:a2aa2a))
(read-frame-evt/4 (slow-stream write:1bb2bb)))
(list 25186 24929 25186 24929))
(check-equal? (let* ((i (open-input-bytes #"aabbff"))
(e (read-frame-evt/4 i)))
(for/list [(i 6)] (sync e)))
(list 24929 25186 26214 eof eof eof))
(let-values (((thd evt) (read-frame-evt/4* (open-input-bytes #"aabbff"))))
(check-false (thread-dead? thd))
(check-equal? (sync evt) 24929)
(collect-garbage)
(sleep 0.1) ;; give the will-executor a chance to run and the thread a chance to terminate.
(check-true (thread-dead? thd)))
)
;; Eeeccchhhhh, I guess that's not so bad after all...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment