diff --git a/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java b/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java index efd03d6..a184b71 100644 --- a/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java +++ b/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java @@ -15,8 +15,11 @@ setContentView(R.layout.activity_main_java); findViewById(R.id.flowable_btn).setOnClickListener(view -> { - rxProcess.flowableReactiveStreams(); - } - ); + rxProcess.flowableReactiveStreams(); + }); + + findViewById(R.id.observable_btn).setOnClickListener(view -> { + rxProcess.observable(); + }); } } diff --git a/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java b/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java index e2f3b3e..59ef72c 100644 --- a/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java +++ b/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java @@ -1,10 +1,14 @@ package org.ntlab.developrx; +import org.ntlab.developrx.utils.ThreadUtil; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; /** @@ -54,8 +58,7 @@ // データを受け取った際の処理 @Override public void onNext(String data) { - String threadName = Thread.currentThread().getName(); - System.out.println(threadName + ": " + data); + System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data); // 次に受け取るデータ数をリクエストする this.subscription.request(1L); } @@ -63,8 +66,7 @@ // 完了を通知された際の処理 @Override public void onComplete() { - String threadName = Thread.currentThread().getName(); - System.out.println(threadName + ": 完了しました"); + System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); } // エラーを通知された際の処理 @@ -74,4 +76,47 @@ } }); } + + /** + * Observable (バックプレッシャー機能なし) + */ + public void observable() { + Observable observable = Observable.create(emitter -> { + String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; + + for (String data : datas) { + if (emitter.isDisposed()) { + return; + } + emitter.onNext(data); + } + + emitter.onComplete(); + }); + + observable.observeOn(Schedulers.computation()) + .subscribe( + new Observer() { + @Override + public void onSubscribe(Disposable disposable) { + // 何もしない + } + + @Override + public void onNext(String item) { + System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onComplete() { + System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました"); + } + } + ); + } } diff --git a/app/src/main/res/layout/activity_main_java.xml b/app/src/main/res/layout/activity_main_java.xml index dd53f60..e9845d8 100644 --- a/app/src/main/res/layout/activity_main_java.xml +++ b/app/src/main/res/layout/activity_main_java.xml @@ -17,6 +17,14 @@ android:text="flowable (Reactive Streams)" app:layout_constraintStart_toStartOf="parent" app:layout_constraintTop_toTopOf="parent" /> + +