HomePostsWorks

vscode 源码解析 - 事件模块


在进一步深入学习 vscode 的各种机制之前,我们先对 vscode 当中的一些基础工具做一些探索,因为核心机制大量地用到了这些基础模块,这篇文章将会介绍事件(event)模块,相关代码在 vs/base/common/event.ts 文件中。

Event 模块实现

Event 接口

Event 接口规定了一个函数,当调用了这个函数,就表示监听了这个函数所对应的事件流。

export interface Event<T> { ( listener: (e: T) => any, thisArgs?: any, disposables?: IDisposable[] | DisposableStore ): IDisposable }

返回的 IDisposable 对象用于解除这个监听的(通过调用它的 dispose 方法)。

另外一种解除监听的方式就是 disposable 了,Event 函数在执行的过程中会将 IDisposable 插入 disposables,方便调用方决定在什么时候解除监听。

Emitter

有了 Event 接口,我们可以规定事件如何被消费,那么事件是如何产生的呢?一种方法就是通过 Emitter

Emitter 类型暴露了两个重要方法:

fire,从这个方法的函数签名就能看出它就是用来派发一个事件的,该方法的主要逻辑就是将 this._listeners 当中的保存的 listener 全部调用一遍(省略了部分分支逻辑和性能监控相关代码)

fire(event: T): void { if (this._listeners) { for (let listener of this._listeners) { this._deliveryQueue.push([listener, event]); } while (this._deliveryQueue.size > 0) { const [listener, event] = this._deliveryQueue.shift()!; try { if (typeof listener === 'function') { listener.call(undefined, event); } else { listener[0].call(listener[1], event); } } catch (e) { onUnexpectedError(e); } } } }

get event(),这个方法会在 Emitter 中创建一个 Event,其主要逻辑就是将 listener 添加到 this._listeners 当中

const remove = this._listeners.push(!thisArgs ? listener : [listener, thisArgs])

Emitter 类型还提供了一些特殊的回调接口:

export interface EmitterOptions { onFirstListenerAdd?: Function onFirstListenerDidAdd?: Function onListenerDidAdd?: Function onLastListenerRemove?: Function }

这使得 Emitter 在注册消费者的时候执行一些额外的逻辑。我们将会在下文中看到其中一些回调所扮演的重要角色。

Event 辅助方法

vscode 还提供了一系列工具方法用于组合 Event ,得到更加丰富的事件处理能力。下面我们一一进行说明。

once

export function once<T>(event: Event<T>): Event<T> { return (listener, thisArgs = null, disposables?) => { // we need this, in case the event fires during the listener call let didFire = false let result: IDisposable result = event( /* A */ (e) => { if (didFire) { return } else if (result) { result.dispose() } else { didFire = true } return listener.call(thisArgs, e) }, null, disposables ) if (didFire) { result.dispose() } return result } }

这个方法用于将一个 Event 变为只能派发一次的,事件类型相同的 Event

每一个事件到达时,会从 A 处开始执行,可以看到这段代码通过 didFire 作为锁,保证 listener.call(thisArgs, e) 只会被执行一次。

很明显, once 的执行过程中有两个 Event ,那么消息如何在 Event 之间传递的呢?我们注意到 A 处的匿名函数调用了一个 Eventlistener,而 A 本身又是另一个 Event 的 listener,所以答案是很明显的:消息沿着 Event 链传递的过程,就是 Eventlistener 们递归调用的过程。

snapshot

export function snapshot<T>(event: Event<T> /* B */): Event<T> { let listener: IDisposable const emitter = new Emitter<T>({ onFirstListenerAdd() { listener = event(emitter.fire, emitter) }, onLastListenerRemove() { listener.dispose() } }) /* C */ return emitter.event }

这个工具方法用于生成 map 等操作,我们把它和 map 一起分析。

map

将一种类型的事件转换成另一种类型的事件,看起来和 Arraymap 非常相似。

export function map<I, O>(event: Event<I> /* A */, map: (i: I) => O): Event<O> { return snapshot( /* B */ (listener, thisArgs = null, disposables?) => event((i) => listener.call(thisArgs, map(i)), null, disposables) ) }

从代码可以看出:这里的 Event 链的顺序是

  1. map 装饰的 Event A
  2. snapshot 的参数,匿名的 Event B
  3. Emitter 暴露出的 Event C

当用户调用这个 map 转换出的 Event 的时候,实际上订阅的是 C,然后 C 在第一次被订阅时,会调用 B,而 B 又去订阅了 A。这里我们看到了 Emitter 的参数钩子起到了什么作用:B 是一个很特殊的 Event 它在 onFirstListenerAdd 中被订阅了 ,并且之后它并不会参与到 listener 的调用链中来,而是帮助 A 和 C 的 listener 之间创建了调用链,同时调用 map 对事件做了处理。

当有事件传递过来的时候,则是调用 A 的 listener i => listener.call(thisArgs, map(i)) ,而这里的 listener 很明显可以看出是 C 的 listener,也就是 Emitterfire 方法,通过上文中对 Emitter 的学习,我们知道,fire 方法会触发用户调用 C 时所传递来的 listener,这样整个传递链条就完整了。

forEach / filter / reduce

了解了 map 的工作原理之后,这三个函数就容易理解了,大家可以自行阅读代码。

signal

这个函数仅仅是做了一下类型转换,让订阅者忽略事件所携带的数据,比较简单。

any

export function any<T>(...events: Event<T>[]): Event<T> { return (listener, thisArgs = null, disposables?) => combinedDisposable( ...events.map((event) => event((e) => listener.call(thisArgs, e), null, disposables) ) ) }

这个方法会在 events 中任意一个 Event 派发事件的时候派发一个事件。

debounce

对 Event 链条上的事件做防抖处理。

export function debounce<T>( event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number, leading?: boolean, leakWarningThreshold?: number ): Event<T> export function debounce<I, O>( event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number, leading?: boolean, leakWarningThreshold?: number ): Event<O> export function debounce<I, O>( event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number = 100, leading = false, leakWarningThreshold?: number ): Event<O> { let subscription: IDisposable let output: O | undefined = undefined let handle: any = undefined let numDebouncedCalls = 0 const emitter = new Emitter<O>({ leakWarningThreshold, onFirstListenerAdd() { subscription = event( /* A */ (cur) => { numDebouncedCalls++ output = merge(output, cur) if (leading && !handle) { emitter.fire(output) output = undefined } clearTimeout(handle) handle = setTimeout(() => { const _output = output output = undefined handle = undefined if (!leading || numDebouncedCalls > 1) { emitter.fire(_output!) } numDebouncedCalls = 0 }, delay) } ) }, onLastListenerRemove() { subscription.dispose() } }) return emitter.event }

不难看出这段代码的核心逻辑就是 A 处的 listener,它会对 debounce 时间内对数据做归并处理,并设置定时器,当收到新事件时就取消定时器,而定时器到期时就调用 emitter.fire  向下游继续发送事件。

stopWatch

这是一个记录耗时的 Event,当它收到第一个事件时,会把这个事件转换为它从创建到收到该事件的耗时。

latch

这个 Event 仅有当事件确实发生变化时,才会向下游发送事件。原理也很简单,就是在 filter 的基础上,利用闭包来缓存上一次事件的数据,然后用新数据和它做比较,新老数据不同或者是第一次接收数据才放通。

buffer

这个 Event 在没有人订阅它时,会缓存所有收到的事件,并在收到订阅时将已经缓存的事件全部发送出去。

export function buffer<T>( event: Event<T>, nextTick = false, _buffer: T[] = [] ): Event<T> { let buffer: T[] | null = _buffer.slice() let listener: IDisposable | null = event((e) => { if (buffer) { buffer.push(e) } else { emitter.fire(e) } }) const flush = () => { if (buffer) { buffer.forEach((e) => emitter.fire(e)) } buffer = null } const emitter = new Emitter<T>({ onFirstListenerAdd() { if (!listener) { listener = event((e) => emitter.fire(e)) } }, onFirstListenerDidAdd() { if (buffer) { if (nextTick) { setTimeout(flush) } else { flush() } } }, onLastListenerRemove() { if (listener) { listener.dispose() } listener = null } }) return emitter.event }

如果调用 buffer 时传入了 nextTick = True ,则发送缓存事件的操作会易步进行,所以如果你第一次订阅时同步添加了很多 listener,则它们都会收到这些缓存的事件。

ChainableEvent

如果要多次使用 map, filter 等函数,一个比较优雅的写法是链式调用,例如 Event.map.filter.xxxChainableEvent 就是为此准备的,通过调用 chain 方法,一个 Event 会转换成 ChainableEvent,然后就可以进行链式调用:

export function chain<T>(event: Event<T>): IChainableEvent<T> { return new ChainableEvent(event) }

ChainableEvent 的实现很简单,就是对上面的方法进行了一次包裹,这里就不再赘述了。

除了上面提到的对 Event 的转换方法之外,还有一些生成 Event 的方法。

fromNodeEventEmitter

export function fromNodeEventEmitter<T>( emitter: NodeEventEmitter, eventName: string, map: (...args: any[]) => T = (id) => id ): Event<T> { const fn = (...args: any[]) => result.fire(map(...args)) const onFirstListenerAdd = () => emitter.on(eventName, fn) const onLastListenerRemove = () => emitter.removeListener(eventName, fn) const result = new Emitter<T>({ onFirstListenerAdd, onLastListenerRemove }) return result.event }

该方法是对 node.js 原生事件的包裹,在原生事件的回调中调用 Emitter.fire

fromDOMEventEmitter

对 DOM 事件进行包装,和上面的非常相似,这里就不赘述了。

fromPromise

export function fromPromise<T = any>(promise: Promise<T>): Event<undefined> { const emitter = new Emitter<undefined>() let shouldEmit = false promise .then(undefined, () => null) .then(() => { if (!shouldEmit) { setTimeout(() => emitter.fire(undefined), 0) } else { emitter.fire(undefined) } }) shouldEmit = true return emitter.event }

将 Promise 转换为事件。通过 shouldEmit 确保 Promise 不会因为已经 resolve 而在订阅发生之前就开始派发事件(这样会导致错过事件)。

奇怪的是这里丢失了 Promise 返回的结果,不知道为什么这么设计,可能是 vscode 自己用不着吧。

toPromise

将事件转换为 Event,这个也比较简单。

另外,还有一些工具类提供更多的事件管理能力。

PauseableEmitter

类似于 Emitter,但是能通过 pauseresume 方法暂停一条 Event 链上事件的传播,比较简单。

EventMultiplexer

这个类可以订阅多个事件,并在任意一个事件派发的时候,将该事件转发给自己所有的订阅者,它的核心就是它的 hook 方法:

private hook(e: { event: Event<T>; listener: IDisposable | null; }): void { e.listener = e.event(r => this.emitter.fire(r)); }

也较为简单,这里不再赘述。

EventBufferer

这是一个非常有趣的类,它提供了一个 wrapEvent 方法包裹一个 Event,并提供了一个 bufferEvents 方法,在这个方法的回调内所有经过它 wrapEvent 包裹的 Event,都先不会被传播给订阅者。

/** * The EventBufferer is useful in situations in which you want * to delay firing your events during some code. * You can wrap that code and be sure that the event will not * be fired during that wrap. * * ``` * const emitter: Emitter; * const delayer = new EventDelayer(); * const delayedEvent = delayer.wrapEvent(emitter.event); * * delayedEvent(console.log); * * delayer.bufferEvents(() => { * emitter.fire(); // event will not be fired yet * }); * * // event will only be fired at this point * ``` */ export class EventBufferer { private buffers: Function[][] = [] wrapEvent<T>(event: Event<T>): Event<T> { return (listener, thisArgs?, disposables?) => { return event( (i) => { const buffer = this.buffers[this.buffers.length - 1] if (buffer) { buffer.push(() => listener.call(thisArgs, i)) } else { listener.call(thisArgs, i) } }, undefined, disposables ) } } bufferEvents<R = void>(fn: () => R): R { const buffer: Array<() => R> = [] this.buffers.push(buffer) const r = fn() this.buffers.pop() buffer.forEach((flush) => flush()) return r } }

bufferEvents 被调用的时候,会往 this.buffers 中压入一个新 buffer,在 fn 执行过程中派发的事件,就会因为 if (buffer) 判断为 true 而被缓存, fn 执行完毕之后, buffer 被弹出,其中包含的事件全部被派发。

Relay

这个类提供了切换上游 Event 的方法。当设置 Relayinput 属性时,就会切换监听的 Event,而下游的 Event 监听的是 RelayEmitter,因此无需重新设置监听。

export class Relay<T> implements IDisposable { // ... set input(event: Event<T>) { this.inputEvent = event if (this.listening) { this.nputEventListener.dispose() this.inputEventListener = event(this.emitter.fire, this.emitter) } } // ... }

总结

至此我们已经学习了 vscode 事件模块的主要内容(其他的性能分析和泄漏监测等这篇文章就不分析了,感兴趣的读者可以自行阅读)。

vscode 事件模块是所谓响应式编程的一种实现,如果想要继续学习响应式编程,非常推荐以下两个项目:

CC BY-NC 4.0 2025 © Evan HuRSS