Skip to content

Instantly share code, notes, and snippets.

View lbradstreet's full-sized avatar

Lucas Bradstreet lbradstreet

View GitHub Profile
(ns onyx.peer.serializer-test
(:require [taoensso.nippy :as nippy])
(:import [org.agrona.concurrent UnsafeBuffer]))
(deftype LazyVal [^UnsafeBuffer key-buffer ^UnsafeBuffer val-buffer val-cached])
(def vs (object-array [(->LazyVal (UnsafeBuffer. ^bytes (nippy/fast-freeze 5))
(UnsafeBuffer. ^bytes (nippy/fast-freeze {:a :7}))
nil)
(->LazyVal (UnsafeBuffer. ^bytes (nippy/fast-freeze [:a :b]))
(ns onyx.peer.serializer-test
(:require [taoensso.nippy :as nippy])
(:import [org.agrona.concurrent UnsafeBuffer]))
(deftype LazyVal [^UnsafeBuffer key-buffer ^UnsafeBuffer val-buffer val-cached])
(def vs (object-array [(->LazyVal (UnsafeBuffer. ^bytes (nippy/fast-freeze 5))
(UnsafeBuffer. ^bytes (nippy/fast-freeze {:a :7}))
nil)
(->LazyVal (UnsafeBuffer. ^bytes (nippy/fast-freeze [:a :b]))
(ns ^:no-doc onyx.messaging.int-object-map
(:import [org.agrona.collections Int2ObjectHashMap
Int2ObjectHashMap$KeyIterator Int2ObjectHashMap$EntryIterator]
[clojure.lang ILookup Associative IPersistentMap Counted ISeq]))
(defprotocol IInt2ObjectMap
(clone [this]))
(deftype IntObjectMap [^Int2ObjectHashMap m]
clojure.lang.IPersistentMap
(defn after-batch [{:keys [onyx.core/results onyx.core/task] :as event} lifecycle]
(let [tree-new (mapv (fn [{:keys [root leaves] :as message}]
(assoc message
:leaves
(mapv (fn [leaf]
(let [input-segment-context (:onyx/context (:message root))
output-segment-context (:onyx/context (:message leaf))
context-path {:path (conj (vec (:path input-segment-context)) task)}]
(assoc-in leaf
[:message :onyx/context]
(defn my-inc [{:keys [n] :as segment}]
(assoc segment :n (inc n)))
(defn after-batch [{:keys [onyx.core/results onyx.core/task] :as event} lifecycle]
(let [additional-context task
tree-new (mapv (fn [{:keys [root leaves] :as message}]
(assoc message
:leaves
(mapv (fn [leaf]
(let [input-segment-context (:context (:message root))]
Command `n Control file /var/folders/d8/6x6y27ln2f702g56jzzh9h780000gn/T/aeron-lucas/cnc.dat
18:58:12 - Aeron Stat
=========================
0: 0 - Bytes sent
1: 0 - Bytes received
2: 0 - Failed offers to ReceiverProxy
3: 0 - Failed offers to SenderProxy
4: 0 - Failed offers to DriverConductorProxy
5: 0 - NAKs sent
6: 0 - NAKs received
23 value 0
27 value 0
31 value 0
32 value 0
33 value 0
37 value 0
38 value 0
50 value 0
54 value 0
61 value 0
^[[H^[[2J01:21:40 - Aeron Stat
=========================
0: 1,239,712 - Bytes sent
1: 1,219,616 - Bytes received
2: 0 - Failed offers to ReceiverProxy
3: 0 - Failed offers to SenderProxy
4: 0 - Failed offers to DriverConductorProxy
5: 0 - NAKs sent
6: 0 - NAKs received
7: 1,156 - Status Messages sent
22:24:54 - Aeron Stat
22:24:55 - Aeron Stat
22:24:56 - Aeron Stat
22:24:57 - Aeron Stat
22:24:58 - Aeron Stat
22:24:59 - Aeron Stat
22:25:00 - Aeron Stat
27: 0 - pub-lmt: 3 -1178955183 741114144 aeron:udp?endpoint=localhost:40002
28: 0 - snd-pos: 3 -1178955183 741114144 aeron:udp?endpoint=localhost:40002
22:25:01 - Aeron Stat
This file has been truncated, but you can view the full file.
Command `n Control file /private/var/folders/d8/6x6y27ln2f702g56jzzh9h780000gn/T/aeron-lucas/cnc.dat
19:10:03 - Aeron Stat
=========================
0: 480 - Bytes sent
1: 64 - Bytes received
2: 0 - Failed offers to ReceiverProxy
3: 0 - Failed offers to SenderProxy
4: 0 - Failed offers to DriverConductorProxy
5: 0 - NAKs sent
6: 0 - NAKs received