golang channle学习

Page content

学习 golang channel 源码的记录,都注释在源码中了。

文章参考:

  1. 微信公众号-Golang梦工厂
  2. 极客时间专栏-Go并发编程实战课

channel 的数据结构

type hchan struct {
	qcount   uint           // 循环队列中元素的数量,就是还没被取走的
	dataqsiz uint           // 循环队列的大小
	buf      unsafe.Pointer // 存放元素的循环队列,大小总是elemsize的整数倍
	elemsize uint16         // chan 中元素的大小
	closed   uint32         // chan 是否关闭
	elemtype *_type         // chan 中元素的类型
	sendx    uint           // 处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置
	recvx    uint           // 处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置
	sendq    waitq          // 如果生产者因为 buf 满了而阻塞,会被加入到 sendq 队列中
    recvq    waitq          // chan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中
	lock mutex              // 保护所有字段
}

channel 的创建

使用 make 创建 channel 时,编译后对应 runtime.makechanruntime.makechan64 源码位置: runtime/chan.go

// makechan 创建channel
// chan 的类型,chan 的缓冲区大小
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	// 对发送元素类型进行检查
	// 疑问:elem.size 是什么意思?
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}

	// 对齐检查
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	// 判断是否会发生内存溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// 构造 hchan 对象
	var c *hchan
	switch {
	case mem == 0: // 无缓冲channel创建,chan的size或者元素的size是0,不必创建buf
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 元素类型不包含指针,只进行一次内存分配
		// 如果 hchan 结构体中不含指针,gc 就不会扫描 hchan 中的元素,所以我们
		// 只需要分配 “hchan结构体大小 + 元素大小*元素个数”的内存
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// hchan数据结构后面紧接着就是buf
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 元素包含指针,那么单独分配buf
		// 需要进行两次内存分配
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	// 初始化 hchan 中的对象
	// 元素大小、类型、容量都记录下来
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

向 channel 发送数据

发送数据的操作,经过编译后对应的是 runtime.chansend1,最终调用的是runtime.chansend2 源码位置: runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 第一部分
	// 如果 chan 是 nil 的话,就把调用者 goroutine park(阻塞休眠),调用者就永远被阻塞住了
	// 所以会报死锁错误
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 第二部分,如果chan没有被close,并且chan满了,直接返回
	// chansend1 调用 chansend 时时候设置了 block 参数,所以这部分代码不会执行
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	lock(&c.lock)

	// 第三部分,chan已经被close的情景,再往里面发送数据的话会 panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 第四部分
	// 如果等待队列中有等待的 receiver,那么这段代码就把它从队列中弹出
	// 然后直接把数据交给它(通过 memmove(dst, src, t.size)),而不需要放入到 buf 中,速度可以更快一些
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 第五部分
	// 当前没有 receiver,需要把数据放入到 buf 中,放入之后,就成功返回了
	// 题外话: 如果有 receiver,又没有信号给它接受,它应该处在 recvq 队列里,这会直接进入第四部分的 case
	//		   如果有信号给 receiver 接收,那说明你前面还有信号未被接收,你先在 缓存里等等吧
	//	       如果缓存里也满里,那你得排队里,这是第六部分的情况
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	// 第六部分
	// buf 满了,发送者的 goroutine 就会加入到发送者的等待队列中,直到被唤醒
	// 这个时候,数据或者被取走了,或者 chan 被 close 了
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // 放入发送等待队列中
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 让当前 g 变成 wait 状态
	KeepAlive(ep)

	// 唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

从 channel 接收数据

接收数据的操作,经过编译后对应的是 runtime.chanrecv1runtime.chanrecv2,分别是一个返回和两个返回,最终调用的都是 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// 第一部分
	// chan为nil,和 send 一样,从 nil chan 中接收(读取、获取)数据时,调用者会被永远阻塞,出发 panic
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 第二部分, block=false且c为空
	// 可以直接忽略,因为不是我们这次要分析的场景
	if !block && empty(c) {
		if atomic.Load(&c.closed) == 0 {
			return
		}
		if empty(c) {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	// 第三部分
	// chan 已经被close,且 chan 中没有缓存的元素
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		// 清理 ep 中的指针数据
		// 为什么清理ep指针呢?ep指针是什么?
		// 这个ep就是我们要接收的值存放的地址(val := <-ch val就是ep  ),即使channel关闭了,我们也可以接收零值
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	// 第四部分
	// 处理 buf 满的情况,当然也可能是没有 buf,从 send 队列取一个等待者试试
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 第五部分
	// 没有等待的sender, buf中有数据
	// 这个是和 chansend 共用一把大锁,所以不会有并发的问题。如果 buf 有元素,就取出一个元素给 receiver
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 第六部分 
	// 处理 buf 中没有元素的情况
	// 如果没有元素,那么当前的 receiver 就会被阻塞,直到它从 sender 中接收了数据,或者是 chan 被 close,才返回
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

关闭 channle

关闭 channle 的操作,编译后对应的是 runtime.closechan

func closechan(c *hchan) {
	// 关闭一个 nil chan,panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	// 关闭一个已经关闭的 chan,panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	// channel 关闭标志
	c.closed = 1
	// goroutine 集合
	var glist gList
	// 释放所有的reader
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 释放所有的writer (它们会panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// 将所有待清除的 goroutine 状态从 _Gwaiting 设置为 _Grunnable,等待调度器调度
	// 因为所有待清除的 goroutine 都是挂起状态,需要唤醒他们,继续走后面的流程
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}