-
-
Save Bogdanp/b1edba407b5f7ab8794e0cb5ac197d34 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket/base | |
(#%declare #:unsafe) | |
(require (for-syntax racket/base) | |
(only-in racket/flonum ->fl) | |
racket/format | |
racket/match | |
racket/place | |
racket/unsafe/ops | |
racket/require | |
(filtered-in | |
(lambda (name) | |
(regexp-replace #rx"^unsafe-" name "")) | |
racket/unsafe/ops)) | |
(provide | |
handle) | |
(define (fnv32a bs start end) | |
(for/fold ([h 2166136261]) | |
([b (in-bytes bs start end)]) | |
(fx*/wraparound (fxxor h b) 16777619))) | |
(struct state (n lo hi sum) | |
#:mutable) | |
(define (empty-state) | |
(state 0 +inf.0 -inf.0 0.0)) | |
(define (update-state! s degrees) | |
(match-define (state n lo hi sum) s) | |
(when (fl< degrees lo) (set-state-lo! s degrees)) | |
(when (fl> degrees hi) (set-state-hi! s degrees)) | |
(set-state-n! s (fx+ n 1)) | |
(set-state-sum! s (fl+ sum degrees))) | |
(define (update-state!* s other-n other-lo other-hi other-sum) | |
(match-define (state n lo hi sum) s) | |
(when (fl< other-lo lo) (set-state-lo! s other-lo)) | |
(when (fl> other-hi hi) (set-state-hi! s other-hi)) | |
(set-state-n! s (fx+ n other-n)) | |
(set-state-sum! s (fl+ sum other-sum))) | |
(define (process-shard filename shards shard) | |
(define N 1000) ;; stations | |
(define keys (make-vector N #f)) | |
(define states (make-vector N #f)) | |
(define (get-idx bs start end) | |
(define len (fx- end start)) | |
(let loop ([idx (fxmodulo (fnv32a bs start end) N)]) | |
(define k | |
(vector-ref keys idx)) | |
(cond | |
[(not k) | |
(begin0 idx | |
(vector-set! keys idx (subbytes bs start end)) | |
(vector-set! states idx (empty-state)))] | |
[(and (fx= len (unsafe-bytes-length k)) | |
(bytes-equal? k bs start)) | |
idx] | |
[else | |
(loop (fxmodulo (fx+ idx 1) N))]))) | |
(define bs (make-bytes (fx* 10 1024 1024))) ;; must be large enough to fit the longest line | |
(define in (open-input-file filename)) | |
(let loop ([i 0] | |
[len (read-bytes! bs in)] | |
[start 0]) | |
(define next-pos | |
(bytes-find-pos bs newline-code start len)) | |
(cond | |
[next-pos | |
(when (fx= (fxmodulo i shards) shard) | |
(define semi-pos (bytes-find-pos bs semi-code start next-pos)) | |
(define location-idx (get-idx bs start semi-pos)) | |
(define degrees (bytes->flonum bs (fx+ semi-pos 1) next-pos)) | |
(update-state! (vector-ref states location-idx) degrees)) | |
(loop (fx+ i 1) len (fx+ next-pos 1))] | |
[(< start len) | |
(define rem (fx- len start)) | |
(bytes-copy! bs 0 bs start len) | |
(define n-read | |
(read-bytes! bs in rem)) | |
(loop i (fx+ rem n-read) 0)] | |
[else | |
(define n-read | |
(read-bytes! bs in)) | |
(if (eof-object? n-read) | |
(for/list ([k (in-vector keys)] | |
[s (in-vector states)] | |
#:when k) | |
(match-define (state n lo hi sum) s) | |
`(,k ,n ,lo ,hi ,sum)) | |
(loop i n-read 0))]))) | |
(define (handle pch) | |
(match-define `(,filename ,shards ,shard ,out-ch) | |
(place-channel-get pch)) | |
(place-channel-put out-ch (process-shard filename shards shard))) | |
(define semi-code (char->integer #\;)) | |
(define zero-code (char->integer #\0)) | |
(define minus-code (char->integer #\-)) | |
(define period-code (char->integer #\.)) | |
(define newline-code (char->integer #\newline)) | |
(define (bytes-equal? a b b-start) | |
(for/and ([b0 (in-bytes a)] | |
[b1 (in-bytes b b-start)]) | |
(fx= b0 b1))) | |
(define (bytes-find-pos bs needle start end) | |
(and (fx< start end) | |
(if (fx= (bytes-ref bs start) needle) | |
start | |
(bytes-find-pos bs needle (fx+ start 1) end)))) | |
(define (bytes->flonum bs start end) | |
(if (fx= (bytes-ref bs start) minus-code) | |
(fl- (bytes->flonum* bs (fx+ start 1) end)) | |
(bytes->flonum* bs start end))) | |
(define (bytes->flonum* bs start end) | |
(for/fold ([n (fl+ 0.0)] | |
[d (fl+ 0.0)] | |
#:result (if (fl> d 0.0) | |
(fl/ n (flexpt 10.0 (fl- d 1.0))) | |
(fl+ n))) | |
([b (in-bytes bs start end)]) | |
(cond | |
[(fx= b period-code) | |
(values n (fl+ 1.0))] | |
[(fl> d 0.0) | |
(values | |
(fl+ (fl* n 10.0) (->fl (fx- b zero-code))) | |
(fl+ d 1.0))] | |
[else | |
(values | |
(fl+ (fl* n 10.0) (->fl (fx- b zero-code))) | |
d)]))) | |
(define (~n n) | |
(~r #:precision '(= 1) n)) | |
(define (reduce-states xs) | |
(define states (make-hash)) | |
(for ([x (in-list xs)]) | |
(match-define `(,location ,n ,lo ,hi ,sum) x) | |
(update-state!* (hash-ref! states location empty-state) n lo hi sum)) | |
(display "{") | |
(for ([(k idx) (in-indexed (in-list (sort (hash-keys states) bytes<?)))]) | |
(unless (fx= idx 0) | |
(display ", ")) | |
(match-define (state n lo hi sum) | |
(hash-ref states k)) | |
(printf "~a=~a/~a/~a" k (~n lo) (~n (fl/ sum (->fl n))) (~n hi))) | |
(displayln "}")) | |
(define (main filename) | |
(define N (quotient (processor-count) 2)) | |
(define channels | |
(for/list ([i (in-range N)]) | |
(define pch (dynamic-place (syntax-source #'here) 'handle)) | |
(define-values (res-ch-in res-ch-out) | |
(place-channel)) | |
(begin0 res-ch-in | |
(place-channel-put pch `(,filename ,N ,i ,res-ch-out))))) | |
(time | |
(reduce-states | |
(apply append (map place-channel-get channels))))) | |
(module+ main | |
(require racket/cmdline) | |
(command-line | |
#:args [filename] | |
(main filename))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment