Skip to content

Instantly share code, notes, and snippets.

@susieyy
Last active November 17, 2020 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
import Foundation
import Combine
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
// $ swift --version
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8)
// Target: x86_64-apple-darwin20.1.0
// $ xcodebuild -version
// Xcode 12.2
// Build version 12B5044c
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.flatMap { value in
Future<String, Never>() { promise in
DispatchQueue.global().async {
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]")
promise(.success(String(value)))
}
}
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
var cancellables: Set<AnyCancellable> = .init()
print("Workers initial count: \(workers.count)")
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
// - OUTPUT --------------------------------------------------------
// Workers initial count: 20
// Working: isMainThread [false] [0]
// Working: isMainThread [false] [1]
// Working: isMainThread [false] [2]
// Working: isMainThread [false] [3]
// Working: isMainThread [false] [4]
// Working: isMainThread [false] [5]
// Working: isMainThread [false] [6]
// Working: isMainThread [false] [7]
// Working: isMainThread [false] [8]
// Working: isMainThread [false] [9]
// Working: isMainThread [false] [11]
// Working: isMainThread [false] [12]
// Working: isMainThread [false] [10]
// Working: isMainThread [false] [13]
// Working: isMainThread [false] [14]
// Working: isMainThread [false] [15]
// Working: isMainThread [false] [16]
// Working: isMainThread [false] [17]
// Working: isMainThread [false] [18]
// Working: isMainThread [false] [19]
// Workers result count: 20
// ▿ 20 elements
// - "0"
// - "1"
// - "2"
// - "3"
// - "4"
// - "5"
// - "7"
// - "6"
// - "8"
// - "9"
// - "11"
// - "12"
// - "10"
// - "13"
// - "14"
// - "15"
// - "16"
// - "17"
// - "18"
// - "19"
// - OUTPUT --------------------------------------------------------
@yimajo
Copy link

yimajo commented Nov 15, 2020

これはStackOverflowかAppleのフォーラムに投稿するのが良さそうな興味深い問題ですね

私の環境でもXcode 12.2で再現しました(結果が20 countsになるときもあるし19, 18になるようで結果が実行時に変わることがあり不安定でした)。

receive(on:)よりsubscribe(on:)使ったら解決

調べてみた感じ.receive(on: DispatchQueue.global())をコメントアウトすると期待値どおりになりますね。
そのため、mapのキューを指定したいのであれば.subscribe(on: DispatchQueue.global())で試すとこれは安定して期待通りになりました。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
//        .receive(on: DispatchQueue.global())
        .map {
            print("Working: [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global()) // ここでmapのキューを変える
        .eraseToAnyPublisher()
}

(余談ですが、receive(on:)よりsubscribe(on:)のほうが副作用の実行キューを指定したい意図が明確になると考えています)

予想

こうなるそもそもの原因は全然わからないのですが予想してみます。

  • いわゆるColdなcollectが非同期実行されると基本はすべてのPublisherのイベントを待たない
  • FutureなんかのいわゆるHotなcollectはPublisherのイベントを待ってくれる
    • なぜ?
      • Futureのほうにはcount引数ありのcollectメソッドはない
        • Futureを使う際には非同期実行されることは前提なのでそもそも待つ?

@susieyy
Copy link
Author

susieyy commented Nov 16, 2020

@yimajo san 検証くださってありがとうございます 🙏

subscribe(on:) を利用すると確かに全件処理されるのですが、mapの処理がバックグラウンドスレッドではなくて
メインスレッドで実行されており、意図する処理になっていないようです。

Specifies the scheduler on which to perform subscribe, cancel, and request operations.
A publisher which performs upstream operations on the specified scheduler.
c,f, https://developer.apple.com/documentation/combine/publishers/handleevents/subscribe(on:options:)

上記を見てもmapはバックグラウンドスレッドになりそうなのですが。

subscribeの位置の違いによる挙動は https://qiita.com/shiz/items/9dc8e9a96f399b6c7246#subscribe でも言及されていて、違和感があるようですね。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global())
        .eraseToAnyPublisher()
}

// Working: isMainThread [true] [0]
// Working: isMainThread [true] [1]
// Working: isMainThread [true] [2]
// Working: isMainThread [true] [3]
// Working: isMainThread [true] [4]
// Working: isMainThread [true] [5]
// Working: isMainThread [true] [6]
// Working: isMainThread [true] [7]
// Working: isMainThread [true] [8]
// Working: isMainThread [true] [9]
// Working: isMainThread [true] [10]
// Working: isMainThread [true] [11]
// Working: isMainThread [true] [12]
// Working: isMainThread [true] [13]
// Working: isMainThread [true] [14]
// Working: isMainThread [true] [15]
// Working: isMainThread [true] [16]
// Working: isMainThread [true] [17]
// Working: isMainThread [true] [18]
// Working: isMainThread [true] [19]
// Sink: isMainThread [true]
// Workers result count: 20

以下のようにsubscribeの位置を変更するとmapはバックグラウンドスレッドになり、しかも全件処理されるようになりました。
ありがとうございます 🙏🙏🙏

これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .subscribe(on: DispatchQueue.global())
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .eraseToAnyPublisher()
}

// Working: isMainThread [false] [0]
// Working: isMainThread [false] [1]
// Working: isMainThread [false] [2]
// Working: isMainThread [false] [3]
// Working: isMainThread [false] [5]
// Working: isMainThread [false] [4]
// Working: isMainThread [false] [6]
// Working: isMainThread [false] [8]
// Working: isMainThread [false] [11]
// Working: isMainThread [false] [9]
// Working: isMainThread [false] [7]
// Working: isMainThread [false] [10]
// Working: isMainThread [false] [12]
// Working: isMainThread [false] [19]
// Working: isMainThread [false] [14]
// Working: isMainThread [false] [15]
// Working: isMainThread [false] [16]
// Working: isMainThread [false] [18]
// Working: isMainThread [false] [17]
// Working: isMainThread [false] [13]
// Sink: isMainThread [true]
// Workers result count: 20

@susieyy
Copy link
Author

susieyy commented Nov 16, 2020

原因は難しいですね。

@yimajo がおっしゃるとおり、collectが最後の完了まで待たないのかなと思いきや、ログを出して解析してみると

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .receive(on: DispatchQueue.global())
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .print()
        .eraseToAnyPublisher()
}

以下のログは20回出力されていて、20回の全件がfinishedしたから正しくcollectしているようです。

  • receive subscription: (ReceiveOn)
  • receive finished

ただ、map処理の以下ログは19回しかなくて、mapが処理されてないけど、正しい数finishedされてるようですね。謎です。

  • Working: isMainThread [false] [XX]
Workers initial count: 20
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [0]
Working: isMainThread [false] [1]
receive subscription: (ReceiveOn)
request unlimited
receive value: (0)
Working: isMainThread [false] [2]
receive value: (1)
Working: isMainThread [false] [3]
receive finished
receive value: (3)
receive value: (2)
receive finished
receive finished
receive subscription: (ReceiveOn)
receive finished
request unlimited
Working: isMainThread [false] [4]
receive subscription: (ReceiveOn)
request unlimited
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [5]
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [6]
receive subscription: (ReceiveOn)
Working: isMainThread [false] [7]
receive finished
request unlimited
receive value: (5)
receive value: (6)
Working: isMainThread [false] [9]
receive value: (7)
receive subscription: (ReceiveOn)
receive finished
receive finished
receive value: (9)
receive finished
receive finished
request unlimited
receive finished
Working: isMainThread [false] [10]
receive subscription: (ReceiveOn)
request unlimited
receive value: (10)
receive subscription: (ReceiveOn)
Working: isMainThread [false] [11]
receive finished
request unlimited
receive value: (11)
Working: isMainThread [false] [12]
receive finished
receive subscription: (ReceiveOn)
request unlimited
receive value: (12)
receive subscription: (ReceiveOn)
request unlimited
Working: isMainThread [false] [13]
receive subscription: (ReceiveOn)
request unlimited
receive finished
Working: isMainThread [false] [14]
receive value: (13)
Working: isMainThread [false] [15]
receive subscription: (ReceiveOn)
request unlimited
receive finished
receive value: (14)
receive value: (15)
receive finished
Working: isMainThread [false] [16]
receive subscription: (ReceiveOn)
receive finished
request unlimited
receive value: (16)
Working: isMainThread [false] [17]
receive subscription: (ReceiveOn)
request unlimited
receive finished
receive value: (17)
receive subscription: (ReceiveOn)
request unlimited
receive finished
Working: isMainThread [false] [18]
Working: isMainThread [false] [19]
receive value: (18)
receive value: (19)
receive finished
receive finished
Sink: isMainThread [true]
Workers result count: 19
▿ 19 elements
  - "0"
  - "1"
  - "3"
  - "2"
  - "4"
  - "5"
  - "6"
  - "7"
  - "9"
  - "10"
  - "11"
  - "12"
  - "13"
  - "14"
  - "15"
  - "16"
  - "17"
  - "18"
  - "19"

