Skip to content

Instantly share code, notes, and snippets.

@kristianlm
Last active January 13, 2022 21:08
Show Gist options
  • Save kristianlm/4e65e04dcd5727ff0125a3149b07cf21 to your computer and use it in GitHub Desktop.
Save kristianlm/4e65e04dcd5727ff0125a3149b07cf21 to your computer and use it in GitHub Desktop.

chicken-avro

My latest and greatest personal experiment.

TODO

  • write: binary values
    • primitives
    • comples
  • write: propagate output port to all writers
  • write: buffer output, for each avro block, with item counter
  • write: compression deflate
  • write: compression zstandard
  • schema: sexp-schema -> json-schema
  • schema: json-schema -> sexp-schema
  • read: binary values
  • read: schema projection
  • test: steal tests from fastavro
;;; trying to implement avro binary encoding
;;; as described here:
;;; https://avro.apache.org/docs/current/spec.html#map_encoding
;;;
(import chicken.string chicken.bitwise chicken.io chicken.port
(only chicken.memory.representation number-of-bytes)
(only chicken.random random-bytes)
zstd medea matchable test)
(define (debug . args)
(with-output-to-port (current-error-port)
(lambda () (apply print args))))
(define-syntax test-group (syntax-rules () ((_ rest ...) #f)))
(include "schema.scm")
(include "binary.scm")
(include "reader.scm")
(define sync-marker "1234567890123456")
;; fields need name, type,
(define sschema `(record myrec (fields (name string)
(age int))))
;; TODO:
;; - convert any short-form types into full JSON form (record name ...) etc
(define schema
;;(string->json schema*)
`((type . "record")
(name . "user")
(fields . #( ((name . "name")
(type . "string"))
((name . "age")
(type . "long"))
((name . "scores")
(type . ((type . "map")
(values . "string"))))))))
(define (avro-writer output-port schema #!key (compression 'null))
(let* ((w (schema-writer schema))
(sync-marker "1234567890123456")
(zb (open-output-string)) ;; compressed block
(up #f) ;; uncompressed output port
(block-count 0))
(define (up!)
(set! up (case compression
((null) #f)
((zstandard) (compressing-output-port zb))
(else (error "unknown compression (null zstd)" compression)))))
(up!)
(define (flush!)
(when up (close-output-port up)) ;; flush compressed block, if needed
(with-output-to-port output-port
(lambda ()
(let ((block (get-output-string zb)))
(write-vint block-count) ;; number of entries
(write-vint (string-length block)) ;; bytes
(display block)
(display sync-marker)
(set! zb (open-output-string))
(up!)
(set! block-count 0)))))
(with-output-to-port output-port
(lambda ()
(write-magic)
(write-map `((avro.codec . ,(symbol->string compression))
(avro.schema . ,(json->string schema)))
write-bytes)
(display sync-marker)))
(lambda (x)
(cond ((eq? #:flush x) (flush!))
((eq? #:count x) block-count)
((eq? #:block x) (when up (flush-output up)) (string-length (get-output-string zb)))
((eq? #:close x) (unless (= 0 block-count) (flush!)) (close-output-port output-port))
(else
(with-output-to-port (or up zb) (lambda () (w x)))
(set! block-count (+ 1 block-count))
(when (>= block-count 10000)
(flush!)))))))
(call-with-output-file "chicken.avro"
(lambda (of)
(let ((w (avro-writer of `((type . "record")
(name . "klm")
(fields . #(((name . "foo") (type . "int"))
((name . "bar") (type . "string"))
((name . "secret") (type . "string")))))
compression: 'zstandard)))
(let loop ((n 0))
(w `(klm ,n "cake" "hemmelig"))
(when (< n 100)
(loop (+ n 1))))
(w #:close))))
;; proudly stolen from Chust's protobuf egg.
;; TODO: implement read-string! for speed
(define (make-limited-input-port in limit close-orig?)
(make-input-port
#;read
(lambda ()
(if (> limit 0)
(begin
(set! limit (- limit 1))
(read-char in))
#!eof))
#;ready?
(lambda ()
(and (> limit 0)
(char-ready? in)))
#;close
(lambda ()
(if close-orig?
(close-input-port in)
(void)))
#;peek
(lambda ()
(if (> limit 0)
(peek-char in)
#!eof))))
(define (avro-reader ip)
(let ((magic (read-string 4 ip)))
(unless (equal? "Obj\x01" magic)
(error "invalid magic" magic)))
(define metadata (read-map read-bytes ip))
(define schema
(receive (json rest)
(read-json (alist-ref 'avro.schema metadata))
json))
(define reader (schema-reader schema))
(define sync-marker (assert-size 16 (read-string 16 ip)))
(define block-count 0)
(define block-size 0)
(define block-port #f)
(define (block-next)
(set! block-count (read-vint ip))
(if (eof-object? block-count)
#f
(begin
(set! block-size (read-vint ip))
(set! block-port (make-limited-input-port ip block-size #f))
#t)))
(define (block-close)
(debug "block-close")
(let ((rest (read-string 64 block-port)))
(unless (eof-object? rest) ;; TODO: don't read, skip for speed & safety
(error "expected end of block, got " rest)))
(let ((sm (read-string 16 ip)))
(if (equal? sync-marker sm)
(debug "block ok")
(warning "expecting sync-marker" sm))))
(define (avro-read-record)
(debug "READING record " block-count)
(cond ((eof-object? block-count) #!eof)
((> block-count 0)
(set! block-count (- block-count 1))
(reader block-port))
(else
(when block-port (block-close))
(if (block-next)
(avro-read-record) ;; continue in next block
#!eof))))
avro-read-record)
(with-input-from-file "weather.avro"
(lambda ()
(let ((r (avro-reader (current-input-port))))
(debug (r))
(debug (r))
(debug (r))
(debug (r))
(debug (r))
(debug (r))
(debug (r))
(debug (r))
(debug (r)))))
;;; needs schema.scm
;;;
;;; some inspiration taken from
;;; https://github.com/confluentinc/avro-c-packaging/blob/master/src/encoding_binary.c#L67-L79
(import test)
(define-syntax wots
(syntax-rules ()
((_ body ...) (with-output-to-string (lambda () body ...)))))
(define-syntax wifs
(syntax-rules ()
((_ str body ...) (with-input-from-string str (lambda () body ...)))))
(define (zigzag n)
(bitwise-xor (arithmetic-shift n 1)
(arithmetic-shift n -63)))
(define (unzigzag n)
(bitwise-xor (- (bitwise-and n 1))
(arithmetic-shift n -1)))
(test-group
"zigzag"
(test 0 (unzigzag (zigzag 0)))
(test -1 (unzigzag (zigzag -1)))
(test 1 (unzigzag (zigzag 1)))
(test -2 (unzigzag (zigzag -2)))
(test 2 (unzigzag (zigzag 2)))
(test -3 (unzigzag (zigzag -3)))
(test 3 (unzigzag (zigzag 3))))
;; primitive types
(define (write-vint n)
(let loop ((n (zigzag n)))
(if (>= n #b10000000)
(begin (write-byte (bitwise-ior #b10000000 (bitwise-and n #b01111111)))
(loop (arithmetic-shift n -7)))
(write-byte n))))
(test-group
"write-vint"
(test "\x00" (wots (write-vint 0)))
(test "\x01" (wots (write-vint -1)))
(test "\x02" (wots (write-vint 1)))
(test "\x03" (wots (write-vint -2)))
(test "\x04" (wots (write-vint 2)))
(test "\x05" (wots (write-vint -3)))
(test "\x06" (wots (write-vint 3))))
(define (read-vint #!optional port)
(set! port (or port (current-input-port)))
(let loop ((n 0)
(shift 0))
(let ((byte (read-byte port)))
(if (eof-object? byte)
(if (= shift 0) byte (error "premature eof while reading vint"))
(let ((n (bitwise-ior n (arithmetic-shift
(bitwise-and #b01111111 byte) shift))))
(if (zero? (bitwise-and #b10000000 byte))
(unzigzag n)
(loop n (+ shift 7))))))))
(test-group
"read-vint"
(test 0 (wifs "\x00" (read-vint)))
(test -1 (wifs "\x01" (read-vint)))
(test 1 (wifs "\x02" (read-vint)))
(test -2 (wifs "\x03" (read-vint)))
(test 2 (wifs "\x04" (read-vint)))
(test -3 (wifs "\x05" (read-vint)))
(test 3 (wifs "\x06" (read-vint))))
(define (write-boolean v) (if v (write-byte 1) (write-byte 0)))
(define (read-boolean) (if (zero? (read-byte)) #f #t))
(test-group
"boolean"
(test "\x00" (wots (write-boolean #f)))
(test "\x01" (wots (write-boolean #t)))
(test #f (wifs "\x00" (read-boolean)))
(test #t (wifs "\x01" (read-boolean)))
(test #t (wifs "\x04" (read-boolean))))
(define (write-float x) (error "TODO write float"))
(define (read-float x) (error "TODO read float"))
(define (write-double x) (error "TODO write double"))
(define (read-double x) (error "TODO read double"))
(define (write-bytes str)
(write-vint (string-length str))
(write-string str))
(define (assert-size n str)
(if (= n (number-of-bytes str))
str
(error (conc "premature end of file, expecting "
n " bytes but got " (number-of-bytes str))
str)))
(define (read-bytes #!optional port)
(set! port (or port (current-input-port)))
(let ((len (read-vint port)))
(assert-size len (read-string len port))))
;; complex types
(define (write-enum x type) (error "TODO write enum"))
(define (read-enum x type) (error "TODO read enum"))
(define (write-array x type) (error "TODO write array"))
(define (read-array x type) (error "TODO read array"))
(define (write-pairs x write-type)
(write-vint (length x))
(for-each (lambda (pair)
(write-bytes (let ((j (car pair)))
(if (symbol? j)
(symbol->string j)
j)))
(write-type (cdr pair)))
x))
(define (write-map x write-type) ;; TODO: support negative size with bytecount
(write-pairs x write-type)
(write-pairs '() #f))
(define (read-pairs count read-type #!optional port)
(unless port (set! port (current-input-port)))
(let loop ((count count)
(result '()))
(if (> count 0)
(let ((key (read-bytes port))
(val (read-type port)))
(loop (- count 1) (cons (cons (string->symbol key) val) result)))
result)))
(define (read-map read-type #!optional port)
(unless port (set! port (current-input-port)))
(let loop ((result '()))
(let ((count (read-vint port)))
(if (zero? count)
(reverse result) ;; todo: do something less memory intensive here?
(loop (append (read-pairs count read-type port) result))))))
(test-group
"map"
(let ((m `((one . "1") (two . "2"))))
(test
m
(wifs (wots (write-map m write-bytes))
(read-map read-bytes)))))
(define (write-union unioni x) (error "TODO ")
(write-vint unioni))
(define (write-fixed x) (error "TODO"))
;; container
(define (write-magic) (display "Obj\x01"))
(begin
(define (schema-writer schema #!optional names)
(cond ((equal? schema "int") write-vint)
((equal? schema "long") write-vint)
((equal? schema "bytes") write-bytes)
((equal? schema "string") write-bytes)
((equal? schema "float") write-float)
((equal? schema "double") write-double)
((pair? schema)
(let ((type (alist-ref 'type schema)))
(cond
((equal? type "record")
(let ()
(define (field-writer field)
(schema-writer (cond ((alist-ref 'type field) => values)
(else (error "field has no type" field)))))
(let* ((rname (schema-record-name schema))
(fields (cond ((alist-ref 'fields schema) => vector->list)
(else (error "record has no fields" schema))))
(writers (map field-writer fields)))
(lambda (x)
(unless (and (pair? x)
(eq? rname (car x)))
(error (conc "expecting record `(" rname " ...)")
x schema))
(let ((x0 x))
(let loop ((writers writers)
(x (cdr x0)))
(if (pair? writers)
(if (pair? x)
(let ((w (car writers)))
(w (car x))
(loop (cdr writers) (cdr x)))
(error (conc "record '" rname " expecting " (length fields) " items, got " (length x0))
x0)))))))))
((equal? type "map")
(let ((writer (schema-writer
(or (alist-ref 'values schema)
(error "map type is missing values" schema)))))
(lambda (x)
(write-map x writer))))
((equal? type "array") (error "TODO: array type"))
(else (schema-writer type)))))
(else (error "unknown schema form" schema))))
(with-output-to-string
(lambda ()
((schema-writer `((type . "record")
(name . "user")
(fields . #( ((name . "name")
(type . "string"))
((name . "age")
(type . "long"))
((name . "scores")
(type . ((type . "map")
(values . "string"))))))))
`(user "king" 2 ((cake . "blah") (color . "red")))))))
from fastavro import writer, reader, parse_schema
parsed_schema = parse_schema({
'type' : "map",
'values' : 'string'
})
records = [
{u'cat': u'AAAA'},
{u'mouse': u'BBBB'}
]
with open('weather.avro', 'wb') as out:
writer(out, parsed_schema, records, codec='null')
(define (schema-reader schema #!optional names)
(cond ((equal? schema "int") read-vint)
((equal? schema "long") read-vint)
((equal? schema "bytes") read-bytes)
((equal? schema "string") read-bytes)
((equal? schema "float") read-float)
((equal? schema "double") read-double)
((pair? schema)
(let ((type (alist-ref 'type schema)))
(cond
((equal? type "record")
(let ()
(define (field-reader field)
(schema-reader (cond ((alist-ref 'type field) => values)
(else (error "field has no type"
field)))))
(let* ((rname (schema-record-name schema))
(fields (cond ((alist-ref 'fields schema) => vector->list)
(else (error "record has no fields" schema))))
(readers (map field-reader fields)))
(lambda (port)
(let loop ((readers readers)
(result `(,rname)))
(if (pair? readers)
(let ((r (car readers)))
(loop (cdr readers)
(cons (r port) result)))
(reverse result)))))))
((equal? type "map")
(let ((reader (schema-reader
(or (alist-ref 'values schema)
(error "map type is missing values" schema)))))
(lambda (port)
(read-map reader port))))
((equal? type "array") (error "TODO: array type"))
;; resolve type recursively
;; (eg {"type": "long"} even though "long" is sufficient)
(else (schema-reader type)))))
(else (error "unknown schema form" schema))))
(test-group
"schema-reader"
(define (raw schema item)
(with-output-to-string
(lambda ()
(let ((w (schema-writer schema)))
(w item)))))
(let ((s `((type . "long"))))
(test
"schema-reader long"
12345
(call-with-input-string
(raw s 12345)
(schema-reader s))))
(let ((s `((type . "map") (values . "long"))))
(test
"schema-reader long"
`((field1 . 1001)
(field2 . 1002))
(call-with-input-string
(raw s `((field1 . 1001)
(field2 . 1002)))
(schema-reader s))))
(let ((s `((type . "record") (name . "myrec")
(fields . #(((name . "field1")
(type . "string"))
((name . "field2")
(type . "long")))))))
(test
"schema-reader record"
`(myrec "hello" 100)
(call-with-input-string
(raw s `(myrec "hello" 100))
(schema-reader s)))))
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
reader = DataFileReader(open("chicken.avro", "rb"), DatumReader())
for user in reader:
print(user)
reader.close()
(define (schema-record-name schema) ;; TODO: include namespace too
(cond ((alist-ref 'name schema) => string->symbol)
(else (error "record has no name" schema))))
(begin
(define (schema-normalize schema)
(match schema
;; primitive types
((? symbol? type) (symbol->string type))
;; complex
(('record name . rest)
(schema-normalize
`((type . "record")
(name . ,(symbol->string name))
,@rest)))
(('enum . rest) (error "TODO"))
(('array name . rest) (error "TODO"))
(('map name . rest) (error "TODO"))
(('union name . rest) (error "TODO"))
(('fixed name . rest) (error "TODO"))
(((? symbol? type) . rest) (error "unknown type" type schema ))
(else
(cond ((and (pair? schema)
(pair? (car schema))
(equal? "record" (alist-ref 'type schema)))
(let ()
(define (->vector x)
(if (vector? x) x (list->vector x)))
(alist-update 'fields (->vector (alist-ref 'fields schema)) schema)))
(else schema)))))
(schema-normalize `(record foo (fields))))
(test-group "schema-normalize"
(test `((type . "record")
(name . "foo")
(fields . #()))
(schema-normalize `(record foo (fields)))))
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
# schema = avro.schema.parse("""
# {"namespace": "example.avro",
# "type": "record",
# "name": "Record",
# "fields": [
# {"name": "string", "type": "string"},
# {"name": "bool", "type": "bool"},
# {"name": "favorite_color", "type": ["string", "null"]}
# ]
# }""")
schema = avro.schema.parse("""
{
"namespace": "example.avro",
"type": "map",
"values": ["string", "float"]
}
""")
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"key1": 3.14,
"key2": "3.14",
"key3": 3.14,
"key4": 3.14})
# writer.append({"name": "Alyssa", "favorite_number": 256})
# writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
#writer.append(["heisann hoppsann"]);
# writer.append(True);
# writer.append(False);
# writer.append(True);
# writer.append(False);
# writer.append(1.0);
# writer.append(2.0);
# writer.append(3.0);
# writer.append(4.0);
writer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment