Skip to content

Instantly share code, notes, and snippets.

@nivekuil
Last active March 16, 2021 23:37
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nivekuil/75981cfb573caeb2818e01bdddfc46ed to your computer and use it in GitHub Desktop.
Save nivekuil/75981cfb573caeb2818e01bdddfc46ed to your computer and use it in GitHub Desktop.
crux scylla/cassandra doc store
(ns app.crux.scylla
(:require [crux.codec :as codec]
[crux.db]
[crux.document-store :as ds]
[crux.io :refer [with-nippy-thaw-all]]
[crux.system :as sys]
[crux.memory]
[taoensso.nippy :as nippy]
[taoensso.timbre :as log]
[qbits.alia :as alia]
[promesa.core :as p]))
(defn id->bytes [id]
(crux.memory/->on-heap (codec/->id-buffer id)))
(defprotocol CruxAsyncDocumentStore
(submit-docs-async [this id-and-docs])
(-fetch-docs-async [this ids]))
(defrecord ScyllaDocumentStore [session select insert delete]
crux.db/DocumentStore
(submit-docs [this id-and-docs]
@(submit-docs-async this id-and-docs))
(-fetch-docs [this ids]
(into {} @(-fetch-docs-async this ids)))
CruxAsyncDocumentStore
(submit-docs-async [this id-and-docs]
(-> (fn [[id doc]]
(if (codec/evicted-doc? doc)
(alia/execute-async session delete
{:values [(id->bytes id)]
:idempotent? true
:consistency-level :local-quorum})
(alia/execute-async session insert
{:values [(id->bytes id) (nippy/fast-freeze doc)]
:idempotent? true
:consistency-level :local-quorum})))
(map id-and-docs)
p/all))
(-fetch-docs-async [this ids]
(with-nippy-thaw-all
(-> (fn [id]
(p/let [result (alia/execute-async session select
{:values [(id->bytes id)]
:idempotent? true
:consistency-level :one})]
(let [{:keys [current-page]} result
[{:keys [value]}] current-page
doc (some-> value crux.memory/->on-heap nippy/fast-thaw)]
(when doc [id doc]))))
(map ids)
p/all))))
(defn ->document-store {::sys/deps {:document-cache 'crux.cache/->cache}
::sys/args {:keyspace {:doc "keyspace"
:required? true}
:addresses {:doc "addresses"
:required? true}
:table {:doc "table name"
:required? true}
:local-dc {:doc "local datacenter"}}}
[{:keys [document-cache keyspace addresses local-dc table] :as opts}]
(let [session (alia/session {:contact-points addresses
:load-balancing-local-datacenter (or local-dc "datacenter1")})]
#_(alia/execute session (format "
CREATE KEYSPACE IF NOT EXISTS %s
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};" keyspace))
(alia/execute session (format "USE %s;" keyspace))
(alia/execute session (format "CREATE TABLE IF NOT EXISTS %s (
key blob,
value blob,
PRIMARY KEY (key));" table))
(let [select (alia/prepare session (format "SELECT key, value FROM %s WHERE key = ?;" table))
insert (alia/prepare session (format "INSERT INTO %s (key, value) VALUES (?, ?);" table))
delete (alia/prepare session (format "DELETE FROM %s WHERE key = ?;" table))]
(ds/->cached-document-store
(assoc opts
:document-cache document-cache
:document-store (->ScyllaDocumentStore session select insert delete))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment