Skip to content

Instantly share code, notes, and snippets.

@danlentz
Last active August 29, 2015 14:14
Show Gist options
  • Save danlentz/301a0f056ffd9c6605a9 to your computer and use it in GitHub Desktop.
Save danlentz/301a0f056ffd9c6605a9 to your computer and use it in GitHub Desktop.
Thread-safe Monotonic Clock for Clojure
(ns clock)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Lock-Free Thread-safe Monotonic Clock
;; Dan Lentz <http://github.com/danlentz>
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; We use epoch stamp (milliseconds) and zero-pad the low order bits
;; of millisecond stamp and provide atomic incremental
;; uuids-this- second subcounter on the low order bits, which
;; guarantee that two timestamps never collide regardless of clock
;; precision.
;;
;; 113914335216380000 (+ (* (epoch-time) 10000) 100103040000000000)
;; 113914335216380001 first contending timestamp
;; 113914335216380002 second contending timestamp
;; ... and so forth
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:const +subcounter-resolution+ 9999)
(deftype State [^short seqid ^long millis])
(let [-state- (atom (->State 0 0))]
(defn monotonic-time ^long []
(let [^State new-state
(swap! -state-
(fn [^State current-state]
(loop [time-now (System/currentTimeMillis)]
(if-not (= (.millis current-state) time-now)
(->State 0 time-now)
(let [tt (.seqid current-state)]
(if (< tt +subcounter-resolution+)
(->State (inc tt) time-now)
(recur (System/currentTimeMillis))))))))]
(+ (.seqid new-state)
(+ (* (.millis new-state) 10000) 100103040000000000)))))
@sbocq
Copy link

sbocq commented Feb 1, 2015

I liked the 3rd revision better because there was less plumbing. I noticed too it was not thread-safe and thought it was a good fit for the STM so I decided to give it a shot:

(defn monotonic-time ^long []
  (let [ts (fn []
             (+ (* (System/currentTimeMillis) 10000) 100103040000000000))]
    (dosync
     (let [^long time-now (ts)]
       (ensure last-time)
       (if (= time-now @last-time)
         (if (< @stamps-this-tick +tick-resolution+)
           (+ time-now (alter stamps-this-tick inc))
           (recur))
         (do
           (ref-set stamps-this-tick 0)
           (ref-set last-time time-now)
           time-now))))))

I didn't find a documented way to rollback/retry a transaction (I still have a lot to learn on Clojure) but it seems 'recur does the job because 'dosync wraps its @Body in a fn. I tested it using this simple test:

(let [cores 8
      rs (atom (vec (repeat cores nil)))
      run (fn [n]
            (fn []
              (let [r (repeatedly 10000 monotonic-time)]
                (swap! rs #(assoc % n r)))))
      ts (for [i (range cores)]
           (Thread. (run i)))]
  (doseq [t ts]
    (.start t))
  (doseq [t ts]
    (.join t))
  (let [r (apply concat @rs)]
    (assert (count r) (count (distinct r))))) 

and it passes on my dual-core FWIW.

@danlentz
Copy link
Author

danlentz commented Feb 1, 2015

Very interesting. My latest revision (I guess number 10 :) is thread safe and uses 'swap' which should be very efficient as compared with full blown STM. But also, are you sure that your 'recur' is really restarting the transaction efficiently rather than simply recurring back to the top of monotonic-time and starting a whole new transaction? It would be interesting to know its performance relative to the swap technique. @spocq

@sbocq
Copy link

sbocq commented Feb 1, 2015

This is what 'dosync does ultimately:

(runInTransaction (fn [] ~@body))

and according the definition of 'recur it can't jump beyond a fn so I'd conclude it is definitely restarting within the transaction.

I didn't really thought about performance. Indeed, the STM has certainly some more overhead. But in your latest version the whole body is executed within a locked region on the atom, which increases the chances to suspend threads in case of contention. But maybe this is just nitpicking because the perf is likely dominated by the syscall anyway... All I know is that making such microbenchmarks can be tricky :)

Here is an updated version with one deref:

(let [max-stamps 9999
      state (ref [0 0])]
  (defn monotonic-time ^long []
    (let [ts (fn []
               (+ (* (System/currentTimeMillis) 10000) 100103040000000000))]
      (dosync
       (let [^long time-now (ts)
             [^long last-time stamps] (ensure state)]
         (if (= time-now last-time)
           (if (< stamps max-stamps)
             (let [stamps' (inc stamps)]
               (ref-set state [last-time stamps'])
               (+ time-now stamps'))
             (recur))
           (do
             (ref-set state [time-now 0])
             time-now)))))))

P.S.: My github handle is @sbocq :)

@danlentz
Copy link
Author

danlentz commented Feb 2, 2015

Why would my implementation be locking? the whole purpose of swaps and atoms is lock-free concurrency.
Actually performance is of interest to me here because i use this monotonic clock to support very efficient concurrent generation of v1 uuid's: http://github.com/danlentz/clj-uuid

@sbocq

@sbocq
Copy link

sbocq commented Feb 2, 2015

Ah, you're right! I see in the doc that f might be retried multiple times in 'swap! and indeed the implementation uses a CAS operation.

Out of curiosity, I made a Java implementation:

import java.util.concurrent.atomic.AtomicReference;

public class Mono {

    private static final int MAX_STAMPS = 9999;

    private static final class Pair {
        public final long time;
        public final int stamps;

        public Pair(long time, int stamps) {
            this.time = time;
            this.stamps = stamps;
        }

        public final long now() {
            return time + stamps;
        }

        @Override
        public final boolean equals(Object other) {
            if (other == this) {
                return true;
            } else {
                Pair o = (Pair) other;
                return o.time == time && o.stamps == stamps;
            }
        }
    }

    private static final AtomicReference<Pair> state =
        new AtomicReference<>(new Pair(0,0));

    public static final long next() {
        for (;;) {
            long now =
                (System.currentTimeMillis() * 10000) + 100103040000000000L;

            Pair last = state.get();
            if (now == last.time) {
                if (last.stamps < MAX_STAMPS) {
                    Pair next = new Pair(now, last.stamps + 1);
                    if (state.compareAndSet(last, next))
                        return next.now();
                }
            } else {
                Pair next = new Pair(now, 0);
                if (state.compareAndSet(last, next))
                    return now;
            }
        }
    }
}

and did a quick dirty benchmark on a lein REPL with "-server" using:

(time (let [cores 8
            rs (atom (vec (repeat cores nil)))
            run (fn [n]
                  (fn []
                    (dotimes [_ 10000]
                      (Mono/next))
                    (swap! rs #(assoc % n n))))
            ts (for [i (range cores)]
                 (Thread. (run i)))]
        (doseq [t ts]
          (.start t))
        (doseq [t ts]
          (.join t))))

And here are the timings:

  • STM: 843ms
  • atom: 240ms
  • java: 150ms

The STM implementation is indeed 4 times slower but the Java one is about 1.5 times faster. I don't know if the Clojure code can be improved much beyond what you did. I haven't reached the chapter on Clojure's performance yet but at least I learned a few other things today :)

@danlentz

@danlentz
Copy link
Author

danlentz commented Feb 3, 2015

@sbocq this might make it a little more efficient

@sbocq
Copy link

sbocq commented Feb 3, 2015

@danlentz revision 12 runs also about the same speed (250ms) in my simple benchmark.

And without going through any hoops this idiomatic one takes also about 250ms!

(let [max-stamps 9999
      state (atom [0 0])]
  (defn monotonic-time ^long []
    (let [[now stamps] (swap! state
                              (fn [[last-time stamps]]
                                (let [now (System/currentTimeMillis)]
                                  (if (= now last-time)
                                    (if (< stamps max-stamps)
                                      [now (inc stamps)]
                                      (recur state))
                                    [now 0]))))]
      (+ (* now 10000) 100103040000000000 stamps))))

@danlentz
Copy link
Author

danlentz commented Feb 3, 2015

interesting discussion, @sbocq! Thanks for joining in!

@sbocq
Copy link

sbocq commented Feb 3, 2015

@danlentz Thanks! I'm looking forward for more ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment