Last active
December 11, 2025 18:46
-
-
Save imakira/4b537a13ecd8c1816427068d10777565 to your computer and use it in GitHub Desktop.
Clojure watching Directories/Files using jdk WatchService and core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns net.coruscation.cerulean.server.watch-service | |
| (:require | |
| [clojure.core.async :as a]) | |
| (:import | |
| [java.io File] | |
| [java.nio.file | |
| Path | |
| StandardWatchEventKinds | |
| WatchEvent | |
| WatchEvent$Kind | |
| WatchService])) | |
| (def ^:dynamic *chan-size* 512) | |
| (defn register [^Path path ^WatchService watch-service] | |
| (.register path | |
| watch-service | |
| (into-array WatchEvent$Kind | |
| [StandardWatchEventKinds/ENTRY_CREATE | |
| StandardWatchEventKinds/ENTRY_DELETE | |
| StandardWatchEventKinds/ENTRY_MODIFY | |
| StandardWatchEventKinds/OVERFLOW]))) | |
| (defn watch [& paths] | |
| (let [resp-chan (a/chan *chan-size*) | |
| cancel-chan (a/chan 1) | |
| stopped? (atom false) | |
| worker | |
| (future | |
| (let [paths (map (fn [p] | |
| (Path/of p (into-array String []))) | |
| paths) | |
| watch-service (-> (java.nio.file.FileSystems/getDefault) | |
| (.newWatchService))] | |
| (doseq [path paths] | |
| (doseq [^File subpath-path (file-seq (.toFile path))] | |
| (when (.isDirectory subpath-path) | |
| (register (.toPath subpath-path) | |
| watch-service)))) | |
| (try | |
| (while (not @stopped?) | |
| (let [watch-key (.take watch-service) | |
| events (.pollEvents watch-key) | |
| ^Path parent-dir (.watchable watch-key)] | |
| (doseq [^WatchEvent event events] | |
| (let [^WatchEvent$Kind kind (.kind event) | |
| ^Path event-path (.context event) | |
| ^Path resolved-path (.resolve parent-dir event-path)] | |
| (a/>!! resp-chan | |
| {:kind (case (.name kind) | |
| "ENTRY_CREATE" :entry-create | |
| "ENTRY_MODIFY" :entry-modify | |
| "ENTRY_DELETE" :entry-delete | |
| "OVERFLOW" :overflow) | |
| :path resolved-path}) | |
| (when (and (= kind StandardWatchEventKinds/ENTRY_CREATE) | |
| (.isDirectory (.toFile resolved-path))) | |
| (register resolved-path | |
| watch-service) | |
| (doseq [^File subpath-file (rest (file-seq (.toFile resolved-path)))] | |
| (when (.isDirectory subpath-file) | |
| (register (.toPath subpath-file) | |
| watch-service)) | |
| ;; without it | |
| ;; `watch-subdir-test` would fail | |
| (a/>!! resp-chan | |
| {:kind :entry-create | |
| :path (.toPath subpath-file)}))) | |
| (.reset watch-key))))) | |
| (catch InterruptedException _) | |
| (finally | |
| (.close watch-service))))) | |
| sentinel (future | |
| (while (not (a/<!! cancel-chan))) | |
| (reset! stopped? true) | |
| (future-cancel worker) | |
| (a/close! resp-chan) | |
| (a/close! cancel-chan))] | |
| (with-meta [resp-chan cancel-chan] | |
| {:debug {:worker worker | |
| :sentinel sentinel | |
| :stopped? stopped?}}))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns net.coruscation.cerulean.server.watch-service-test | |
| (:require | |
| [clojure.core.async :as a] | |
| [clojure.java.io :as io] | |
| [clojure.java.shell :as sh] | |
| [clojure.test :refer [deftest is testing]] | |
| [net.coruscation.cerulean.server.watch-service :as subject]) | |
| (:import | |
| [java.nio.file Path])) | |
| (def ^:dynamic *test-dir* "test/resource/watch_service_test") | |
| (.mkdirs (io/file *test-dir*)) | |
| (deftest watch-test | |
| (testing "simple test" | |
| (dotimes [n 5] | |
| (let [simple-file (str *test-dir* "/demo")] | |
| (.delete (io/file simple-file)) | |
| (let [result (subject/watch *test-dir*) | |
| [resp-chan cancel-chan] result] | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100) | |
| (sh/sh "touch" simple-file) | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100) | |
| (let [event (a/poll! resp-chan)] | |
| (is (= (:kind event) | |
| :entry-create)) | |
| (is (= (-> event :path Path/.toString) | |
| simple-file))) | |
| (a/>!! cancel-chan false) | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100) | |
| (is (not (future-cancelled? (-> result | |
| meta | |
| :debug | |
| :worker)))) | |
| (a/>!! cancel-chan true) | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100) | |
| (is (true? (future-done? (-> result | |
| meta | |
| :debug | |
| :worker)))) | |
| (is (true? (future-done? (-> result | |
| meta | |
| :debug | |
| :sentinel)))) | |
| (is (true? @(-> result | |
| meta | |
| :debug | |
| :stopped?)))))))) | |
| (deftest watch-subdir-test | |
| (testing | |
| (let [test-dir (str *test-dir* "/subdir-test")] | |
| (.mkdirs (io/file test-dir)) | |
| (let [[resp cancel] (subject/watch test-dir)] | |
| (try | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 200) | |
| (.mkdirs (io/file (str test-dir "/s1/s2/s3"))) | |
| (.createNewFile (io/file (str test-dir "/s1/s2/s3/file"))) | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 200) | |
| (a/>!! cancel true) | |
| (is (= (.toString (:path (last (a/<!! (a/into [] resp))))) | |
| "test/resource/watch_service_test/subdir-test/s1/s2/s3/file")) | |
| (finally | |
| (doseq [file (reverse (file-seq (io/file test-dir)))] | |
| (.delete file)) | |
| (a/>!! cancel true))))))) | |
| (deftest watch-test2 | |
| (testing "(kinda) heavier test" | |
| (let [test-dir (str *test-dir* "/test2")] | |
| (.mkdirs (io/file test-dir)) | |
| (doseq [f (reverse (rest (file-seq (io/file test-dir))))] | |
| (.delete f)) | |
| (try (let [[resp cancel] (subject/watch test-dir) | |
| cancel-receiver (a/chan 1) | |
| results-future (future | |
| (loop [end? false | |
| results (transient [])] | |
| (if end? | |
| (persistent! results) | |
| (a/alt!! | |
| resp ([value](recur false (conj! results value))) | |
| cancel-receiver ([_] (recur true results)))))) | |
| simple-file (str test-dir "/demo")] | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 100) | |
| (doseq [i (range 0 10000)] | |
| (is (true? (.createNewFile (io/file (str simple-file i)))))) | |
| (.sleep java.util.concurrent.TimeUnit/MILLISECONDS 1000) | |
| (a/>!! cancel-receiver true) | |
| (= 10000 (count @results-future))) | |
| (finally | |
| (doseq [f (rest (file-seq (io/file test-dir)))] | |
| (.delete f))))))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment