前言

想象一下,在一个繁忙的交通枢纽(业务系统),车辆(Go协程)川流不息,它们需要高效、有序地交换信息以完成各自的使命。

Channel就像是这些车辆之间的专用通道,既保证了信息的准确传递,又避免了交通拥堵和混乱。通过Channel,Go协程之间可以安全地进行数据交换,无需担心数据竞争和同步问题,让开发者能够专注于业务逻辑的实现,而非繁琐的并发控制。

Channel的使用和设计理念

Channel的声明

  • 无缓冲Channel的申请:make(chan int)make(chan bool,0)
  • 有缓冲Channel的申请:make(chan string,2)

Channel的基本使用

  • 向Channel发送数据x:ch <- x
  • 从Channel接收数据:x = <- ch
  • 从Channel接收数据并丢弃:<- ch

Channel常见错误用法:无缓冲Channel阻塞

func main(){
    ch := make(chan string)
    ch <- "hello" // 阻塞在这里
    fmt.Println(<-ch)
}

Channel的设计理念

Channel的理念有一句非常经典的话:“不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。”

所谓共享内存,典型就是传入一个变量的指针并修改之。通信的方式,则是直接从Channel中拿数据。

这么做很多优势:

  • 避免协程竞争和数据冲突的问题
  • 是一种更高级的抽象,可以降低开发难度,增加程序的可读性
  • 这种编程方式解耦了模块,增强了程序的扩展性和可维护性。

Channel的底层数据结构

在查看Channel的底层数据结构之前,我们从Channel的使用过程已经可以大致绘制出其结构的示意图了:

image.png

按照之前的使用,Channel应该有一个读和写协程的等待队列,一个缓存区。

我们查看runtime/chan.go/hchan结构体:

type hchan struct {  
    qcount   uint           // total data in the queue  
    dataqsiz uint           // size of the circular queue  
    buf      unsafe.Pointer // points to an array of dataqsiz elements    elemsize uint16  
    closed   uint32  
    elemtype *_type // element type  
    sendx    uint   // send index    
    recvx    uint   // receive index  
    recvq    waitq  // list of recv waiters    
    sendq    waitq  // list of send waiters  
    lock mutex  
}

里面的重要字段如下:

  • 环形缓冲区:之所以设计为环形,是因为可以大幅度降低GC的开销。
    • qcount:缓冲区数据数量。
    • dataqsiz:缓冲区的大小。
    • buf:指针,指向Buf的第一个数据。
    • elemsize:类型大小
    • elemtype:数据类型
  • 发送队列和接收队列sendxsendqrecvqrecvx
    • waitq为链表,里面记录链表头和链表尾
  • 互斥锁lock,用于保护hchan结构体本身。
    • Channel并不是无锁的。在塞入数据和取出数据的时候需要加锁,开销不大。
  • 状态值close,0为开启,1为关闭。

Channel的工作原理与算法

发送数据

c<-是一个Go语言的语法糖,在编译阶段,c<-会转化为runtime.chansend1()

channel的发送数据情景可以分为三种:

  • 直接发送
  • 放入缓存
  • 休眠等待

直接发送

状态:数据发送前,已经有协程G在休眠等待(Receive Queue)。此时缓存必然是空的,不用考虑。

算法:数据直接拷贝给G的接收变量,唤醒G。

算法实现

  • 从队列取出一个等待接收的G
  • 将数据直接拷贝到接收变量的G里面
  • 唤醒G

相关源码

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	lock(&c.lock)
	// 从队列取出等待接收的协程
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)  
		return true
	}
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 将数据拷贝到sudog的elem字段
	if sg.elem != nil {  
	    sendDirect(c.elemtype, sg, ep)  
	    sg.elem = nil  
	}
	// ...省略一些逻辑
	// 唤醒G协程
	goready(gp, skip+1)
}

// 将数据拷贝到sg的elem
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem  
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

放入缓存

状态:没有G在休眠,并且缓冲区有空间。

算法:直接将数据放到缓冲区中。

算法实现

  • 获取可存入的缓冲的地址。
  • 数据拷贝到缓冲区地址
  • 维护索引

相关源码:

// chansend方法

if c.qcount < c.dataqsiz {  
    // 缓冲区还有空间
    qp := chanbuf(c, c.sendx)  
    // 数据移动到缓冲区
    typedmemmove(c.elemtype, qp, ep)  
    c.sendx++  
    if c.sendx == c.dataqsiz {  
       c.sendx = 0  
    }  
    // 维护索引
    c.qcount++  
    unlock(&c.lock)  
    return true  
}

休眠等待

状态:没有G在休眠等待,而且没有缓冲区或者缓冲区满了。

算法:协程进入发送队列,休眠等待。

算法实现

  • 协程包装为sudog
  • 将sudog放入sendq队列
  • 休眠并解锁
  • 被唤醒后,数据已经被取走了,维护其它数据

相关源码

// chansend方法

// 拿到sudog结构体
gp := getg()  
mysg := acquireSudog()  
mysg.releasetime = 0  
if t0 != 0 {  
    mysg.releasetime = -1  
}
mysg.elem = ep  
mysg.waitlink = nil  
mysg.g = gp  
mysg.isSelect = false  
mysg.c = c  
gp.waiting = mysg  
gp.param = nil  
// 放入发送队列
c.sendq.enqueue(mysg)
// 休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

数据接收

接收数据

接收数据的源码位置:

  • 编译阶段,i<-c会转化为runtime.chanrecv1()i,ok<-c转化为chanrecv2()
  • 最后会调用chanrecv()

Channel接收数据的情形:

  • 有等待的协程,从协程接收。
  • 有等待的协程,从缓存接收。
  • 接收缓存
  • 阻塞接收

从等待的协程接收

状态:已经有协程处于发送队列之中;Channel没有缓存。

算法:将数据直接从发送队列的协程中拷贝过来。

算法实现

  • 判断是否有协程在发送队列等待,进入recv方法
  • 判断这个Channel是不是无缓存
  • 直接从发送队列中的协程中取走数据,顺便唤醒这个协程。

源码分析

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// block被写死,就是true
	lock(&c.lock)
	// 如果发送队列里面有协程在等待
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  
		return true, true
	}
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {  
		// 缓冲没有数据,直接从等待队列中的协程拷贝
	    if ep != nil {  
	       recvDirect(c.elemtype, sg, ep)  
	    }
	}
	// ...
	// 唤醒等待队列的协程
	goready(gp, skip+1)
	
}
  
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {  
    src := sg.elem  
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)  
    memmove(dst, src, t.size)  
}

有等待的协程,从缓存接收

状态:有协程在发送队列里面,但是Channel的缓冲里面有数据。

算法:从缓冲取走一个数据,将休眠的协程从等待队列放进缓存,唤醒协程。

算法实现

  • 判断如果有协程在发送队列等待,进入recv(和前面一样)
  • 判断channel是否有缓存
  • 如果有,从缓存取走一个数据
  • 将发送队列里面的协程数据放入缓存,唤醒这个协程

源码分析

// 第一步的逻辑是一样的
// 我们直接看recv方法里面的代码
if c.dataqsiz == 0 {  
	// 缓存为空的逻辑
    if ep != nil {  
       // copy data from sender  
       recvDirect(c.elemtype, sg, ep)  
    }  
} else {  
	//缓存不为空,从缓存里面拿数据
	
    qp := chanbuf(c, c.recvx)  
    // 数据从缓存拷贝到接收者
    if ep != nil {  
       typedmemmove(c.elemtype, ep, qp)  
    }  
    // 将发送者队列中的协程数据拷贝到缓存
    typedmemmove(c.elemtype, qp, sg.elem)  
    c.recvx++  
    if c.recvx == c.dataqsiz {  
       c.recvx = 0  
    }  
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz  
}
// ...
// 唤醒协程
goready(gp, skip+1)

接收缓存

状态:没有协程在发送队列里面,但是缓存有数据。

算法:直接从缓存里面取走数据。

算法实现

  • 判断队列没有发送协程
  • 从缓存里面拷贝数据到接收者

源码分析

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c.qcount > 0 {  
	    // 直接从缓存里面拿到数据
	    qp := chanbuf(c, c.recvx)  
	    if ep != nil {  
	       typedmemmove(c.elemtype, ep, qp)  
	    }  
	    typedmemclr(c.elemtype, qp)  
	    c.recvx++  
	    if c.recvx == c.dataqsiz {  
	       c.recvx = 0  
	    }  
	    c.qcount--  
	    unlock(&c.lock)  
	    return true, true  
	}

}

阻塞接收

状态:没有协程在发送队列,没有缓冲空闲区。

算法:将协程自身进入接受队列,休眠等待。

算法实现

  • 判断是否发送队列没有协程在等待
  • 判断Channel是否无缓冲
  • 将接受协程包装为sudog,放入接受等待队列,gopark休眠
  • 唤醒时,发送的协程已经把数据拷贝到位。

源码分析

// no sender available: block on this channel.  
gp := getg()  
mysg := acquireSudog()  
mysg.releasetime = 0  
if t0 != 0 {  
    mysg.releasetime = -1  
}
// ...
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 唤醒后
gp.waiting = nil  
gp.activeStackChans = false  
if mysg.releasetime > 0 {  
    blockevent(mysg.releasetime-t0, 2)  
}  
success := mysg.success  
gp.param = nil  
mysg.c = nil  
releaseSudog(mysg)  
return true, success

非阻塞channel的用法

select

案例:在下方代码中,我们从两个管道里面读数据和写数据。

在没有select之前,我们的程序会阻塞到这些读写channel的语句上。如果我们用了select,要是无法从case语句中读写channel,就会走default,不会阻塞。

func main() {  
    c1 := make(chan int, 5)  
    c2 := make(chan int)  
    select {  
    case <-c1: // 如果没有select,这里会阻塞  
       fmt.Println("从C1接收到数据")  
    case c2 <- 1:  
       fmt.Println("将数据发送到C2")  
    default: // select保证了这两个走不了直接走default  
       fmt.Println("None")  
    }  
}

select的原理

编译后的代码会判断,是否同时存在接受、发送、默认路径。

  • 首先查看是否有可以立即执行的case
  • 没有的话,有default就走default
  • 没有default的话,会将自己注册到每个case语句的队列里面

timer

timer会在倒计时结束的时候,向t.C放入数据。

func main() {
	t := time.NewTimer(time.Second)
	t.C
}

timer适合做定时相关的任务。