NodeJS观察者模式库(observable-fns)

平时在开发过程中,经常会用到设计模式(Design pattern), 它是前人总结出的宝贵经验和经典实践。常用的设计模式有三类:创建型模式(工厂模式、单例模式、建造者模式)、结构型模式(装饰器模式、适配器模式、代理模式)、行为型模式(监听者模式、观察者模式)。本文介绍一种观察者模式的实现库:Observable-fns, 使用者可以快速在NodeJS环境下使用该模式进行消息分发和订阅。

前言

Observable-fns是一个实现比较巧妙的Observer模式库。该库的作者同样为Andy Wermke, Observable-fns不仅提供了传统的发布订阅接口,而且还为使用者提供了常用的工具函数,方便开发者进行消息进行过滤、扫描、变换、导入导出、合并订阅等,是非常使用的工具库。

观察模式整体设计

在观察者模式中,数据通过观察者作为输入端,想订阅者传递。Observable-fns实现了Observable类,其中包括subscriber的订阅回调方法,数据管理方法等。具体架构可参看下图:

Observable-fns设计

从图中的Step 1中,我们可以看出,创建Observable类时,需要传入订阅回调函数,这个函数是订阅者调用subscribe方法时触发调用的。

当订阅者调用subscribe时,就会创建一个Subscription类,它拥有自己的消息队列、订阅状态、以及会生成一个订阅管理类SubscriptionObserver,它是对订阅者Subscription类的管理类,作用是操作Subscription类进行消息分发,并通过内部状态机逻辑,维护Subscription的状态,这是Step 2所做的事情.

接着,Subscription类会执行Step 3调用订阅回调函数,参数就是上一步创建的SubscriptionObserver类, 回到函数内部来定义用于自定义的消息对接和发布代码。

广播消息

在整体设计中,我们实现了观察者和订阅者的一一映射,并建立了信息的发布、订阅通道,并可以管理消息发布过程中的订阅状态。

但是如何实现单一观察者发布消息,多方订阅者接收消息的场景呢?Observable-fns也给出了自己的方案:multicast.

Observable multicast

multicast是一个函数,输入参数是我们上节中创建的Observable类, 函数内部会创建一个MulticastSubject类,并返回一个ObservableProxy类, ObservableProxy会对MulticastSubject进行管理,我们可以参考代码实现。

function multicast<T>(coldObservable: ObservableLike<T>): Observable<T> {
  const subject = new Subject<T>()

  let sourceSubscription: ReturnType<ObservableLike<T>["subscribe"]> | undefined
  let subscriberCount = 0

  return new Observable<T>(observer => {
    // Init source subscription lazily
    if (!sourceSubscription) {
      sourceSubscription = coldObservable.subscribe(subject)
    }

    // Pipe all events from `subject` into this observable
    const subscription = subject.subscribe(observer)
    subscriberCount++

    return () => {
      subscriberCount--
      subscription.unsubscribe()

      // Close source subscription once last subscriber has unsubscribed
      if (subscriberCount === 0) {
        unsubscribe(sourceSubscription)
        sourceSubscription = undefined
      }
    }
  })
}

这里的Observable类我们定义为cold observer, 因为这时只是定义好订阅回调方法,单并没有触发。通过multicast方法的调用,会触发一次Observable的subscribe调用,从而初始化订阅通道,此时Observable类就变成了hot observer

当订阅者使用ObservableProxy的subscribe方法时,会触发MulticastSubject的subscribe方法,MulticastSubject继承了Observable类,并维护了一个Observer set, 每当调用一次subscribe方法时,即可生成一个SubscriptionObserver并缓存起来。

Observable通过next方法获取到消息输入时,会传递给MulticastSubject的next方法,最终MulticastSubject将该消息分发给Observer set中的所有Observer, 没有Observer都拥有自己的消息队列以及subscriber. 因此最终行程了一个原始Observer,多个subscriber的分发模式。

其他消息管理

除了常用的multicast外,Observable-fns还提供了merge、scan、map、filter、flatMap、interval等方法。基本思路都是新建一个ObservableProxy,在这个新的Observer中进行消息的过滤、组合、转换、分发。具体的代码细节可以参考源码

这里方法中大多用到了一个异步调度器(AsyncSerialScheduler), 它的功能可以异步接收消息,并串行执行消息处理,从而提高性能,实现代码如下:

class AsyncSerialScheduler<T> {
  private _baseObserver: SubscriptionObserver<T>
  private _pendingPromises: Set<Promise<any>>

  constructor(observer: SubscriptionObserver<T>) {
    this._baseObserver = observer
    this._pendingPromises = new Set()
  }

  complete() {
    Promise.all(this._pendingPromises)
      .then(() => this._baseObserver.complete())
      .catch(error => this._baseObserver.error(error))
  }

  error(error: any) {
    this._baseObserver.error(error)
  }

  schedule(task: (next: (value: T) => void) => Promise<void>) {
    const prevPromisesCompletion = Promise.all(this._pendingPromises)
    const values: T[] = []

    const next = (value: T) => values.push(value)

    const promise = Promise.resolve()
      .then(async () => {
        await prevPromisesCompletion
        await task(next)
        this._pendingPromises.delete(promise)

        for (const value of values) {
          this._baseObserver.next(value)
        }
      })
      .catch(error => {
        this._pendingPromises.delete(promise)
        this._baseObserver.error(error)
      })

    this._pendingPromises.add(promise)
  }
}

从代码中我们可以看出, AsyncSerialScheduler拥有三个方法scheduleerrorcomplete.

  • schedule: 该方法接收task输入,task是一个函数,用于具体功能逻辑的实现,如果达到了该功能的目的,则此消息放入values队列中,然后发送给目标Subjecter。上述操作是异步执行,所以当有I/O操作时,不会影响性能。
  • error: 是向目标Subjecter传递error信息。
  • complete: 是一个同步方法,它的作用是等待直到所有task都被执行完毕。方便使用者可以获得任务结束点的通知。

总结

以上就是Observable-fns的核心内容,主要介绍了Observer -> Subscriber的核心代码实现,并分析了在1vN场景下,multicast的实现方法。Observable-fns的设计和代码对NodeJS开发者的技能提高帮助很大,有很多值得学习的地方,可以在平时的学习和工作中借鉴使用。希望本文可以对广大NodeJS读者的提升有所帮助。

最近的文章

自动驾驶-数据平台简介

在自动驾驶领域中, 数据平台是一个很重要的核心平台, 无论是算法的改进,还是 bug 的解决,场景的重现,以及程序的调试都需要数据平台提供的多维度数据来驱动。本文分析了两个比较完整的开源项目:Apollo Dreamview 和 Uber Streetscape,他们的设计思想并不完全相同,各有优缺点,我会通过三篇文章来介绍他们的这些不通点。数据平台工作流程自动驾驶汽车每天产生的数据量在 PB 级规模的,这对数据的处理和展示退出了更高的要求。所以数据平台一般来讲有:数据转换,数据上传,...…

继续阅读
更早的文章

NodeJS多线程库Threads

NodeJS作为server端的运行环境,在低资源占用的情况下,处理高IO有很大的优势。然而对于密集型计算的任务却有些力不从心,虽然早已引入worker线程,但依然依然在使用上有诸多不便。本文介绍一种NodeJS三方库: ThreadsJS, 使用者可以快速使NodeJS具有复杂场景下密集计算的能力。 前言 Threads.js 整体设计 线程封装 工作线程封装函数: expose 主线程封装函数: spawn 线程间信息传递 ...…

继续阅读