Created
August 3, 2012 22:52
-
-
Save greghendershott/3252353 to your computer and use it in GitHub Desktop.
Problem trying to do non-blocking I/O and reactor loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket | |
;; 1. Run this: | |
;; | |
;; racket bug-example.rkt | |
;; | |
;; 2. Run ApacheBench: | |
;; | |
;; $ ab -n 1000 -c 20 http://localhost:8081/ | |
;; | |
;; See the comments near the end of this file for a description of 3 | |
;; problems. | |
;; | |
;; Environment: | |
;; | |
;; 1. Mac OS X Lion 10.7.4 | |
;; | |
;; 2. ApacheBench Version 2.3 <$Revision: 117807 (patched to work on | |
;; OS X Lion using Homebrew formula as described here: | |
;; <http://simon.heimlicher.com/articles/2012/07/08/fix-apache-bench-ab-on-os-x-lion> | |
;; | |
;; 3. Racket 5.2.1. (Same result with 5.2, which I tried on the off | |
;; chance that the select => epoll/kqueue change in 5.2.1 was relevant | |
;; -- but seems not.) | |
(define main-thread (current-thread)) | |
(define h (make-hasheq)) ;exact-integer? -> evt? | |
(define next-id 0) | |
(define (events-to-sync) | |
;; (-> (listof evt?)) | |
(hash-values h)) | |
(struct sync-result | |
(id ;hash key in `h' | |
v ;`sync' result value; varies by event type | |
k)) ;the handler procedure, (any/c -> any), called with `v' | |
;; Add an event and a handler (a function to be called when the event | |
;; is ready). Return an id, which is an opaque value that can be used | |
;; with `remove-handler!'. THIS IS NOT THREAD-SAFE: See | |
;; call-from-main-thread instead. | |
(define (add-handler! e k) | |
;; (evt? (any/c . -> . any) . -> . any/c) | |
(unless (equal? (current-thread) main-thread) | |
(log-error "add-handler! NOT called from main-thread")) | |
(define id next-id) | |
;;(log-debug (format "add-handler! id=~a" id)) | |
(set! next-id (add1 next-id)) | |
(hash-set! h | |
id | |
(wrap-evt e | |
(lambda (v) (sync-result id v k)))) | |
id) | |
(define (remove-handler! id) | |
;; (any/c . -> . void) | |
(void (unless (equal? id thread-mailbox-handler-id) | |
;;(log-debug (format "remove-handler! id=~a" id)) | |
(hash-remove! h id)))) | |
;; Functions that don't offer non-blocking (for example `tcp-connect') | |
;; must be run using a worker thread. When the worker thread has a | |
;; result, it could cause the handler procedure to be called soon by | |
;; doing (add-handler! always-evt f). However, add-handler! is NOT | |
;; thread-safe. Instead, the worker thread uses | |
;; `call-from-main-thread' to ask the main thread to call | |
;; `add-handler!'. How `call-main-thread' does this is using | |
;; `thread-send'. | |
(define thread-mailbox-handler-id #f) | |
(define (maybe-add-thread-mailbox-evt!) | |
(unless thread-mailbox-handler-id ;need to init? | |
(define (thread-mailbox-event-handler _) | |
(displayln "thread mailbox handler called") | |
(add-handler! always-evt (thread-receive))) | |
(set! thread-mailbox-handler-id | |
(add-handler! (thread-receive-evt) thread-mailbox-event-handler)))) | |
;; Although provided for use in a worker thread, also safe to call | |
;; from the main thread. | |
(define/contract (call-from-main-thread f) | |
((any/c . -> . any) . -> . void) | |
(thread-send main-thread f) | |
(void)) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; the reactor loop | |
;; Note: This will run forever now that we have a thread mailbox | |
;; handler that we never remove, so events-to-sync will never | |
;; be the empty list. | |
(define (reactor-loop) | |
(maybe-add-thread-mailbox-evt!) ;initialize | |
(define es (events-to-sync)) | |
(unless (empty? es) | |
;; (match-define (sync-result id v k) (apply sync es)) | |
;; (k v) | |
;; (remove-handler! id) | |
(define e (apply sync es)) | |
(cond | |
[(sync-result? e) | |
((sync-result-k e) (sync-result-v e)) | |
(remove-handler! (sync-result-id e))] | |
[else | |
(log-warning "reactor-loop: unknown event type from apply")]) | |
(reactor-loop))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(define/contract (read-bytes-avail!/non-block b in k) | |
((and/c bytes? (not/c immutable?)) | |
input-port? | |
((or/c eof-object? exact-nonnegative-integer?) . -> . any) | |
. -> . void) | |
(add-handler! (read-bytes-avail!-evt b in) k) | |
(void)) | |
(define/contract (write-string/non-block s out k) | |
(string? output-port? (exact-nonnegative-integer? . -> . any) . -> . void) | |
(void (add-handler! (write-bytes-avail-evt (string->bytes/utf-8 s) out) k))) | |
(define/contract (tcp-accept/non-block listener k) | |
(tcp-listener? (input-port? output-port? . -> . any) . -> . void) | |
(void (add-handler! listener (lambda (listener) | |
(define-values (i o) (tcp-accept listener)) | |
(k i o))))) | |
(define/contract (tcp-serve/non-block k | |
#:port [port 80] | |
#:hostname [hostname #f] | |
#:reuse-port? [reuse-port? #f] | |
#:limit-memory/MB [limit-memory/MB 50] | |
#:max-wait [max-wait 40] | |
#:connection-timeout-secs [timeout 60]) | |
(((input-port? output-port? (-> any) . -> . any)) | |
(#:port exact-nonnegative-integer? | |
#:hostname (or/c #f string?) | |
#:reuse-port? boolean? | |
#:limit-memory/MB exact-positive-integer? | |
#:max-wait exact-positive-integer? | |
#:connection-timeout-secs exact-positive-integer?) | |
. ->* . | |
void) | |
(let ([listener (tcp-listen port max-wait reuse-port? hostname)]) | |
(with-handlers ([exn? fail-handler]) | |
(displayln (format "serve: port=~a hostname=~a timeout=~a" | |
port hostname timeout)) | |
(define (accept) | |
(tcp-accept/non-block listener | |
(lambda (in out) | |
(add-handler! | |
always-evt | |
(lambda (_) | |
;; (file-stream-buffer-mode in 'none) | |
;; (file-stream-buffer-mode out 'none) | |
(with-handlers ([exn? fail-handler]) | |
(k in | |
out | |
(lambda () | |
(close-input-port in) | |
(close-output-port out) | |
))))) | |
(accept) | |
))) | |
(accept) | |
(reactor-loop)) | |
(log-info "closing tcp listener") | |
(tcp-close listener)) | |
(void)) | |
(define (fail-handler exn) | |
(log-error (exn->string exn #t))) | |
(define (exn->string exn stack-trace?) | |
;; (exn? boolean? . -> . string?) | |
(define (stack-trace-string exn) | |
(format "~a" | |
(map (lambda (x) | |
(format "'~a' ~a ~a\n" | |
(if (car x) (car x) "") | |
(if (cdr x) (srcloc-source (cdr x)) "") | |
(if (cdr x) (srcloc-line (cdr x)) ""))) | |
(continuation-mark-set->context | |
(exn-continuation-marks exn))))) | |
(format "Thread will exit due to exception: \"~a\"~a" | |
(exn-message exn) | |
(if stack-trace? | |
(string-append "\nStack:\n" (stack-trace-string exn)) | |
""))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(tcp-serve/non-block | |
#:port 8081 | |
#:reuse-port? #f | |
#:max-wait 20 | |
(lambda (in out k) | |
(define (read-request) ;read until first double EOL | |
(define cum #"") | |
(define b (make-bytes 1024 0)) | |
(define (read-more!) | |
(read-bytes-avail!/non-block | |
b in (lambda (e) | |
;; PROBLEM 1: Very often (many, many time) there will | |
;; be an error, "peek-bytes-avail!: input port is | |
;; closed". | |
;; | |
;; I also tried using regexp-match-evt but got a simlar | |
;; error re the input port being closed. | |
;; | |
;; PROBLEM 2: Sometimes (very rarely, with `ab -c 20' this | |
;; will be a channel? instead of the documented | |
;; eof-object? or integer? | |
;; | |
;; PROBLEM 3: Even more rarely, racket will abend with | |
;; "Process scheme segmentation fault: 11". | |
(cond | |
[(eof-object? e) | |
(write-response cum)] | |
[else | |
(set! cum (bytes-append cum (subbytes b 0 e))) | |
(cond | |
[(regexp-match? #rx"(?:\r\n\r\n|\n\n|\r\r)$" cum) | |
(write-response cum)] | |
[else (read-more!)])])))) | |
(read-more!)) | |
(define (write-response req) | |
;;(log-debug (format "write-response. req=~a" req)) | |
(write-string/non-block (string-append "HTTP/1.0 200 OK\r\n" | |
"Connection: close\r\n" | |
"\r\n" | |
"<html></html>") | |
out | |
(lambda (_) (k)))) | |
(read-request))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment