Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created December 8, 2021 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hiredman/c6868603eb9bf3620f2b89acfaef623e to your computer and use it in GitHub Desktop.
Save hiredman/c6868603eb9bf3620f2b89acfaef623e to your computer and use it in GitHub Desktop.
;; single file web app
;; run the following to start the process
;; clj -J-Xmx500M -J-XX:-OmitStackTraceInFastThrow -J-Dclojure.server.repl="{:port 9457 :accept clojure.core.server/repl}" -Sdeps '{:deps {net.sourceforge.htmlunit/htmlunit {:mvn/version "2.53.0"} org.clojure/data.json {:mvn/version "2.4.0"} com.github.seancorfield/next.jdbc {:mvn/version "1.2.731"} org.postgresql/postgresql {:mvn/version "42.2.24"} org.clojure/clojurescript {:mvn/version "1.10.879"} io.undertow/undertow-core {:mvn/version "2.2.12.Final"} reagent/reagent {:mvn/version "1.1.0"} cljsjs/react {:mvn/version "17.0.2-0"} cljsjs/react-dom {:mvn/version "17.0.2-0"} com.cognitect/transit-clj {:mvn/version "1.0.324"} com.cognitect/transit-cljs {:mvn/version "0.8.269"}}}' -M -i src/packages.cljc -e '(packages/main)'
(ns packages
#?(:cljs (:require-macros [packages :as m]))
(:require
#?@(:clj ([clojure.data.json :as json]
[next.jdbc :as jdbc]
[cljs.build.api :as cljs]
[clojure.java.io :as io]
[clojure.repl :refer :all]
[clojure.data :refer [diff]])
:cljs ([reagent.core :as r]
[reagent.dom :as rdom]))
[cognitect.transit :as transit])
#?(:clj (:import (io.undertow.util Headers)
(io.undertow.websockets.core AbstractReceiveListener
WebSockets
WebSocketChannel
BufferedTextMessage)
(io.undertow.websockets WebSocketProtocolHandshakeHandler
WebSocketConnectionCallback)
(io.undertow Undertow)
(io.undertow.server HttpHandler
HttpServerExchange)
(java.util UUID)
(java.util.logging Logger
Level
ConsoleHandler
SimpleFormatter
LogRecord)
(java.io File)
(java.nio ByteBuffer)
(java.time Instant)
(java.time.format DateTimeFormatter)
(java.util.concurrent TimeUnit
Executor
SubmissionPublisher
Flow$Subscription
Flow$Subscriber
CompletableFuture
ScheduledExecutorService
Executors)
(java.net URLEncoder
URL)
(com.gargoylesoftware.htmlunit WebClient
BrowserVersion))))
(comment
;; remote repls without the source file and can load it like this
(clojure.lang.Compiler/load
(java.io.StringReader. (slurp ""))
"packages.cljc"
"packages.cljc")
)
(def port 9456)
#?(:clj (def carrier-url nil))
(defmulti carrier-url (fn [carrier tracking] carrier))
(defmethod carrier-url "fedex" [carrier tracking]
(str "https://www.fedex.com/fedextrack/?trknbr=" tracking))
(defmethod carrier-url "ups" [carrier tracking]
(str "https://www.ups.com/track?loc=en_US&tracknum=" tracking))
(defmethod carrier-url "usps" [carrier tracking]
(str "https://tools.usps.com/go/TrackConfirmAction?tLabels=" tracking))
(defmethod carrier-url "dhl" [carrier tracking]
(str "https://webtrack.dhlglobalmail.com/?trackingnumber=" tracking))
#?(:clj
(do
(alias 'm (ns-name *ns*))
(defonce ^Logger logger (doto (Logger/getLogger "clojure")
(.setUseParentHandlers false)
(.addHandler
(doto (ConsoleHandler.)
(.setLevel Level/ALL)
(.setFormatter
(proxy [SimpleFormatter] []
(format [^LogRecord record]
(let [sb (StringBuilder.)]
(.append sb "#:log{")
(.append sb ":z ")
(.append sb (pr-str (str (Instant/ofEpochMilli (.getMillis record)))))
;; (.append sb " :b ")
;; (.append sb (format "%02d" (.getSequenceNumber record)))
;; (.append sb " :c ")
;; (.append sb (format "%02d" (.getThreadID record)))
(.append sb " :v :")
(.append sb (.toLowerCase (.getName (.getLevel record))))
(.append sb " :n ")
(.append sb (.getSourceClassName record))
(.append sb " :l ")
(let [x (.getSourceMethodName record)]
(if (empty? x)
(.append sb "nil")
(.append sb (.getSourceMethodName record))))
(.append sb " :m ")
(.append sb (pr-str (.getMessage record)))
(doseq [p (seq (.getParameters record))
:when (map? p)
[k v] (seq p)]
(doto sb
(.append " ")
(cond->
(namespace k) (.append (pr-str k))
(not (namespace k)) (-> (.append ":_/") (.append (name k))))
(.append " ")
(.append (pr-str v))))
(when-let [t (.getThrown record)]
(.append sb " :thrown \n")
(.append sb (pr-str t)))
(.append sb "}\n")
(str sb)))))))))
(defmacro def-levels [level-lookup]
`(do
~@(for [level '[severe warning info config fine finer finest]]
`(def ~(symbol (str "log-level-" (name level)))
(~(first level-lookup)
~(name level)
~@(rest level-lookup))))))
(def-levels (-> .toUpperCase Level/parse))
(doseq [level '[severe warning info config fine finer finest]]
(intern *ns*
level
(fn f
([&form &env msg]
(f &form &env msg {}))
([&form &env msg parameters]
(let [ns (name (ns-name *ns*))]
`(let [p# ~parameters]
(if (map? p#)
(log-context ~(symbol (str "log-level-" (name level)))
~ns
~(str (:line (meta &form)))
(print-str ~msg)
p#)
(log-error ~(symbol (str "log-level-" (name level)))
~ns
~(str (:line (meta &form)))
(print-str ~msg)
^Throwable p#)))))))
(.setMacro ^clojure.lang.Var (ns-resolve *ns* level)))
(defn log-context [^Level level ^String ns ^String line ^String msg ^Object context]
(.logp logger level ns line msg context))
(defn log-error [^Level level ^String ns ^String line ^String msg ^Throwable error]
(.logp logger level ns line msg error))
)
:cljs
(do
;; TODO needs improvement
(m/def-levels ({"severe" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))
"warning" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))
"info" (fn [ns line msg obj context?]
(if context?
(let [sb (new js/Array)]
(.push sb "#:log{")
(.push sb " :v :info")
(.push sb " :n ")
(.push sb ns)
(.push sb " :l ")
(.push sb line)
(.push sb " :m ")
(.push sb (pr-str msg))
(doseq [[k v] obj]
(.push sb " ")
(if (namespace k)
(.push sb (pr-str k))
(doto sb
(.push ":_/")
(.push (name k))))
(.push sb " ")
(.push sb (pr-str v)))
(.push sb "}")
(.info js/console (.join sb " ")))
(prn ns line msg obj)))
"config" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))
"fine" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))
"finer" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))
"finest" (fn [ns line msg obj context?]
(if context?
(prn ns line msg obj)
(prn ns line msg obj)))}))
(defn log-context [level ns line msg context]
(level ns line msg context true))
(defn log-error [level ns line msg context]
(level ns line msg context false))
))
#?(:clj
(do
(set! *warn-on-reflection* true)
;; the logging macros depend on form meta data from the reader,
;; which is missing for macro generated forms, so this gets
;; weird.
(defmacro log-time
([form]
(let [start (gensym)
result (gensym)]
(list
(list `fn [start result]
(with-meta (list `info "elapsed time" {:log/time `(/ (- (System/nanoTime) ~start) 1e6)}) (meta &form))
result)
'(System/nanoTime)
form)))
([m form]
(let [start (gensym)
result (gensym)]
(list
(list `fn [start result]
(with-meta (list `info "elapsed time" (assoc m :log/time `(/ (- (System/nanoTime) ~start) 1e6))) (meta &form))
result)
'(System/nanoTime)
form))))
(defonce file *file*)
(defonce state (atom {}))
(defonce most-recent-error (atom nil))
(def requests-count (atom 0))
(defonce ^ScheduledExecutorService sched (Executors/newScheduledThreadPool 1))
(defonce ^SubmissionPublisher pub (SubmissionPublisher.))
(def ds (jdbc/get-datasource "jdbc:postgresql://192.168.1.229/postgres?user=postgres"))
(comment
(->> (jdbc/execute! ds ["select * from package_event order by package_id,at"])
(partition-by :package_event/package_id)
(mapcat (fn [x] (partition-all 2 1 (map (juxt (comp bigdec #(/ (Math/round (* % 10)) 10) :package_event/lat)
(comp bigdec #(/ (Math/round (* % 10)) 10) :package_event/lon)) x))))
(filter #(= 2 (count %)))
(reduce (fn [s [a b]] (update-in s [a b] (fnil inc 0))) {}))
)
(defn package-data []
(info "package data")
(log-time
{:what :package-data}
(jdbc/execute! ds ["
with
most_recent_evts as (select * from package_event where package_event.at in (select max(at) from package_event group by package_id) and package_event.id in (select max(id) from package_event group by package_id, at)),
earliest_evts as (select * from package_event where package_event.at in (select min(at) from package_event group by package_id) and package_event.id in (select min(id) from package_event group by package_id, at))
select
package.id,
carrier,
earliest_evts.place as first_place,
earliest_evts.at as first_date,
most_recent_evts.place as last_place,
most_recent_evts.at as last_date,
most_recent_evts.status as last_status,
tracking,
active,
case
when earliest_evts.place = most_recent_evts.place then 0
else
(asin(
sqrt(
pow(sin(((earliest_evts.lat - most_recent_evts.lat) * PI())/180/2), 2) +
pow(sin(((earliest_evts.lon - most_recent_evts.lon) * PI())/180/2), 2) *
cos((most_recent_evts.lat * PI())/180) * cos((earliest_evts.lat * PI())/180))) * 2 * 6371) /
((extract(epoch from most_recent_evts.at) - extract(epoch from earliest_evts.at)) / 60 / 60)
end kph,
extract(days from now() - most_recent_evts.at) days_ago
from package
left join most_recent_evts
on most_recent_evts.package_id = package.id
left join earliest_evts
on earliest_evts.package_id = package.id
order by active desc, most_recent_evts.at desc;
"])))
(defonce timed-publish ((fn f [s]
(.schedule s (partial f s) (* 1000 60) TimeUnit/MILLISECONDS)
(when (pos? (.getNumberOfSubscribers pub))
(.submit pub (package-data))))
sched))
(defn geocode [place]
(json/read-str
(slurp
(URL.
(format "https://nominatim.openstreetmap.org/search?q=%s&format=json"
(URLEncoder/encode place "utf-8"))))))
(defn timezone-offset [lat lon]
(get (->> (json/read-str
(slurp
(URL.
(format "https://api.teleport.org/api/locations/%s,%s/?embed=location:nearest-cities/location:nearest-city/city:timezone/tz:offsets-now"
lat
lon))))
(tree-seq coll? seq)
(filter map?)
(filter #(contains? % "total_offset_min"))
(first))
"total_offset_min"))
(defmulti apply-to-carrier-events (fn [carrier tracking f] carrier))
(defmethod apply-to-carrier-events "fedex" [carrier tracking f]
(with-open [wc (WebClient.)]
(let [page (.getPage wc (carrier-url carrier tracking))
_ (Thread/sleep (* 60 1000))
_ (doseq [button (.getElementsByTagName page "button")
:when (= (.asText (.getFirstChild button))
"Expand History")]
(.click button))
_ (Thread/sleep 1000)]
(f
(tree-seq
(comp seq #(.getChildren %))
(comp seq #(.getChildren %))
page)))))
(defmethod apply-to-carrier-events "ups" [carrier tracking f]
(with-open [wc (WebClient. BrowserVersion/FIREFOX)]
(-> wc
(.getOptions)
(.setThrowExceptionOnScriptError false))
(let [page (.getPage wc (carrier-url carrier tracking))
_ (Thread/sleep (* 60 1000))
_ (.click (.getElementById page "st_App_View_Details"))]
(f
(tree-seq
(comp seq #(.getChildren %))
(comp seq #(.getChildren %))
(first (.getElementsByTagName (first (.getElementsByTagName page "ups-shipment-progress")) "table")))))))
(defmethod apply-to-carrier-events "usps" [carrier tracking f]
(info "a" {})
(with-open [wc (WebClient. BrowserVersion/FIREFOX)]
(-> wc
(.getOptions)
(.setThrowExceptionOnScriptError false))
(info "b" {:carrier carrier :tracking tracking})
(let [page (.getPage wc (carrier-url carrier tracking))
_ (info "c" {})
_ (Thread/sleep (* 60 1000))
page (-> page .getEnclosingWindow .getEnclosedPage)
_ (info "d" {})
_ (doseq [elem (.getElementsByTagName page "a")
:when (some-> elem
(.getAttributes)
(get "class")
(.getNodeValue)
(.contains "see-all"))]
(.click elem))
_ (Thread/sleep (* 5 1000))]
(f
(-> page
(.getElementById "trackingHistory_1")
(.getChildren)
(->> (filter #(= "div" (.getNodeName %))))
(first)
(.getChildren)
(->> (filter #(= "div" (.getNodeName %))))
(first)
(.getChildren)
(->> (filter #(= "div" (.getNodeName %))))
(first)
(.getChildren))))))
(defmethod apply-to-carrier-events "dhl" [carrier tracking f]
(with-open [wc (WebClient. BrowserVersion/FIREFOX)]
(-> wc
(.getOptions)
(.setThrowExceptionOnScriptError false))
(let [page (.getPage wc (carrier-url carrier tracking))]
(f
(seq
(.getChildren
(first
(filter
#(and (some-> %
(.getAttributes)
(get "class")
(.getNodeValue)
(.contains "timeline"))
(= "ol" (.getNodeName %)))
(tree-seq
(comp seq #(.getChildren %))
(comp seq #(.getChildren %))
page)))))))))
(def observe-carrier nil)
(defmulti observe-carrier (fn [id carrier tracking] carrier))
(defmethod observe-carrier "fedex" [id carrier tracking]
(apply-to-carrier-events
carrier
tracking
(fn [nodes]
(reduce
(fn [accum elem]
(cond (some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__scan-event-date-row"))
(assoc accum :date (some-> elem .getFirstChild .getFirstChild .asText))
(some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__scan-event-details-row"))
(let [place (first (for [elem (.getChildren elem)
:when (some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__location"))]
(-> elem .asText)))
place-data (when (seq place)
(or (get-in accum [:places place])
(for [pd (geocode place)
:let [minutes-offset
(timezone-offset (-> pd (get "lat")) (-> pd (get "lon")))]]
(assoc pd :tz-minutes-offset minutes-offset))))
time (first (for [elem (.getChildren elem)
:when (some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__time-stamp"))]
(-> elem .asText)))
date (:date accum)
tz-minutes-offset (:tz-minutes-offset (first place-data))
_ (Thread/sleep 1000)]
(if-not (seq place)
accum
(do
(try
(jdbc/execute-one!
ds
["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
id
(Double/parseDouble (-> place-data first (get "lat")))
(Double/parseDouble (-> place-data first (get "lon")))
tz-minutes-offset
place
(when tz-minutes-offset
(java.sql.Timestamp.
(.toEpochMilli
(Instant/from
(.parse (DateTimeFormatter/ofPattern "EEEE, LLLL d, y h:m a XXXX")
(format "%s %s %+03d%02d" date time (quot tz-minutes-offset 60)
(Math/abs (rem tz-minutes-offset 60))))))))
(first (for [elem (.getChildren elem)
:when (some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__status-and-details"))
elem (.getChildren elem)
:when (some-> (.getAttributes elem)
(get "class")
(.getNodeValue)
(.contains "travel-history-table__event-status"))]
(-> elem .asText)))])
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops" t)))
(-> accum
(assoc-in [:places place] place-data)))))
:else
accum))
{}
nodes))))
(defmethod observe-carrier "ups" [id carrier tracking]
(apply-to-carrier-events
carrier
tracking
(fn [nodes]
(reduce
(fn [accum elem]
(cond (and (= "td" (.getNodeName elem))
(nil? (:time accum)))
(let [[date time] (map
#(.getTextContent %)
(filter
#(= "#text" (.getNodeName %))
(.getChildren elem)))]
(assoc accum :date date :time time))
(and (= "td" (.getNodeName elem))
(:time accum))
(do
(let [[status place] (->> (tree-seq (comp seq #(.getChildren %)) (comp seq #(.getChildren %)) elem)
(filter #(= "#text" (.getNodeName %)))
(map #(some-> % .getTextContent .trim))
(filter seq))]
(assert status)
(assert place)
(if (= status "Label Created")
(assoc accum :time nil)
(let [pd (when (seq place)
(or (get-in accum [:places place])
(for [pd (geocode place)
:let [minutes-offset (timezone-offset (-> pd (get "lat"))
(-> pd (get "lon")))]]
(assoc pd :tz-minutes-offset minutes-offset))))]
(try
(jdbc/execute-one!
ds
["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
id
(Double/parseDouble (-> pd first (get "lat")))
(Double/parseDouble (-> pd first (get "lon")))
(:tz-minutes-offset (first pd))
place
(when (:tz-minutes-offset (first pd))
(java.sql.Timestamp.
(.toEpochMilli
(Instant/from
(.parse (DateTimeFormatter/ofPattern "MM/dd/yyyy h:m a XXXX")
(-> (format "%s %s %+03d%02d"
(:date accum)
(:time accum)
(quot (-> pd first :tz-minutes-offset) 60)
(Math/abs (rem (-> pd first :tz-minutes-offset) 60))
)
(.replaceAll "P.M." "PM")
(.replaceAll "A.M." "AM")))))))
status])
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops" t)))
(-> accum
(assoc-in [:places place] pd)
(assoc :time nil))))))
:else
accum))
{}
nodes))))
;; (re-find #"\w+ \d+, \d+," "November 20, 2021,\n\t\t\t\t\t\t\t\t\t8:26 pm")
;; (re-find #"\w+,? \w\w" "INDIANAPOLIS IN LOGISTICS CENTER")
;; (re-find #"\w+,? \w\w (\d+)?" "INDIANAPOLIS IN LOGISTICS CENTER")
(defmethod observe-carrier "usps" [id carrier tracking]
(apply-to-carrier-events
carrier
tracking
(fn [nodes]
(reduce
(fn [accum elem]
(cond (and (= "span" (.getNodeName elem))
(some #{"strong"} (->> elem .getChildren (map #(.getNodeName %))))
(> (-> elem
.getChildren
(->> (filter #(= "strong" (.getNodeName %))))
first
.getTextContent
.trim
(.split "\n")
(count))
1))
(let [[date raw-time] (-> elem
.getChildren
(->> (filter #(= "strong" (.getNodeName %))))
first
.getTextContent
.trim
(.split "\n"))
time (.trim raw-time)]
(assoc accum :date date :time time))
(and (= "span" (.getNodeName elem))
(:time accum)
(not (:status accum)))
(assoc accum :status (.trim (.getTextContent elem)))
(and (= "span" (.getNodeName elem))
(:time accum)
(:status accum))
(let [place (.trim (.getTextContent elem))
state (:status accum)]
(let [pd (when (seq place)
(or (get-in accum [:places place])
(seq (for [pd (geocode place)
:let [minutes-offset (timezone-offset (-> pd (get "lat"))
(-> pd (get "lon")))]]
(assoc pd :tz-minutes-offset minutes-offset)))
(seq (for [pd (geocode (re-find #"\w+,? \w\w " place))
:let [minutes-offset (timezone-offset (-> pd (get "lat"))
(-> pd (get "lon")))]]
(assoc pd :tz-minutes-offset minutes-offset)))))]
(try
(jdbc/execute-one!
ds
["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
id
(Double/parseDouble (-> pd first (get "lat")))
(Double/parseDouble (-> pd first (get "lon")))
(:tz-minutes-offset (first pd))
place
(when (:tz-minutes-offset (first pd))
(java.sql.Timestamp.
(.toEpochMilli
(Instant/from
(.parse (DateTimeFormatter/ofPattern "MMMM d, yyyy, h:m a XXXX")
(-> (format "%s %s %+03d%02d"
(:date accum)
(:time accum)
(quot (-> pd first :tz-minutes-offset) 60)
(Math/abs (rem (-> pd first :tz-minutes-offset) 60)))
(.replaceAll "am" "AM")
(.replaceAll "pm" "PM")))))))
(:status accum)])
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops"t)))
(-> accum
(dissoc :time :status)
(assoc-in [:places place] pd))))
(= "hr" (.getNodeName elem))
(empty accum)
:else
accum))
{}
nodes))))
(defmethod observe-carrier "dhl" [id carrier tracking]
(apply-to-carrier-events
carrier
tracking
(fn [nodes]
(reduce
(fn [accum elem]
(cond (and (= "li" (.getNodeName elem))
(some-> elem
(.getAttributes)
(get "class")
(.getNodeValue)
(.contains "timeline-date")))
(assoc accum :date (.asText elem))
(and (:date accum)
(= "li" (.getNodeName elem))
(some-> elem
(.getAttributes)
(get "class")
(.getNodeValue)
(.contains "timeline-event")))
(let [[time-a time-b] (some-> elem
(.getChildren)
(->> (filter #(some-> %
(.getAttributes)
(.get "class")
(.getNodeValue)
(.contains "timeline-time"))))
(first)
(.asText)
(.split "\n"))
[location] (for [c (.getChildren elem)
:when (some-> c
(.getAttributes)
(.get "class")
(.getNodeValue)
(.contains "timeline-unit"))
c (.getChildren c)
:when (some-> c
(.getAttributes)
(.get "class")
(.getNodeValue)
(.contains "timeline-location"))]
(.trim (.asText c)))
[description] (for [c (.getChildren elem)
:when (some-> c
(.getAttributes)
(.get "class")
(.getNodeValue)
(.contains "timeline-unit"))
c (.getChildren c)
:when (some-> c
(.getAttributes)
(.get "class")
(.getNodeValue)
(.contains "timeline-description"))]
(.trim (.asText c)))
pd (when (seq location)
(or (get-in accum [:places location])
(for [pd (geocode location)
:let [minutes-offset (timezone-offset (-> pd (get "lat"))
(-> pd (get "lon")))]]
(assoc pd :tz-minutes-offset minutes-offset))))]
(if (and time-a
time-b
location
description)
(let [[am-pm _] (.split time-b " ")]
(try
(jdbc/execute-one!
ds
["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
id
(Double/parseDouble (-> pd first (get "lat")))
(Double/parseDouble (-> pd first (get "lon")))
(:tz-minutes-offset (first pd))
location
(when (:tz-minutes-offset (first pd))
(java.sql.Timestamp.
(.toEpochMilli
(Instant/from
(.parse (DateTimeFormatter/ofPattern "MMM d, yyyy h:m a XXXX")
(format "%s %s %s %+03d%02d"
(:date accum)
time-a
am-pm
(quot (-> pd first :tz-minutes-offset) 60)
(Math/abs (rem (-> pd first :tz-minutes-offset) 60))))))))
description])
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops"t)))
(assoc-in accum [:places location] pd))
(assoc-in accum [:places location] pd)))
:else
(do
#_(prn elem)
accum)))
{}
nodes))))
(defn observe []
(transduce
identity
(completing
(fn [n {:package/keys [id carrier tracking]}]
(try
(observe-carrier id carrier tracking)
(inc n)
(catch Throwable t
(info (print-str id carrier tracking))
(reset! most-recent-error t)
(severe "whoops" t)
n))))
0
(jdbc/execute! ds ["select * from package where active = true"])))
(def ^CompletableFuture page (CompletableFuture.))
;; dumps a lot of management bean stuff out in a format that
;; prometheus(https://prometheus.io/) can digest
(defn metrics-data []
(let [baos (java.io.ByteArrayOutputStream.)]
(loop [s nil
e {:result (java.util.HashMap.)}
c (java.lang.management.ManagementFactory/getPlatformMBeanServer)
d nil]
(cond (instance? javax.management.MBeanServerConnection c)
(let [x (.queryNames (java.lang.management.ManagementFactory/getPlatformMBeanServer) nil nil)
e (assoc e :con c)]
(recur s e nil (concat (for [i x] [s e i]) d)))
(instance? javax.management.ObjectName c)
(if (contains? e c)
(recur s e nil d)
(let [info (.getMBeanInfo (:con e) c)
atttibutes (.getAttributes info)
e (assoc e :info info :name c c true)
s (conj s (-> c (.getDomain)))]
(recur s e nil (concat (for [i atttibutes] [s e i]) d))))
(instance? javax.management.MBeanAttributeInfo c)
(let [^javax.management.MBeanAttributeInfo c c]
(if (.isReadable c)
(let [[attr] (.getAttributes (:con e) (:name e) (into-array String [(.getName c)]))
e (assoc e :attribute-info c)
s (conj s (-> c (.getName)))]
(recur s e attr d))
(recur s e nil d)))
(instance? javax.management.Attribute c)
(let [^javax.management.Attribute c c]
(recur s e (.getValue c) d))
(and c (.isArray (class c)))
(recur s e nil (concat (for [i c] [s e i]) d))
(instance? java.util.Map c)
(recur s e nil (concat (for [[k v] c] [(into s k) e v]) d))
(instance? javax.management.openmbean.CompositeData c)
(let [^javax.management.openmbean.CompositeData c c]
(recur s e nil (concat (for [k (seq (.keySet (.getCompositeType c)))] [(conj s k) e (.get c k)]) d)))
(string? c)
(recur s e nil d)
(boolean? c)
(recur s e nil d)
(number? c)
(do
(.put ^java.util.Map (:result e)
(->> s
(map (fn [x]
(-> (str x)
(.toLowerCase)
(.replaceAll " " "_")
(.replaceAll "-" "_")
(.replaceAll "\\." "_")
(.replaceAll "'" ""))))
(reverse)
(interpose "_")
(apply str))
c)
(recur s e nil d))
(and (nil? c) d)
(let [[[s e c] & d] d]
(recur s e c d))
(and (nil? c) (nil? d))
(doseq [[k v] (:result e)]
(.write baos (.getBytes (str k)))
(.write baos (.getBytes " "))
(.write baos (.getBytes (str (double v))))
(.write baos (.getBytes "\n")))
:else
(do
(prn c)
(prn (supers (class c)))
(assert nil))))
(.write baos (.getBytes (format "http_request_count %s\n" @requests-count)))
(.write baos (.getBytes (format "undertow_channel_count %s\n" (-> @state :channels count))))
(-> baos
(.toByteArray)
(ByteBuffer/wrap))))
(def handle-request* nil)
(defmulti handle-request* (fn [state ^HttpServerExchange exchange] (.getRequestPath exchange)))
(defmethod handle-request* "/metrics" [state ^HttpServerExchange exchange]
(doto exchange
(-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
(-> .getResponseSender (.send ^ByteBuffer (metrics-data)))))
(defmethod handle-request* "/source" [state ^HttpServerExchange exchange]
(.dispatch exchange
(fn []
(doto exchange
(-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
(-> .getResponseSender (.send (slurp file) io.undertow.io.IoCallback/END_EXCHANGE))))))
(defmethod handle-request* :default [state ^HttpServerExchange exchange]
(.dispatch
exchange
(fn []
(.whenCompleteAsync
page
(reify
java.util.function.BiConsumer
(accept [_ t u]
(let [^ByteBuffer t t]
(when u
(severe "error" u))
(doto exchange
(-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/html"))
(-> .getResponseSender (.send (.slice t) io.undertow.io.IoCallback/END_EXCHANGE))))))
(.getIoThread exchange)))))
(defn handle-request [state ^HttpServerExchange exchange]
(swap! requests-count inc)
(log-time
{:http/path (.getRequestPath exchange) :http/method (str (.getRequestMethod exchange))}
(handle-request* state exchange)))
(def dispatch nil)
(defmulti dispatch (fn [state id channel v resp exec] (nth v 2)))
(defmethod dispatch "log" [state id channel v resp exec]
(info (nth v 3) {:id id})
(resp nil))
(defmethod dispatch "observe" [state id channel v resp exec]
(observe)
(resp nil))
(defmethod dispatch "new-tracking" [state id channel v resp exec]
(jdbc/execute! ds ["insert into package(carrier,tracking) values (?,?)"
(.trim (.toLowerCase (str (second (nth v 3)))))
(.trim (.toLowerCase (str (first (nth v 3)))))])
(info "new-tracking")
(resp "Some Response"))
(defmethod dispatch "set-active" [state id channel v resp exec]
(info v)
(jdbc/execute! ds ["update package set active = ? where id = ?"
(boolean (second (nth v 3)))
(first (nth v 3))])
(resp "Some Response"))
(defmethod dispatch "get-note" [state id channel v resp exec]
(resp
(or (:package_note/note (jdbc/execute-one! ds ["select note from package_note where package_id = ?" (first (nth v 3))]))
"")))
(defmethod dispatch "set-note" [state id channel v resp exec]
(try
(jdbc/execute! ds ["insert into package_note(package_id,note) values (?,?)"
(first (nth v 3))
(second (nth v 3))])
(catch Throwable _
(jdbc/execute! ds ["update package_note set note = ? where package_id = ?"
(second (nth v 3))
(first (nth v 3))])))
(resp true))
(defmethod dispatch "list" [state id channel v resp exec]
(resp
(jdbc/execute! ds [(str "select * from "
(#{"package" "event" "observation" "fedex_event_data"}
(first (nth v 3))))])))
(defmethod dispatch "packages" [state id channel v resp exec]
(resp
(log-time
(package-data))))
(defn on-next [pd state id ^WebSocketChannel channel v resp exec]
(.execute
(.getWorker channel)
(fn []
(try
(log-time
{:id id :what :tick}
(let [ps (get-in @state [:data id (nth v 1) :ps] #{})
new-ps (set (for [row pd
[k v] row]
[(:package/id row) k v]))
[to-remove to-add _] (diff ps new-ps)]
(resp
{:assert to-add
:retract to-remove})
(swap! state assoc-in [:data id (nth v 1) :ps] new-ps)))
(.request ^Flow$Subscription (get-in @state [:data id (nth v 1) :sub]) 1)
(catch Throwable t
(severe "error" t))))))
(defn on-subscribe [^Flow$Subscription s state id ^WebSocketChannel channel v resp exec]
(swap! state assoc-in [:data id (nth v 1) :sub] s)
(.addCloseTask channel
(reify
org.xnio.ChannelListener
(handleEvent [this evt]
(info "cancel subscription" {:id id})
(.cancel s))))
(.request s 1))
(defmethod dispatch "packages-sub" [state id channel v resp exec]
(.subscribe
pub
(reify
Flow$Subscriber
(onComplete [_])
(onError [_ err]
(severe "error" err))
(onNext [_ pd]
(on-next pd state id channel v resp exec))
(onSubscribe [_ s]
(on-subscribe s state id channel v resp exec))))
(.submit pub (package-data)))
(defmethod dispatch "packages-refresh" [state id channel v resp exec]
(info "packages-refresh")
(.submit pub (package-data))
(resp nil))
(defn send-text [^String s ^WebSocketChannel channel timeout]
(let [cf (java.util.concurrent.CompletableFuture.)]
(WebSockets/sendText s
channel
(reify
io.undertow.websockets.core.WebSocketCallback
(complete [_ channel context]
(.complete cf true))
(onError [_ channel context throwable]
(.completeExceptionally cf throwable)))
(long timeout))
cf))
(defn on-full-text-message [state id ^WebSocketChannel channel ^BufferedTextMessage msg]
(let [v (transit/read (transit/reader (java.io.ByteArrayInputStream. (.getBytes (.getData msg) "utf-8")) :json))]
(info v)
(case (long (nth v 0))
0 (dispatch state
id
channel
v
(fn [result]
(send-text
(let [bao (java.io.ByteArrayOutputStream.)]
(transit/write (transit/writer bao :json)
[1 (nth v 1) result])
(String. (.toByteArray bao)))
channel
1000))
(fn [f]
(.execute (.getWorker channel) f)))
1 (info v))))
(defn websocket-connect [state exchange ^WebSocketChannel channel]
(let [id (UUID/randomUUID)]
(.addCloseTask channel
(reify
org.xnio.ChannelListener
(handleEvent [this evt]
(info "closed" {:id id})
(swap! state update-in [:channels] dissoc id)
(swap! state update-in [:data] dissoc id))))
(-> channel
.getReceiveSetter
(.set
(proxy [AbstractReceiveListener] []
(onFullCloseMessage [^WebSocketChannel channel msg]
(swap! state update-in [:channels] dissoc id)
(swap! state update-in [:data] dissoc id)
(.close channel))
(onFullTextMessage [channel msg]
(try
(on-full-text-message state id channel msg)
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops" t)))))))
(swap! state assoc-in [:channels id] channel)
(.resumeReceives channel)))
(defonce ^Undertow server
(doto (-> (Undertow/builder)
(.setIoThreads 1)
(.setWorkerThreads 2)
;; listens everwhere, watch out
(.addHttpListener port "0.0.0.0")
(.setHandler
(WebSocketProtocolHandshakeHandler.
(reify
WebSocketConnectionCallback
(onConnect [_ exchange channel]
(websocket-connect state exchange channel)))
(reify
HttpHandler
(handleRequest [_ exchange]
(try
(handle-request state exchange)
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops" t)))))))
(.build))
(.start)))
;; recompile file as cljs
(.execute (.getWorker server)
(bound-fn []
(log-time
{:time/what :cljs-compile}
(let [js (File/createTempFile "whatever" ".js")]
(try
(cljs/build (.getAbsolutePath (io/file *file*))
{:output-to (.getAbsolutePath js)
:optimizations :advanced})
(.complete
page
(ByteBuffer/wrap
(.toByteArray
(doto (java.io.ByteArrayOutputStream.)
(.write (.getBytes "<meta charset=\"utf-8\" /><html><head><script>"))
((fn [a] (clojure.java.io/copy js a)))
(.write (.getBytes "</script></head><body><script>packages.main();</script></body></html>"))))))
(catch Throwable t
(.completeExceptionally page t))
(finally
(.delete js)))))))
;; reload this file
(defn reload []
(log-time (load-file file))
(info "reload complete"))
;; tell all clients to reload, close channels, shutdown server,
;; reload this file to restart
(defn new-server []
(doseq [[_ ^WebSocketChannel ch] (-> @state :channels)]
(try
(WebSockets/sendTextBlocking
(str (json/write-str [0 1 "reload" (* 1000 30)]))
ch)
(.close ch)
(catch Throwable t
(reset! most-recent-error t)
(severe "whoops" t))))
(.stop server)
(reset! state {})
(ns-unmap 'packages 'server)
(reload))
;; tell clients to reload
(defn reload-clients []
(doseq [[_ ch] (-> @state :channels)]
(try
(WebSockets/sendTextBlocking
(str (json/write-str [0 1 "reload" 0]))
^WebSocketChannel ch)
(catch Throwable t
(severe "whoops" t)))))
(defn main [& args]
@(promise))
))
(defn elide [s]
(if (> (count s) 7)
(str (subs s 0 2) "..." (subs s (- (count s) 2)))
s))
#?(:cljs
(do
(m/info "Top" {})
(def i (atom 0))
(defn make-rpc [url]
(let [ws (atom nil)
registry (atom {})
f (fn f []
(reset! ws (js/WebSocket. url))
(aset @ws "onopen" (fn [evt]
(m/info "onopen" {})))
(aset @ws "onclose" (fn [evt]
(doseq [[k v] @registry
:when (number? k)]
(v)
(swap! registry dissoc k))
(js/setTimeout f 1000)))
(aset @ws "onerror" (fn [evt]
(m/info "onerror" {:event evt})))
(aset @ws "onmessage"
(fn [event]
(let [msg (transit/read (transit/reader :json) (aget event "data"))]
(case (get msg 0)
1 (let [s (get msg 1)
g (get @registry s)]
(g [(first (rest (rest msg)))
(js/Promise.
(fn [complete error]
(swap! registry assoc s (fn
([]
(g))
([result] (complete result))))))]))
0 ((get @registry (get msg 2))
msg
(fn [r]
(.send @ws (transit/write (transit/writer :json) [1 (get msg 1) r])))))))))]
(f)
(doto (fn [name eos & args]
(js/Promise.
(fn [complete error]
(let [s (swap! i inc)
m (transit/write (transit/writer :json) [0 s name args eos])]
(try
(.send @ws m)
(swap! registry
assoc
s
(fn
([]
(complete eos))
([result] (complete result))))
(catch :default t
(complete eos)))))))
(aset "register" (fn [name f]
(swap! registry assoc name f))))))
(defn new-tracking [rpc t c]
(fn [evt]
(rpc "new-tracking" nil (t) (c))
(t "")
(c "")))
(defn table-ui [state rpc quil]
(fn []
[:div
[:h1 "Untitled 2021 Downey Project"]
[:h2 (first (for [[a b c] @state
:when (= a -1)
:when (= b "last-data")]
(str c)))]
[:div
[:span "Carrier" [:input#carrier10]]
[:span "Tracking" [:input#tracking10]]
[:button {:on-click (new-tracking
rpc
(fn
([]
(.-value (.getElementById js/document "tracking10")))
([value]
(aset (.getElementById js/document "tracking10") "value" value)))
(fn
([]
(.-value (.getElementById js/document "carrier10")))
([value]
(aset (.getElementById js/document "carrier10") "value" value))))}
"Add Tracking"]
[:button {:on-click (fn [evt] (rpc "observe" nil))} "Observe"]
[:button {:on-click (fn [evt] (rpc "packages-refresh" nil))} "Refresh"]]
[:table
[:tr {:style {:background-color "#436318" :color "#ddd"}}
[:th "id"]
[:th "carrier"]
[:th "tracking"]
[:th "start place"]
[:th "start at"]
[:th "last place"]
[:th "last at"]
[:th "days ago"]
[:th "KPH"]
[:th "active"]
[:th "status"]]
(let [s @state]
(doall
(map-indexed
(fn [i {:as m :keys [package/carrier
package/tracking
package_event/first_place
package_event/first_date
package_event/last_place
package_event/last_date
days_ago
kph
package/active
package_event/last_status]}]
(list
^{:key (:package/id m)}
[:tr
(if (not (zero? (mod i 2)))
{:style {:background-color "#436318" :color "#ddd"}}
{})
[:td {:style {:text-align "right"}
:on-click (fn [_]
(swap! state (fn [s]
(if (contains? s [(:package/id m) "display_note" true])
(disj s [(:package/id m) "display_note" true])
(conj s [(:package/id m) "display_note" true])))))}
(:package/id m)]
[:td carrier]
[:td [:a {:href (carrier-url carrier tracking) :target "_blank"} (elide tracking)]]
[:td first_place]
[:td (.format (.DateTimeFormat js/Intl "en" #js {:timeStyle "short" :dateStyle "short"}) first_date)]
[:td last_place]
[:td (.format (.DateTimeFormat js/Intl "en" #js {:timeStyle "short" :dateStyle "short"}) last_date)]
[:td {:style {:text-align "right"}} days_ago]
[:td {:style {:text-align "right"}} (long kph)]
[:td
{:style {:text-align "center"}}
(if-not (= active :unknown)
[:input
{:type "checkbox"
:checked (boolean active)
:on-change (fn [evt]
(swap! state #(-> %
(disj [(:package/id m) :package/active active])
(conj [(:package/id m) :package/active :unknown])))
(-> (rpc "set-active" nil (:package/id m) (not active))
(.then (fn [_] (rpc "packages-refresh")))
(.then (fn [_]
(swap! state #(-> %
(disj [(:package/id m) :package/active :unknown])))))))}]
[:div {:class "loadingspinner" :style {:width "0.5em" :height "0.5em"}}])]
[:td
[:div {:style (if (contains? @state [(:package/id m) "display_status" true]) {:display "none"} {})
:on-click (fn [_] (swap! state conj [(:package/id m) "display_status" true]))}
(elide last_status)]
[:div {:style (if (contains? @state [(:package/id m) "display_status" true]) {} {:display "none"})
:on-click (fn [_] (swap! state disj [(:package/id m) "display_status" true]))}
last_status]]]
(when (contains? @state [(:package/id m) "display_note" true])
^{:key (str "note-" (:package/id m))}
[:tr
[:td {:colspan "10"}
[quil (:package/id m)]]])))
(reverse
(sort-by
(juxt #(boolean (get % :package/active))
#(get % :package_event/last_date))
(for [[a b c] s
:when (= b :package/id)
:let [m (into {} (for [[ap b c] s
:when (= a ap)]
[b c]))]]
m))))))]]))
;; make a component for the quil editor
(defn make-quil [rpc id]
(r/create-class
{:display-name "Quil"
:reagent-render (fn [] [:div])
:component-did-mount (fn [this]
(.then
(rpc "get-note" nil id)
(fn [[note continue]]
(when note
(set! (.-innerHTML (rdom/dom-node this)) note)
(new js/Quill (rdom/dom-node this) #js {"theme" "snow"})))))
:component-will-unmount (fn [this]
(rpc
"set-note"
nil
id
(-> (.find js/Quill (rdom/dom-node this))
(aget "root")
(aget "innerHTML"))))}))
(defn ^:export main []
(let [;; I should just give up and pull in datascript
state (r/atom #{})
loc (.-location js/window)
url (str (case (.-protocol loc)
"http:" "ws"
"https:" "wss")
"://"
(.-host loc)
(.-pathname loc))
rpc (make-rpc url)
_ (swap! state conj [-1 "last-data" (js/Date.)])
f (fn [] (table-ui state rpc (partial make-quil rpc)))]
(set! (.-title js/document) "Untitled 2021 Downey Project")
(doto (aget js/document "head")
(.appendChild
(doto (.createElement js/document "link")
(.setAttribute "href" "https://downey.family/~kevin/tufte.min.css")
(.setAttribute "rel" "stylesheet")))
(.appendChild
(doto (.createElement js/document "link")
(.setAttribute "href" "https://cdn.quilljs.com/1.3.6/quill.snow.css")
(.setAttribute "rel" "stylesheet")))
(.appendChild
(doto (.createElement js/document "script")
(.setAttribute "src" "https://cdn.quilljs.com/1.3.6/quill.js")))
(.appendChild
(doto (.createElement js/document "style")
(.appendChild
(.createTextNode js/document
"
.loadingspinner {
pointer-events: none;
width: 2.5em;
height: 2.5em;
border: 0.4em solid transparent;
border-color: #eee;
border-top-color: #3E67EC;
border-radius: 50%;
animation: loadingspin 1s linear infinite;
}
@keyframes loadingspin {
100% {
transform: rotate(360deg)
}
}
")))))
(rpc "log" nil "Hi, I am a client")
((fn f []
(.then (rpc "packages-sub" nil)
(fn g [[x continue :as y]]
(if continue
(do
(m/info "here" {})
(swap! state (fn [s]
(transduce
(mapcat (fn [[k ops]]
(eduction
(map (fn [item] [k item]))
(seq ops))))
(fn
([s]
(into #{[-1 "last-data" (js/Date.)]}
(remove #(and (= -1 (nth % 0))
(= "last-data" (nth % 1))))
s))
([s [op triple]]
(case op
:assert (conj s triple)
:retract (disj s triple))))
s
(seq x))))
(.then continue g))
(js/setTimeout f 1000))))))
((aget rpc "register") "log" (fn [msg reply]
(m/info msg {})
(reply nil)))
((aget rpc "register") "reload" (fn [msg reply]
(js/setTimeout (fn [] (.reload (.-location js/window) true)) (nth msg 3))
(reply nil)))
(rdom/render [f] (.-body js/document))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment