Newer
Older
DevelopRx / app / src / main / java / org / ntlab / developrx / JavaRxProcess.java
KeijuMatsumoto on 12 Nov 2017 6 KB [add] map
package org.ntlab.developrx;

import org.ntlab.developrx.utils.ThreadUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

/**
 * Created by matsumoto_k on 2017/11/11.
 */

public class JavaRxProcess {

    /**
     * Flowable (Reactive Streams対応)
     */
    public void flowableReactiveStreams() {
        Flowable<String> flowable = Flowable.create(emitter -> {
            String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};

            for (String data : datas) {
                //購読解除されている場合は処理を止める
                if (emitter.isCancelled()) {
                    return;
                }

                // データを通知する
                emitter.onNext(data);
            }

            // 完了したことを通知
            emitter.onComplete();
        }, BackpressureStrategy.BUFFER);

        flowable
                // Subscriberの処理を別のスレッドで行うようにする
                .observeOn(Schedulers.computation())
                .subscribe(new Subscriber<String>() {

                    // データ数のリクエストおよび購読の解除を行うオブジェクト
                    Subscription subscription;

                    // 購読が開始された際の処理
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        // 受け取るデータ数をリクエストする
                        // Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する
                        this.subscription.request(1L);
                    }

                    // データを受け取った際の処理
                    @Override
                    public void onNext(String data) {
                        System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data);
                        // 次に受け取るデータ数をリクエストする
                        this.subscription.request(1L);
                    }

                    // 完了を通知された際の処理
                    @Override
                    public void onComplete() {
                        System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
                    }

                    // エラーを通知された際の処理
                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }
                });
    }

    /**
     * Observable (バックプレッシャー機能なし)
     */
    public void observable() {
        Observable<String> observable = Observable.create(emitter -> {
            String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};

            for (String data : datas) {
                if (emitter.isDisposed()) {
                    return;
                }
                emitter.onNext(data);
            }

            emitter.onComplete();
        });

        observable.observeOn(Schedulers.computation())
                .subscribe(
                        new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                // 何もしない
                            }

                            @Override
                            public void onNext(String item) {
                                System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item);
                            }

                            @Override
                            public void onError(Throwable e) {
                                e.printStackTrace();
                            }

                            @Override
                            public void onComplete() {
                                System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
                            }
                        }
                );
    }

    /**
     * 引数のデータを通知するFLowableを作成
     * 引数は10個まで
     */
    public void just() {
        Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * 引数のデータを通知するFlowableを作成
     */
    public void fromArray() {
        Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K");
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * startから順にcount数のデータを通知する
     */
    public void range() {
        Flowable<Integer> flowable = Flowable.range(10, 3);
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * 指定時間ごとに通知を送る
     */
    public void interval() {
        Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(5);
        flowable.subscribe(data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": " + System.currentTimeMillis() + ": " + "data = " + data));
    }

    /**
     * 指定時間後に0を通知する
     */
    public void timer() {
        Flowable<Long> flowable = Flowable.timer(1000, TimeUnit.MILLISECONDS);
        flowable.subscribe(
                data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": data = " + data),
                error -> System.out.println("エラー = " + error),
                () -> System.out.println("完了")
        );
    }

    /**
     * エラーのみを通知する
     */
    public void error() {
        Flowable.error(new Exception("例外発生")).subscribe(new DebugSubscriber<>());
    }

    /**
     * データを変換して通知
     */
    public void map() {
        Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E")
                .map(data -> data.toLowerCase());
        flowable.subscribe(new DebugSubscriber<>());
    }
}