package org.ntlab.developrx;
import org.ntlab.developrx.utils.ThreadUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
/**
* Created by matsumoto_k on 2017/11/11.
*/
public class JavaRxProcess {
/**
* Flowable (Reactive Streams対応)
*/
public void flowableReactiveStreams() {
Flowable<String> flowable = Flowable.create(emitter -> {
String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};
for (String data : datas) {
//購読解除されている場合は処理を止める
if (emitter.isCancelled()) {
return;
}
// データを通知する
emitter.onNext(data);
}
// 完了したことを通知
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable
// Subscriberの処理を別のスレッドで行うようにする
.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {
// データ数のリクエストおよび購読の解除を行うオブジェクト
Subscription subscription;
// 購読が開始された際の処理
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
// 受け取るデータ数をリクエストする
// Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する
this.subscription.request(1L);
}
// データを受け取った際の処理
@Override
public void onNext(String data) {
System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data);
// 次に受け取るデータ数をリクエストする
this.subscription.request(1L);
}
// 完了を通知された際の処理
@Override
public void onComplete() {
System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
}
// エラーを通知された際の処理
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
});
}
/**
* Observable (バックプレッシャー機能なし)
*/
public void observable() {
Observable<String> observable = Observable.create(emitter -> {
String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};
for (String data : datas) {
if (emitter.isDisposed()) {
return;
}
emitter.onNext(data);
}
emitter.onComplete();
});
observable.observeOn(Schedulers.computation())
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
// 何もしない
}
@Override
public void onNext(String item) {
System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
}
}
);
}
/**
* 引数のデータを通知するFLowableを作成
* 引数は10個まで
*/
public void just() {
Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
flowable.subscribe(new DebugSubscriber<>());
}
/**
* 引数のデータを通知するFlowableを作成
*/
public void fromArray() {
Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K");
flowable.subscribe(new DebugSubscriber<>());
}
/**
* startから順にcount数のデータを通知する
*/
public void range() {
Flowable<Integer> flowable = Flowable.range(10, 3);
flowable.subscribe(new DebugSubscriber<>());
}
/**
* 指定時間ごとに通知を送る
*/
public void interval() {
Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(5);
flowable.subscribe(data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": " + System.currentTimeMillis() + ": " + "data = " + data));
}
/**
* 指定時間後に0を通知する
*/
public void timer() {
Flowable<Long> flowable = Flowable.timer(1000, TimeUnit.MILLISECONDS);
flowable.subscribe(
data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": data = " + data),
error -> System.out.println("エラー = " + error),
() -> System.out.println("完了")
);
}
/**
* エラーのみを通知する
*/
public void error() {
Flowable.error(new Exception("例外発生")).subscribe(new DebugSubscriber<>());
}
/**
* データを変換して通知
*/
public void map() {
Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E")
.map(data -> data.toLowerCase());
flowable.subscribe(new DebugSubscriber<>());
}
/**
* からのFlowable/Observableを生成する
*/
public void empy() {
Flowable.empty().subscribe(new DebugSubscriber<>());
}
/**
* 受け取ったデータをFlowable/Observableに変換し、そのFlowable/Obsevableが持つデータを通知する
*/
public void flatmap1() {
Flowable<String> flowable = Flowable.just("A", "", "B", "", "C")
.flatMap(data -> {
if (data.isEmpty()) {
return Flowable.empty();
} else {
return Flowable.just(data.toUpperCase());
}
});
flowable.subscribe(new DebugSubscriber<>());
}
public void flatmap2() {
Flowable<String> flowable = Flowable.range(1, 3)
.flatMap(
data -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3),
(sourceData, newData) -> "[" + sourceData + "] " + newData
);
flowable.subscribe(new DebugSubscriber<>());
}
}