Skip to content

Instantly share code, notes, and snippets.

@gerritjvv
gerritjvv / gist:5955996
Created July 9, 2013 09:30
Count Frequencies of a column in a tsv file
(def column-n (fn [n sep] (fn [record] (let [s (str record) arr (clojure.string/split s sep)] (arr n)))))
(defn file-freq [file column sep]
(with-open [ rdr (clojure.java.io/reader file) ]
(-> (column-n column sep) (map (line-seq rdr)) frequencies)
)
)
@gerritjvv
gerritjvv / Buffered IO - DB - Stream PATTERN
Last active December 20, 2015 11:59
This pattern helps to convert a connected resource (i.e. a resource accessed via some kind of connection) into a lazy sequence. The idea is to fill a in-memory buffer with data from a connection that is opened and closed. Then consume of the buffer, and when the buffer is empty fill it again with another connection.
(defn buffered-select [f-select init-pos]
"Creates a lazy sequence of messages for this datasource"
(letfn [
(m-seq [buff pos]
(if-let [buff2 (if (empty? buff) (f-select pos) buff)]
(cons (first buff2) (lazy-seq (m-seq (rest buff2) (inc pos) )))
)
)
]
(m-seq nil init-pos)
@gerritjvv
gerritjvv / gist:6239976
Last active December 21, 2015 03:08
Clojure core.async schedule fixed delay actions
;Print "Hi" every 2 seconds:
(defn t []
(go (loop [] (<! (timeout 2000)) (prn "hi") (recur))))
;or as a macro
@gerritjvv
gerritjvv / gist:6508101
Created September 10, 2013 11:29
Connection as a Sequence sleep retry on nil Lets say you have a connection that you want to stream results from. The connection can also return zero data in which case you would want to sleep and retry until data is available. A complicated solution would be to do this with a loop recur but a more elegant pattern can be created using map and fil…
;for testing purposes lets simulate a connection that will return nil or 1
(defn connect-and-get [] (rand-nth [nil 1 1 1 nil 1 nil 1 nil]))
(def select-data (repeatedly connect-and-get))
(defn sleep-while-nil [ts s]
(filter (complement nil?) (map (fn [x] (if-not x (Thread/sleep ts)) x) s)))
(take 10 (sleep-while-nil 100 select-data))
@gerritjvv
gerritjvv / queue offer
Created November 2, 2013 22:54
A offer function that will timeout if the offer cannot be made
(use 'clojure.core.async)
(defn offer [ch msg timeout-ms]
"Blocks till the message can be placed on the queue and returns true otherwise returns false"
(not
(nil?
(first (alts!! [(go (>! ch msg) msg) (timeout timeout-ms)])))))
;;lets test
@gerritjvv
gerritjvv / buff-chan
Last active January 1, 2016 15:29
There are many times where you need to buffer up a series off results and then perform an operation on them, and if the count is not reached on a predefined timeout do the operation with the results collected. For my use case I'm writing a kafka producer and want messages received on the a send function to be buffered before sending to kafka. Th…
(require '[clojure.core.async :refer [go alts! >! <! >!! <!! chan timeout]])
(defn buffered-chan
"Reads from ch-source and if either timeout or the buffer-count has been
read the result it sent to the channel thats returned from this function"
([ch-source buffer-count timeout-ms]
(buffered-chan ch-source buffer-count timeout-ms 1))
([ch-source buffer-count timeout-ms buffer-or-n]
(let [ch-target (chan buffer-or-n)]
(go
@gerritjvv
gerritjvv / clojure async channel bridge
Created December 31, 2013 16:58
Simple thing to do, but requires repeated typing. The only thing it does is move from channel-a to channel-b. This function is implemented in https://github.com/gerritjvv/fun-utils
(require '[clojure.core.async :refer [chan go >! <! >!! <!!]])
(require '[clojure.core.async :as async])
(defn chan-bridge
([ch-source map-f ch-target]
"map map-f onto ch-source and copy the result to ch-target"
(chan-bridge (async/map map-f [ch-source]) ch-target))
([ch-source ch-target]
"in a loop read from ch-source and write to ch-target
this function returns inmediately and returns the ch-target"
@gerritjvv
gerritjvv / convert.sh
Last active January 25, 2016 20:39
Convert mkv to avi
for f in ./*.mkv; do
echo "convert $f"
ffmpeg -i "$f" -f avi -c:v mpeg4 -b:v 4000k -c:a libmp3lame -b:a 320k "${f%.*}.avi"
done
from mymem import cache_set, cache_get
########################
#### private functions
def chunk_seq(file_name, chunk_size):
'''
Create a lazy sequence for bytes each up to chunk_size (default 1048576)
'''
@gerritjvv
gerritjvv / kafka-clj.pool-impl.clj
Created June 15, 2016 17:15
Performance of ConcurrentLinkedQueue + Semaphore vs atom + persistent_vector for queue implementation
(ns
^{:doc "Concurrent keyed pool implementation using ConcurrentHashMap and ConcurrentLinkedQueue"}
kafka-clj.pool-impl
(:use criterium.core)
(:import (java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap Semaphore ExecutorService Executors TimeUnit)))
;; add [criterium "0.4.4"] to you're project.clj file
;; then use run-test-cc and run-test-a
;;
;; Results for both ConcurrentLinkedQueue + Semaphore and for atom + vector access is the same