Skip to content

Instantly share code, notes, and snippets.

@reborg
Created December 15, 2017 14:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reborg/2d9b31f2209f9b3afd1dc401b3dffc2c to your computer and use it in GitHub Desktop.
Save reborg/2d9b31f2209f9b3afd1dc401b3dffc2c to your computer and use it in GitHub Desktop.
split-by transducer
; Stats by burst with the split-by transducer.
; The input for this example is a (potentially large) collection of events.
; A simplified list of tab separated events would appear like the following:
; 2017-05-04T13:08:57Z\tmsg1
; 2017-05-04T13:10:52Z\tmsg2
; 2017-05-04T13:13:33Z\tmsg3
; 2017-05-04T23:14:10Z\tmsg4
; 2017-05-04T23:16:23Z\tmsg5
; Events are ordered by timestamp. We want the count of events (or other metrics) within a "burst".
; A burst is a group of contigous events so that there is never more than n-seconds between them.
; If two events are more than n-seconds apart, they belong to a different "burst" and we count them separately.
(require '[clojure.string :as s])
(import '[java.io BufferedReader FileReader Reader StringReader File]
'[java.time Duration Instant]
'[java.time.format DateTimeFormatter]
'[java.time.temporal TemporalAccessor])
(defn burst?
"Check if two timestamps are within BURST seconds apart."
[ts1 ts2 burst]
(let [^TemporalAccessor t1 (Instant/from (.parse (DateTimeFormatter/ISO_INSTANT) ts1))
^TemporalAccessor t2 (Instant/from (.parse (DateTimeFormatter/ISO_INSTANT) ts2))]
(<= (Math/abs (.getSeconds (Duration/between t2 t1 ))) burst)))
(defn split-by
"Splits the input each time f, a predicate of the
previous and current value, returns false."
[f]
(fn [rf]
(let [a (java.util.ArrayList.)]
(fn
([] (rf))
([result]
(rf (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
(.clear a)
(unreduced (rf result v))))))
([result input]
(if (or (.isEmpty a) (f (.get a (dec (.size a))) input))
(do (.add a input) result)
(let [v (vec (.toArray a))]
(.clear a)
(let [ret (rf result v)]
(when-not (reduced? ret) (.add a input))
ret))))))))
(defn process [^Reader r fsplit fmetrics]
(let [br (BufferedReader. r)
lines (line-seq br)]
(transduce
(comp
(map s/trim)
(remove s/blank?)
(map #(s/split % #"\t"))
(split-by fsplit)
(map fmetrics))
(completing conj! #(do (.close br) (persistent! %)))
(transient [])
lines)))
(def events "2017-05-04T13:08:57Z\tmsg1\n
2017-05-04T13:09:52Z\tmsg2\n
2017-05-04T13:11:03Z\tmsg3\n
2017-05-04T23:13:10Z\tmsg4\n
2017-05-04T23:13:23Z\tmsg5")
;; Count of events for each burst. A burst end when the next event happens after 2 minutes.
(process
(StringReader. events)
#(burst? (get %1 0) (get %2 0) 120)
count)
;; [3 2]
; Now up for a more interesting challenge. Let's grab the:
; curl -O http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-1K.tar.gz; tar xvfz lastfm-dataset-1K.tar.gz
; lastfm dataset of played tracks, a ~20GB, user-grouped, timestamped tsv.
; We want to see which listening session has the highest number of tracks. A listening session ends if there
; are no more tracks played after 3 hours from the last. This should take less than a 1 min to run.
(let [threehours (* 60 60 3)
splitf #(burst? (get %1 1) (get %2 1) threehours)
events (FileReader. (File. "lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv"))]
(->>
(process events splitf count)
(sort >)
(take 10)))
; (11211 10310 8046 7258 7106 6751 6230 6181 5736 5360))
; The longest uninterrupted (within 3 hours) session had 11211 played songs. By tweaking "fmetrics" you can go deeper into
; the analysis, for example checking frequencies of songs played and so on.
@matanox
Copy link

matanox commented Dec 16, 2017

Cool stuff. Why the transient and how would the conj! be used by the transducing process?

@reborg
Copy link
Author

reborg commented Jan 25, 2018

Sorry didn't your comment before @matanster . The transient is there for performances, as a vector is gradually built by transduce and no stages of that building process are public. So transduce can go transient internally and go back to persistent as the last step. conj! is there because the reducing function receives a transient vector as accumulator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment