Experiments Never Fail

RxJava で Observable の並列処理を直列化する

rx.Observable<T> のオペレータは、通常は非同期で、並列に処理されます。

例えば以下のような場合:

public void start() {
Observable.range(1, 5)
.flatMap(x -> fatTask(x))
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}

private final Random rand = new Random();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

// ランダムにスリープした後 x を onNext する
private Observable<Integer> fatTask(final int x) {
return Observable.create(subscriber -> {
long sleep = (long) (rand.nextDouble() * 10000L);
Log.d(TAG, "fatTask(" + x + ") - start.");

executor.schedule(() -> {
subscriber.onNext(x);
subscriber.onCompleted();
}, sleep, TimeUnit.MILLISECONDS);
});
}

このプログラムの出力はこうなります。

出力:
fatTask(1) - start.
fatTask(2) - start.
fatTask(3) - start.
fatTask(4) - start.
fatTask(5) - start.
onNext - 3
onNext - 5
onNext - 4
onNext - 2
onNext - 1

fatTask は 1,2,3,4,5 の順で 完了を待たずに 呼びだされます。
が、それぞれ処理にかかる時間が異なるので、 onNext が呼ばれる順は 1〜 とは限りません。

ソースとなる Stream の順番を崩したくない場合は、 fatTask(1) が完了してから fatTask(2) を開始する、というように直列化しなければなりません。

Observable.Concat(concatWith) #

これを行うのが Observable.Concat です(RxJava では Observable.concatWith のようですね)。
複数の Observable を順に(完了してから次へ)処理していきます。

使い方 #

toList で一旦ただの List にしてから、concatWith で数珠つなぎにします。

public void start() {
Observable.range(1, 5)
.toList()
.flatMap(list -> {
// fatTask(1).contat(fatTask(2)).contat(fatTask(3))...
// にする(fold 使えれば…)
Observable<Integer> task = null;
for (int x : list) {
if (task == null) {
task = fatTask(x);
} else {
task = task.concatWith(fatTask(x));
}
}
return task;
})
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}

このプログラムの出力はこうなります。

出力
fatTask(1) - start.
onNext - 1
fatTask(2) - start.
onNext - 2
fatTask(3) - start.
onNext - 3
fatTask(4) - start.
onNext - 4
fatTask(5) - start.
onNext - 5

fatTask(1) の完了を待ってから、次の fatTask(2) が実行されています。


Rx.NET では、

static IObservable<T> Concat<T>(IEnumerable<IObservable<T>> sources)

で、複数の IObservable を一括で渡せるのですが、 RxJava にはないようで、、、。

static <T> Observable<T> concatEager(Iterable<? extends Observable<? extends T>> sources)

というのがあったんですが、期待通りうごいてくれず、 Eager? なんでしょう?

ソースが無限リストだったら? #

toList で一旦ただの List にしているのが非常に気に入らないですね。
range(1, 5)interval(1, TimeUnit.SECONDS) のように無限の Stream だったら使えません。

そこで、 concat には、こんな overload もあります。

static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables)

Observable を通知する Observable? ややこしいですがこう使います。

public void start() {
// 2. を concat する
Observable.concat(
// 1. Observable<Long>
Observable.interval(1, TimeUnit.SECONDS)
// 2. Long を Observable<Integer> に変換
// → Observable<Observable<Integer>> になる
.map(x -> fatTask(x.intValue())))
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}

このプログラムの出力はこうなります。

出力
fatTask(0) - start.
onNext - 0
fatTask(1) - start.
onNext - 1
fatTask(2) - start.
onNext - 2
…つづく

無限リストながら、並列処理せずに順序通り動いてくれます。

interval の値を単純に mapObservable<Integer> に変換してやります。するとこれは Observable<Observable<Integer>> になり、concat 可能になります。 flatMap だと平坦化されちゃうのでただの map です。

まとめ #

Observable は普通は非同期で並列処理。
非同期ながら直列化したい場合は Observable.concat でできます。

  1. GPS から緯度経度を取得
  2. なんか重い計算を行う
  3. 結果をテキストファイルに書き出す

みたいな処理をするとき 3. を 1. の順序と同じにしたいのでこれを使います。

はじめ自分は flatMap で繋いでいくだけですべて直列化されているのかなーと勘違いしていたので、これを知った時は目からウロコでした。

参考 #

published at tags: RxJava ReactiveX Java