diff --git a/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java b/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java index fd67971..efd03d6 100644 --- a/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java +++ b/app/src/main/java/org/ntlab/developrx/JavaMainActivity.java @@ -7,10 +7,16 @@ * Created by matsumoto_k on 2017/11/11. */ public class JavaMainActivity extends AppCompatActivity { + JavaRxProcess rxProcess = new JavaRxProcess(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main_java); + + findViewById(R.id.flowable_btn).setOnClickListener(view -> { + rxProcess.flowableReactiveStreams(); + } + ); } } diff --git a/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java b/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java new file mode 100644 index 0000000..ebbadd8 --- /dev/null +++ b/app/src/main/java/org/ntlab/developrx/JavaRxProcess.java @@ -0,0 +1,77 @@ +package org.ntlab.developrx; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; + +/** + * Created by matsumoto_k on 2017/11/11. + */ + +public class JavaRxProcess { + + /** + * Flowable (Reactive Streams対応) + */ + public void flowableReactiveStreams() { + Flowable flowable = Flowable.create(emitter -> { + String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"}; + + for (String data : datas) { + //購読解除されている場合は処理を止める + if (emitter.isCancelled()) { + return; + } + + // データを通知する + emitter.onNext(data); + } + + // 完了したことを通知 + emitter.onComplete(); + }, BackpressureStrategy.BUFFER); + + flowable + // Subscriberの処理を別のスレッドで行うようにする + .subscribeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + + // データ数のリクエストおよび購読の解除を行うオブジェクト + Subscription subscription; + + // 購読が開始された際の処理 + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + // 受け取るデータ数をリクエストする + // Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する + this.subscription.request(1L); + } + + // データを受け取った際の処理 + @Override + public void onNext(String data) { + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + ": " + data); + // 次に受け取るデータ数をリクエストする + this.subscription.request(1L); + } + + // 完了を通知された際の処理 + @Override + public void onComplete() { + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + ": 完了しました"); + } + + // エラーを通知された際の処理 + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }); + } +} diff --git a/app/src/main/res/layout/activity_main_java.xml b/app/src/main/res/layout/activity_main_java.xml index bbce02c..dd53f60 100644 --- a/app/src/main/res/layout/activity_main_java.xml +++ b/app/src/main/res/layout/activity_main_java.xml @@ -1,18 +1,22 @@ - - + - +