package org.ntlab.developrx; import android.util.Pair; import org.ntlab.developrx.utils.ThreadUtil; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import java.util.Date; import java.util.concurrent.TimeUnit; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; /** * Created by matsumoto_k on 2017/11/11. */ public class JavaRxProcess { /** * Flowable (Reactive Streams対応) */ public void flowableReactiveStreams() { Flowable<String> flowable = Flowable.create(emitter -> { String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; for (String data : datas) { //購読解除されている場合は処理を止める if (emitter.isCancelled()) { return; } // データを通知する emitter.onNext(data); } // 完了したことを通知 emitter.onComplete(); }, BackpressureStrategy.BUFFER); flowable // Subscriberの処理を別のスレッドで行うようにする .observeOn(Schedulers.computation()) .subscribe(new Subscriber<String>() { // データ数のリクエストおよび購読の解除を行うオブジェクト Subscription subscription; // 購読が開始された際の処理 @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; // 受け取るデータ数をリクエストする // Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する this.subscription.request(1L); } // データを受け取った際の処理 @Override public void onNext(String data) { System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data); // 次に受け取るデータ数をリクエストする this.subscription.request(1L); } // 完了を通知された際の処理 @Override public void onComplete() { System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); } // エラーを通知された際の処理 @Override public void onError(Throwable t) { t.printStackTrace(); } }); } /** * Observable (バックプレッシャー機能なし) */ public void observable() { Observable<String> observable = Observable.create(emitter -> { String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; for (String data : datas) { if (emitter.isDisposed()) { return; } emitter.onNext(data); } emitter.onComplete(); }); observable.observeOn(Schedulers.computation()) .subscribe( new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { // 何もしない } @Override public void onNext(String item) { System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); } } ); } /** * 引数のデータを通知するFLowableを作成 * 引数は10個まで */ public void just() { Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J"); flowable.subscribe(new DebugSubscriber<>()); } /** * 引数のデータを通知するFlowableを作成 */ public void fromArray() { Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"); flowable.subscribe(new DebugSubscriber<>()); } /** * startから順にcount数のデータを通知する */ public void range() { Flowable<Integer> flowable = Flowable.range(10, 3); flowable.subscribe(new DebugSubscriber<>()); } /** * 指定時間ごとに通知を送る */ public void interval() { Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(5); flowable.subscribe(data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": " + System.currentTimeMillis() + ": " + "data = " + data)); } /** * 指定時間後に0を通知する */ public void timer() { Flowable<Long> flowable = Flowable.timer(1000, TimeUnit.MILLISECONDS); flowable.subscribe( data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": data = " + data), error -> System.out.println("エラー = " + error), () -> System.out.println("完了") ); } /** * エラーのみを通知する */ public void error() { Flowable.error(new Exception("例外発生")).subscribe(new DebugSubscriber<>()); } /** * データを変換して通知 */ public void map() { Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E") .map(data -> data.toLowerCase()); flowable.subscribe(new DebugSubscriber<>()); } /** * からのFlowable/Observableを生成する */ public void empy() { Flowable.empty().subscribe(new DebugSubscriber<>()); } /** * 受け取ったデータをFlowable/Observableに変換し、そのFlowable/Obsevableが持つデータを通知する */ public void flatmap1() { Flowable<String> flowable = Flowable.just("A", "", "B", "", "C") .flatMap(data -> { if (data.isEmpty()) { return Flowable.empty(); } else { return Flowable.just(data.toUpperCase()); } }); flowable.subscribe(new DebugSubscriber<>()); } public void flatmap2() { Flowable<String> flowable = Flowable.range(1, 3) .flatMap( data -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3), (sourceData, newData) -> "[" + sourceData + "] " + newData ); flowable.subscribe(new DebugSubscriber<>()); } public void meetingTest() { PublishProcessor<Long> processor1 = PublishProcessor.create(); PublishProcessor<Long> processor2 = PublishProcessor.create(); Flowable<Pair<Long, Long>> flowable = PublishProcessor.combineLatest(processor1, processor2, (time1, time2) -> new Pair<Long, Long>(time1, time2)); flowable.subscribe(pair -> System.out.println(new Date(pair.first) + " : " + new Date(pair.second))); new Thread(() -> { try { Thread.sleep(1000); processor1.onNext(System.currentTimeMillis()); Thread.sleep(3000); processor1.onNext(System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Thread.sleep(2000); processor2.onNext(System.currentTimeMillis()); Thread.sleep(1000); processor2.onNext(System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }