Newer
Older
DevelopRx / app / src / main / java / org / ntlab / developrx / JavaRxProcess.java
package org.ntlab.developrx;

import android.util.Pair;

import org.ntlab.developrx.utils.ThreadUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;

/**
 * Created by matsumoto_k on 2017/11/11.
 */

public class JavaRxProcess {

    /**
     * Flowable (Reactive Streams対応)
     */
    public void flowableReactiveStreams() {
        Flowable<String> flowable = Flowable.create(emitter -> {
            String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};

            for (String data : datas) {
                //購読解除されている場合は処理を止める
                if (emitter.isCancelled()) {
                    return;
                }

                // データを通知する
                emitter.onNext(data);
            }

            // 完了したことを通知
            emitter.onComplete();
        }, BackpressureStrategy.BUFFER);

        flowable
                // Subscriberの処理を別のスレッドで行うようにする
                .observeOn(Schedulers.computation())
                .subscribe(new Subscriber<String>() {

                    // データ数のリクエストおよび購読の解除を行うオブジェクト
                    Subscription subscription;

                    // 購読が開始された際の処理
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        // 受け取るデータ数をリクエストする
                        // Long.MAX_VALUEをrequestした場合、データ数を制限すること無くデータを通知する
                        this.subscription.request(1L);
                    }

                    // データを受け取った際の処理
                    @Override
                    public void onNext(String data) {
                        System.out.println(ThreadUtil.getCurrentThreadName() + ": " + data);
                        // 次に受け取るデータ数をリクエストする
                        this.subscription.request(1L);
                    }

                    // 完了を通知された際の処理
                    @Override
                    public void onComplete() {
                        System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
                    }

                    // エラーを通知された際の処理
                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }
                });
    }

    /**
     * Observable (バックプレッシャー機能なし)
     */
    public void observable() {
        Observable<String> observable = Observable.create(emitter -> {
            String[] datas = {"データ1", "データ2", "データ3", "データ4", "データ5"};

            for (String data : datas) {
                if (emitter.isDisposed()) {
                    return;
                }
                emitter.onNext(data);
            }

            emitter.onComplete();
        });

        observable.observeOn(Schedulers.computation())
                .subscribe(
                        new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                // 何もしない
                            }

                            @Override
                            public void onNext(String item) {
                                System.out.println(ThreadUtil.getCurrentThreadName() + ": " + item);
                            }

                            @Override
                            public void onError(Throwable e) {
                                e.printStackTrace();
                            }

                            @Override
                            public void onComplete() {
                                System.out.println(ThreadUtil.getCurrentThreadName() + ": 完了しました");
                            }
                        }
                );
    }

    /**
     * 引数のデータを通知するFLowableを作成
     * 引数は10個まで
     */
    public void just() {
        Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * 引数のデータを通知するFlowableを作成
     */
    public void fromArray() {
        Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K");
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * startから順にcount数のデータを通知する
     */
    public void range() {
        Flowable<Integer> flowable = Flowable.range(10, 3);
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * 指定時間ごとに通知を送る
     */
    public void interval() {
        Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(5);
        flowable.subscribe(data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": " + System.currentTimeMillis() + ": " + "data = " + data));
    }

    /**
     * 指定時間後に0を通知する
     */
    public void timer() {
        Flowable<Long> flowable = Flowable.timer(1000, TimeUnit.MILLISECONDS);
        flowable.subscribe(
                data -> System.out.println(ThreadUtil.getCurrentThreadName() + ": data = " + data),
                error -> System.out.println("エラー = " + error),
                () -> System.out.println("完了")
        );
    }

    /**
     * エラーのみを通知する
     */
    public void error() {
        Flowable.error(new Exception("例外発生")).subscribe(new DebugSubscriber<>());
    }

    /**
     * データを変換して通知
     */
    public void map() {
        Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E")
                .map(data -> data.toLowerCase());
        flowable.subscribe(new DebugSubscriber<>());
    }

    /**
     * からのFlowable/Observableを生成する
     */
    public void empy() {
        Flowable.empty().subscribe(new DebugSubscriber<>());
    }

    /**
     * 受け取ったデータをFlowable/Observableに変換し、そのFlowable/Obsevableが持つデータを通知する
     */
    public void flatmap1() {
        Flowable<String> flowable = Flowable.just("A", "", "B", "", "C")
                .flatMap(data -> {
                    if (data.isEmpty()) {
                        return Flowable.empty();
                    } else {
                        return Flowable.just(data.toUpperCase());
                    }
                });
        flowable.subscribe(new DebugSubscriber<>());
    }

    public void flatmap2() {
        Flowable<String> flowable = Flowable.range(1, 3)
                .flatMap(
                        data -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3),
                        (sourceData, newData) -> "[" + sourceData + "] " + newData
                );
        flowable.subscribe(new DebugSubscriber<>());
    }

    public void meetingTest() {
        PublishProcessor<Long> processor1 = PublishProcessor.create();
        PublishProcessor<Long> processor2 = PublishProcessor.create();
        Flowable<Pair<Long, Long>> flowable = PublishProcessor.combineLatest(processor1, processor2, (time1, time2) -> new Pair<Long, Long>(time1, time2));
        flowable.subscribe(pair -> System.out.println(new Date(pair.first) + " : " + new Date(pair.second)));

        new Thread(() -> {
            try {
                Thread.sleep(1000);
                processor1.onNext(System.currentTimeMillis());
                Thread.sleep(3000);
                processor1.onNext(System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                Thread.sleep(2000);
                processor2.onNext(System.currentTimeMillis());
                Thread.sleep(1000);
                processor2.onNext(System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}