-
-
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
import Foundation | |
import Combine | |
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f | |
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing. | |
// $ swift --version | |
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8) | |
// Target: x86_64-apple-darwin20.1.0 | |
// $ xcodebuild -version | |
// Xcode 12.2 | |
// Build version 12B5044c | |
let workers: [AnyPublisher<String, Never>] = (0..<20).map { | |
Just<Int>($0) | |
.flatMap { value in | |
Future<String, Never>() { promise in | |
DispatchQueue.global().async { | |
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]") | |
promise(.success(String(value))) | |
} | |
} | |
.eraseToAnyPublisher() | |
} | |
.eraseToAnyPublisher() | |
} | |
var cancellables: Set<AnyCancellable> = .init() | |
print("Workers initial count: \(workers.count)") | |
Publishers | |
.MergeMany(workers) | |
.collect() | |
.receive(on: DispatchQueue.main) | |
.sink(receiveValue: { | |
print("Workers result count: \($0.count)") | |
dump($0) | |
}) | |
.store(in: &cancellables) | |
// - OUTPUT -------------------------------------------------------- | |
// Workers initial count: 20 | |
// Working: isMainThread [false] [0] | |
// Working: isMainThread [false] [1] | |
// Working: isMainThread [false] [2] | |
// Working: isMainThread [false] [3] | |
// Working: isMainThread [false] [4] | |
// Working: isMainThread [false] [5] | |
// Working: isMainThread [false] [6] | |
// Working: isMainThread [false] [7] | |
// Working: isMainThread [false] [8] | |
// Working: isMainThread [false] [9] | |
// Working: isMainThread [false] [11] | |
// Working: isMainThread [false] [12] | |
// Working: isMainThread [false] [10] | |
// Working: isMainThread [false] [13] | |
// Working: isMainThread [false] [14] | |
// Working: isMainThread [false] [15] | |
// Working: isMainThread [false] [16] | |
// Working: isMainThread [false] [17] | |
// Working: isMainThread [false] [18] | |
// Working: isMainThread [false] [19] | |
// Workers result count: 20 | |
// ▿ 20 elements | |
// - "0" | |
// - "1" | |
// - "2" | |
// - "3" | |
// - "4" | |
// - "5" | |
// - "7" | |
// - "6" | |
// - "8" | |
// - "9" | |
// - "11" | |
// - "12" | |
// - "10" | |
// - "13" | |
// - "14" | |
// - "15" | |
// - "16" | |
// - "17" | |
// - "18" | |
// - "19" | |
// - OUTPUT -------------------------------------------------------- |
@yimajo san 検証くださってありがとうございます 🙏
subscribe(on:)
を利用すると確かに全件処理されるのですが、mapの処理がバックグラウンドスレッドではなくて
メインスレッドで実行されており、意図する処理になっていないようです。
Specifies the scheduler on which to perform subscribe, cancel, and request operations.
A publisher which performs upstream operations on the specified scheduler.
c,f, https://developer.apple.com/documentation/combine/publishers/handleevents/subscribe(on:options:)
上記を見てもmapはバックグラウンドスレッドになりそうなのですが。
subscribe
の位置の違いによる挙動は https://qiita.com/shiz/items/9dc8e9a96f399b6c7246#subscribe でも言及されていて、違和感があるようですね。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.subscribe(on: DispatchQueue.global())
.eraseToAnyPublisher()
}
// Working: isMainThread [true] [0]
// Working: isMainThread [true] [1]
// Working: isMainThread [true] [2]
// Working: isMainThread [true] [3]
// Working: isMainThread [true] [4]
// Working: isMainThread [true] [5]
// Working: isMainThread [true] [6]
// Working: isMainThread [true] [7]
// Working: isMainThread [true] [8]
// Working: isMainThread [true] [9]
// Working: isMainThread [true] [10]
// Working: isMainThread [true] [11]
// Working: isMainThread [true] [12]
// Working: isMainThread [true] [13]
// Working: isMainThread [true] [14]
// Working: isMainThread [true] [15]
// Working: isMainThread [true] [16]
// Working: isMainThread [true] [17]
// Working: isMainThread [true] [18]
// Working: isMainThread [true] [19]
// Sink: isMainThread [true]
// Workers result count: 20
以下のようにsubscribe
の位置を変更するとmapはバックグラウンドスレッドになり、しかも全件処理されるようになりました。
ありがとうございます 🙏🙏🙏
これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.subscribe(on: DispatchQueue.global())
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.eraseToAnyPublisher()
}
// Working: isMainThread [false] [0]
// Working: isMainThread [false] [1]
// Working: isMainThread [false] [2]
// Working: isMainThread [false] [3]
// Working: isMainThread [false] [5]
// Working: isMainThread [false] [4]
// Working: isMainThread [false] [6]
// Working: isMainThread [false] [8]
// Working: isMainThread [false] [11]
// Working: isMainThread [false] [9]
// Working: isMainThread [false] [7]
// Working: isMainThread [false] [10]
// Working: isMainThread [false] [12]
// Working: isMainThread [false] [19]
// Working: isMainThread [false] [14]
// Working: isMainThread [false] [15]
// Working: isMainThread [false] [16]
// Working: isMainThread [false] [18]
// Working: isMainThread [false] [17]
// Working: isMainThread [false] [13]
// Sink: isMainThread [true]
// Workers result count: 20
原因は難しいですね。
@yimajo がおっしゃるとおり、collectが最後の完了まで待たないのかなと思いきや、ログを出して解析してみると
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.receive(on: DispatchQueue.global())
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.print()
.eraseToAnyPublisher()
}
以下のログは20回出力されていて、20回の全件がfinishedしたから正しくcollectしているようです。
- receive subscription: (ReceiveOn)
- receive finished
ただ、map処理の以下ログは19回しかなくて、mapが処理されてないけど、正しい数finishedされてるようですね。謎です。
- Working: isMainThread [false] [XX]
Workers initial count: 20
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [0]
Working: isMainThread [false] [1]
receive subscription: (ReceiveOn)
request unlimited
receive value: (0)
Working: isMainThread [false] [2]
receive value: (1)
Working: isMainThread [false] [3]
receive finished
receive value: (3)
receive value: (2)
receive finished
receive finished
receive subscription: (ReceiveOn)
receive finished
request unlimited
Working: isMainThread [false] [4]
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [5]
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [6]
receive subscription: (ReceiveOn)
Working: isMainThread [false] [7]
receive finished
request unlimited
receive value: (5)
receive value: (6)
Working: isMainThread [false] [9]
receive value: (7)
receive subscription: (ReceiveOn)
receive finished
receive finished
receive value: (9)
receive finished
receive finished
request unlimited
receive finished
Working: isMainThread [false] [10]
receive subscription: (ReceiveOn)
request unlimited
receive value: (10)
receive subscription: (ReceiveOn)
Working: isMainThread [false] [11]
receive finished
request unlimited
receive value: (11)
Working: isMainThread [false] [12]
receive finished
receive subscription: (ReceiveOn)
request unlimited
receive value: (12)
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [13]
receive subscription: (ReceiveOn)
request unlimited
receive finished
Working: isMainThread [false] [14]
receive value: (13)
Working: isMainThread [false] [15]
receive subscription: (ReceiveOn)
request unlimited
receive finished
receive value: (14)
receive value: (15)
receive finished
Working: isMainThread [false] [16]
receive subscription: (ReceiveOn)
receive finished
request unlimited
receive value: (16)
Working: isMainThread [false] [17]
receive subscription: (ReceiveOn)
request unlimited
receive finished
receive value: (17)
receive subscription: (ReceiveOn)
request unlimited
receive finished
Working: isMainThread [false] [18]
Working: isMainThread [false] [19]
receive value: (18)
receive value: (19)
receive finished
receive finished
Sink: isMainThread [true]
Workers result count: 19
▿ 19 elements
- "0"
- "1"
- "3"
- "2"
- "4"
- "5"
- "6"
- "7"
- "9"
- "10"
- "11"
- "12"
- "13"
- "14"
- "15"
- "16"
- "17"
- "18"
- "19"
FYI
CombineのMergeMany, receive(on, collectで複数のバックグラウンド処理を待ち合わせると、一部のバックグラウンド処理が実行されないコードのgist
→ https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f
なるほど!スレッドチェックしてませんでした。
これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。
はい。その通りだと思います。
さらに言うと、Justとmapの間にあることでJust<Map< ... >>
にならないようにしているとも思います。
解決案2: Justとmapを連続しない
それに関連する話で、実は最初に出した .subscribe(on: DispatchQueue.global())
にこだわった解決案2があり、次のようにJustとmapを連続しないことでmapはバックグラウンドスレッドで動作するようになりますね。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.eraseToAnyPublisher() // ここでJust<Map< ...> >になるのを防ぎAnyPublisherにしておく
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.subscribe(on: DispatchQueue.global())
.eraseToAnyPublisher()
}
Justとmapを連続すると型がJust<Map< ...>>になり何が起こっているのか
この理由はあくまで予測なのですが、Just<Map< ...>>
型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしているんだと思います。Hot化されてしまうからsubscribe(on:)
でバックグラウンドキューによる指定ができなくなるという状況なんだと予想しています(CombineではHotという概念が表面的には示されていないのであくまで「Hot的な動作」という意味です)。
なぜそう思うかというと、複数のSubscriberがsubscribeした際に結果が共有されるようになるためです。検証のためにコピペでsink
を複数でやってみます。次のような感じです。
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.eraseToAnyPublisher() // この部分があるかないかでmapの動作回数も変わります
.map {
print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
return String($0)
}
.subscribe(on: DispatchQueue.global())
.eraseToAnyPublisher()
}
var cancellables: Set<AnyCancellable> = .init()
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
.eraseToAnyPublisher()
があると- map処理が40回になる
- subscribeされる数を2倍にしたので
subscribe(on:)
で指定したバックグラウンドキューで実行される
- map処理が40回になる
.eraseToAnyPublisher()
がないと- map処理は20回以下になる
- subscribeされた数を気にしなくなる
subscribe(on:)
で指定したバックグラウンドキューで実行されない
- map処理は20回以下になる
これ不思議なんですが、実は.map
だけじゃなく.filter
でも同様だったと思います。このことから憶測ですがAppleのCombineは意図的にオペレータの組み合わせでHot化させて無駄を省こうとしてくれていて、それによってsubscribe(on:)
でキューを指定してもそれに応じないんじゃないかと思っています。FutureもいわゆるHotなので、キューを指定してもそれに従わないのと同じなんだと思います。
整理してみます
- Cold動作で指定したバックグラウンドキューで動作させるなら
- Justとmapの間に
.receive(on:)
を置く - Justとmapの間に
.eraseToAnyPublisher()
を置いて、.subscribe(on:)
を置く
- Justとmapの間に
- Hot動作 が望ましいなら
- Futureにする(結局そうなるとDispatchQueue.global().async)
という感じでしょうか。私の予測が入り混じっているので全然自信はないです。
そして実際はアプリの開発にJust
とmap
をやりたいわけじゃなく、本当はMergeMany
とcollect
の検証のために今回のようにJustとmap書いただけではないかと思いますので、結局Future
にしてDeferred
使うことに落ち着くかなという気もします。
print
したらmap
のreceive finished
が20回あるがreceive value:
が19回の件
あくまで憶測ですがcollect()
の動作が2種類ある気がします。
動作的には .subscribe(on)
されてないColdなPublisherたちを下流にあるcollect()
が扱う場合は、何かしらのタイミングで終了したという判断をしないといけないために上流がイベントを流さずにその上流ストリームも終了させられてしまう。一方、HotなPublisherもしくは.subscribe(on:)
されたColdなPublisherたちのcollect()
は、非同期実行を待つ前提であるため上流のストリームの終了を待つという感じなのかな、と思いました。思い込みかもしれませんが...。
以上、長々と憶測が多くなり失礼しましたー🙇
なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。
Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況
確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。
Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている
&
eraseToAnyPublisherでColdのまま動作
このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)
そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。
手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。
Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。
これはStackOverflowかAppleのフォーラムに投稿するのが良さそうな興味深い問題ですね
私の環境でもXcode 12.2で再現しました(結果が20 countsになるときもあるし19, 18になるようで結果が実行時に変わることがあり不安定でした)。
receive(on:)よりsubscribe(on:)使ったら解決
調べてみた感じ
.receive(on: DispatchQueue.global())
をコメントアウトすると期待値どおりになりますね。そのため、
map
のキューを指定したいのであれば.subscribe(on: DispatchQueue.global())
で試すとこれは安定して期待通りになりました。(余談ですが、
receive(on:)
よりsubscribe(on:)
のほうが副作用の実行キューを指定したい意図が明確になると考えています)予想
こうなるそもそもの原因は全然わからないのですが予想してみます。
collect
メソッドに引数countあるメソッドではあきらかに待たない様子が説明されてるsubscribe(on:)
でキュー指定されると内部でそれを考慮してくれて待ってくれる...?collect
メソッドはない