Do not communicate by sharing memory; instead, share memory by communicating.
简单使用
channel的创建、发送、接收、关闭的简单使用如下:
通过dlv debug进行disass可以知道主要调用下面几个方法,本次也主要分析这些方法的调用。
- runtime.makechan,创建一个channel
- runtime.chansend1,发送数据
- runtime.chanrecv1,接收数据
- runtime.closechan,关闭channel
数据结构
channel的数据结构 hchan如下:
type hchan struct {
qcount uint // chan队列长度
dataqsiz uint // chan队列容量
buf unsafe.Pointer // 队列指针
sendx uint // 发送的当前索引
recvx uint // 接收的当前索引
elemsize uint16 // elem结构的大小,用于计算缓冲区所需的大小
elemtype *_type // chan的数据类型
recvq waitq // 接收协程队列
sendq waitq // 发送协程队列
closed uint32 // 是否关闭
lock mutex // 互斥锁
}
hchan里一共有三个队列,channel数据的缓冲区是循环队列,sendq和recvq是链表队列,都是FIFO。
创建
在makechan的 代码中,分为检查和创建。
检查包括:
- 检查Element的大小,需要小于 1«16
- 检查hchan的内存对齐
- 计算Element的大小 * bufferSize 是否会溢出,并且获取需要分配的mem内存大小
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
创建包括:
- 如果是无缓存chan,mem为0,则直接分配一个hchanSize的大小
- 如果是有缓存且非指针类型的chan,则分配hchanSize+mem大小的内存,这里为连续内存(contiguous memory)
- 如果是有缓存且指针类型的chan,则先分配hchanSize自身内存,再单独分配mem的缓冲区内存
然后设置了elemsize、elemtype和dataqsiz。
这里有个问题,为什么有缓存的指针类型不能分配连续内存呢?
关闭
在closechan的 代码中,分为检查和关闭。
检查包括:
- chan是否为nil
- chan是否已经关闭
关闭包括:
- 释放接收协程队列
- 释放发送协程队列
- 唤醒所有等待的协程
发送数据
chansend 被调用有两种情况:
- 情况一:
c <- x
,block=true,对应函数chansend1
如果同时chan是nil,往nil chan写东西,会调用gopark
方法将当前协程休眠,一直阻塞下去。下面示例代码会阻塞3秒后,因为没有其他alive的协程了而报错退出(fatal error: all goroutines are asleep - deadlock!)。
package main
import (
"time"
)
func main() {
var ch chan<- struct{} = nil
go func() {
time.Sleep(time.Second * 3)
}()
ch <- struct{}{}
}
- 情况二:
select { case c <- v: ... }
,block=false,对应函数selectnbsend
差不多的示例代码,在select case下就不会阻塞,因为block=false会直接返回。
package main
import (
"time"
)
func main() {
var ch chan<- struct{} = nil
go func() {
time.Sleep(time.Second * 3)
}()
select {
case ch <- struct{}{}:
default:
}
}
直接发送
如果等待接收的队列里有协程在等待,直接出队,将要发送的数据拷贝到等待协程的elem指针指向的地址上,然后将elem指针置空,唤醒该等待的协程。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
缓冲区
如果等待队列为空,缓冲区不为空,通过 chanbuf
函数计算当前sendx基于buf的具体内存地址,将发送指针指向的数据拷贝到 chanbuf
计算出来的内存地址上,其他的就是
RingBuffer 的内容了。
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
阻塞发送
前面两种都没走进去,就只能暂存了,通过 getg()
获取当前协程指针,生成一个状态对象 sudog
,记录 要发送的数据ep、协程指针gp、ch等,然后将 sudog
入队到sendq中,并且告诉调度器,当前协程要进入阻塞状态了。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
接收数据
chanrecv 被调用是三种情况:
x <- c
,对应函数chanrecv1
,仅返回数据x, ok := <- c
,对应函数chanrecv2
,返回数据和状态select { case v, ok = <-c:
,对应函数selectnbrecv
,返回数据和状态
和发送数据类似,chanrecv1
和 chanrecv2
对于nil channel会强阻塞,调用gopark
休眠当前协程,select case则直接返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
下面具体分析 chanrecv
的内容。
直接接收
如果发送队列里有sudog
,调用recv进行传输,这里分为两种情况:
- 情况一:当channel是无缓存队列,直接将
sudog
中的element挪到待接收的地址ep - 情况二:当channel是有缓存队列,
前情提要:接收数据时能从sendq中dequeue到待发送sudog说明buf队列已经满了,即qcount=dataqsiz
,因为要保证队列的整体的FIFO,所以首先拿recvx计算当前接收的buf缓存队列地址qp,将qp地址的数据挪到待接收地址ep,再将sudog
里的element数据挪到qp地址,因为recvx内存块的已经发送给ep了,所以recvx自增1,同时判断是否euql dataqsiz,实现Turn arround。
情况二里,至于为什么 c.sendx = c.recvx
?发挥一下想象力,当前buf缓存是满的,如果不进行上面的赋值,当sendq为空了,从buf中再取出一次,recvx++,buf缓存不是满的了,再接收一次,sendx会把有效数据给覆盖了,如果赋值了,后续接收覆盖的都是已经发送过的数据内存地址。
然后将sudog
的elem置为nil,唤醒该sudog
关联的协程。
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
缓冲区
和发送类似,调用chanbuf
计算当前接收索引qp(读),将当前接收索引qp的数据拷贝到ep指针,并且清空当前qp指针的数据,其他的也类似
RingBuffer。
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
阻塞接收
和发送类似,获取当前协程指针和sudog
,给sudog
赋值当前接收的ep地址、当前协程地址以及chan,然后将sudog
入队到recv q中,调用gopark休眠当前协程。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
总结
待解决前面的问题: