Observable的单播与Subject多播

Observable的单播与Subject多播

Scroll Down

通过RxJS库中的(of)方法来创建一个Observable

function f1() {
    let observable = of(1, 2, 3);
    /* 定义一个观察者 */
    let observer = { next: num => { console.log(num); } };
    observable.subscribe(
        //使用 subscribe(observer?: PartialObserver<T>): Subscription; 方法
        observer
    );
}
// f1();

通过构造函数来创建Observable

function f2() {
    const observable = Observable.create(
        observer => {
            try {
                console.log("生产者发送了1");
                observer.next(1);
                console.log("生产者发送了2");
                observer.next(2);
                console.log("生产者发送了3");
                observer.next(3);
                setTimeout(function () {
                    observer.next(4);
                }, 3000);
                console.log("生产者使用异步函数发送给了4");
            } catch (e) {
                observer.error(e);
            }
            //observer.complete();
        }
    );
    const observer = {
        next: num => console.log(num),
        error: e => console.log(e),
        complete: () => console.log("完成")
    };
    observable.subscribe(observer);
}
f2();

(模拟)自定义subscribe()

function f4() {
    function subscribe(observer) {
        var intervalID = setInterval(() => {
            observer.next('launch.....');
        }, 1000);
        return () => clearInterval(intervalID);
    }

    var subscription = subscribe({ next: (x) => console.log(x) });
    setTimeout(() => subscription(), 5000);
}
f4();

使用RxJS中的操作符

我们可以使用管道(pipe)将多个操作符链接起来,并将操作符返回的结果组合成一个,这样,代码结构更清晰

function f5() {
    const observable = of(1, 2, 3);
    observable.pipe(
        tap(x => console.log(x)),
        map(x => x * x),
        tap(x => console.log(x))
    ).subscribe(data => console.log(data));
}
f5();

Observable 的单播

function f6() {
    //Observable 创建一个可观测对象 
    const observable = of(1, 2, 3);
    //创建两个观察者对象
    const observer1 = { next: x => console.log('O1 ' + x) };
    const observer2 = { next: x => console.log('O2 ' + x) };
    //两个观察者分别订阅可观察对象
    observable.subscribe(observer1);
    observable.subscribe(observer2);
}
f6();

输出:
O1 1
O1 2
O1 3
O2 1
O2 2
O2 3
两个观察者之间互不干扰,即生产者先将值发送给observer1,再将值发送给observer2

Subject的多播

作为可观察对象(生产者)的Subject

function f7() {
    //创建一个可观测对象 Subject
    const subject = new Subject();
    //创建两个观察者对象
    const observer1 = { next: x => console.log('O1 ' + x) };
    const observer2 = { next: x => console.log('O2 ' + x) };
    // 注册两个观察者!注意,这里是注册,
    // 而不是像Observable.subscribe()那样执行订阅者函数
    subject.subscribe(observer1);
    subject.subscribe(observer2);
    // subject 广播
    subject.next(1);
    subject.next(2);
    subject.next(3);
}
f7();

输出:
O1 1
O2 1
O1 2
O2 2
O1 3
O2 3

作为观察者的Subject

function f8() {
    //定义一个可观察对象
    const observable = of(1, 2, 3);
    //作为观察者的Subject
    const subject = new Subject();
    //创建两个观察者对象 Observer
    const observer1 = { next: x => console.log('O1 ' + x) };
    const observer2 = { next: x => console.log('O2 ' + x) };

    //将Observer注册为Subject的观察者
    subject.subscribe(observer1);
    subject.subscribe(observer2);

    //将subject注册为observable的观察者并执行
    observable.subscribe(subject);
}
f8();

输出:
O1 1
O2 1
O1 2
O2 2
O1 3
O2 3
运行结果和上面的是一样的

Subject(observer)订阅Subject(Observable)

 function f9(){
    // 作为Observable
    const subjectObservable = new Subject();
    // 作为 observer
    const subjectObserver = new Subject();
 
    // 新建两个观察者对象,并将其注册到subjectObserver中
    const observer1 = {
        next: num => console.log("O1 " + num)
    }
    const observer2 = {
        next: num => console.log("O2 " + num)
    }
    subjectObserver.subscribe(observer1);
    subjectObserver.subscribe(observer2);
 
    // subjectObserver订阅subjectObservable
    subjectObservable.subscribe(subjectObserver);
 
    // subjectObservable向在subjectObserver中注册的观察者发送值
    subjectObservable.next(1);
    subjectObservable.next(2);
    subjectObservable.next(3);
}
f9();

输出:
O1 1
O2 1
O1 2
O2 2
O1 3
O2 3
运行结果和上面的是一样的