Skip to content

Instantly share code, notes, and snippets.

@imakira
Last active December 11, 2025 18:46
Show Gist options
  • Select an option

  • Save imakira/4b537a13ecd8c1816427068d10777565 to your computer and use it in GitHub Desktop.

Select an option

Save imakira/4b537a13ecd8c1816427068d10777565 to your computer and use it in GitHub Desktop.
Clojure watching Directories/Files using jdk WatchService and core.async
(ns net.coruscation.cerulean.server.watch-service
(:require
[clojure.core.async :as a])
(:import
[java.io File]
[java.nio.file
Path
StandardWatchEventKinds
WatchEvent
WatchEvent$Kind
WatchService]))
(def ^:dynamic *chan-size* 512)
(defn register [^Path path ^WatchService watch-service]
(.register path
watch-service
(into-array WatchEvent$Kind
[StandardWatchEventKinds/ENTRY_CREATE
StandardWatchEventKinds/ENTRY_DELETE
StandardWatchEventKinds/ENTRY_MODIFY
StandardWatchEventKinds/OVERFLOW])))
(defn watch [& paths]
(let [resp-chan (a/chan *chan-size*)
cancel-chan (a/chan 1)
stopped? (atom false)
worker
(future
(let [paths (map (fn [p]
(Path/of p (into-array String [])))
paths)
watch-service (-> (java.nio.file.FileSystems/getDefault)
(.newWatchService))]
(doseq [path paths]
(doseq [^File subpath-path (file-seq (.toFile path))]
(when (.isDirectory subpath-path)
(register (.toPath subpath-path)
watch-service))))
(try
(while (not @stopped?)
(let [watch-key (.take watch-service)
events (.pollEvents watch-key)
^Path parent-dir (.watchable watch-key)]
(doseq [^WatchEvent event events]
(let [^WatchEvent$Kind kind (.kind event)
^Path event-path (.context event)
^Path resolved-path (.resolve parent-dir event-path)]
(a/>!! resp-chan
{:kind (case (.name kind)
"ENTRY_CREATE" :entry-create
"ENTRY_MODIFY" :entry-modify
"ENTRY_DELETE" :entry-delete
"OVERFLOW" :overflow)
:path resolved-path})
(when (and (= kind StandardWatchEventKinds/ENTRY_CREATE)
(.isDirectory (.toFile resolved-path)))
(register resolved-path
watch-service)
(doseq [^File subpath-file (rest (file-seq (.toFile resolved-path)))]
(when (.isDirectory subpath-file)
(register (.toPath subpath-file)
watch-service))
;; without it
;; `watch-subdir-test` would fail
(a/>!! resp-chan
{:kind :entry-create
:path (.toPath subpath-file)})))
(.reset watch-key)))))
(catch InterruptedException _)
(finally
(.close watch-service)))))
sentinel (future
(while (not (a/<!! cancel-chan)))
(reset! stopped? true)
(future-cancel worker)
(a/close! resp-chan)
(a/close! cancel-chan))]
(with-meta [resp-chan cancel-chan]
{:debug {:worker worker
:sentinel sentinel
:stopped? stopped?}})))
(ns net.coruscation.cerulean.server.watch-service-test
(:require
[clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.java.shell :as sh]
[clojure.test :refer [deftest is testing]]
[net.coruscation.cerulean.server.watch-service :as subject])
(:import
[java.nio.file Path]))
(def ^:dynamic *test-dir* "test/resource/watch_service_test")
(.mkdirs (io/file *test-dir*))
(deftest watch-test
(testing "simple test"
(dotimes [n 5]
(let [simple-file (str *test-dir* "/demo")]
(.delete (io/file simple-file))
(let [result (subject/watch *test-dir*)
[resp-chan cancel-chan] result]
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100)
(sh/sh "touch" simple-file)
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100)
(let [event (a/poll! resp-chan)]
(is (= (:kind event)
:entry-create))
(is (= (-> event :path Path/.toString)
simple-file)))
(a/>!! cancel-chan false)
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100)
(is (not (future-cancelled? (-> result
meta
:debug
:worker))))
(a/>!! cancel-chan true)
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100)
(is (true? (future-done? (-> result
meta
:debug
:worker))))
(is (true? (future-done? (-> result
meta
:debug
:sentinel))))
(is (true? @(-> result
meta
:debug
:stopped?))))))))
(deftest watch-subdir-test
(testing
(let [test-dir (str *test-dir* "/subdir-test")]
(.mkdirs (io/file test-dir))
(let [[resp cancel] (subject/watch test-dir)]
(try
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 200)
(.mkdirs (io/file (str test-dir "/s1/s2/s3")))
(.createNewFile (io/file (str test-dir "/s1/s2/s3/file")))
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 200)
(a/>!! cancel true)
(is (= (.toString (:path (last (a/<!! (a/into [] resp)))))
"test/resource/watch_service_test/subdir-test/s1/s2/s3/file"))
(finally
(doseq [file (reverse (file-seq (io/file test-dir)))]
(.delete file))
(a/>!! cancel true)))))))
(deftest watch-test2
(testing "(kinda) heavier test"
(let [test-dir (str *test-dir* "/test2")]
(.mkdirs (io/file test-dir))
(doseq [f (reverse (rest (file-seq (io/file test-dir))))]
(.delete f))
(try (let [[resp cancel] (subject/watch test-dir)
cancel-receiver (a/chan 1)
results-future (future
(loop [end? false
results (transient [])]
(if end?
(persistent! results)
(a/alt!!
resp ([value](recur false (conj! results value)))
cancel-receiver ([_] (recur true results))))))
simple-file (str test-dir "/demo")]
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100)
(doseq [i (range 0 10000)]
(is (true? (.createNewFile (io/file (str simple-file i))))))
(.sleep java.util.concurrent.TimeUnit/MILLISECONDS 1000)
(a/>!! cancel-receiver true)
(= 10000 (count @results-future)))
(finally
(doseq [f (rest (file-seq (io/file test-dir)))]
(.delete f)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment