-
-
Save si14/e7bc3c17ce3316567c03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn process [config system repo] | |
(debug "processing " (str (:owner-login repo) "/") (:name repo)) | |
(let [gh (safe-get system :github) | |
datomic (safe-get system :datomic) | |
sc-c (get-in @(safe-get system :user-cs-register) | |
[(:user-id repo) :sc-c]) | |
user (get-user-by-id datomic (:user-id repo)) | |
notes (get-notes gh datomic user repo) | |
datomic-notes (mapv (partial entities/prepare-record :note) notes) | |
;; TODO(si14): this will break when GH will allow custom permissions | |
hooked-repo (ensure-hook config system gh user repo) | |
selected-repo (assoc hooked-repo :status :selected)] | |
(drop-old-notes datomic repo) | |
(upsert :repository datomic (assoc selected-repo :notes datomic-notes)) | |
(when sc-c | |
(go (>! sc-c (t/->ReposUpdateSC [selected-repo])) | |
(>! sc-c (t/->NotesUpdateSC (:gh-id repo) notes)))))) | |
(defn check-undone [config system queue-c] | |
(let [datomic (safe-get system :datomic) | |
query '[:find ?eid | |
:in $ | |
:where | |
[?eid :repository/status :repository.status/processing ?tx] | |
[?tx :db/txInstant ?inst] | |
;; older than 30 minutes | |
[(codenotes.entities/older-than ?inst 1800)]] | |
db (d/db datomic) | |
repos (mapv (comp t/map->Repository | |
entities/prepare-transaction-results) | |
(entities/query-entities query db))] | |
(debug "found unprocessed repos older than 30 minutes:" repos) | |
(go (doseq [repo repos] | |
(>! queue-c repo))))) | |
;; TODO(si14): processor can (should?) be a separate thread | |
(defn start [config system] | |
(let [queue-c (safe-get-in system [:processor-channels :queue]) | |
control-c (safe-get-in system [:processor-channels :control]) | |
db-poller-c (start-db-poller config system)] | |
(go (loop [] | |
(let [[val c] (alts! [control-c queue-c db-poller-c] :priority true)] | |
(condp = c | |
queue-c (do | |
(try | |
(process config system val) | |
(catch Exception e | |
(error "got exception in processor loop" | |
(with-out-str | |
(stacktrace/print-stack-trace e))))) | |
(recur)) | |
db-poller-c (do | |
(debug val) | |
(try | |
(check-undone config system queue-c) | |
(catch Exception e | |
(error "got exception in check-undone loop" | |
(with-out-str | |
(stacktrace/print-stack-trace e))))) | |
(recur)) | |
control-c (debug "stopping processor"))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment