Skip to content

Instantly share code, notes, and snippets.

@jacobbubu
Last active March 7, 2023 14:31
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jacobbubu/8138663f17b350aa2aff952fe432d2b6 to your computer and use it in GitHub Desktop.
Save jacobbubu/8138663f17b350aa2aff952fe432d2b6 to your computer and use it in GitHub Desktop.
如何实现 pull-stream 的 Source

Source(read) 概述

Sink 对 Source 有两种请求:

  1. read: 向 Source 请求数据,通过 read(null, currCb) 来请求。
  2. abort: 通知 Source 终止当前的 stream,通过 read(endOrError, currCb) 来请求。endOrError 的类型是 true | Error,表示是一次正常的终止还是异常的终止。

对于 read 请求,Sink 应该基于 上一次的 read 请求回调了,再发起新的 read 请求这样的规则。 对于 abort 请求,由于有时 Source 也不知道何时需要 abort(出现了异常或者外部其他条件触发的),因此可能在上一次 read 没有回调就发生。

  • 对于 Sink 的 read 请求,如果”重入“(上一次 read 没有结束,新的 read 来了),Source 完成实际资源的读取后,仅仅调用最后一个传入的 currCb 来传递数据。
    • 其”法理“在于,仅当 Source 真的有了数据,才可能返回给 Sink。这中间无论 Sink "催促"(read)多少次,Source 都不可能有数据返回,唯一能做的就是把 currCb 更新到 prevCb。毕竟是 Sink 没有遵守 “新的读取要在获得了上一次数据之后发起“ 这条规则。Source 为了避免相同的数据多次回调 Sink,因此必然要在回调 prevCb还是回调 currCb 之间选择一个,显然回调 currCb 更合理。
    • 如果我们把 read 理解成是 Sink 向 Source 传递了一个装数据的”杯子“,那么在”杯子“被装上新的数据返回给 Sink 之前,Sink 重复发 read 就是不断地给 Source ”新杯子“,Source 只能丢弃老”杯子“来确保数据只能返回一次。
  • 当 Sink 请求 Source "终止(abort)"时 (read(true)),此时 Source 的处理逻辑是不一样的。首先 Source 会马上终止上一次未完成的 prevCb(true),然后进行必要的清理工作,清理完成后 currCb(true),针对当前的请求也进行了回调。
    • 这样处理的”法理“在于,Sink 发起的终止 Source 要尽快执行,而且要尽快回调 Sink 其完成了”终止“任务。并且,如果有 prevCb 存在,这个 prevCb 不能再正常返回数据了,一定要返回终止(prevCb (true)),因为此时 Sink 已经处于”结束“状态,回调正常的数据反而“不合理”。与此同时,上一次的 read 相关联的异步调用在调用完成后,也不应该再回调 Sink了(pervCb(null, data)),因为这个 prevCb 已经可能随着最近的 abort 请求被提前回调了(prevCb(true))。
    • read 的实现不一样(prevCb总是被currCb覆盖),abort 在实现的时候对于 prevCbcurrCb 都会进行回调(prevCb (true)currCb (true))。之所以逻辑不一样,我”猜想“逻辑在于: abort 在 Sink 这边可能是一个突发需求,因为异常或者什么外部条件触发的,发生的时机和上一次 read 是否回调完成无关。因此 Source 不能把 abort 理解成是一次”杯子“的替代。Source 需要终止往最后一次的”杯子“里放数据的行为(prevCb (true)),也需要响应最近的这次 abort 的回调。

需要记录那些状态

如上文,Source 的实现至少需要跟踪以下几个状态:

  • prevCb: 记录 Sink 最近的 read 请求的回调函数。
    • 每次 Sink 发起 read(null, currCb) 请求,Source 会设置 prevCb = currCb;,换个新”杯子“。
    • 当 Source 准备好数据,就会执行 const tempCb = prevCb; prevCb = null; tempCb(null, data); 来返回数据并清除 prevCb
  • ended: Source 用来记录当前 stream 是否已经终止。
    • 当 Sink 发起 abort 请求时,Source 往往立刻设置 ended 为 Sink 传入的终止请求,例如:
let prevCb = null;
let ended = null;

function read(endOrError, currCb) {
	if (!currCb) throw new Error("read must have cb");

	// 下游是否需要停止
	if (endOrError) {
  	// 如需要,记录下游需求
   	ended = endOrError;
		if (prevCb) {
			callback(endOrError)
		}
		// 完成清理工作,清理后完成 abort 请求的回调
		cleanUp(function(){
			currCb(endOrError)
		})
	}
	prevCb = currCb
	readNext()
}

function readNext() {
	if (!prevCb) return;

	// 有之前的终止请求
	if (ended) {
		callback(ended);
		return;
	}

	fs.read(FILENAME, (err, data) {
		if (ended) {
			callback(ended);
			return;
		}

		if (err) {
			ended = err;
			callback(ended);
		} else {
			callback(null, data);
		}
	})
}

function callback(endOrError, data) {
	const tempCb = prevCb
  // 还债且清除欠债
	prevCb = null
	tempCb(endOrError, data)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment