Skip to content

Instantly share code, notes, and snippets.

@micha
Created July 19, 2019 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save micha/0c0fadd65d63afbf602d72634b4a395e to your computer and use it in GitHub Desktop.
Save micha/0c0fadd65d63afbf602d72634b4a395e to your computer and use it in GitHub Desktop.
(ns adzerk.instant-counts-updater.api
(:require
[clj-statsd :as s]
[cheshire.core :as json]
[clojure.java.io :as io]
[clojure.string :as string]
[clojure.tools.logging :as log]
[amazonica.aws.s3 :as s3]
[amazonica.aws.sqs :as sqs]
[amazonica.aws.dynamodbv2 :as ddb]
[adzerk.instant-counts-updater.tx :as tx
:refer [deftx abort!! retry!!]]
[adzerk.instant-counts-updater.config :as config]
[adzerk.instant-counts-updater.db :as db
:refer [<< with-tx QUERY INSERT-MULTI! EXECUTE!]]
[adzerk.instant-counts-updater.util :as util
:refer [csharp->yyyy-mm-dd timed with-timing guard tagvec]])
(:import
[java.sql SQLException]
[clojure.lang ExceptionInfo]
[java.util.zip GZIPInputStream]
[com.amazonaws AbortedException]
[com.amazonaws.services.dynamodbv2.model
InternalServerErrorException
TransactionCanceledException
TransactionInProgressException
ConditionalCheckFailedException
IdempotentParameterMismatchException
ProvisionedThroughputExceededException]
[com.amazonaws.services.s3.model AmazonS3Exception]))
;; setup ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/setup "127.0.0.1" 8125)
(declare ^:dynamic *tags*)
(declare ^:dynamic *on-success*)
(declare records-in-queue?)
(def ingesters (atom 0))
(def lock-table config/IC_FILE_LOCK_TABLE)
(def ingest-queue config/IC_INGEST_QUEUE_URL)
(def max-backlog (read-string config/IC_MAX_RECORD_BACKLOG))
;; datadog helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmacro with-api-context
[metric paused? & body]
`(binding [*tags* (atom {:ic.entity "none"}), *on-success* []]
(with-let [s# (some->> (try ~@body
(catch AbortedException ex#
(when-not (~paused?) (log/error ex#) :aborted))
(catch Throwable ex# (log/error ex#) :error)))]
(s/increment ~metric 1 1 (tagvec (assoc @*tags* :ic.status s#)))
(when (= :success s#) (doseq [f# *on-success*] (f))))))
(defn on-success
[f]
(set! *on-success* (conj *on-success* f)))
;; misc ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn post-switchover?
"Responsibility for writing records to dynamodb is determined by the switch-
over date and the timestamp in the filename. If the filename's timestamp is
not before the agreed-upon switchover date we handle it, otherwise it's the
responsibility of the old sindri service."
[filename]
(some-> (config/parse-filename filename)
:time
(.compareTo config/IC_SWITCHOVER_TIME)
pos?))
;; S3 helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn s3-input-stream
"Given an s3 get result returns the content input stream, gunzipped."
[{:keys [input-stream] {:keys [content-encoding]} :object-metadata}]
(if (not= content-encoding "gzip") input-stream (GZIPInputStream. input-stream)))
(defn s3-reader
"..."
[bucket key]
(try (io/reader (s3-input-stream (s3/get-object :bucket-name bucket :key key)))
(catch AmazonS3Exception ex2
(if (= "NoSuchKey" (.getErrorCode ex2)) (abort!! :notfound) (retry!! :s3error ex2)))))
;; dynamo helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn dynamo-lock-complete?
[table-name s3path]
(try (= -1 (:ic_lease (:item (ddb/get-item :consistent-read true
:table-name table-name
:key {:s3path s3path}))))
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled))))
(defn dynamo-lock-commit!
[table-name s3path tx-uid tx-expire]
(try (ddb/update-item
:table-name table-name
:key {:s3path s3path}
:condition-expression "ic_claimed = :v1 AND ic_lease = :v2"
:update-expression "SET ic_lease = :v3"
:expression-attribute-values {":v1" tx-uid ":v2" tx-expire ":v3" -1})
(catch ConditionalCheckFailedException _ (retry!! :txfailed))
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled))))
(defn dynamo-lock-begin!
[table-name s3path & {:keys [ttl] :or {ttl (* 1000 60 5)}}]
(if (dynamo-lock-complete? table-name s3path)
(abort!! :txcomplete)
(let [now (System/currentTimeMillis)
uid (util/thread-uuid)
expire (+ now ttl)]
(try (ddb/update-item
:table-name table-name
:key {:s3path s3path}
:condition-expression "attribute_not_exists(ic_lease) OR ic_lease BETWEEN :v0 AND :v2"
:update-expression "SET ic_claimed = :v1, ic_lease = :v3"
:expression-attribute-values {":v0" 0 ":v1" uid ":v2" now ":v3" expire})
#(dynamo-lock-commit! table-name s3path uid expire)
(catch ConditionalCheckFailedException _ (retry!! :txfailed))
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled))))))
(defn verify-not-duplicate!
[impression-id]
(let [nowtime (int (/ (System/currentTimeMillis) 1000))
expires (+ nowtime (* 24 60 60))]
(try (ddb/put-item
:table-name config/IC_CLICK_DEDUP
:condition-expression "attribute_not_exists(Expires) OR Expires < :v1"
:expression-attribute-values {":v1" nowtime}
:item {:UniqueId impression-id :Expires expires})
(catch ConditionalCheckFailedException _ (s/increment :ic.clickdeduped 1))
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled)))))
(defn update-dynamo-counts!
[{:keys [token entitytype entityid createdon impressions clicks conversions passbacks revenue] :as job}]
(let [tx-expr "ADD Impressions :v1, Clicks :v2, Conversions :v3, Passbacks :v4, Revenue :v5"
tx-vals {":v1" impressions ":v2" clicks ":v3" conversions ":v4" passbacks ":v5" revenue}]
(try ((timed ddb/transact-write-items :ic.ddbtransaction.time)
:client-request-token token
:transact-items (case entitytype
"ad"
[{:update {:table-name config/IC_AD_TOTALS
:key {:PassCreativeId entityid}
:update-expression tx-expr
:expression-attribute-values tx-vals}}
{:update {:table-name config/IC_AD_TOTALS_DAILY
:key {:PassCreativeId entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}]
"adpart"
(let [[adid part] (string/split entityid #":" 2)
now-ms (System/currentTimeMillis)]
[{:update {:table-name config/IC_AD_PARTITION_TOTALS
:key {:Ad adid :Partition part}
:update-expression (str tx-expr " SET Precedence = :w1")
:expression-attribute-values (merge tx-vals {":w1" now-ms})}}
{:update {:table-name config/IC_AD_PARTITION_TOTALS_DAILY
:key {:AdPartition entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}])
"flight"
[{:update {:table-name config/IC_FLIGHT_TOTALS
:key {:PassId entityid}
:update-expression tx-expr
:expression-attribute-values tx-vals}}
{:update {:table-name config/IC_FLIGHT_TOTALS_DAILY
:key {:PassId entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}]
"campaign"
[{:update {:table-name config/IC_CAMPAIGN_TOTALS
:key {:CampaignId entityid}
:update-expression tx-expr
:expression-attribute-values tx-vals}}
{:update {:table-name config/IC_CAMPAIGN_TOTALS_DAILY
:key {:CampaignId entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}]
"advertiser"
[{:update {:table-name config/IC_ADVERTISER_TOTALS
:key {:AdvertiserId entityid}
:update-expression tx-expr
:expression-attribute-values tx-vals}}
{:update {:table-name config/IC_ADVERTISER_TOTALS_DAILY
:key {:AdvertiserId entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}]
"network"
[{:update {:table-name config/IC_NETWORK_TOTALS_DAILY
:key {:NetworkId entityid :Date createdon}
:update-expression tx-expr
:expression-attribute-values tx-vals}}]))
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled))
(catch TransactionCanceledException ex
(let [reasons (set (map #(.getCode %) (.getCancellationReasons ex)))]
(cond (reasons "ProvisionedThroughputExceeded") (retry!! :ddbthrottled)
(reasons "ThrottlingError") (retry!! :ddbthrottled)
(reasons "TransactionConflict") (retry!! :ddbtxconflict)
:else (retry!! :ddberror ex)))))))
;; postgres helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn backlog-exceeded?
[]
(not (try (QUERY ["SELECT * FROM check_backlog(?::int)" max-backlog])
(catch SQLException ex
(when (not= "BCKLG" (.getSQLState ex)) (throw ex))))))
(defn still-working?
[]
(try (not (QUERY ["SELECT * FROM check_working()"]))
(catch SQLException ex
(or (= "WORKN" (.getSQLState ex)) (throw ex)))))
(defn start-sql-job!
[]
(try (first (QUERY ["SELECT * FROM get_job()"]))
(catch SQLException ex (retry!! :sqlerror ex))))
(defn complete-sql-job!
[{:keys [id]}]
(try (first (QUERY ["SELECT * FROM complete_job(?::bigint)" id]))
(catch SQLException ex (retry!! :sqlerror ex))))
(defn insert-and-aggregate!
[rows]
(try (let [tmp-table (str "tmp" (.getId (Thread/currentThread)))]
(QUERY ["SELECT * FROM create_temp_table(?::text)" tmp-table])
(INSERT-MULTI! tmp-table rows)
(QUERY ["SELECT * FROM update_initial_aggregated_counts(?::text)" tmp-table]))
(catch SQLException ex (retry!! :sqlerror ex))))
(defn valid-event?
[d]
(let [event? (= (:Meta:schema d) "event")
not-click? (not (contains? d :ClickCount))
correction-id (:IsCorrection d)
valid-ua? (:IsValidUA d false)
id (:ImpressionId d)
unique-id (if correction-id (str id ":" correction-id) id)
unique-click? (delay (verify-not-duplicate! unique-id))]
(and event? (or not-click? (and valid-ua? @unique-click?)))))
(defn make-rows
[events counts text]
(if-let [d (guard (json/parse-string text true))]
(when (valid-event? d)
(let [part (when-let [p (:EcpmPartition d)]
(str (:CreativePassId d) ":" p))
base (merge {:impressions (:ImpressionCount d 0)
:clicks (:ClickCount d 0)
:conversions (:ConversionCount d 0)
:revenue (:Revenue d 0)
:passbacks 0
:createdon (csharp->yyyy-mm-dd (:CreatedOn d))}
(when (= (:EventId d) 500) {:impressions -1 :passbacks 1}))
make-row #(merge base {:entitytype %1 :entityid %2})]
(swap! events inc)
(swap! counts #(+ % (if part 6 5)))
[(when part (make-row "adpart" part))
(make-row "ad" (:CreativePassId d))
(make-row "network" (:NetworkId d))
(make-row "flight" (:PassId d))
(make-row "campaign" (:CampaignId d))
(make-row "advertiser" (:BrandId d))]))
(do (s/increment :ic.json.error)
(log/errorf "could not parse json: %s" text))))
(defn ingest-records!
[lines]
(with-timing :ic.ingestrecords.time
(let [events (atom 0), counts (atom 0)]
(some->> (seq lines)
(mapcat (partial make-rows events counts))
(filter identity)
(seq)
(#(do (insert-and-aggregate! %) {:events @events :counts @counts}))))))
;; SQS helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn get-sqs-job!
[queue-url]
(try (when-let
[{:keys [attributes body] :as job}
(first (:messages (sqs/receive-message
:queue-url queue-url
:wait-time-seconds 20
:max-number-of-messages 1)))]
(assoc job :body (guard (json/parse-string body true)) :queue-url queue-url))
(catch Throwable ex (retry!! :sqserror ex))))
(defn complete-sqs-job!
[job]
(try (sqs/delete-message job)
(catch Throwable ex (retry!! :sqserror ex))))
;; try-with-resources macros ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmacro with-sql-tx
[& body]
`(try (with-tx :read-committed ~@body)
(catch SQLException ex# (log/error ex#) :sqlerror)))
(deftx with-dynamo-lock
(begin [bucket key] (dynamo-lock-begin! lock-table (str bucket "/" key)))
(commit [dynamo-lock-commit! _] (dynamo-lock-commit!)))
(deftx with-update-job
(begin [] (start-sql-job!))
(commit [job _] (complete-sql-job! job)))
(deftx with-ingest-job
(begin [] (get-sqs-job! ingest-queue))
(commit [job _] (complete-sqs-job! job)))
(deftx with-ingesters
(begin [] (swap! ingesters inc))
(commit [_ _] (swap! ingesters dec))
(rollback [_ _] (swap! ingesters dec)))
;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn records-in-queue? [] (with-tx :read-committed (still-working?)))
(defn files-ingesting? [] (< 0 @ingesters))
(defn status-report [] (with-tx :read-committed (QUERY ["SELECT * FROM status_report"])))
(defn reset-stats! [] (with-tx :read-committed (QUERY ["SELECT * FROM reset_stats()"])))
(defn update-dynamo!
[paused?]
(with-api-context :ic.update paused?
(when-not (paused?)
(with-sql-tx
(with-update-job :as {:keys [entitytype] :as job}
(swap! *tags* assoc :ic.entity entitytype)
(update-dynamo-counts! job))))))
(defn s3-download-and-ingest!
[paused?]
(with-api-context :ic.ingest paused?
(if (paused?)
(when (zero? @ingesters) (reset-stats!))
(with-sql-tx
(when-not (backlog-exceeded?)
(with-ingest-job :as {{:keys [bucket key]} :body}
(on-success #(log/info :ingested bucket key))
(with-ingesters :as _
(when-not (and bucket key) (abort!! :malformed))
(when-not (post-switchover? key) (abort!! :switchover))
(with-dynamo-lock bucket key :as _
(with-open [rdr (s3-reader bucket key)]
(let [counts (ingest-records! (line-seq rdr))]
(s/increment :ic.ingestcounts (:counts counts))
(s/increment :ic.ingestevents (:events counts))))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment