Java 9 - Reactive Streams

in Tech Java

简介

Reactive Streams 本质上就是通过异步非阻塞的观察者模式来处理数据。其实现也就是 java.util.concurrent.Flow 中包含三个重要的函数式接口 Publisher、Subscriber、Subscription。还有一个函数式接口 Processor 直接继承 Publisher 和 Subscriber,既能充当发布者推送数据,也能充当订阅者消费数据。其中还包含一个 back pressure (背压 or 回压)的概念,也就是,当发布者推送数据超过了订阅者消费数据能力时产生的压力。这时就需要一定的机制来保障订阅者不被压垮。

Publisher(发布者)

用于发布数据到订阅者

 @FunctionalInterface
    public static interface Publisher<T> {
        // 关联订阅者
        public void subscribe(Subscriber<? super T> subscriber);
    }

Subscriber(订阅者)

用于消费发布者推送的数据

    public static interface Subscriber<T> {
        // 发布者接收完订阅信息后会被回调。然后通过 Subscription 向发布者请求数据
        public void onSubscribe(Subscription subscription);
        // 接收并消费发布者推送的数据
        public void onNext(T item);
        // 抛异常时会被调用
        public void onError(Throwable throwable);
        // 处理完成时会被调用
        public void onComplete();
    }

Subscription(订阅关系)

用于发布者和订阅者的关联管理

    public static interface Subscription {
        // 可指定每次请求的数量
        public void request(long n);
        // 可取消订阅
        public void cancel();
    }

示例

发布者使用J.U.C下内置的实现,也就是 java.util.concurrent.SubmissionPublisher

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveStreams {
    private final static Logger log = LoggerFactory.getLogger(ReactiveStreams.class);
    private final static int DATA_COUNT = 300;
    private final static CountDownLatch latch = new CountDownLatch(DATA_COUNT);

    @Test
    public void test() throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 订阅者 需自行依据业务实现
        MySubscriber subscriber = new MySubscriber();
        // 建立订阅关系。会回调到 Subscriber.onSubscribe 方法
        publisher.subscribe(subscriber);

        for (int i = 0; i < DATA_COUNT; i++) {
            String data = "今晚打老虎" + i;
            log.info("生产数据: {}", data);
            // 内部回调到 Subscriber.onNext 方法
            // 且缓存最大值为256。也就是当订阅者消费能力不足时,这里会被阻塞
            publisher.submit(data);
        }
        // 内部在订阅者接收处理完数据后,回调到 Subscriber.onComplete 方法
        publisher.close();
        latch.await();
    }

    class MySubscriber implements Flow.Subscriber<String> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            log.info("【订阅者】 向发布者请求一条数据");
            subscription.request(1);
        }

        @Override
        public void onNext(String item) {
            Random random = new Random();
            int time = random.nextInt(100);
            log.info("【订阅者】 正在消费数据: {},预计{}ms", item, time);
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            // 请求下一条数据
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            log.error("【订阅者】 出错了哦: {}", throwable.getMessage());
            subscription.cancel();// 出错了,取消订阅
        }

        @Override
        public void onComplete() {
            log.info("【订阅者】 消费完所有数据了");
        }
    }
}