这篇文章是有关我如何用JavaScript编写等效于Go(lang)频道的系列文章的第二篇。
我还没有,我强烈建议您在阅读这篇文章之前先阅读第一篇文章:
在上一篇文章中,我们在JS中构建了Go通道的基本功能。
我们能够创建通道,向其发送值以及从中接收值。
这次,我们将向我们的JS通道添加一个新功能:缓冲。
因此,让我们开始快速介绍Go中的缓冲通道。
缓冲通道
上次我们看到通道的发送和接收操作正在阻止操作。
发送操作将阻塞,直到在同一通道上准备好接收操作为止,反之亦然。
至少对于未缓冲的通道来说是这样,但是通道可能有缓冲!
让我们send123()
从上一次带缓冲通道的示例中回顾一下:
func main() {
ch := make(chan int, 3) // Create a buffered integer channel
go send123(ch) // Start send123() in a new goroutine
// Receive an integer from ch and print it to stdout 3 times
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
func send123(ch chan int) {
// Send 3 integers to ch
ch <- 1
ch <- 2
ch <- 3
}
如您所见,make()
接受第二个参数,它是通道缓冲区的大小。
我们的通道ch
现在有一个大小为3的缓冲区,这意味着它可以存储3个值。
因此,send123()
不必等待main()
准备好从接收ch
。
当然,执行的顺序不是确定的,但是有可能立即send123()
将三个整数发送到通道,该通道会将它们存储到其缓冲区中。
发送操作变为非阻塞操作。
相反,只要ch
缓冲区中有值,从接收值ch
就不会成为的阻塞操作main()
。
但是,如果缓冲区“太小”(例如,ch
缓冲区大小为1)会发生什么?
好吧,send123()
它将只能执行一个非阻塞发送操作,那么它将不得不等待接收操作以释放ch
缓冲区中的一些空间。
把它们加起来:
- 如果通道的缓冲区已满,则发送操作将阻塞
- 如果通道的缓冲区为空,则接收操作将阻塞
缓冲通道通常用于平滑发送/接收密集处理的执行。
使用正确的缓冲区大小,它可使不同的goroutine面对极少的阻塞时间。
让我们将示例转换为JS:
function* main() {
const ch = yield chan(3) // Create a buffered channel
yield fork(send123, ch) // Start send123()
// Receive a value from ch and log it to console 3 times
console.log(`main() received ${yield recv(ch)}`)
console.log(`main() received ${yield recv(ch)}`)
console.log(`main() received ${yield recv(ch)}`)
}
function* send123(ch) {
// Send 3 integers to ch
yield send(ch, 1); console.log('send123() sent 1')
yield send(ch, 2); console.log('send123() sent 2')
yield send(ch, 3); console.log('send123() sent 3')
}
与上次相比唯一改变的是chan()
操作工厂,该工厂现在接受可选的缓冲区大小。
我们还添加了一些日志以查看执行顺序。
现在让我们将此缓冲功能添加到我们的JS通道中!
实施缓冲通道
让我们从缓冲通道创建开始。
缓冲通道创建
首先,我们必须更改我们的chan()
运营工厂以接受一个bufferSize
论点:
export const chan = (bufferSize = 0) => ({
[CHAN]: true,
bufferSize,
})
bufferSize
默认为0
,因此默认情况下,我们将创建一个无缓冲通道。
上一次我们决定使用String
构造函数创建通道键,这可以确保我们拥有唯一的引用并为我们提供了toString()
一种开箱即用的方法。
我们不会对此进行更改,但是我们可以在字符串中添加缓冲区大小以用于调试:
let nextChanId = 1
const chanKey = bufferSize => new String(
`chan #${nextChanId++} { bufferSize: ${bufferSize} }`
)
现在,我们必须更改channelMiddleware
,以便它将管理缓冲通道的创建。
现在,我们的通道状态仅包含一个接收队列和一个发送队列。
让我们添加必要的内容以使缓冲的通道正常工作:
export const channelMiddleware = () => (next, ctx) => async operation => {
if (operation[CHAN]) {
const key = chanKey(operation.bufferSize)
ctx[CHANS].set(key, {
sendQ: [],
recvQ: [],
buffer: Array(operation.bufferSize),
bufferLength: 0,
})
return key
}
// ...
}
该buffer
数组有两个目的:
- 它将存储缓冲的值
- 它的长度将告诉我们缓冲区的大小(或您愿意的容量)
而bufferLength
整数将告诉我们有多少价值其实有在缓冲区中。
这应该给我们足够的信息:
- 我们的缓冲区是否具有值:
bufferLength !== 0
- 是我们满负荷的缓冲区:
bufferLength === buffer.length
现在有趣的部分!我们必须修改发送和接收操作以管理缓冲的通道。
发送到缓冲通道
到目前为止,当我们向通道发送值时,我们仅做两件事:检查接收队列中是否有等待的接收者并发送给它,或者将发送者推送到发送队列中。
现在我们还必须检查缓冲区中是否还有剩余空间,然后再将发送方推送到发送队列中:
if (operation[SEND]) {
const chanState = ctx[CHANS].get(operation.chanKey)
const recver = chanState.recvQ.shift()
if (recver) {
recver(operation.value)
return
}
if (chanState.bufferLength != chanState.buffer.length) {
// Store value in the buffer
}
return new Promise(resolve => {
chanState.sendQ.push(() => {
resolve()
return operation.value
})
})
}
应该按照发送顺序接收值,因此缓冲区必须是FIFO队列,这意味着我们总是要在缓冲区的末尾存储值。
我们不能推送值,因为这会改变buffer.length
,从而告诉我们缓冲区的容量,但是我们可以bufferLength
用来知道将值存储在哪里的索引:
if (chanState.bufferLength != chanState.buffer.length) {
chanState.buffer[chanState.bufferLength++] = operation.value
return
}
chanState.bufferLength++
允许用于存储operation.value
当前chanState.bufferLength
索引并在其后递增。
这个!现在,只要剩下一些空间,我们的通道就会将值存储在缓冲区中,并且仅在缓冲区已满时才将发送方推送到发送队列中。
从缓冲通道接收
直到现在,当我们从某个通道接收消息时,我们所做的只是检查发送队列中是否有等待的发送者并从中接收消息,或者将接收者推送到接收队列中。
现在我们必须预先检查缓冲区是否包含任何值:
if (operation[RECV]) {
const chanState = ctx[CHANS].get(operation.chanKey)
if (chanState.bufferLength !== 0) {
// Receive from buffer
}
const sender = chanState.sendQ.shift()
if (sender) return sender()
return new Promise(resolve => {
chanState.recvQ.push(resolve)
})
}
缓冲区是FIFO队列,我们必须从缓冲区的开头获取值。
就像接收时一样,我们无法使用,buffer.shift()
否则会意外更改缓冲区的容量。
我们应该做的是读取索引0
,然后将所有缓冲区的值向左移动一个索引,而不更改其长度。
数组对此有一个称为copyWithin的方法:
if (chanState.bufferLength !== 0) {
const value = chanState.buffer[0]
chanState.buffer.copyWithin(0, 1)
chanState.bufferLength--
return value
}
我们还递减ch.bufferLength
以反映新缓冲区的内容。
但是仍然存在一个问题,当我们释放缓冲区中的一些空间时,我们应该检查发送队列中是否有发送者。
在发送队列中有一个不完整的缓冲区和发送方将是该通道的无效状态。
因此,当我们从缓冲区中获取值时,让我们检查发送队列:
if (chanState.bufferLength !== 0) {
const value = chanState.buffer[0]
chanState.buffer.copyWithin(0, 1)
const sender = chanState.sendQ.shift()
if (sender) {
chanState.buffer[chanState.bufferLength - 1] = sender()
} else {
chanState.bufferLength--
}
return value
}
如果发送队列中有一个发送者,我们会从中接收并将值放在缓冲区的末尾。
因此,chanState.bufferLength
仅当发送队列中没有发送者时,我们才递减。
就是这样!现在,我们有完全可用的缓冲通道。
让我们尝试了这一点与我们的repl.it send123()
例子(它使用ESM从模块的好处):
接下来是什么
下次,我们将看到如何关闭通道,也许不是最有趣的部分,但绝对是通道的关键功能。
因此,我还有另外三篇文章:
- 转到JS(3/5)中的频道:关闭
- 转到JS(4/5)中的频道:测距
- 转到JS(5/5)中的频道:选择
希望您喜欢第二篇文章,给他们❤️,💬发表评论,或与他人分享,然后关注我,以使我的下一篇文章得到通知。