Skip to content

Instantly share code, notes, and snippets.

@ruseel
Last active January 16, 2023 06:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ruseel/934546fe376a1f88bcb8c969e2229304 to your computer and use it in GitHub Desktop.
Save ruseel/934546fe376a1f88bcb8c969e2229304 to your computer and use it in GitHub Desktop.
lacinia - resolve-promise - go,future,promesa/future,promesa/thread - {invoke,invoke-async}

lacinia 의 resolver 구성에 대해서 좀 더 알아야 (async 혹은 ThreadPoolExecutor) 의 동작을 이해할 수 있을 것 같습니다.

lacinia 에서 resolver 를 구현할 때 value 를 바로 리턴하도록 할 수도 있지만 ResolveResultPromise 를 리턴하게 할 수도 있습니다. [1] lacinia 에서 이것을 "Asynchronous Field Resolvers" 라고 부릅니다.

resolver 에서 ResolveResultPromise 를 리턴하게 만들 때는 아래와 같은 pattern 을 따릅니다.

(defn resolve-xyz
  [_ _ _]
  (let [result (resolve/resolve-promise)] ;; create "lacinia promise"
    (some-task-dispatching-fn ;; 여기서 다른 parallelism 이 적용되도록 바꿉니다. A)
       (fn [] 
          ,,,   ;; do-actual-work B)
          (resolve/deliver! result ,,,) ;; "lacinia promise" deliver! 
    result))

이 때 A) 에 쓸 수 있는 함수로 거론 할 수 있는 것이

  • java.lang.Thread
  • clojure.core/thread
  • clojure.core/future
  • promesa/future
  • promesa/thread
  • clojure.core.async/go

B) 에 나타나는 함수가 지금은 cognitect/aws-api 를 사용하고 있어서

  • aws/invoke
  • aws/invoke-async

가 됩니다.

A) 에서 쓸 함수를 고를 때 어떤 ThreadPoolExecutor 를 쓸지가 꽤 헷갈리고 고민되는 주제인데 각각을 무엇을 쓰는지 찾아보면

  • java.lang.Thread, clojure.core/thread 가 본질적으로 같다고 치고 -- 이 둘은 아예 ThreadPoolExecutor 를 안 쓰고
  • clojure.core/future 는 clojure.core/soloExecutor 를 쓰는데 soloExecutor 는 Executor.newCachedExecutor() 로 생성한 executor 이고
  • promesa/future 는 promesa.exec/default-executor 를 사용하게 되어 있고 promesa 5.1.0 에서도 promesa 10.0.594 에서도 ForkJoinPool/commonPool 을 사용합니다. 그리고 ForkJoinPool/commonPool 의 thread 숫자는 (dec Runtime.availableProcessors) 를 따르고 (여러분의 맥북에서는 아마 7 이 될 겁니다)
  • promesa/thread 는 promesa.exec/default-thread-executor 를 쓰고 promesa 10.0.594 에서 이 executor 는 Executor.newCachedExecutor 를 사용합니다.
  • clojure.core.async/go 는 clojure.core.async.impl.dispatch/executor 를 사용합니다. dispatch/executor 는 Executor.newFixedExecutor 로 만들고 default 로 숫자는 8 이 들어갑니다. 한데 go block 안에서는 대체로 I/O 작업이 있을 때면 parking 을 해서 thead wait 을 I/O 작업을 하지 않게 하니 어떤 executor 를 쓰는지, 그 executor 의 pool 사이즈가 어떤지 크게 상관이 없게 됩니다.

B) 에 출연하는 함수는

aws/invoke, aws/invoke-async 인데 그 대신 invoke 와 invoke-async 라는 가짜 함수를 만들고 무조건 2000ms 씩 기다리게 만들었습니다.

그리고 A 와 B 의 조합을 구현하는 함수를 이렇게 만들어 두고

  {:go-and-invoke-async delayed-result--with-go-and-invoke-async
   :future-and-invoke delayed-result--with-future-and-invoke
   :promesa-future-and-invoke delayed-result--with-promesa-future-and-invoke
   :promesa-thread-and-invoke delayed-result--with-promesa-thread-and-invoke
   :promesa-promise-and-invoke-async delayed-result--with-promesa-promise-and-invoke-async-as-promise})

query 를 호출할 때 어떤 A-B 조합을 쓸 지 고를 수 있게 만들었습니다.

(q "{
  coll(n: 70) { name(with: \"promesa-promise-and-invoke-async-as-promise\") }
}")

[1] Asynchronous Field Resolvers, https://lacinia.readthedocs.io/en/latest/resolve/async.html

(ns com.walmartlabs.lacinia.async2-test
"Tests for field resolvers that return deferred ResolverResults."
(:require
[clojure.test :refer [deftest is use-fixtures]]
[clojure.core.async :as a :refer [<! >! go]]
[promesa.core :as p]
[com.walmartlabs.lacinia :as lacinia]
[com.walmartlabs.lacinia.resolve :as resolve]
[com.walmartlabs.lacinia.util :as util]
[com.walmartlabs.lacinia.schema :as schema]))
(defonce promise-id (atom 0))
(use-fixtures :each
(fn [f]
(try
(f)
(finally
(swap! promise-id (constantly 0))))))
(defn invoke-async [c]
(go
(<! (a/timeout 2000))
(swap! promise-id inc)
(>! c @promise-id)))
(defn invoke []
(let [c (a/chan)]
(invoke-async c)
(a/<!! c)))
(defn invoke-async-as-promise []
(p/create
(fn [resolve _reject]
(a/go
(let [c (a/chan)]
(invoke-async c)
(resolve (<! c)))))))
(defn delayed-result--with-go-and-invoke-async
[]
(let [result (resolve/resolve-promise)]
(go
(let [c (a/chan)]
(invoke-async c)
(resolve/deliver! result (<! c))))
result))
(defn delayed-result--with-future-and-invoke
[]
(let [result (resolve/resolve-promise)]
(future
(resolve/deliver! result (invoke)))
result))
(defn delayed-result--with-promesa-future-and-invoke
[]
(let [result (resolve/resolve-promise)]
(p/future
(resolve/deliver! result (invoke)))
result))
(defn delayed-result--with-promesa-thread-and-invoke
[]
(let [result (resolve/resolve-promise)]
(p/thread
(resolve/deliver! result (invoke)))
result))
(defn delayed-result--with-promesa-promise-and-invoke-async-as-promise
[]
(let [result (resolve/resolve-promise)]
(p/then (invoke-async-as-promise)
(fn [value]
(resolve/deliver! result value)))
result))
(def delay-resolver-fns
{:go-and-invoke-async delayed-result--with-go-and-invoke-async
:future-and-invoke delayed-result--with-future-and-invoke
:promesa-future-and-invoke delayed-result--with-promesa-future-and-invoke
:promesa-thread-and-invoke delayed-result--with-promesa-thread-and-invoke
:promesa-promise-and-invoke-async-as-promise delayed-result--with-promesa-promise-and-invoke-async-as-promise})
(defn name-resolver
[_ args _]
((or (delay-resolver-fns (keyword (:with args)))
(fn [] (throw (ex-info "Unknown resolver" {:args args}))))))
(defn node-resolver
[_ _args _]
{})
(defn coll-resolver
[_ args _]
(repeat (:n args) {}))
(def compiled-schema
(-> '{:objects
{:node
{:fields {:name {:type String
:args {:with {:type String}}
:resolve :name}}}}
:queries
{:node {:type :node
:resolve :node}
:coll {:type (list :node)
:args {:n {:type (non-null Int)}}
:resolve :coll}}}
(util/attach-resolvers {:name name-resolver
:node node-resolver
:coll coll-resolver})
schema/compile))
(defn q [query]
(lacinia/execute compiled-schema query nil nil))
(deftest queries-execute-in-parallel
(let [result
(q "{
coll(n: 70) { name(with: \"promesa-promise-and-invoke-async-as-promise\") }
}")]
result))
(comment
(.. Runtime getRuntime availableProcessors)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment