Skip to content

Instantly share code, notes, and snippets.

@NalaGinrut
Created June 30, 2017 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NalaGinrut/5a5d08ce7a9b6d5610f73dd269a6f242 to your computer and use it in GitHub Desktop.
Save NalaGinrut/5a5d08ce7a9b6d5610f73dd269a6f242 to your computer and use it in GitHub Desktop.
(use-modules (artanis server epoll)
(artanis utils)
(ice-9 rdelim)
(ice-9 match)
(ice-9 suspendable-ports)
(ice-9 control))
(install-suspendable-ports!)
(define (call-with-sigint x y) (%call-with-sigint x y))
(define %call-with-sigint
(if (not (provided? 'posix))
(lambda (thunk handler-thunk) (thunk))
(lambda (thunk handler-thunk)
(let ((handler #f))
(catch 'interrupt
(lambda ()
(dynamic-wind
(lambda ()
(set! handler
(sigaction SIGINT (lambda (sig) (throw 'interrupt)))))
thunk
(lambda ()
(if handler
;; restore Scheme handler, SIG_IGN or SIG_DFL.
(sigaction SIGINT (car handler) (cdr handler))
;; restore original C handler.
(sigaction SIGINT #f)))))
(lambda (k . _) (handler-thunk)))))))
(define *error-event* (logior EPOLLRDHUP EPOLLHUP))
(define *read-event* EPOLLIN)
(define (gen-read-event) (logior *read-event* EPOLLET))
(define *rw-event* (logior EPOLLIN EPOLLOUT))
(define (gen-rw-event) (logior *error-event* EPOLLET *rw-event*))
(define *write-event* EPOLLOUT)
(define (gen-write-event) (logior *error-event* EPOLLET *write-event*))
(define *work-table* (make-hash-table))
(define *event-set* (make-epoll-event-set))
(define epfd (epoll-create1 0))
(define listen-fd)
(define listen-socket)
(define (async-read-waiter port)
(display "Async read!\n")
;;(format #t "would break ~a~%" port)
(abort-to-prompt 'serve-one-request (port->fdes port)))
(define (async-write-waiter port)
(display "Async write!\n")
;;(format #t "would break ~a~%" port)
(abort-to-prompt 'serve-one-request (port->fdes port)))
(define (make-listen-port family addr port)
(let ((sock (socket family SOCK_STREAM 0)))
;; Causes the port to be released immediately after the socket is closed.
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
(fcntl sock F_SETFL (logior O_NONBLOCK (fcntl sock F_GETFL 0)))
(bind sock family addr port)
(listen sock 5)
(sigaction SIGPIPE SIG_IGN)
sock))
(define (multicast-msg msg)
(hash-for-each
(lambda (fd _)
(let ((port (car (fdes->ports fd))))
(display msg port)
(force-output port)))
*work-table*))
(define (register-connecting-socket conn-port)
(format #t "register ~a as RW event~%" conn-port)
(epoll-ctl epfd EPOLL_CTL_ADD (port->fdes conn-port)
(make-epoll-event (port->fdes conn-port) (gen-rw-event))))
(define (socket-is-connected? sock)
(let ((e (getsockopt sock SOL_SOCKET SO_ERROR)))
(format #t "SO_ERROR is ~a~%" e)
(zero? e)))
(define (release-connecting-socket reason username conn-port)
(format #t "releasing ~a because of ~a~%" conn-port reason)
(when (= (port->fdes listen-socket) (port->fdes conn-port))
(error "BUG here!"))
(epoll-ctl epfd EPOLL_CTL_DEL (port->fdes conn-port) #f)
(hash-remove! *work-table* (port->fdes conn-port))
(when (eq? reason 'quit)
(format conn-port "Goodbye ~a~%~!" (or username "noname")))
#;
(when (not (port-closed? conn-port))
(cond
((socket-is-connected? conn-port)
(format #t "~a is still connected, shutdown it!~%" conn-port)
(shutdown conn-port 2))
(else
(format #t "~a has been disconnected!~%" conn-port)))
)
(format #t "Now closing ~a~%" conn-port)
;;(when (not (port-closed? conn-port)) (close-port conn-port))
(shutdown conn-port 2)
)
;; client-connection is conn-socket
(define (read-from-1234 client-connection)
(define username #f)
(catch
#t
(lambda ()
(display "start now!\n")
(let ((client-details (cdr client-connection))
(conn-port (car client-connection)))
(register-connecting-socket conn-port)
(format #t "client-connection: ~a~%" client-connection)
(setsockopt conn-port SOL_SOCKET SO_KEEPALIVE 1)
;;(setsockopt conn-port SOL_SOCKET SO_REUSEADDR 1)
(fcntl conn-port F_SETFL (logior O_NONBLOCK (fcntl conn-port F_GETFL 0)))
(setvbuf conn-port 'block)
(setsockopt conn-port SOL_SOCKET SO_SNDBUF (* 12 1024))
(format #t "Got new client connection: ~a~%" client-details)
(format conn-port "Hello guest, please type your name~%~!")
(let lp ()
(cond
((or (port-closed? conn-port)
(not (socket-is-connected? conn-port))
(eof-object? (peek-char conn-port)))
(release-connecting-socket 'quit username conn-port))
((not username)
(let ((name (string-trim-both (read-line conn-port)
(lambda (c) (member c '(#\sp #\return))))))
(cond
((string-null? name)
(format conn-port "On come on! Give me your name!~%~!")
(lp))
(else
(set! username name)
(format conn-port "Hello ~a, there're ~a people here~%~!" username (hash-count (const #t) *work-table*))
(lp)))))
(else
(let* ((str (read-line conn-port))
(msg (format #f "~a: ~a~%" username str)))
(display msg)
(multicast-msg msg)
(lp)))))))
(lambda e
(format #t "Exception: ~a~%" e)
(release-connecting-socket 'exception username (car client-connection)))))
(define (go)
(let* ((s (make-listen-port AF_INET 0 #;INADDR_LOOPBACK 1234
))
(listen-event (make-epoll-event (port->fdes s) (gen-read-event))))
(sigaction SIGPIPE SIG_IGN) ; FIXME: should we remove the related threads?
(epoll-ctl epfd EPOLL_CTL_ADD (port->fdes s) listen-event)
(set! listen-socket s)
(catch
#t
(lambda ()
(try-it))
(lambda e
(format #t "General error happened!~%~!")
(format #t "~a~%" e)))))
(define (run-task task)
(call-with-prompt
'serve-one-request
(lambda ()
(format #t "now we run this task: ~a~%" task)
(task))
(lambda (k fd)
(format #t "task ~a said: I need schedule!~%" fd)
(hash-set! *work-table* fd k))))
(define (serve-one-request client)
(define (continuatble-work?)
(cond
((and (pair? client) (number? (car client))) ; this type is generated from epoll
(hash-ref *work-table* (car client)))
(else (error continuatble-work? "Wrong client" client))))
(format #t "wt: ~a~%" (hash-map->list cons *work-table*))
(format #t "Now get client ~a~%" client)
(cond
((continuatble-work?)
;;=> run-task
=> (lambda (task)
(format #t "let's continue ~a~%" client)
(run-task task)))
(else
(format #t "new task ~a~%" client)
(let* ((conn-port (accept (car (fdes->ports (car client)))))
(new-task (lambda ()
(read-from-1234 conn-port))))
(setvbuf (car conn-port) 'block)
(setsockopt (car conn-port) SOL_SOCKET SO_SNDBUF (* 12 1024))
(run-task new-task)))))
(define-public (is-peer-shutdown? e)
(not (zero? (logand (cdr e) EPOLLRDHUP))))
(define (try-it)
(display "enter!\n")
(format #t "listen socket is ~a~%" listen-socket)
(format #t "is-closed? (~a)~%" (port-closed? listen-socket))
(display "success!\n")
(parameterize ((current-read-waiter async-read-waiter)
(current-write-waiter async-write-waiter))
(let ((el (epoll-wait epfd *event-set* 3000)))
(format #t "Waiting for request...now there's ~a requests~%" (length el))
(format #t "wt: ~a~%" (hash-map->list cons *work-table*))
(for-each
(lambda (e)
(call-with-sigint
(lambda ()
(cond
((is-peer-shutdown? e)
(release-connecting-socket 'peer-shutdown "unknown" (car (fdes->ports (car e)))))
(else (serve-one-request e))))
(lambda ()
(format #t "I'm interrupted ~a!~%" e)
(release-connecting-socket 'interrupt "unknown" (car (fdes->ports (car e)))))))
el)
(try-it))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment