public
Created

Problem trying to do non-blocking I/O and reactor loop

  • Download Gist
bug-example.rkt
Racket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
#lang racket
 
;; 1. Run this:
;;
;; racket bug-example.rkt
;;
;; 2. Run ApacheBench:
;;
;; $ ab -n 1000 -c 20 http://localhost:8081/
;;
;; See the comments near the end of this file for a description of 3
;; problems.
;;
;; Environment:
;;
;; 1. Mac OS X Lion 10.7.4
;;
;; 2. ApacheBench Version 2.3 <$Revision: 117807 (patched to work on
;; OS X Lion using Homebrew formula as described here:
;; <http://simon.heimlicher.com/articles/2012/07/08/fix-apache-bench-ab-on-os-x-lion>
;;
;; 3. Racket 5.2.1. (Same result with 5.2, which I tried on the off
;; chance that the select => epoll/kqueue change in 5.2.1 was relevant
;; -- but seems not.)
 
(define main-thread (current-thread))
 
(define h (make-hasheq)) ;exact-integer? -> evt?
(define next-id 0)
 
(define (events-to-sync)
;; (-> (listof evt?))
(hash-values h))
 
(struct sync-result
(id ;hash key in `h'
v ;`sync' result value; varies by event type
k)) ;the handler procedure, (any/c -> any), called with `v'
 
;; Add an event and a handler (a function to be called when the event
;; is ready). Return an id, which is an opaque value that can be used
;; with `remove-handler!'. THIS IS NOT THREAD-SAFE: See
;; call-from-main-thread instead.
(define (add-handler! e k)
;; (evt? (any/c . -> . any) . -> . any/c)
(unless (equal? (current-thread) main-thread)
(log-error "add-handler! NOT called from main-thread"))
(define id next-id)
;;(log-debug (format "add-handler! id=~a" id))
(set! next-id (add1 next-id))
(hash-set! h
id
(wrap-evt e
(lambda (v) (sync-result id v k))))
id)
 
(define (remove-handler! id)
;; (any/c . -> . void)
(void (unless (equal? id thread-mailbox-handler-id)
;;(log-debug (format "remove-handler! id=~a" id))
(hash-remove! h id))))
 
;; Functions that don't offer non-blocking (for example `tcp-connect')
;; must be run using a worker thread. When the worker thread has a
;; result, it could cause the handler procedure to be called soon by
;; doing (add-handler! always-evt f). However, add-handler! is NOT
;; thread-safe. Instead, the worker thread uses
;; `call-from-main-thread' to ask the main thread to call
;; `add-handler!'. How `call-main-thread' does this is using
;; `thread-send'.
(define thread-mailbox-handler-id #f)
(define (maybe-add-thread-mailbox-evt!)
(unless thread-mailbox-handler-id ;need to init?
(define (thread-mailbox-event-handler _)
(displayln "thread mailbox handler called")
(add-handler! always-evt (thread-receive)))
(set! thread-mailbox-handler-id
(add-handler! (thread-receive-evt) thread-mailbox-event-handler))))
 
;; Although provided for use in a worker thread, also safe to call
;; from the main thread.
(define/contract (call-from-main-thread f)
((any/c . -> . any) . -> . void)
(thread-send main-thread f)
(void))
 
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; the reactor loop
 
;; Note: This will run forever now that we have a thread mailbox
;; handler that we never remove, so events-to-sync will never
;; be the empty list.
(define (reactor-loop)
(maybe-add-thread-mailbox-evt!) ;initialize
(define es (events-to-sync))
(unless (empty? es)
;; (match-define (sync-result id v k) (apply sync es))
;; (k v)
;; (remove-handler! id)
(define e (apply sync es))
(cond
[(sync-result? e)
((sync-result-k e) (sync-result-v e))
(remove-handler! (sync-result-id e))]
[else
(log-warning "reactor-loop: unknown event type from apply")])
(reactor-loop)))
 
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
(define/contract (read-bytes-avail!/non-block b in k)
((and/c bytes? (not/c immutable?))
input-port?
((or/c eof-object? exact-nonnegative-integer?) . -> . any)
. -> . void)
(add-handler! (read-bytes-avail!-evt b in) k)
(void))
 
(define/contract (write-string/non-block s out k)
(string? output-port? (exact-nonnegative-integer? . -> . any) . -> . void)
(void (add-handler! (write-bytes-avail-evt (string->bytes/utf-8 s) out) k)))
 
(define/contract (tcp-accept/non-block listener k)
(tcp-listener? (input-port? output-port? . -> . any) . -> . void)
(void (add-handler! listener (lambda (listener)
(define-values (i o) (tcp-accept listener))
(k i o)))))
 
(define/contract (tcp-serve/non-block k
#:port [port 80]
#:hostname [hostname #f]
#:reuse-port? [reuse-port? #f]
#:limit-memory/MB [limit-memory/MB 50]
#:max-wait [max-wait 40]
#:connection-timeout-secs [timeout 60])
(((input-port? output-port? (-> any) . -> . any))
(#:port exact-nonnegative-integer?
#:hostname (or/c #f string?)
#:reuse-port? boolean?
#:limit-memory/MB exact-positive-integer?
#:max-wait exact-positive-integer?
#:connection-timeout-secs exact-positive-integer?)
. ->* .
void)
 
(let ([listener (tcp-listen port max-wait reuse-port? hostname)])
(with-handlers ([exn? fail-handler])
(displayln (format "serve: port=~a hostname=~a timeout=~a"
port hostname timeout))
(define (accept)
(tcp-accept/non-block listener
(lambda (in out)
(add-handler!
always-evt
(lambda (_)
;; (file-stream-buffer-mode in 'none)
;; (file-stream-buffer-mode out 'none)
(with-handlers ([exn? fail-handler])
(k in
out
(lambda ()
(close-input-port in)
(close-output-port out)
)))))
(accept)
)))
(accept)
(reactor-loop))
(log-info "closing tcp listener")
(tcp-close listener))
(void))
 
(define (fail-handler exn)
(log-error (exn->string exn #t)))
 
(define (exn->string exn stack-trace?)
;; (exn? boolean? . -> . string?)
(define (stack-trace-string exn)
(format "~a"
(map (lambda (x)
(format "'~a' ~a ~a\n"
(if (car x) (car x) "")
(if (cdr x) (srcloc-source (cdr x)) "")
(if (cdr x) (srcloc-line (cdr x)) "")))
(continuation-mark-set->context
(exn-continuation-marks exn)))))
(format "Thread will exit due to exception: \"~a\"~a"
(exn-message exn)
(if stack-trace?
(string-append "\nStack:\n" (stack-trace-string exn))
"")))
 
 
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
(tcp-serve/non-block
#:port 8081
#:reuse-port? #f
#:max-wait 20
(lambda (in out k)
(define (read-request) ;read until first double EOL
(define cum #"")
(define b (make-bytes 1024 0))
(define (read-more!)
(read-bytes-avail!/non-block
b in (lambda (e)
;; PROBLEM 1: Very often (many, many time) there will
;; be an error, "peek-bytes-avail!: input port is
;; closed".
;;
;; I also tried using regexp-match-evt but got a simlar
;; error re the input port being closed.
;;
;; PROBLEM 2: Sometimes (very rarely, with `ab -c 20' this
;; will be a channel? instead of the documented
;; eof-object? or integer?
;;
;; PROBLEM 3: Even more rarely, racket will abend with
;; "Process scheme segmentation fault: 11".
(cond
[(eof-object? e)
(write-response cum)]
[else
(set! cum (bytes-append cum (subbytes b 0 e)))
(cond
[(regexp-match? #rx"(?:\r\n\r\n|\n\n|\r\r)$" cum)
(write-response cum)]
[else (read-more!)])]))))
(read-more!))
(define (write-response req)
;;(log-debug (format "write-response. req=~a" req))
(write-string/non-block (string-append "HTTP/1.0 200 OK\r\n"
"Connection: close\r\n"
"\r\n"
"<html></html>")
out
(lambda (_) (k))))
(read-request)))

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.