Skip to content

Instantly share code, notes, and snippets.

@jarppe
Last active May 22, 2024 12:04
Show Gist options
  • Save jarppe/f235fde2172e6e3ff091b63c7beada1a to your computer and use it in GitHub Desktop.
Save jarppe/f235fde2172e6e3ff091b63c7beada1a to your computer and use it in GitHub Desktop.
Streaming json
{:paths ["src/main"]
:deps {org.clojure/clojure {:mvn/version "1.12.0-alpha11"}
ring/ring-core {:mvn/version "1.12.1"}
info.sunng/ring-jetty9-adapter {:mvn/version "0.33.1"}
metosin/jsonista {:mvn/version "0.3.8"}}}
(ns streaming-json.main
(:gen-class)
(:require [jsonista.core :as json]
[ring.adapter.jetty9 :as jetty]
[ring.middleware.params :as params])
(:import (java.io PipedOutputStream
PipedInputStream)
(java.util.zip GZIPOutputStream)))
(set! *warn-on-reflection* true)
;;
;; ------------------------------------------------------------------------------
;; DB simulator:
;; ------------------------------------------------------------------------------
;;
; How many rows we have in simulated db?
(def max-data 10000)
;; Simulate DB query. Fetch `length` rows starting from `offset`, returns `nil` when
;; no more rows:
(defn get-data-from-db [offset length]
(->> (range offset
(min (+ offset length) max-data))
(map (fn [id]
{:id id
:foo (rand-int 100)
:bar (rand-int 100)}))
(seq)))
;;
;; ------------------------------------------------------------------------------
;; Streaming JSON support:
;; ------------------------------------------------------------------------------
;;
(def ^"[B" json-array-begin (.getBytes "[" java.nio.charset.StandardCharsets/UTF_8))
(def ^"[B" json-value-separator (.getBytes "," java.nio.charset.StandardCharsets/UTF_8))
(def ^"[B" json-array-end (.getBytes "]" java.nio.charset.StandardCharsets/UTF_8))
(defn row-seq [offset batch-size handler]
(let [rows (handler offset batch-size)]
(when (seq rows)
(concat rows
(lazy-seq (row-seq (+ offset (count rows))
batch-size
handler))))))
(defn streaming-response [handler {:strs [limit offset batch-size]
:or {limit Long/MAX_VALUE
offset 0
batch-size 100}}]
(let [output-pipe (PipedOutputStream.)
input-pipe (PipedInputStream. output-pipe 4096)
output (GZIPOutputStream. output-pipe 4096)]
(future
(.write output json-array-begin)
(doseq [^"[B" out (->> (row-seq offset batch-size handler)
(sequence (comp (take limit)
(map json/write-value-as-bytes)
(interpose json-value-separator))))]
(.write output out))
(.write output json-array-end)
(.flush output)
(.close output))
input-pipe))
;;
;; ------------------------------------------------------------------------------
;; Ring handler and server:
;; ------------------------------------------------------------------------------
;;
(defn handler [req]
{:status 200
:headers {"content-type" "application/json"
"content-encoding" "gzip"}
:body (streaming-response get-data-from-db
(-> (:query-params req)
(update-vals parse-long)))})
(defn start-server! []
(jetty/run-jetty (-> handler
(params/wrap-params))
{:host "127.0.0.1"
:port 8800
:join? false}))
(defn stop-server! [server]
(when server
(jetty/stop-server server))
nil)
;;
;; ------------------------------------------------------------------------------
;; Main:
;; ------------------------------------------------------------------------------
;;
(defonce server (atom nil))
(defn restart! []
(swap! server (fn [server]
(stop-server! server)
(start-server!))))
(defn -main [& _args]
(restart!))
(comment
(restart!)
;
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment