Last active
May 22, 2019 14:57
-
-
Save tonyg/90c2c8406a24b5357aa44b8677089a90 to your computer and use it in GitHub Desktop.
Exploration of `replace-evt` in the context of getting a reliable signal that a complete frame of input is available.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket | |
;; Exploration of `replace-evt` in the context of getting a reliable | |
;; signal that a complete frame of input is available. | |
(module+ test (require rackunit)) | |
;; Imagine a protocol, based on a TCP socket, where every two adjacent | |
;; bytes sent by the server are to be treated together. That is, | |
;; fixed-size two-byte framing. | |
;; If you're not careful, a server sending *one* byte followed by a | |
;; long delay and then the subsequent byte (or EOF) can cause your | |
;; client to block, even if you are using asynchronous events to | |
;; multiplex multiple input sources. | |
;; The problem arises when programs use asynchronous events to detect | |
;; the availability of some portion of a packet, followed by use of | |
;; synchronous calls to retrieve the packet itself. Unfortunately, | |
;; this is a standard idiom for users of Racket's `evt`-based, | |
;; CML-style I/O multiplexing system. | |
;; `read-frame` reads the next two bytes from `p`, converting them to | |
;; an unsigned integer using big-endian order. If one or zero bytes | |
;; are available, followed by end-of-file, `read-frame` returns eof. | |
;; Otherwise, if fewer than two bytes are available, `read-frame` | |
;; blocks until either eof or more bytes arrive. | |
;; | |
(define (read-frame p) | |
(define bs (read-bytes 2 p)) | |
(cond [(eof-object? bs) eof] | |
[(< (bytes-length bs) 2) eof] | |
[else (integer-bytes->integer bs #f #t)])) | |
(module+ test | |
;; Some simple tests. | |
;; | |
(check-equal? (read-frame (open-input-bytes #"")) eof) | |
(check-equal? (read-frame (open-input-bytes #"a")) eof) | |
(check-equal? (read-frame (open-input-bytes #"aa")) 24929) | |
(define *PAUSE-UNIT* 0.05) | |
(define (time-thunk f) | |
(let* ((start (current-inexact-milliseconds)) | |
(result (f)) | |
(stop (current-inexact-milliseconds))) | |
(list (/ (- stop start) 1000.0) | |
result))) | |
;; To test blocking, we'll use multiple threads and insert | |
;; artificial delays. `check-io-characteristics` trickles bytes | |
;; through in a background thread, and reads frames in a foreground | |
;; thread. | |
;; | |
(define (check-io-characteristics writer reader expected-result expected-speed threshold) | |
(let-values (((i o) (make-pipe))) | |
(thread (lambda () | |
(writer o) | |
(close-output-port o))) | |
(match (time-thunk (lambda () (reader i))) | |
[(list t v) | |
(check-equal? v expected-result) | |
(match expected-speed | |
['fast (check-true (< t threshold) "Reader should be fast")] | |
['slow (check-true (> t threshold) "Reader should be slow")])]))) | |
;; One byte eventually followed by EOF should yield `#f`, slowly. | |
(define (write:a2 o) | |
(write-bytes #"a" o) | |
(sleep (* 2 *PAUSE-UNIT*))) | |
(check-io-characteristics write:a2 read-frame eof 'slow *PAUSE-UNIT*) | |
;; One byte eventually followed by another should yield a frame, but slowly. | |
(define (write:a2a o) | |
(write-bytes #"a" o) | |
(sleep (* 2 *PAUSE-UNIT*)) | |
(write-bytes #"a" o)) | |
(check-io-characteristics write:a2a read-frame 24929 'slow *PAUSE-UNIT*) | |
;; Three bytes eventually followed by another should yield one of the two frames immediately. | |
(define (write:bba2a o) | |
(write-bytes #"bba" o) | |
(sleep (* 2 *PAUSE-UNIT*)) | |
(write-bytes #"a" o)) | |
(check-io-characteristics write:bba2a read-frame 25186 'fast *PAUSE-UNIT*) | |
;; Three bytes eventually followed by another should yield both frames eventually. | |
(check-io-characteristics write:bba2a | |
(lambda (i) (list (read-frame i) (read-frame i))) | |
(list 25186 24929) | |
'slow | |
*PAUSE-UNIT*)) | |
(module+ test | |
;; Now to show the problem we're considering here, where a slow | |
;; (potentially malicious) input can hold up processing of other | |
;; inputs. | |
;; `slow-stream` runs `writer` in the background, and returns an | |
;; input-port reading from `writer`. | |
;; | |
(define (slow-stream writer) | |
(let-values (((i o) (make-pipe))) | |
(thread (lambda () (writer o) (close-output-port o))) | |
i)) | |
;; `multi-io` is like `check-io-characteristics` but for multiple | |
;; inputs. It doesn't explicitly check timing of events, instead | |
;; accumulating frames in order as they arrive and yielding the | |
;; final list. The order of frames in the list implicitly encodes | |
;; enough of the timing that we can check for the problem. | |
;; | |
(define (multi-io/1 i1 i2 reader) | |
(let loop ((enable1 #t) | |
(enable2 #t) | |
(frames '())) | |
(define (clause enabled i k) | |
(if enabled | |
(handle-evt i | |
(lambda (_) | |
(match (reader i) | |
[(? eof-object?) (k #f frames)] | |
[v (k #t (cons v frames))]))) | |
never-evt)) | |
(if (or enable1 enable2) | |
(sync (clause enable1 i1 (lambda (enable1 frames) (loop enable1 enable2 frames))) | |
(clause enable2 i2 (lambda (enable2 frames) (loop enable1 enable2 frames)))) | |
(reverse frames)))) | |
;; This first test shows the issue. We expect `(list 25186 24929)` | |
;; because the "bb" frame is available before the "aa" frame is | |
;; complete. However, the code will yield `(list 24929 25186)` | |
;; because it starts reading the "aa" frame and gets stuck waiting | |
;; for it to finish before it can return to check for other inputs! | |
;; | |
(define (write:1bb o) | |
(sleep *PAUSE-UNIT*) | |
(write-bytes #"bb" o)) | |
(check-equal? (multi-io/1 (slow-stream write:a2a) (slow-stream write:1bb) read-frame) | |
;; We really want the expected result to be (list 25186 24929), | |
;; but it is | |
(list 24929 25186) | |
;; as explained above. | |
) | |
;; We don't see the same with single-byte reads! The streams | |
;; interleave properly. | |
;; | |
(check-equal? (multi-io/1 (slow-stream write:a2a) (slow-stream write:1bb) read-byte) | |
(list 97 98 98 97)) | |
) | |
;; Let's try to use `replace-evt` to consume data as it comes | |
;; available, without blocking other streams. | |
(define (const-evt v) | |
(handle-evt always-evt (lambda (_) v))) | |
(define (read-frame-evt/1 p) | |
(let loop ((remaining 2) (accumulator '())) | |
(if (zero? remaining) | |
(const-evt (integer-bytes->integer (list->bytes (reverse accumulator)) #f #t)) | |
(replace-evt p | |
(lambda (_) | |
(match (read-byte p) | |
[(? eof-object?) (const-evt eof)] | |
[b (loop (- remaining 1) (cons b accumulator))])))))) | |
(module+ test | |
;; First, we'll try it on a single stream: | |
;; | |
(let ((i (slow-stream write:a2a))) | |
(check-equal? (sync (read-frame-evt/1 i)) 24929)) | |
;; That looks very promising! Does it work for multiple streams at once? | |
;; | |
(define (multi-io/2 i1 i2 evt-maker) | |
(let loop ((enable1 #t) | |
(enable2 #t) | |
(frames '())) | |
(define (clause enabled i k) | |
(if enabled | |
(handle-evt (evt-maker i) | |
(match-lambda | |
[(? eof-object?) (k #f frames)] | |
[v (k #t (cons v frames))])) | |
never-evt)) | |
(if (or enable1 enable2) | |
(sync (clause enable1 i1 (lambda (enable1 frames) (loop enable1 enable2 frames))) | |
(clause enable2 i2 (lambda (enable2 frames) (loop enable1 enable2 frames)))) | |
(reverse frames)))) | |
;; The answer is *no*. | |
;; | |
(check-equal? (multi-io/2 (slow-stream write:a2a) (slow-stream write:1bb) read-frame-evt/1) | |
;; We expect this: | |
;; (list 25186 24929) | |
;; but get: | |
(list 25186)) | |
;; | |
;; Each time `evt-maker` (i.e. `read-frame-evt/1`) is called, it | |
;; creates a fresh frame-reader. So when the "aa" frame is half-way | |
;; through, the "bb" frame completes, which makes the `multi-io/2` | |
;; loop go round to the top again, creating fresh frame-reader | |
;; events, discarding the built-up state in the "aa" frame-reader. | |
;; Subsequently, the "bb" reader completes with eof, and the "aa" | |
;; reader is left with the odd "a" byte alone, meaning it too yields | |
;; eof. | |
;; Let's try gathering frames by passing in pre-made read events, | |
;; instead of having an `evt-maker` to cons them up. | |
;; | |
(define (multi-io/3 evt1 evt2) | |
(let loop ((enable1 #t) | |
(enable2 #t) | |
(frames '())) | |
(define (clause enabled evt k) | |
(if enabled | |
(handle-evt evt | |
(match-lambda | |
[(? eof-object?) (k #f frames)] | |
[v (k #t (cons v frames))])) | |
never-evt)) | |
(if (or enable1 enable2) | |
(sync (clause enable1 evt1 (lambda (enable1 frames) (loop enable1 enable2 frames))) | |
(clause enable2 evt2 (lambda (enable2 frames) (loop enable1 enable2 frames)))) | |
(reverse frames)))) | |
(check-equal? (multi-io/3 (read-frame-evt/1 (slow-stream write:a2a)) | |
(read-frame-evt/1 (slow-stream write:1bb))) | |
;; We want (list 25186 24929), but we get | |
(list 25186)) | |
;; | |
;; It still doesn't work. Each time `sync` is called, it starts the | |
;; whole chain of `replace-evt`s from the beginning, even though | |
;; this time it's using the same actual event objects. | |
) | |
;; Let's try moving the state of the parse outside the `replace-evt` | |
;; and using mutation (!) instead of the quasi-functional approach | |
;; used in `read-frame-evt/1`. | |
;; | |
;; We will have to take special care to reset the internal state after | |
;; yielding our first value, because the same event object could be | |
;; used to parse *multiple* frames in succession! | |
;; | |
(define (read-frame-evt/2 p) | |
(define accumulator '()) | |
(define remaining 2) | |
(define self-evt | |
(replace-evt p | |
(lambda (_) | |
(if (zero? remaining) | |
(begin0 (const-evt (integer-bytes->integer | |
(list->bytes (reverse accumulator)) #f #t)) | |
(set! accumulator '()) | |
(set! remaining 2)) | |
(match (read-byte p) | |
[(? eof-object?) (const-evt eof)] | |
[b (set! remaining (- remaining 1)) | |
(set! accumulator (cons b accumulator)) | |
self-evt]))))) | |
self-evt) | |
(module+ test | |
;; At last! This seems to work: | |
;; | |
(check-equal? (multi-io/3 (read-frame-evt/2 (slow-stream write:a2a)) | |
(read-frame-evt/2 (slow-stream write:1bb))) | |
(list 25186 24929)) | |
(define (write:a2aa2a o) | |
(write:a2a o) | |
(write:a2a o)) | |
(define (write:1bb2bb o) | |
(write:1bb o) | |
(sleep *PAUSE-UNIT*) | |
(write:1bb o)) | |
;; ... or, at least, it works for reading SINGLE frames! For some | |
;; reason, we don't get the ordering we expect with multi-frame | |
;; streams that have embedded delays: | |
;; | |
(let ((A (slow-stream write:a2aa2a)) | |
(B (slow-stream write:1bb2bb))) | |
(check-equal? (multi-io/3 (read-frame-evt/2 A) | |
(read-frame-evt/2 B)) | |
;; We expect (list 25186 24929 25186 24929), but get | |
(list 24929 25186 25186 24929))) | |
;; Furthermore, not only is it not really doing the right thing, the | |
;; ergonomics are *horrible*. We have no way to write a parsing | |
;; routine so that it looks like the simple blocking implementation, | |
;; but works in a non-blocking context. | |
) | |
;; What if we push the state into the input-port, by peeking rather | |
;; than reading until the whole frame is ready? | |
;; | |
;; It might be able to be made to work, but this approach: | |
;; | |
;; (define (read-frame-evt/3 p) | |
;; (define self-evt | |
;; (replace-evt p | |
;; (lambda (_) | |
;; (define bs (make-bytes 2)) | |
;; (match (peek-bytes-avail! bs 0 #f p) | |
;; [(? eof-object?) (const-evt eof)] | |
;; [2 (begin0 (const-evt (integer-bytes->integer bs #f #t)) | |
;; (read-bytes 2 p))] | |
;; [_ self-evt])))) | |
;; self-evt) | |
;; | |
;; ... fails because `p` never becomes unready, so the whole thing | |
;; *spins* until a complete frame is available. | |
;; | |
;; (module+ test | |
;; (check-equal? (multi-io/3 (read-frame-evt/3 (slow-stream write:a2a)) | |
;; (read-frame-evt/3 (slow-stream write:1bb))) | |
;; (list 25186 24929))) | |
;; | |
;; Furthermore, the code itself is not ideal. It doesn't get us any | |
;; closer to having a direct approach to writing non-blocking parsers. | |
;; Also, the read-bytes to discard the already-seen input is ugly; we | |
;; could try port-commit-peeked to fix that, but its interface is | |
;; complicated. | |
;; OK. I feel like the nuclear option is to Just Use Threads, and that | |
;; that option is problematic because of thread management, | |
;; exception-handling, race conditions etc etc. So before I go there, | |
;; let's just see if delimited continuations (in the form of | |
;; "algebraic" effects!) might give us what we need. | |
;; You'll need to have the "effects" package from | |
;; https://github.com/tonyg/racket-effects installed for this to work. | |
(require effects) | |
(define *tag* (make-effect-tag 'io)) | |
;; This version doesn't terminate, because it ends by yielding the | |
;; very first frame read and never resets its state to read any | |
;; subsequent ones: | |
;; | |
;; (define (io-evt proc) | |
;; (with-effect #:deep *tag* k | |
;; ([evt (replace-evt evt (once-lambda (v) (k v)))]) | |
;; (const-evt (proc)))) | |
;; | |
;; where | |
;; | |
;; (define-syntax-rule (once-lambda args body ...) | |
;; (let ((ran? #f) | |
;; (result #f)) | |
;; (lambda args | |
;; (when (not ran?) | |
;; (let ((answer (begin body ...))) | |
;; (set! ran? #t) | |
;; (set! result answer))) | |
;; result))) | |
;; This version works, but suffers a space leak with each subsequent | |
;; round: | |
;; | |
;; (define (io-evt proc) | |
;; (with-effect #:deep *tag* k | |
;; ([evt (replace-evt evt (once-lambda (v) (k v)))]) | |
;; (let ((result (proc))) | |
;; (define next-round (once-lambda () (io-evt proc))) | |
;; (define consumed? #f) | |
;; (guard-evt (lambda () | |
;; (if consumed? | |
;; (next-round) | |
;; (begin0 (const-evt result) | |
;; (set! consumed? #t)))))))) | |
;; This version works, and by reusing the `guard-evt` object, avoids | |
;; the space leak: | |
;; | |
(define (io-evt proc) | |
(define state #f) | |
(define yielding-evt (guard-evt (lambda () (state)))) | |
(define (goto! next-state) | |
(set! state next-state) | |
yielding-evt) | |
(define (run-proc-with-effects) | |
(with-effect #:deep *tag* k | |
([evt (goto! (lambda () (replace-evt evt k)))]) ;; "in-progress" state | |
(let ((result (proc))) | |
(goto! (lambda () ;; "ready" state | |
(set! state run-proc-with-effects) | |
(const-evt result)))))) | |
(goto! run-proc-with-effects)) | |
(define (io-sync proc) | |
(sync (io-evt proc))) | |
(define io-wait! (perform *tag*)) | |
(define (io-evt->proc ctor) | |
(lambda args | |
(io-wait! (apply ctor args)))) | |
(define (read-byte* in) | |
(when (not (byte-ready? in)) | |
(io-wait! in)) | |
(read-byte in)) | |
(define read-bytes* (io-evt->proc read-bytes-evt)) | |
(module+ test | |
(check-equal? (let ((i (open-input-bytes #"aa"))) | |
(sync (handle-evt (io-evt (lambda () | |
(list (read-byte* i) | |
(read-byte* i)))) | |
(lambda (bs) | |
(cons 'done bs))))) | |
(list 'done 97 97)) | |
(check-equal? (sync (handle-evt (io-evt (lambda () | |
(define i (open-input-bytes #"aa")) | |
(list (read-byte* i) | |
(read-byte* i)))) | |
(lambda (bs) | |
(cons 'done bs)))) | |
(list 'done 97 97)) | |
(check-equal? (io-sync (lambda () (read-bytes* 2 (open-input-bytes #"aa")))) | |
(bytes 97 97))) | |
(define (read-frame*/one-byte-at-a-time p) | |
(match (read-byte* p) | |
[(? eof-object?) eof] | |
[b1 (match (read-byte* p) | |
[(? eof-object?) eof] | |
[b2 (integer-bytes->integer (bytes b1 b2) #f #t)])])) | |
(define (read-frame*/two-bytes-at-once p) | |
(define bs (read-bytes* 2 p)) | |
(cond [(eof-object? bs) eof] | |
[(< (bytes-length bs) 2) eof] | |
[else (integer-bytes->integer bs #f #t)])) | |
(module+ test | |
(define (test-read-frame* read-frame*) | |
(check-equal? (let* ((i (open-input-bytes #"aabbff")) | |
(e (io-evt (lambda () (read-frame* i))))) | |
(for/list [(i 6)] (sync e))) | |
(list 24929 25186 26214 eof eof eof)) | |
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"")))) eof) | |
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"a")))) eof) | |
(check-equal? (io-sync (lambda () (read-frame* (open-input-bytes #"aa")))) 24929) | |
(let ((A (slow-stream write:a2a)) | |
(B (slow-stream write:1bb))) | |
(check-equal? (multi-io/3 (io-evt (lambda () (read-frame* A))) | |
(io-evt (lambda () (read-frame* B)))) | |
(list 25186 24929))) | |
(let ((A (slow-stream write:a2aa2a)) | |
(B (slow-stream write:1bb2bb))) | |
(check-equal? (multi-io/3 (io-evt (lambda () (read-frame* A))) | |
(io-evt (lambda () (read-frame* B)))) | |
(list 25186 24929 25186 24929)))) | |
(test-read-frame* read-frame*/one-byte-at-a-time) | |
(test-read-frame* read-frame*/two-bytes-at-once)) | |
;; Note that this is not transactional! An `io-evt` can get *part-way* | |
;; through reading a frame without ever being selected, and the | |
;; consequences of this are observable externally. | |
;; But, that aside, now all we need is a way to reuse `read-frame` in | |
;; an `io-evt` context! | |
;;--------------------------------------------------------------------------- | |
;; OK, let's finally explore the nuclear option of threads. | |
;; We must ensure that we can cleanly shut down a read thread when we | |
;; no longer want the input stream, whether or not the backing input | |
;; port is closed. | |
;; We must ensure that an exception in the read thread manifests next | |
;; time a read is performed. | |
;; We must ensure that the thread doesn't go ahead and read a | |
;; bajillion messages without being asked: that reading only happens | |
;; when we're definitely not *un*ready (hmm) for input. | |
(define executor (make-will-executor)) | |
(void (thread (lambda () (let loop () (will-execute executor) (loop))))) | |
(define (read-frame-evt/4* i) | |
(define data-ch (make-channel)) | |
(define termination-reason (void)) | |
(define reader-thread | |
(thread (lambda () | |
(with-handlers [((lambda (e) #t) (lambda (e) (set! termination-reason e)))] | |
(let loop ((backlog '())) | |
(define nack-evt (thread-receive)) | |
(define (deliver item) | |
(sync (handle-evt nack-evt (lambda (_) (loop (list item)))) | |
(handle-evt (channel-put-evt data-ch item) (lambda (_) (loop '()))))) | |
(match backlog | |
['() | |
(sync (handle-evt nack-evt (lambda (_) (loop '()))) | |
(handle-evt i (lambda (_) | |
(match (read-frame i) | |
[(? eof-object?) (void)] ;; terminate! | |
[item (deliver item)]))))] | |
[(list item) | |
(deliver item)])))))) | |
(define frame-evt | |
(nack-guard-evt (lambda (nack-evt) | |
(thread-send reader-thread nack-evt #f) | |
(choice-evt data-ch | |
(handle-evt (thread-dead-evt reader-thread) | |
(lambda (_) | |
(if (void? termination-reason) ;; ordinary eof | |
eof | |
(raise termination-reason)))))))) | |
(will-register executor frame-evt (lambda (_) (break-thread reader-thread))) | |
(values reader-thread frame-evt)) | |
(define (read-frame-evt/4 i) | |
(define-values (_thread evt) (read-frame-evt/4* i)) | |
evt) | |
(module+ test | |
(check-equal? (multi-io/3 (read-frame-evt/4 (slow-stream write:a2a)) | |
(read-frame-evt/4 (slow-stream write:1bb))) | |
(list 25186 24929)) | |
(check-equal? (multi-io/3 (read-frame-evt/4 (slow-stream write:a2aa2a)) | |
(read-frame-evt/4 (slow-stream write:1bb2bb))) | |
(list 25186 24929 25186 24929)) | |
(check-equal? (let* ((i (open-input-bytes #"aabbff")) | |
(e (read-frame-evt/4 i))) | |
(for/list [(i 6)] (sync e))) | |
(list 24929 25186 26214 eof eof eof)) | |
(let-values (((thd evt) (read-frame-evt/4* (open-input-bytes #"aabbff")))) | |
(check-false (thread-dead? thd)) | |
(check-equal? (sync evt) 24929) | |
(collect-garbage) | |
(sleep 0.1) ;; give the will-executor a chance to run and the thread a chance to terminate. | |
(check-true (thread-dead? thd))) | |
) | |
;; Eeeccchhhhh, I guess that's not so bad after all... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment