Skip to content

Instantly share code, notes, and snippets.

@greghendershott
Created August 3, 2012 22:52
Show Gist options
  • Save greghendershott/3252353 to your computer and use it in GitHub Desktop.
Save greghendershott/3252353 to your computer and use it in GitHub Desktop.
Problem trying to do non-blocking I/O and reactor loop
#lang racket
;; 1. Run this:
;;
;; racket bug-example.rkt
;;
;; 2. Run ApacheBench:
;;
;; $ ab -n 1000 -c 20 http://localhost:8081/
;;
;; See the comments near the end of this file for a description of 3
;; problems.
;;
;; Environment:
;;
;; 1. Mac OS X Lion 10.7.4
;;
;; 2. ApacheBench Version 2.3 <$Revision: 117807 (patched to work on
;; OS X Lion using Homebrew formula as described here:
;; <http://simon.heimlicher.com/articles/2012/07/08/fix-apache-bench-ab-on-os-x-lion>
;;
;; 3. Racket 5.2.1. (Same result with 5.2, which I tried on the off
;; chance that the select => epoll/kqueue change in 5.2.1 was relevant
;; -- but seems not.)
(define main-thread (current-thread))
(define h (make-hasheq)) ;exact-integer? -> evt?
(define next-id 0)
(define (events-to-sync)
;; (-> (listof evt?))
(hash-values h))
(struct sync-result
(id ;hash key in `h'
v ;`sync' result value; varies by event type
k)) ;the handler procedure, (any/c -> any), called with `v'
;; Add an event and a handler (a function to be called when the event
;; is ready). Return an id, which is an opaque value that can be used
;; with `remove-handler!'. THIS IS NOT THREAD-SAFE: See
;; call-from-main-thread instead.
(define (add-handler! e k)
;; (evt? (any/c . -> . any) . -> . any/c)
(unless (equal? (current-thread) main-thread)
(log-error "add-handler! NOT called from main-thread"))
(define id next-id)
;;(log-debug (format "add-handler! id=~a" id))
(set! next-id (add1 next-id))
(hash-set! h
id
(wrap-evt e
(lambda (v) (sync-result id v k))))
id)
(define (remove-handler! id)
;; (any/c . -> . void)
(void (unless (equal? id thread-mailbox-handler-id)
;;(log-debug (format "remove-handler! id=~a" id))
(hash-remove! h id))))
;; Functions that don't offer non-blocking (for example `tcp-connect')
;; must be run using a worker thread. When the worker thread has a
;; result, it could cause the handler procedure to be called soon by
;; doing (add-handler! always-evt f). However, add-handler! is NOT
;; thread-safe. Instead, the worker thread uses
;; `call-from-main-thread' to ask the main thread to call
;; `add-handler!'. How `call-main-thread' does this is using
;; `thread-send'.
(define thread-mailbox-handler-id #f)
(define (maybe-add-thread-mailbox-evt!)
(unless thread-mailbox-handler-id ;need to init?
(define (thread-mailbox-event-handler _)
(displayln "thread mailbox handler called")
(add-handler! always-evt (thread-receive)))
(set! thread-mailbox-handler-id
(add-handler! (thread-receive-evt) thread-mailbox-event-handler))))
;; Although provided for use in a worker thread, also safe to call
;; from the main thread.
(define/contract (call-from-main-thread f)
((any/c . -> . any) . -> . void)
(thread-send main-thread f)
(void))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; the reactor loop
;; Note: This will run forever now that we have a thread mailbox
;; handler that we never remove, so events-to-sync will never
;; be the empty list.
(define (reactor-loop)
(maybe-add-thread-mailbox-evt!) ;initialize
(define es (events-to-sync))
(unless (empty? es)
;; (match-define (sync-result id v k) (apply sync es))
;; (k v)
;; (remove-handler! id)
(define e (apply sync es))
(cond
[(sync-result? e)
((sync-result-k e) (sync-result-v e))
(remove-handler! (sync-result-id e))]
[else
(log-warning "reactor-loop: unknown event type from apply")])
(reactor-loop)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define/contract (read-bytes-avail!/non-block b in k)
((and/c bytes? (not/c immutable?))
input-port?
((or/c eof-object? exact-nonnegative-integer?) . -> . any)
. -> . void)
(add-handler! (read-bytes-avail!-evt b in) k)
(void))
(define/contract (write-string/non-block s out k)
(string? output-port? (exact-nonnegative-integer? . -> . any) . -> . void)
(void (add-handler! (write-bytes-avail-evt (string->bytes/utf-8 s) out) k)))
(define/contract (tcp-accept/non-block listener k)
(tcp-listener? (input-port? output-port? . -> . any) . -> . void)
(void (add-handler! listener (lambda (listener)
(define-values (i o) (tcp-accept listener))
(k i o)))))
(define/contract (tcp-serve/non-block k
#:port [port 80]
#:hostname [hostname #f]
#:reuse-port? [reuse-port? #f]
#:limit-memory/MB [limit-memory/MB 50]
#:max-wait [max-wait 40]
#:connection-timeout-secs [timeout 60])
(((input-port? output-port? (-> any) . -> . any))
(#:port exact-nonnegative-integer?
#:hostname (or/c #f string?)
#:reuse-port? boolean?
#:limit-memory/MB exact-positive-integer?
#:max-wait exact-positive-integer?
#:connection-timeout-secs exact-positive-integer?)
. ->* .
void)
(let ([listener (tcp-listen port max-wait reuse-port? hostname)])
(with-handlers ([exn? fail-handler])
(displayln (format "serve: port=~a hostname=~a timeout=~a"
port hostname timeout))
(define (accept)
(tcp-accept/non-block listener
(lambda (in out)
(add-handler!
always-evt
(lambda (_)
;; (file-stream-buffer-mode in 'none)
;; (file-stream-buffer-mode out 'none)
(with-handlers ([exn? fail-handler])
(k in
out
(lambda ()
(close-input-port in)
(close-output-port out)
)))))
(accept)
)))
(accept)
(reactor-loop))
(log-info "closing tcp listener")
(tcp-close listener))
(void))
(define (fail-handler exn)
(log-error (exn->string exn #t)))
(define (exn->string exn stack-trace?)
;; (exn? boolean? . -> . string?)
(define (stack-trace-string exn)
(format "~a"
(map (lambda (x)
(format "'~a' ~a ~a\n"
(if (car x) (car x) "")
(if (cdr x) (srcloc-source (cdr x)) "")
(if (cdr x) (srcloc-line (cdr x)) "")))
(continuation-mark-set->context
(exn-continuation-marks exn)))))
(format "Thread will exit due to exception: \"~a\"~a"
(exn-message exn)
(if stack-trace?
(string-append "\nStack:\n" (stack-trace-string exn))
"")))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(tcp-serve/non-block
#:port 8081
#:reuse-port? #f
#:max-wait 20
(lambda (in out k)
(define (read-request) ;read until first double EOL
(define cum #"")
(define b (make-bytes 1024 0))
(define (read-more!)
(read-bytes-avail!/non-block
b in (lambda (e)
;; PROBLEM 1: Very often (many, many time) there will
;; be an error, "peek-bytes-avail!: input port is
;; closed".
;;
;; I also tried using regexp-match-evt but got a simlar
;; error re the input port being closed.
;;
;; PROBLEM 2: Sometimes (very rarely, with `ab -c 20' this
;; will be a channel? instead of the documented
;; eof-object? or integer?
;;
;; PROBLEM 3: Even more rarely, racket will abend with
;; "Process scheme segmentation fault: 11".
(cond
[(eof-object? e)
(write-response cum)]
[else
(set! cum (bytes-append cum (subbytes b 0 e)))
(cond
[(regexp-match? #rx"(?:\r\n\r\n|\n\n|\r\r)$" cum)
(write-response cum)]
[else (read-more!)])]))))
(read-more!))
(define (write-response req)
;;(log-debug (format "write-response. req=~a" req))
(write-string/non-block (string-append "HTTP/1.0 200 OK\r\n"
"Connection: close\r\n"
"\r\n"
"<html></html>")
out
(lambda (_) (k))))
(read-request)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment