简单示例
Golang 只需使用 go
关键字就可以让函数在新的并发执行单元中运行。
package main
import (
"fmt"
)
func main() {
messages := make(chan string)
go func () { messages <- "ping" }()
msg := <-messages
fmt.Printf("%s\n", msg)
}
上例是 Goroutine
与 Channel
结合使用的示例,优雅的等待不同并发单元的函数执行完成后,主程序再退出。
Goroutine 由 Go 的 Runtime 管理,称为 Go 程,可以类比其他语言中的协程,但与协程不完全相同,Goroutine 来自协程的概念,让一组可复用的函数运行在一组线程之上;Channel 是 Goroutine 间的通信机制;Golang 通过 Goroutine 与 Channel 实现并发。
创建 Channels
ch = make(chan int) // 无缓存 channel
ch = make(chan int, 0) // 无缓存 channel
ch = make(chan int, 3) // 容量为3的 channel
Channels 发送与接收
ch <- x // 发送数据到管道
x = <-ch // 从管道接收数据
<-ch // 从管道接收数据(不关心回值)
不带缓存的 Channels
一个基于无缓存 Channels 的发送操作将导致发送者 goroutine 阻塞,直到另一个 goroutine 在相同的 Channels 上执行接收操作,当发送的值通过 Channels 成功传输之后,两个 goroutine 可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者 goroutine 也将阻塞,直到有另一个 goroutine 在相同的 Channels 上执行发送操作。
基于无缓存 Channels 的发送和接收操作将导致两个 goroutine 做一次同步操作。因为这个原因,无缓存 Channels 有时候也被称为同步 Channels。当通过一个无缓存 Channels 发送数据时,接收者收到数据发生在唤醒发送者 goroutine 之前。
“接收者收到数据发生在唤醒发送者 goroutine 之前” ,我没看懂,翻了英文原文:
the receipt of the value happens before the reawakening of the sending goroutine.
表达的意思应该是:“接收者接收数据发生在再次唤醒 goroutine 发送者之前” (因为无缓存 Channels 当发送者发送完数据就被阻塞了,接受者接收完发送者才能再次发送,所以接受数据发生在再次发送数据之前)
一个使用无缓存 Channel 的例子
package main
import (
"fmt"
)
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Printf("==> %d\n", v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
带缓存的 Channels 创建
// 创建可以缓存三个元素的 Channel 队列
ch := make(chan int, 3)
Goroutines 可以同时向缓存发送三个元素而不发生阻塞。当 Channel 元素满后发送会阻塞,当有 Channel 获取元素后,发送放可以继续发送,当队列为空,接受者将阻塞。
缓存为单向的先进先出队列。一个使用带缓存 Channel 的例子:启动多个 Goroutines 同时向多个CDN请求资源,回值发送到 Channel,接受到第一个回值后就返回数据。
如果使用无缓存 Channel,那么将会导致后返回的 Goroutines 处于卡住状态,这种情况称为“goroutines 泄露”,属于 BUG,不会被垃圾回收机制回收。
循环中的并发
通过带缓冲的 Channel 的接收来等待所有并发任务完成后退出
package main
import (
"fmt"
)
func main() {
done := make(chan int, 10)
for i := 0; i < cap(done); i++ {
go func(i int) {
fmt.Printf("Hello World! %d\n", i)
done <- 1
}(i)
}
for i := 0; i < cap(done); i++ {
<-done
}
}
通过 sync.WaitGroup
可以更加简便的实现上述逻辑。
package main
import (
"sync"
"fmt"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
fmt.Printf("Hello World! %d\n", i)
wg.Done()
}(i)
}
wg.Wait()
}
其中
wg.Add(1)
用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到)。当后台线程完成打印工作之后,调用wg.Done()
表示完成一个事件。main
函数的wg.Wait()
是等待全部的事件完成。
参考