package org.ntlab.developrx; import org.ntlab.developrx.utils.ThreadUtil; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; /** * Created by matsumoto_k on 2017/11/11. */ public class JavaRxProcess { /** * Flowable (Reactive Streams対応) */ public void flowableReactiveStreams() { Flowable<String> flowable = Flowable.create(emitter -> { String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; for (String data : datas) { //購読解除されている場合は処理を止める if (emitter.isCancelled()) { return; } // データを通知する emitter.onNext(data); } // 完了したことを通知 emitter.onComplete(); }, BackpressureStrategy.BUFFER); flowable // Subscriberの処理を別のスレッドで行うようにする .observeOn(Schedulers.computation()) .subscribe(new Subscriber<String>() { // データ数のリクエストおよび購読の解除を行うオブジェクト Subscription subscription; // 購読が開始された際の処理 @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; // 受け取るデータ数をリクエストする // Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する this.subscription.request(1L); } // データを受け取った際の処理 @Override public void onNext(String data) { System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data); // 次に受け取るデータ数をリクエストする this.subscription.request(1L); } // 完了を通知された際の処理 @Override public void onComplete() { System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); } // エラーを通知された際の処理 @Override public void onError(Throwable t) { t.printStackTrace(); } }); } /** * Observable (バックプレッシャー機能なし) */ public void observable() { Observable<String> observable = Observable.create(emitter -> { String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; for (String data : datas) { if (emitter.isDisposed()) { return; } emitter.onNext(data); } emitter.onComplete(); }); observable.observeOn(Schedulers.computation()) .subscribe( new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { // 何もしない } @Override public void onNext(String item) { System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); } } ); } }