@susieyy
Copy link
Author

susieyy commented Nov 16, 2020

FYI
CombineのMergeMany, receive(on, collectで複数のバックグラウンド処理を待ち合わせると、一部のバックグラウンド処理が実行されないコードのgist
https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f

@yimajo
Copy link

yimajo commented Nov 16, 2020

なるほど!スレッドチェックしてませんでした。

これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。

はい。その通りだと思います。

さらに言うと、Justとmapの間にあることでJust<Map< ... >>にならないようにしているとも思います。

解決案2: Justとmapを連続しない

それに関連する話で、実は最初に出した .subscribe(on: DispatchQueue.global())にこだわった解決案2があり、次のようにJustとmapを連続しないことでmapはバックグラウンドスレッドで動作するようになりますね。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .eraseToAnyPublisher() // ここでJust<Map< ...> >になるのを防ぎAnyPublisherにしておく
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global())
        .eraseToAnyPublisher()
}

Justとmapを連続すると型がJust<Map< ...>>になり何が起こっているのか

この理由はあくまで予測なのですが、Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしているんだと思います。Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況なんだと予想しています(CombineではHotという概念が表面的には示されていないのであくまで「Hot的な動作」という意味です)。

なぜそう思うかというと、複数のSubscriberがsubscribeした際に結果が共有されるようになるためです。検証のためにコピペでsinkを複数でやってみます。次のような感じです。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .eraseToAnyPublisher() // この部分があるかないかでmapの動作回数も変わります
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global())
        .eraseToAnyPublisher()
}

var cancellables: Set<AnyCancellable> = .init()

Publishers
    .MergeMany(workers)
    .collect()
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: {
        print("Workers result count: \($0.count)")
        dump($0)
    })
    .store(in: &cancellables)

Publishers
    .MergeMany(workers)
    .collect()
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: {
        print("Workers result count: \($0.count)")
        dump($0)
    })
    .store(in: &cancellables)
  • .eraseToAnyPublisher()があると
    • map処理が40回になる
      • subscribeされる数を2倍にしたので
    • subscribe(on:)で指定したバックグラウンドキューで実行される
  • .eraseToAnyPublisher()がないと
    • map処理は20回以下になる
      • subscribeされた数を気にしなくなる
    • subscribe(on:)で指定したバックグラウンドキューで実行されない

これ不思議なんですが、実は.mapだけじゃなく.filterでも同様だったと思います。このことから憶測ですがAppleのCombineは意図的にオペレータの組み合わせでHot化させて無駄を省こうとしてくれていて、それによってsubscribe(on:)でキューを指定してもそれに応じないんじゃないかと思っています。FutureもいわゆるHotなので、キューを指定してもそれに従わないのと同じなんだと思います。

整理してみます

  • Cold動作で指定したバックグラウンドキューで動作させるなら
    • Justとmapの間に.receive(on:)を置く
    • Justとmapの間に.eraseToAnyPublisher()を置いて、.subscribe(on:)を置く
  • Hot動作 が望ましいなら
    • Futureにする(結局そうなるとDispatchQueue.global().async)

という感じでしょうか。私の予測が入り混じっているので全然自信はないです。

そして実際はアプリの開発にJustmapをやりたいわけじゃなく、本当はMergeManycollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。

printしたらmapreceive finishedが20回あるがreceive value:が19回の件

あくまで憶測ですがcollect()の動作が2種類ある気がします。

動作的には .subscribe(on)されてないColdなPublisherたちを下流にあるcollect()が扱う場合は、何かしらのタイミングで終了したという判断をしないといけないために上流がイベントを流さずにその上流ストリームも終了させられてしまう。一方、HotなPublisherもしくは.subscribe(on:)されたColdなPublisherたちのcollect()は、非同期実行を待つ前提であるため上流のストリームの終了を待つという感じなのかな、と思いました。思い込みかもしれませんが...。

以上、長々と憶測が多くなり失礼しましたー🙇

@susieyy
Copy link
Author

susieyy commented Nov 17, 2020

なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。

Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況

確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。

Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている

&

eraseToAnyPublisherでColdのまま動作

このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)

そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。

手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。

Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment