Skip to content

Instantly share code, notes, and snippets.

@Ivana-
Created February 17, 2024 23:51
Show Gist options
  • Save Ivana-/8f3fca29e2977de4f936c58b5e756b1f to your computer and use it in GitHub Desktop.
Save Ivana-/8f3fca29e2977de4f936c58b5e756b1f to your computer and use it in GitHub Desktop.
Тинькофф курс All to Scala 2 задание
(ns tinkoff-2
(:import [java.util Date]))
;; мутабельный thread-safe (!) атом для хранения списка неотправленных сообщений
(def data (atom []))
;; У вас есть метод `readData` для получения порции непрерывных данных.
;; Данные нужно сразу отослать всем потребителям при помощи `sendData`.
;; Технические детали
;; 1. Аргументы для `sendData` нужно брать из значения, возвращаемого `readData`
;; 2. Каждый адресат из списка `Event.recipients` должен получить данные `payload`
(defn read-data-mock []
;; мокаем чтение даты
{:recipients (-> (repeatedly (+ 3 (rand-int 10)) #(rand-int 100)) set vec)
:payload (apply str (repeatedly 20 #(rand-nth "abcdefghijklmnopqrstuvwxyz0123456789")))})
(defn add-readed-data []
;; добавляем дату в список неотправленных сообщений - по одному для каждого реципиента
;; сортируем список по очередности отправки с учетом таймаута
(let [{:keys [recipients payload]} (read-data-mock)
retry (.getTime (Date.))]
(reset! data (->> @data
(into (map (fn [r] {:recipient r
:payload payload
:retry retry})
recipients))
(sort-by :retry)
vec))))
(comment
(add-readed-data)
(deref data)
;;
)
(defn send-data-mock [dest payload]
;; мокаем отправку даты - таймаут и потом 10% успех 90% неудача
(Thread/sleep (+ 100 (rand-int 100)))
(if (< (rand-int 100) 10) :ACCEPTED :REJECTED))
;; 3. Во время отправки данные могут быть:
;; * `Result.ACCEPTED` - приняты потребителем, операция отправки данных адресату `dest` считается завершённой
;; * `Result.REJECTED` - отклонены, операцию отправки следует повторить после задержки `timeout()`
;; 4. Метод `performOperation` должен обладать высокой пропускной способностью:
;; события внутри `readData` **могут накапливаться**
(defn send-readed-data [timeout]
;; смотрим если есть первое по очередности неотправленное сообщение
(when-let [{:keys [recipient payload retry]} (->> @data first)]
;; смотрим если надо его отправлять (с учетом текущего времени и времени ретрая по таймауту)
(when (> (.getTime (Date.)) retry)
;; удаляем сообщение из списка - чтобы другие потоки не пытались его же отправить
(swap! data (comp vec rest))
(let [r (send-data-mock recipient payload)]
(prn recipient payload r)
(when-not (= :ACCEPTED r)
;; если ошибка отправки - возвращаем сообщение обратно в список
;; с новым временем ретрая с учетом таймаута
(swap! data conj {:recipient recipient
:payload payload
:retry (+ timeout (.getTime (Date.)))}))))))
;; Ваша задача написать метод `performOperation`,
;; который будет производить такую рассылку с максимальной пропускной способностью.
(defn perform-operation [timeout]
;; бесконечный цикл - отправляем самое актуальное сообщение
;; ждем таймаут и повторяем вызов
(send-readed-data timeout)
(Thread/sleep 100)
(recur timeout))
(comment
;; создаем необходимое количество потоков (в данном примере 5)
;; которые будут мониторить список данных к отправке и отправлять если там что-то есть
;; важно - потоков всегда заданное число, а список сообщений для отправки
;; может как наполняться, так и уменьшаться, в зависимости от добавления-отправки
;; да, это наш самодельный тредпул :)
(def fs (->> (range 5) (mapv (fn [_] (future (perform-operation 50))))))
;; добавляем пачку данных для отправки в список
(add-readed-data)
;; останавливаем потоки отправки
(doseq [f fs]
(when-not (future-cancelled? f) (future-cancel f)))
;; типичный результат моделировани
;; [{:recipient 62, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 24, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 85, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 95, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 48, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 13, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 61, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 64, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 66, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 26, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}
;; {:recipient 84, :payload "6tz3vfcu7cqrfwoi5xg7", :retry 1708213056529}]
;; 62 "6tz3vfcu7cqrfwoi5xg7" :REJECTED
;; 24 "6tz3vfcu7cqrfwoi5xg7" :REJECTED
;; 48 "6tz3vfcu7cqrfwoi5xg7" :REJECTED
;; 85 "6tz3vfcu7cqrfwoi5xg7" :ACCEPTED
;; .......................................
;; .......................................
;; .......................................
;; .......................................
;;
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment