HomePostsWorks

vscode 源码解析 - 进程间调用


vscode 的架构中主要有四类进程:主进程、渲染进程、shared 进程和 host 进程,这四个进程之间会发生进程间调用(Inter Process Calling, IPC)。vscode 中有专门的 IPC 模块来实现 IPC 机制,这篇文章将会深入介绍 vscode IPC 模块的设计和原理。

IPC 原理

在我们开始学习 vscode 的 IPC 机制之前,不妨根据我们已经掌握的关于计算机网络的基本知识,来推演一下 IPC 有何要点:

  1. 客户端服务端,客户端向服务端发起请求,请求即是要求调用服务端的某个方法,服务端返回响应,响应即是该方法的返回值
  2. 客户端和服务端需要建立连接
  3. 服务端需要以某种机制分派请求,以找到被调用的方法所在的模块
  4. 请求需要通过某种协议来让双方知道如何解析和生成请求或响应

可以看到 IPC 理念上是比较简单的,而 vscode IPC 模块的优点在于,它清楚地定义了 IPC 模块的各个层次,将客户端的调用过程封装得就像是在调用本地的一个异步方法一样,还让不同的跨进程环境——例如本地进程、基于网络的跨进程、web worker——都能够很容易地实现。

vscode IPC 机制概述

vscode IPC 分为基于 Channel 的基于 RpcProtocol 的两种。

基于 Channel 的机制

我们通过一个例子开始对 Channel 机制的介绍。

在渲染进程初始化的时候,会创建一个 ElectronIPCMainProcessService,然后以此创建一个 LoggerChannelClient,并以 ILoggerService 为 key 添加到依赖注入系统当中:

// Main Process const mainProcessService = this._register(new ElectronIPCMainProcessService(this.configuration.windowId)); serviceCollection.set(IMainProcessService, mainProcessService); // Logger const loggerService = new LoggerChannelClient(mainProcessService.getChannel('logger')); serviceCollection.set(ILoggerService, loggerService);

我们进一步看 LoggerChannelClient 的实现的话,就会发现它会调用 channelcall 方法,这里就就发起了一个 IPC:

export class LoggerChannelClient implements ILoggerService { constructor(private readonly channel: IChannel) { } createConsoleMainLogger(): ILogger { return new AdapterLogger({ log: (level: LogLevel, args: any[]) => { this.channel.call('consoleLog', [level, args]); } }); } }

就是 channel IPC

channel IPC 主要支持两种类型的调用,通过下面这个枚举类型可以看出:

export const enum RequestType { Promise = 100, PromiseCancel = 101, EventListen = 102, EventDispose = 103 }

我们这篇文章将会以基于 Promise 的调用为例,基于事件的调用大家可以自行了解。

channel IPC 主要有以下这些参与者,它们之间的关系如下图所示:

1r4qaFQIf7Vw7xVCw6kCxOl3A26cIaY-So42x1xb7Yw

服务端

客户端

下面我们会具体讲解每个模块的机制。

IChannel 和 IServerChannel

阅读过本系列之前两篇关于依赖注入和服务化的读者,应该已经知道 vscode 中各种功能都是封装在服务当中的,所以 IPC 的执行过程中必须要找到某个能响应特定调用的服务,IServerChannel 则是负责和服务一一对应,帮助它们接入 IPC 系统的,我们将称为实体

而在客户端一侧,业务代码不知道 IPC 机制的接口,因此不能直接发起请求,而是将 IChannel 作为一个能够帮助它发起请求的代理

IserverChannelIChannel 分别就是实体和代理的接口:

export interface IChannel { call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>; listen<T>(event: string, arg?: any): Event<T>; } /** * An `IServerChannel` is the counter part to `IChannel`, * on the server-side. You should implement this interface * if you'd like to handle remote promises or events. */ export interface IServerChannel<TContext = string> { call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>; listen<T>(ctx: TContext, event: string, arg?: any): Event<T>; }

一个 IChannel 像就这样(即 return 返回的对象):

getChannel<T extends IChannel>(channelName: string): T { const that = this; return { call(command: string, arg?: any, cancellationToken?: CancellationToken) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestPromise(channelName, command, arg, cancellationToken); }, listen(event: string, arg: any) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestEvent(channelName, event, arg); } } as T; }

而一个 IServerChannel 会像是这样:

export class TestChannel implements IServerChannel { constructor(private testService: ITestService) { } listen(_: unknown, event: string): Event<any> { switch (event) { case 'marco': return this.testService.onMarco; } throw new Error('Event not found'); } call(_: unknown, command: string, ...args: any[]): Promise<any> { switch (command) { case 'pong': return this.testService.pong(args[0]); case 'cancelMe': return this.testService.cancelMe(); case 'marco': return this.testService.marco(); default: return Promise.reject(new Error(`command not found: ${command}`)); } } }

在这个例子中 TestChannel 就是对 ITestService 的一层封装。

创建 IServerChannel 的方式有很多种,除了上面这样的直接实现,还可以借助 ProxyChannel namespace 提供的方法。

ProxyChannel

如果不需要为 service 做一些特殊处理,可以直接使用 ProxyChannel namespace 下的 fromService 方法将一个 service 包装成一个 IServerChannel

export function fromService(service: unknown, options?: ICreateServiceChannelOptions): IServerChannel { const handler = service as { [key: string]: unknown }; const disableMarshalling = options && options.disableMarshalling; // Buffer any event that should be supported by // iterating over all property keys and finding them const mapEventNameToEvent = new Map<string, Event<unknown>>(); for (const key in handler) { if (propertyIsEvent(key)) { mapEventNameToEvent.set(key, Event.buffer(handler[key] as Event<unknown>, true)); } } return new class implements IServerChannel { listen<T>(_: unknown, event: string): Event<T> { const eventImpl = mapEventNameToEvent.get(event); if (eventImpl) { return eventImpl as Event<T>; } throw new Error(`Event not found: ${event}`); } call(_: unknown, command: string, args?: any[]): Promise<any> { const target = handler[command]; if (typeof target === 'function') { // Revive unless marshalling disabled if (!disableMarshalling && Array.isArray(args)) { for (let i = 0; i < args.length; i++) { args[i] = revive(args[i]); } } return target.apply(handler, args); } throw new Error(`Method not found: ${command}`); } }; }

同样的,也可以通过 toServiceIChannel 封装成服务供业务代码调用,这样业务代码就不用自己去调用 IChannelcall 或者 listen 方法。

export function toService<T>(channel: IChannel, options?: ICreateProxyServiceOptions): T { const disableMarshalling = options && options.disableMarshalling; return new Proxy({}, { get(_target: T, propKey: PropertyKey) { if (typeof propKey === 'string') { // Check for predefined values if (options?.properties?.has(propKey)) { return options.properties.get(propKey); } // Event if (propertyIsEvent(propKey)) { return channel.listen(propKey); } // Function return async function (...args: any[]) { // Add context if any let methodArgs: any[]; if (options && !isUndefinedOrNull(options.context)) { methodArgs = [options.context, ...args]; } else { methodArgs = args; } const result = await channel.call(propKey, methodArgs); // Revive unless marshalling disabled if (!disableMarshalling) { return revive(result); } return result; }; } throw new Error(`Property not found: ${String(propKey)}`); } }) as T; }

本质上是创建了一个 Proxy,将对 Proxy 属性的访问转换成对 channel 的 call listen 方法的调用。

IChannelServer

IChannelServer 的主要职责包括:

  1. protocol 接收消息
  2. 根据消息的类型进行处理
  3. 调用合适的 IServerChannel 来处理请求
  4. 将响应发送给客户端
  5. 注册 IServerChannel

IChannelServer 直接监听 protocol 的消息,然后调用自己的 onRawMessage 方法处理请求。onRawMessge 会根据请求的类型来调用其他方法。以基于 Promise 的调用为例,可以看到它的核心逻辑就是调用 IServerChannelcall 方法。

private onRawMessage(message: VSBuffer): void { const reader = new BufferReader(message); const header = deserialize(reader); const body = deserialize(reader); const type = header[0] as RequestType; switch (type) { case RequestType.Promise: if (this.logger) { this.logger.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}: ${header[2]}.${header[3]}`, body); } return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body }); // ... } } private onPromise(request: IRawPromiseRequest): void { const channel = this.channels.get(request.channelName); let promise: Promise<any>; try { promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token); } catch (err) { // ... } const id = request.id; promise.then(data => { this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.PromiseSuccess }); this.activeRequests.delete(request.id); }, err => { // ... }); }

可以看到,这里通过 requestchannelName 获取到一个 IServerChannel,然后调用了它的 call 方法,并将结果通过 this.sendResponse 发送给客户端。显然,这里 this.channels 需要注册 IServerChannel,而 IChannelServer 提供了这样的方法:

registerChannel(channelName: string, channel: IServerChannel<TContext>): void { this.channels.set(channelName, channel); setTimeout(() => this.flushPendingRequests(channelName), 0); }

IChannelClient

IChannelClient 的逻辑比较简单,它只提供了一个接口,即 getChannel ,它返回了一个 IChannel,实际上就是通过闭包保存了 channelName,然后在业务方调用的时候调用 requestPromise 等发起请求。

getChannel<T extends IChannel>(channelName: string): T { const that = this; return { call(command: string, arg?: any, cancellationToken?: CancellationToken) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestPromise(channelName, command, arg, cancellationToken); }, // ... } as T; } private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<any> { const id = this.lastRequestId++; const type = RequestType.Promise; const request: IRawRequest = { id, type, channelName, name, arg }; if (cancellationToken.isCancellationRequested) { return Promise.reject(errors.canceled()); } let disposable: IDisposable; const result = new Promise((c, e) => { if (cancellationToken.isCancellationRequested) { return e(errors.canceled()); } const doRequest = () => { const handler: IHandler = response => { switch (response.type) { case ResponseType.PromiseSuccess: this.handlers.delete(id); c(response.data); break; case ResponseType.PromiseError: this.handlers.delete(id); const error = new Error(response.data.message); (<any>error).stack = response.data.stack; error.name = response.data.name; e(error); break; case ResponseType.PromiseErrorObj: this.handlers.delete(id); e(response.data); break; } }; this.handlers.set(id, handler); this.sendRequest(request); }; let uninitializedPromise: CancelablePromise<void> | null = null; if (this.state === State.Idle) { doRequest(); } else { // ... } const cancellationTokenListener = cancellationToken.onCancellationRequested(cancel); disposable = combinedDisposable(toDisposable(cancel), cancellationTokenListener); this.activeRequests.add(disposable); }); return result.finally(() => { this.activeRequests.delete(disposable); }); }

消息传输

我们已经看到了 IChannelServerIChannelClient 之间会互发数据,这里简单讲解一下消息传输的机制。

首先消息传输需要约定好请求和响应的结构。

IPC 请求的字段如下:

type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any; }; type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel, id: number }; type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any; }; type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number }; type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;

IPC 响应的字段如下:

type IRawInitializeResponse = { type: ResponseType.Initialize }; type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any }; type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string, name: string, stack: string[] | undefined } }; type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any }; type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any }; type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;

请求和响应在被发送之前,都会通过 VSBuffer 进行序列化,在接收之后则会进行反序列化。

需要一定的机制来将请求和响应对应起来。这在服务端比较容易,因为服务端的处理在顺序上处于 IPC 的中间环节,可以很自然的通过作用域来对应请求和响应。而在客户端,则需要一些机制来匹配请求和响应。

IChannelClientsendRequest 之前,会通过 id 来在自身的 handlers Map 上绑定一个 handler

this.handlers.set(id, handler);

而在收到消息的时候,就会通过这里 id 调用相应的 handler,从而 resolve 客户端 IChannel 的调用。

private onResponse(response: IRawResponse): void { if (response.type === ResponseType.Initialize) { this.state = State.Idle; this._onDidInitialize.fire(); return; } const handler = this.handlers.get(response.id); if (handler) { handler(response); } }

IMessagePassingProtocol

IChannelServerIChannelClient 之间会通过 protocol 传输数据。对于上层,它提供二进制数据流传输服务(用 VSBuffer 进行了封装),并能够在有新消息到达的时候通知上层。

其接口非常简单:

export interface IMessagePassingProtocol { send(buffer: VSBuffer): void; onMessage: Event<VSBuffer>; /** * Wait for the write buffer (if applicable) to become empty. */ drain?(): Promise<void>; }

不同的通讯端有不同的信道,因此 IMessagePassingProtocol 也有多种实现,大致有以下几种:

IPCClient

它用于在客户端管理 IChannel ,它同时实现了 IChannelClientIChannelServer,所以它实际上可以发起也可以响应 IPC:

export interface IChannelClient { getChannel<T extends IChannel>(channelName: string): T; } export interface IChannelServer<TContext = string> { registerChannel(channelName: string, channel: IServerChannel<TContext>): void; }
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable { private channelClient: ChannelClient; private channelServer: ChannelServer<TContext>; constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) { const writer = new BufferWriter(); serialize(writer, ctx); protocol.send(writer.buffer); this.channelClient = new ChannelClient(protocol, ipcLogger); this.channelServer = new ChannelServer(protocol, ctx, ipcLogger); } getChannel<T extends IChannel>(channelName: string): T { return this.channelClient.getChannel(channelName) as T; } registerChannel(channelName: string, channel: IServerChannel<TContext>): void { this.channelServer.registerChannel(channelName, channel); } dispose(): void { this.channelClient.dispose(); this.channelServer.dispose(); } }

可以看到它仅有一个 IMessagePassingProtocol,换句话说,就是只能跟一方进行通讯,这也是它跟 IPCServer 最大的区别。

IPCServer

它一共实现了三个接口:

export interface IChannelServer<TContext = string> { registerChannel(channelName: string, channel: IServerChannel<TContext>): void; } export interface IRoutingChannelClient<TContext = string> { getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T; } export interface IConnectionHub<TContext> { readonly connections: Connection<TContext>[]; readonly onDidAddConnection: Event<Connection<TContext>>; readonly onDidRemoveConnection: Event<Connection<TContext>>; }

我们来看 IPCServer 的构造方法:

export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable { constructor(onDidClientConnect: Event<ClientConnectionEvent>) { onDidClientConnect(({ protocol, onDidClientDisconnect }) => { const onFirstMessage = Event.once(protocol.onMessage); onFirstMessage(msg => { const reader = new BufferReader(msg); const ctx = deserialize(reader) as TContext; const channelServer = new ChannelServer(protocol, ctx); const channelClient = new ChannelClient(protocol); this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel)); const connection: Connection<TContext> = { channelServer, channelClient, ctx }; this._connections.add(connection); this._onDidAddConnection.fire(connection); onDidClientDisconnect(() => { channelServer.dispose(); channelClient.dispose(); this._connections.delete(connection); this._onDidRemoveConnection.fire(connection); }); }); }); } }

可以看到,在对方发来第一条消息时,IPCServer 会创建:

interface Connection<TContext> extends Client<TContext> { readonly channelServer: ChannelServer<TContext>; readonly channelClient: ChannelClient; } export interface Client<TContext> { readonly ctx: TContext; }

注意这里的 ctx 属性,它是客户端的标识符,将用在请求路由的过程中。它的 getChannelChannelClientgetChannel 有很大不同:

getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T; getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T; getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T { const that = this; return { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> { let connectionPromise: Promise<Client<TContext>>; if (isFunction(routerOrClientFilter)) { // when no router is provided, we go random client picking let connection = getRandomElement(that.connections.filter(routerOrClientFilter)); connectionPromise = connection // if we found a client, let's call on it ? Promise.resolve(connection) // else, let's wait for a client to come along : Event.toPromise(Event.filter(that.onDidAddConnection, routerOrClientFilter)); } else { connectionPromise = routerOrClientFilter.routeCall(that, command, arg); } const channelPromise = connectionPromise .then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName)); return getDelayedChannel(channelPromise) .call(command, arg, cancellationToken); }, listen(event: string, arg: any): Event<T> { // ... } } as T; }

可以看到,在调用 getChannel 的时候如果传入了 routerOrClientFilter,则会在 connections 中选择一个。

Routing

选择  Connection 的方法,可以是一个简单的 filter 函数,也可以是通过 IClientRouter 提供的 routeCall 或者 routeEvent 方法。我们以 StaticRouter 为例:

export class StaticRouter<TContext = string> implements IClientRouter<TContext> { constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { } routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { return this.route(hub); } routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { return this.route(hub); } private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { for (const connection of hub.connections) { if (await Promise.resolve(this.fn(connection.ctx))) { return Promise.resolve(connection); } } await Event.toPromise(hub.onDidAddConnection); return await this.route(hub); } }

实际上在 getChannel 调用他的时候,会通过 fn 来选择一个 IConnectionHub 中的 Connection

到这里,整个基于 channel 的 IPC 机制我们就介绍完毕了。

基于 RpcProtocol 的机制

vscode IPC 的第二种机制基于 RpcProtocol,用于渲染进程和 extension host 进程通讯(如果 vscode 的运行环境是浏览器,那么就是主线程和 extension host web worker 之间进行通讯)。

举个例子,在 host 进程初始化时如果发生了错误,它会告知渲染进程,代码如下:

const mainThreadExtensions = rpcProtocol.getProxy(MainContext.MainThreadExtensionService); const mainThreadErrors = rpcProtocol.getProxy(MainContext.MainThreadErrors); errors.setUnexpectedErrorHandler(err => { const data = errors.transformErrorForSerialization(err); const extension = extensionErrors.get(err); if (extension) { mainThreadExtensions.$onExtensionRuntimeError(extension.identifier, data); } else { mainThreadErrors.$onUnexpectedError(data); } });

在调用 mainThreadExtensionsmainThreadError 的方法的时候,即发生了 IPC。

该机制如下图所示:

下面介绍其原理。

shape

客户端怎么知道 mainThreadExtensions 上有一个 $onExtensionRuntimeError 方法可以调用呢?

显然,这里需要定义一个接口,这个接口就是 MainThreadExtensionServiceShape ,定义在 extHost.protocol.ts 文件中。vscode 对于每一个可以调用的实体,都定义了一个以 Shape 为后缀的接口,服务端的实体必须要实现该接口,这样客户端在编写代码的时候就知道有哪些方法可以调用了。

identifier

客户端如何获取到服务端的实体在本地的代理,也就是 mainThreadExtensions 呢?换个问法,mainThreadExtensions 是如何跟 mainThreadErrors 相区别的呢?

代码中我们可以看到 mainThreadExtensions 是通过 rpcProtocol.getProxy(MainContext.MainThreadExtensionService) 获得的,MainContext.MainThreadExtensionService 在这里就起到了一个标识符的作用,它将每一个实体-代理的对子区别开。

MainContext.MainThreadExtensionService 定义在 extHost.protocol.ts 当中:

export const MainContext = { MainThreadExtensionService: createMainId<MainThreadExtensionServiceShape>('MainThreadExtensionService') }

createMainId 就是用于创建标识符的方法,本质上是创建了一个 ProxyIdentifier 对象并存在到一个数组当中:

export class ProxyIdentifier<T> { public static count = 0; _proxyIdentifierBrand: void; public readonly isMain: boolean; public readonly sid: string; public readonly nid: number; constructor(isMain: boolean, sid: string) { this.isMain = isMain; this.sid = sid; this.nid = (++ProxyIdentifier.count); } } const identifiers: ProxyIdentifier<any>[] = []; export function createMainContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> { const result = new ProxyIdentifier<T>(true, identifier); identifiers[result.nid] = result; return result; } export function createExtHostContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> { const result = new ProxyIdentifier<T>(false, identifier); identifiers[result.nid] = result; return result; }

每个标识符有三个字段:

context

我们如何知道另外一个进程中,有哪些实体可以被调用?

extHost.protocol.ts 文件中定义了 MainContext 和 ExtHostContext 两个文件。前者定义了渲染进程中可被调用的实体,后者定义了 host 进程中可被调用的实体。这里也可以看出,在 RpcProtocol 机制下,渲染进程和 host 进程是可以互相调用的。

customer

可被调用的实体是如何注册的?

host 进程调用 mainThreadExtensions 方法的时候,渲染进程必须要有类提供这个方法,而且它还需要注册到这个 RpcProtocol 的机制上。通过查找实现了 MainThreadExtensionServiceShape 的类,不难发现 mainThreadExtensionService.ts 中存在这样一段代码:

@extHostNamedCustomer(MainContext.MainThreadExtensionService) export class MainThreadExtensionService implements MainThreadExtensionServiceShape { // ... }

注意这里装饰器的调用,我们探究其实现:

export function extHostNamedCustomer<T extends IDisposable>(id: ProxyIdentifier<T>) { return function <Services extends BrandedService[]>(ctor: { new(context: IExtHostContext, ...services: Services): T }): void { ExtHostCustomersRegistryImpl.INSTANCE.registerNamedCustomer(id, ctor as IExtHostCustomerCtor<T>); }; }

可以发现它是将 id,也就是 MainContext.MainThreadExtensionServiceMainThreadExtensionService 绑定起来,而在 extension host 初始化的时候实例化它:

const namedCustomers = ExtHostCustomersRegistry.getNamedCustomers(); for (let i = 0, len = namedCustomers.length; i < len; i++) { const [id, ctor] = namedCustomers[i]; const instance = this._instantiationService.createInstance(ctor, extHostContext); this._customers.push(instance); this._rpcProtocol.set(id, instance); }

注册的最后一步就是调用 RpcProtocol.set 方法注册可被调用的实体。

RpcProtocol 的通讯原理

到这里我们基本了解了 RpcProtocol 的接口了,下面来了解一下它的内部逻辑。

首先来看 getProxy,我们知道客户端要通过这个方法获取可调用的代理:

public getProxy<T>(identifier: ProxyIdentifier<T>): T { const { nid: rpcId, sid } = identifier; if (!this._proxies[rpcId]) { this._proxies[rpcId] = this._createProxy(rpcId, sid); } return this._proxies[rpcId]; } private _createProxy<T>(rpcId: number, debugName: string): T { let handler = { get: (target: any, name: PropertyKey) => { if (typeof name === 'string' && !target[name] && name.charCodeAt(0) === CharCode.DollarSign) { target[name] = (...myArgs: any[]) => { return this._remoteCall(rpcId, name, myArgs); }; } if (name === _RPCProxySymbol) { return debugName; } return target[name]; } }; return new Proxy(Object.create(null), handler); }

可以看到它的核心逻辑就是创建一个 Proxy 对象,当对象上的属性被访问时,所有以 $ 开头的属性都会被包装为一个对 this._remoteCall 进行调用的方法。

_remoteCall 的核心逻辑则主要是下面几行(这里主要省略了取消请求相关的逻辑):

private _remoteCall(rpcId: number, methodName: string, args: any[]): Promise<any> { const serializedRequestArguments = MessageIO.serializeRequestArguments(args, this._uriReplacer); const req = ++this._lastMessageId; const callId = String(req); const result = new LazyPromise(); this._pendingRPCReplies[callId] = result; this._onWillSendRequest(req); const msg = MessageIO.serializeRequest(req, rpcId, methodName, serializedRequestArguments, !!cancellationToken); this._protocol.send(msg); return result; } // MessageIO public static serializeRequest(req: number, rpcId: number, method: string, serializedArgs: SerializedRequestArguments, usesCancellationToken: boolean): VSBuffer { if (serializedArgs.type === 'mixed') { return this._requestMixedArgs(req, rpcId, method, serializedArgs.args, serializedArgs.argsType, usesCancellationToken); } return this._requestJSONArgs(req, rpcId, method, serializedArgs.args, usesCancellationToken); } private static _requestJSONArgs(req: number, rpcId: number, method: string, args: string, usesCancellationToken: boolean): VSBuffer { const methodBuff = VSBuffer.fromString(method); const argsBuff = VSBuffer.fromString(args); let len = 0; len += MessageBuffer.sizeUInt8(); len += MessageBuffer.sizeShortString(methodBuff); len += MessageBuffer.sizeLongString(argsBuff); let result = MessageBuffer.alloc(usesCancellationToken ? MessageType.RequestJSONArgsWithCancellation : MessageType.RequestJSONArgs, req, len); result.writeUInt8(rpcId); result.writeShortString(methodBuff); result.writeLongString(argsBuff); return result.buffer; } // MessageBuffer public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer { let result = new MessageBuffer(VSBuffer.alloc(messageSize + 1 /* type */ + 4 /* req */), 0); result.writeUInt8(type); result.writeUInt32(req); return result; }

可以看到一个请求主要有以下这些信息:

  1. type,请求的类型,由一个枚举 MessageType 所定义
  2. req,请求的序号,是一个自增的数字
  3. rpcId,identifier 的字符串 id,表明是哪个实体-代理之间的请求
  4. method,指定要调用实体的哪个方法
  5. argsBuff,序列化的参数

最终这些参数都会被封装为一个 VSBuffer 并通过 protocol 发送,而这些而这里的 protocol,这是我们的老朋友 IMessagePassingProtocol 。 所以我们可以看到 RpcProtocol 机制也是分层的设计,可以在不同的环境中使用。

当服务端接收到一个请求时,会回调 _receiveOneMessage 方法进行处理:

private _receiveOneMessage(rawmsg: VSBuffer): void { if (this._isDisposed) { return; } const msgLength = rawmsg.byteLength; const buff = MessageBuffer.read(rawmsg, 0); const messageType = <MessageType>buff.readUInt8(); const req = buff.readUInt32(); switch (messageType) { case MessageType.RequestJSONArgs: case MessageType.RequestJSONArgsWithCancellation: { let { rpcId, method, args } = MessageIO.deserializeRequestJSONArgs(buff); if (this._uriTransformer) { args = transformIncomingURIs(args, this._uriTransformer); } this._receiveRequest(msgLength, req, rpcId, method, args, (messageType === MessageType.RequestJSONArgsWithCancellation)); break; } // ... }

即根据 type 来调用不同的方法对请求进行处理,这里来看 _receiveRequest 方法:

private _receiveRequest(msgLength: number, req: number, rpcId: number, method: string, args: any[], usesCancellationToken: boolean): void { const callId = String(req); let promise: Promise<any>; let cancel: () => void; if (usesCancellationToken) { // ... } else { // cannot be cancelled promise = this._invokeHandler(rpcId, method, args); cancel = noop; } // Acknowledge the request const msg = MessageIO.serializeAcknowledged(req); this._protocol.send(msg); promise.then((r) => { delete this._cancelInvokedHandlers[callId]; const msg = MessageIO.serializeReplyOK(req, r, this._uriReplacer); this._protocol.send(msg); }, (err) => { // ... }); } private _invokeHandler(rpcId: number, methodName: string, args: any[]): Promise<any> { try { return Promise.resolve(this._doInvokeHandler(rpcId, methodName, args)); } catch (err) { return Promise.reject(err); } } private _doInvokeHandler(rpcId: number, methodName: string, args: any[]): any { const actor = this._locals[rpcId]; if (!actor) { throw new Error('Unknown actor ' + getStringIdentifierForProxy(rpcId)); } let method = actor[methodName]; if (typeof method !== 'function') { throw new Error('Unknown method ' + methodName + ' on actor ' + getStringIdentifierForProxy(rpcId)); } return method.apply(actor, args); }

核心就是调用 _invokeHandler 然后将结果发送回去。

注意到这一行 const actor = this._locals[rpcId]; 获取了可被调用的实体,记得之前注册实体时调用的 set 方法吗:

public set<T, R extends T>(identifier: ProxyIdentifier<T>, value: R): R { this._locals[identifier.nid] = value; return value; }

到这里,我们就了解了 RpcProtocol 的原理了。

CC BY-NC 4.0 2025 © Evan HuRSS