Created
July 25, 2012 23:19
-
-
Save yamasushi/3179295 to your computer and use it in GitHub Desktop.
イテレータの反転をスレッドで実装すると、プロデューサ・コンシューマパターンになる。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
; イテレータの反転をスレッドで実装すると、プロデューサ・コンシューマパターンになる。 | |
(use rfc.http) | |
(use gauche.threads) | |
(use gauche.generator) | |
(use util.queue) | |
(use gauche.vport) | |
(use gauche.uvector) | |
(use sxml.ssax) | |
;(use htmlprag) | |
;(use pretty-print) | |
;(use web-helper) | |
; producer ..... yield手続きを引数にもつ手続き | |
; consumer ..... ジェネレータを引数にもつ手続き | |
(define (producer-consumer produce consume :key (queue-size 100)) | |
(define (make-producer mtq) | |
(^ [] | |
(guard (e [else | |
(print (standard-error-port) (ref e 'message) ) | |
(enqueue/wait! mtq (eof-object)) ] ) | |
(produce (cut enqueue/wait! mtq <>)) | |
(enqueue/wait! mtq (eof-object)) | |
) ) ) | |
(define (make-consumer mtq) | |
(^ [] | |
(guard (e [else (print (standard-error-port) (ref e 'message) ) ]) | |
(consume (generate (^(yield) | |
(let loop [(xc (dequeue/wait! mtq))] | |
(if (eof-object? xc) | |
(yield (eof-object)) | |
(begin | |
(yield xc) | |
(loop (dequeue/wait! mtq) ) ) ) ) ) ) ) | |
) ) ) | |
(and-let* [ ( mtq (make-mtqueue :max-length queue-size)) | |
( c (make-thread (make-consumer mtq ) ) ) | |
( p (make-thread (make-producer mtq )))] | |
(let [(tc (thread-start! c)) | |
(tp (thread-start! p)) ] | |
(thread-join! tp) | |
(thread-join! tc) ; <----consumerの値を返す | |
) ) ) | |
; produce ..... 出力ポートを引数にもつ手続き(データ供給) | |
; consume ..... 入力ポートを引数にもつ手続き(データ消費) | |
(define (producer-consumer-with-port produce consume :key (queue-size 100) (buffer-size 8000) ) | |
(producer-consumer | |
;producer | |
(^(yield) | |
(let1 outp (make <buffered-output-port> | |
:buffer-size buffer-size | |
:flush (^ (u8v flag) | |
(yield (u8vector-copy u8v)) | |
(uvector-length u8v) ) ) | |
(produce outp) | |
(yield (eof-object)) | |
) ) | |
;consumer | |
(^g (let1 inp (make <buffered-input-port> | |
:buffer-size buffer-size | |
:fill (^(u8v-dst) | |
(let1 u8v-src (g) | |
(if (eof-object? u8v-src) | |
0 | |
(let [(ndst (uvector-length u8v-dst)) | |
(nsrc (uvector-length u8v-src ))] | |
(if (>= ndst nsrc ) | |
(begin | |
(u8vector-copy! u8v-dst 0 u8v-src ) | |
;#?= u8v-dst | |
nsrc ) | |
(begin | |
; ここには来ないはずだが念の為 | |
(error "ndst < nsrc") ) ) ) ) ) ) ) | |
(consume inp) | |
) ) | |
:queue-size queue-size | |
) ) | |
(define (call-with-input-http http-param proc :key (queue-size 100) (buffer-size 8000) ) | |
(producer-consumer-with-port | |
;producer | |
(^(o) (apply http-get (append http-param `(:sink ,o :flusher ,(^ _ (flush o) #t) ) ) ) ) | |
;consumer | |
(^(i) (proc i) ) | |
:queue-size queue-size | |
:buffer-size buffer-size | |
) ) | |
;(define (call-with-input-http-uri uri proc :key (queue-size 100)) | |
; (receive (host path) (uri->param uri) | |
; (call-with-input-http | |
; (list host path) proc :queue-size queue-size ))) | |
(define (main args) | |
#?= args | |
(call-with-input-http | |
(cdr args) | |
(^p | |
;(print (port->string p)) | |
(print (ssax:xml->sxml p '() ) ) | |
;(print (html->sxml p)) | |
) | |
:queue-size 100 | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment