NodeJS观察者模式库(observable-fns)

平时在开发过程中,经常会用到设计模式(Design pattern), 它是前人总结出的宝贵经验和经典实践。常用的设计模式有三类:创建型模式(工厂模式、单例模式、建造者模式)、结构型模式(装饰器模式、适配器模式、代理模式)、行为型模式(监听者模式、观察者模式)。本文介绍一种观察者模式的实现库:Observable-fns, 使用者可以快速在NodeJS环境下使用该模式进行消息分发和订阅。

前言

1
Observable-fns
是一个实现比较巧妙的Observer模式库。该库的作者同样为Andy Wermke,
1
Observable-fns
不仅提供了传统的发布订阅接口,而且还为使用者提供了常用的工具函数,方便开发者进行消息进行过滤、扫描、变换、导入导出、合并订阅等,是非常使用的工具库。

观察模式整体设计

在观察者模式中,数据通过观察者作为输入端,想订阅者传递。

1
Observable-fns
实现了
1
Observable
类,其中包括subscriber的订阅回调方法,数据管理方法等。具体架构可参看下图:

Observable-fns设计

从图中的Step 1中,我们可以看出,创建

1
Observable
类时,需要传入订阅回调函数,这个函数是订阅者调用
1
subscribe
方法时触发调用的。

当订阅者调用

1
subscribe
时,就会创建一个
1
Subscription
类,它拥有自己的消息队列、订阅状态、以及会生成一个订阅管理类
1
SubscriptionObserver
,它是对订阅者
1
Subscription
类的管理类,作用是操作
1
Subscription
类进行消息分发,并通过内部状态机逻辑,维护
1
Subscription
的状态,这是Step 2所做的事情.

接着,

1
Subscription
类会执行Step 3调用订阅回调函数,参数就是上一步创建的
1
SubscriptionObserver
类, 回到函数内部来定义用于自定义的消息对接和发布代码。

广播消息

在整体设计中,我们实现了观察者和订阅者的一一映射,并建立了信息的发布、订阅通道,并可以管理消息发布过程中的订阅状态。

但是如何实现单一观察者发布消息,多方订阅者接收消息的场景呢?

1
Observable-fns
也给出了自己的方案:multicast.

Observable multicast

1
multicast
是一个函数,输入参数是我们上节中创建的
1
Observable
类, 函数内部会创建一个
1
MulticastSubject
类,并返回一个
1
ObservableProxy
类,
1
ObservableProxy
会对
1
MulticastSubject
进行管理,我们可以参考代码实现。

function multicast<T>(coldObservable: ObservableLike<T>): Observable<T> {
  const subject = new Subject<T>()

  let sourceSubscription: ReturnType<ObservableLike<T>["subscribe"]> | undefined
  let subscriberCount = 0

  return new Observable<T>(observer => {
    // Init source subscription lazily
    if (!sourceSubscription) {
      sourceSubscription = coldObservable.subscribe(subject)
    }

    // Pipe all events from `subject` into this observable
    const subscription = subject.subscribe(observer)
    subscriberCount++

    return () => {
      subscriberCount--
      subscription.unsubscribe()

      // Close source subscription once last subscriber has unsubscribed
      if (subscriberCount === 0) {
        unsubscribe(sourceSubscription)
        sourceSubscription = undefined
      }
    }
  })
}

这里的

1
Observable
类我们定义为cold observer, 因为这时只是定义好订阅回调方法,单并没有触发。通过
1
multicast
方法的调用,会触发一次
1
Observable
的subscribe调用,从而初始化订阅通道,此时
1
Observable
类就变成了hot observer

当订阅者使用

1
ObservableProxy
的subscribe方法时,会触发
1
MulticastSubject
的subscribe方法,
1
MulticastSubject
继承了
1
Observable
类,并维护了一个Observer set, 每当调用一次subscribe方法时,即可生成一个
1
SubscriptionObserver
并缓存起来。

1
Observable
通过next方法获取到消息输入时,会传递给
1
MulticastSubject
的next方法,最终
1
MulticastSubject
将该消息分发给Observer set中的所有Observer, 没有Observer都拥有自己的消息队列以及subscriber. 因此最终行程了一个原始Observer,多个subscriber的分发模式。

其他消息管理

除了常用的multicast外,

1
Observable-fns
还提供了merge、scan、map、filter、flatMap、interval等方法。基本思路都是新建一个
1
ObservableProxy
,在这个新的
1
Observer
中进行消息的过滤、组合、转换、分发。具体的代码细节可以参考源码

这里方法中大多用到了一个异步调度器(

1
AsyncSerialScheduler
), 它的功能可以异步接收消息,并串行执行消息处理,从而提高性能,实现代码如下:

class AsyncSerialScheduler<T> {
  private _baseObserver: SubscriptionObserver<T>
  private _pendingPromises: Set<Promise<any>>

  constructor(observer: SubscriptionObserver<T>) {
    this._baseObserver = observer
    this._pendingPromises = new Set()
  }

  complete() {
    Promise.all(this._pendingPromises)
      .then(() => this._baseObserver.complete())
      .catch(error => this._baseObserver.error(error))
  }

  error(error: any) {
    this._baseObserver.error(error)
  }

  schedule(task: (next: (value: T) => void) => Promise<void>) {
    const prevPromisesCompletion = Promise.all(this._pendingPromises)
    const values: T[] = []

    const next = (value: T) => values.push(value)

    const promise = Promise.resolve()
      .then(async () => {
        await prevPromisesCompletion
        await task(next)
        this._pendingPromises.delete(promise)

        for (const value of values) {
          this._baseObserver.next(value)
        }
      })
      .catch(error => {
        this._pendingPromises.delete(promise)
        this._baseObserver.error(error)
      })

    this._pendingPromises.add(promise)
  }
}

从代码中我们可以看出,

1
AsyncSerialScheduler
拥有三个方法
1
schedule
1
error
1
complete
.

  • 1
    
    schedule
    
    : 该方法接收task输入,task是一个函数,用于具体功能逻辑的实现,如果达到了该功能的目的,则此消息放入
    1
    
    values
    
    队列中,然后发送给目标
    1
    
    Subjecter
    
    。上述操作是异步执行,所以当有I/O操作时,不会影响性能。
  • 1
    
    error
    
    : 是向目标
    1
    
    Subjecter
    
    传递error信息。
  • 1
    
    complete
    
    : 是一个同步方法,它的作用是等待直到所有
    1
    
    task
    
    都被执行完毕。方便使用者可以获得任务结束点的通知。

总结

以上就是

1
Observable-fns
的核心内容,主要介绍了
1
Observer
->
1
Subscriber
的核心代码实现,并分析了在1vN场景下,multicast的实现方法。
1
Observable-fns
的设计和代码对NodeJS开发者的技能提高帮助很大,有很多值得学习的地方,可以在平时的学习和工作中借鉴使用。希望本文可以对广大NodeJS读者的提升有所帮助。

最近的文章

自动驾驶-数据平台简介

在自动驾驶领域中, 数据平台是一个很重要的核心平台, 无论是算法的改进,还是 bug 的解决,场景的重现,以及程序的调试都需要数据平台提供的多维度数据来驱动。本文分析了两个比较完整的开源项目:Apollo Dreamview 和 Uber Streetscape,他们的设计思想并不完全相同,各有优缺点,我会通过三篇文章来介绍他们的这些不通点。数据平台工作流程自动驾驶汽车每天产生的数据量在 PB 级规模的,这对数据的处理和展示退出了更高的要求。所以数据平台一般来讲有:数据转换,数据上传,...…

继续阅读
更早的文章

NodeJS多线程库Threads

NodeJS作为server端的运行环境,在低资源占用的情况下,处理高IO有很大的优势。然而对于密集型计算的任务却有些力不从心,虽然早已引入worker线程,但依然依然在使用上有诸多不便。本文介绍一种NodeJS三方库: ThreadsJS, 使用者可以快速使NodeJS具有复杂场景下密集计算的能力。 前言 Threads.js 整体设计 线程封装 工作线程封装函数: expose 主线程封装函数: spawn 线程间信息传递 ...…

继续阅读