平时在开发过程中,经常会用到设计模式(Design pattern), 它是前人总结出的宝贵经验和经典实践。常用的设计模式有三类:创建型模式(工厂模式、单例模式、建造者模式)、结构型模式(装饰器模式、适配器模式、代理模式)、行为型模式(监听者模式、观察者模式)。本文介绍一种观察者模式的实现库:Observable-fns, 使用者可以快速在NodeJS环境下使用该模式进行消息分发和订阅。
前言
是一个实现比较巧妙的Observer模式库。该库的作者同样为Andy Wermke, 1
Observable-fns
不仅提供了传统的发布订阅接口,而且还为使用者提供了常用的工具函数,方便开发者进行消息进行过滤、扫描、变换、导入导出、合并订阅等,是非常使用的工具库。1
Observable-fns
观察模式整体设计
在观察者模式中,数据通过观察者作为输入端,想订阅者传递。实现了1
Observable-fns
类,其中包括subscriber的订阅回调方法,数据管理方法等。具体架构可参看下图:1
Observable

从图中的Step 1中,我们可以看出,创建类时,需要传入订阅回调函数,这个函数是订阅者调用1
Observable
方法时触发调用的。1
subscribe
当订阅者调用时,就会创建一个1
subscribe
类,它拥有自己的消息队列、订阅状态、以及会生成一个订阅管理类1
Subscription
,它是对订阅者1
SubscriptionObserver
类的管理类,作用是操作1
Subscription
类进行消息分发,并通过内部状态机逻辑,维护1
Subscription
的状态,这是Step 2所做的事情.1
Subscription
接着,类会执行Step 3调用订阅回调函数,参数就是上一步创建的1
Subscription
类, 回到函数内部来定义用于自定义的消息对接和发布代码。1
SubscriptionObserver
广播消息
在整体设计中,我们实现了观察者和订阅者的一一映射,并建立了信息的发布、订阅通道,并可以管理消息发布过程中的订阅状态。
但是如何实现单一观察者发布消息,多方订阅者接收消息的场景呢?也给出了自己的方案:multicast.1
Observable-fns

是一个函数,输入参数是我们上节中创建的1
multicast
类, 函数内部会创建一个1
Observable
类,并返回一个1
MulticastSubject
类, 1
ObservableProxy
会对1
ObservableProxy
进行管理,我们可以参考代码实现。1
MulticastSubject
function multicast<T>(coldObservable: ObservableLike<T>): Observable<T> {
const subject = new Subject<T>()
let sourceSubscription: ReturnType<ObservableLike<T>["subscribe"]> | undefined
let subscriberCount = 0
return new Observable<T>(observer => {
// Init source subscription lazily
if (!sourceSubscription) {
sourceSubscription = coldObservable.subscribe(subject)
}
// Pipe all events from `subject` into this observable
const subscription = subject.subscribe(observer)
subscriberCount++
return () => {
subscriberCount--
subscription.unsubscribe()
// Close source subscription once last subscriber has unsubscribed
if (subscriberCount === 0) {
unsubscribe(sourceSubscription)
sourceSubscription = undefined
}
}
})
}
这里的类我们定义为cold observer, 因为这时只是定义好订阅回调方法,单并没有触发。通过1
Observable
方法的调用,会触发一次1
multicast
的subscribe调用,从而初始化订阅通道,此时1
Observable
类就变成了hot observer。1
Observable
当订阅者使用的subscribe方法时,会触发1
ObservableProxy
的subscribe方法,1
MulticastSubject
继承了1
MulticastSubject
类,并维护了一个Observer set, 每当调用一次subscribe方法时,即可生成一个1
Observable
并缓存起来。1
SubscriptionObserver
当通过next方法获取到消息输入时,会传递给1
Observable
的next方法,最终1
MulticastSubject
将该消息分发给Observer set中的所有Observer, 没有Observer都拥有自己的消息队列以及subscriber. 因此最终行程了一个原始Observer,多个subscriber的分发模式。1
MulticastSubject
其他消息管理
除了常用的multicast外,还提供了merge、scan、map、filter、flatMap、interval等方法。基本思路都是新建一个1
Observable-fns
,在这个新的1
ObservableProxy
中进行消息的过滤、组合、转换、分发。具体的代码细节可以参考源码。1
Observer
这里方法中大多用到了一个异步调度器(), 它的功能可以异步接收消息,并串行执行消息处理,从而提高性能,实现代码如下:1
AsyncSerialScheduler
class AsyncSerialScheduler<T> {
private _baseObserver: SubscriptionObserver<T>
private _pendingPromises: Set<Promise<any>>
constructor(observer: SubscriptionObserver<T>) {
this._baseObserver = observer
this._pendingPromises = new Set()
}
complete() {
Promise.all(this._pendingPromises)
.then(() => this._baseObserver.complete())
.catch(error => this._baseObserver.error(error))
}
error(error: any) {
this._baseObserver.error(error)
}
schedule(task: (next: (value: T) => void) => Promise<void>) {
const prevPromisesCompletion = Promise.all(this._pendingPromises)
const values: T[] = []
const next = (value: T) => values.push(value)
const promise = Promise.resolve()
.then(async () => {
await prevPromisesCompletion
await task(next)
this._pendingPromises.delete(promise)
for (const value of values) {
this._baseObserver.next(value)
}
})
.catch(error => {
this._pendingPromises.delete(promise)
this._baseObserver.error(error)
})
this._pendingPromises.add(promise)
}
}
从代码中我们可以看出, 拥有三个方法1
AsyncSerialScheduler
、1
schedule
、1
error
.1
complete
: 该方法接收task输入,task是一个函数,用于具体功能逻辑的实现,如果达到了该功能的目的,则此消息放入1
schedule
队列中,然后发送给目标1
values
。上述操作是异步执行,所以当有I/O操作时,不会影响性能。1
Subjecter
: 是向目标1
error
传递error信息。1
Subjecter
: 是一个同步方法,它的作用是等待直到所有1
complete
都被执行完毕。方便使用者可以获得任务结束点的通知。1
task
总结
以上就是的核心内容,主要介绍了1
Observable-fns
-> 1
Observer
的核心代码实现,并分析了在1vN场景下,multicast的实现方法。1
Subscriber
的设计和代码对NodeJS开发者的技能提高帮助很大,有很多值得学习的地方,可以在平时的学习和工作中借鉴使用。希望本文可以对广大NodeJS读者的提升有所帮助。1
Observable-fns