欢迎,来自IP地址为:18.97.14.80 的朋友
并发性 是程序同时执行多个任务的能力,它是构建可扩展、高性能系统的关键要素。
Go 的并发模型基于协程(goroutine)的概念,它是可以同时运行多个函数的轻量级线程;以及通道(Channel),用于在 goroutine 之间安全高效地交换数据的内置通信机制。
Go 的并发特性使开发人员能够编写具有以下功能的程序:
- 同时处理多个请求,提高响应能力和吞吐量
- 高效利用多核处理器,最大限度地利用系统资源
- 编写安全、高效且易于维护的并发代码
Go 的并发模型旨在最大程度地减少开销、减少延迟并防止常见的并发错误(如竞争条件和死锁)。借助 Go 语言的并发特性,开发人员可以轻松构建高性能、可扩展且并发的系统,使其成为构建现代分布式系统、网络和云基础设施的理想选择。
案例假设
首先让我们考虑一个场景来说明并发性: 银行柜员
假如有一家繁忙的银行,有两个出纳员,柜员甲和柜员乙。客户来到银行进行各种交易,如存款、取款和转账。银行的目标当然是快速高效地为客户提供服务。
顺序处理(无并发)
柜员甲和柜员乙按顺序工作,一次一个。当客户到达时,柜员甲会服务该客户,而柜员乙会等到柜员甲完成后再为下一位客户服务。这显然会导致客户等待时间过长。
并发处理
合理的情形是,柜员甲和柜员乙同时工作,同时为客户服务。当客户到达时,柜员甲为客户A进行交易,柜员乙同时帮助另一位客户B进行另一笔交易。他们一起工作,共享银行数据库和现金等资源,同时为多个客户提供服务。
在这种情况下,并发使柜员甲和柜员乙能够高效地一起工作,同时为多个客户提供服务,并改善整体客户体验。同样的概念也适用于计算机编程,并发可以使多个任务能够同时运行,从而提高响应能力、效率和性能。
Goroutine 和 Channel
Goroutine 是由 Go 运行时(runtime)管理的轻量级线程。它是在 Go 运行时上运行的函数,它有助于解决并发和异步流要求
Goroutine 允许在程序中启动和并发运行其他执行线程。
Channel 用于在 goroutine 之间进行通信。它是一个类型化的管道,使用通道运算符”<-“发送和接收值。
如何实现 Goroutine
要实现 goroutine,只需要在调用函数时,前面加上 go 关键字:
package main import ( "fmt" "math/rand" "time" ) func pause() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func sendMsg(msg string) { pause() fmt.Println(msg) } func main() { sendMsg("hello") // sync go sendMsg("test1") // async go sendMsg("test2") // async go sendMsg("test3") // async sendMsg("main") // sync time.Sleep(2 * time.Second) }
从上面的例子可以看出:
- sendMsg 函数有同步调用和异步调用两种方式
- 当 sendMsg 函数调用时,如果不带 go 关键字,则 sendMsg 函数会被同步调用
- 当 sendMsg 函数调用时,如果带 go 关键字,则 sendMsg 函数会被异步调用
Goroutine 的工作原理
当使用 go 关键字调用 sendMsg 函数时,main 函数不会等待 sendMsg 函数执行完毕就继续执行下一行代码,并在调用 sendMsg 函数后立即返回。
否则,该函数是同步调用的,main 函数将等待 sendMsg 函数执行完毕才继续执行下一行代码。
运行上述示例时输出的顺序将与代码的顺序不同,并且多次执行程序,其运行结果也不尽相同,因为三个 goroutine 都是并发运行的,并且由于函数暂停了一段时间,它们唤醒并输出的顺序将不同。
time.Sleep(2 * time.Second) 是一种快速简便的方法,用于保持 main 函数运行 2 秒,以允许 goroutine 在 main 函数退出之前完成执行。否则,在调用 goroutine 后,main 函数将立即退出,goroutine 将没有足够的时间完成执行,从而不会显示 goroutine 的执行结果。
使用 WaitGroups
与上例中使用的 time.Sleep(2 * time.Second) 不同,WaitGroups 更标准,它被用于等待一组 goroutine 完成执行。这是同步多个 goroutines 的简单方法。
示例程序如下:
package main import ( "fmt" "math/rand" "sync" "time" ) func pause() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func sendMsg(msg string, wg *sync.WaitGroup) { defer wg.Done() pause() fmt.Println(msg) } func main() { var wg sync.WaitGroup wg.Add(3) go func(msg string) { defer wg.Done() pause() fmt.Println(msg) }("test1") go sendMsg("test2", &wg) go sendMsg("test3", &wg) wg.Wait() }
示例中还使用了匿名函数声明,并通过 goroutine 来调用它。
上面的例子中,sync.WaitGroup 用于等待三个 goroutine 执行完毕后退出主函数。通过它,就可以同步三个 goroutine 和主函数:
- sync.WaitGroup (wg) 管理 goroutine 并跟踪正在运行的 goroutine 数量
- sync.WaitGroup.Add (wg.Add) 方法用于添加正在运行的 goroutine 数量作为参数
- sync.WaitGroup.Done (wg.Done) 方法用于减少正在运行的 goroutine 数量
- sync.WaitGroup.Wait (wg.Wait) 方法用于等待所有 goroutine 执行完毕后退出主函数
通道的概念
通道用于在 goroutine 之间进行通信。它是一种类型化的管道,可以通过它使用通道运算符”<-“发送和接收消息。
最简单的方式是,一个 goroutine 将消息写入通道,另一个 goroutine 从通道中读取这个写入的消息。
使用 make 方法和 chan 关键字及其类型来创建通道,通道用于传输声明时设置的类型的消息。
示例如下:
package main func main(){ msgChan := make(chan string) }
上面的示例创建了一个字符串消息类型的通道 msgChan。
将数据写入通道
要将数据写入通道,首先指定通道的名称(msgChan),然后指定”<-“运算符和消息,这被视为发送方:
msgChan <- "hello world"
从通道读取数据
要从通道读取数据,只需将运算符”<-“移到通道名称(msgChan)前面,即可将其分配给变量,这被视为接收器:
msg := <- msgChan
使用 Goroutine 实现通道
package main import ( "fmt" "math/rand" "time" ) func main() { msgChan := make(chan string) go func() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) msgChan <- "hello" // Write data to the channel msgChan <- "world" // Write data to the channel }() msg1 := <- msgChan msg2 := <- msgChan fmt.Println(msg1, msg2) }
以上的示例展示了如何从通道写入和读取数据。创建 msgChan 通道并使用 go 关键字创建一个将数据写入通道的 goroutine。msg1 和 msg2 变量用于从通道读取数据。
通道的行为类似于先进先出队列。因此,当一个 goroutine (匿名函数)将数据写入通道时,另一个 goroutine (main)会按照写入的顺序从通道读取数据。
通道缓冲区
通道可以是缓冲的,也可以是无缓冲的。前面的示例使用即为无缓冲通道。
无缓冲通道会导致发送方在将消息发送到通道后立即阻塞,直到接收方收到消息。
缓冲通道允许发送方在缓冲区已满之前不阻塞地将消息发送到通道。因此,发送方仅在缓冲区已满后才阻塞,并等待另一个 goroutine 从通道读取,确保空间大小可用后再解除阻塞。
创建缓冲通道,只需要在使用 make 函数创建时使用第二个参数来指示缓冲区大小即可:
msgBufChan := make(chan string, 2)
上面的示例代码创建了一个字符串类型的缓冲通道 msgBufChan,其缓冲区大小为 2。这意味着该通道在阻塞之前最多可以容纳两条消息。
package main import ( "time" ) func main() { size := 3 msgBufChan := make(chan int, size) // reader (receiver) go func() { for { _ = <-msgBufChan time.Sleep(time.Second) } }() //writer (sender) writer := func() { for i := 0; i <= 10; i++ { msgBufChan <- i println(i) } } writer() }
上面的示例创建了一个 int 类型的缓冲通道 msgBufChan,其缓冲区大小为 3。
writer 函数将数据写入通道,而匿名函数(reader)从通道中读取数据。
当程序运行时,会看到立即打印出 0 到 3 的数字,其余的数字 4 到 10 则以每秒一个的速度缓慢打印出来(time.Sleep(time.Second))。
这显示了缓冲通道在阻塞之前指定其可以容纳的大小的效果。
通道方向
当使用通道作为函数参数时,默认情况下,可以在函数内发送和接收消息。为了在编译时提供额外的安全性,通道函数参数可以定义为方向。也就是说,它们可以定义为只读或只写。
例如:
package main import ( "fmt" "time" ) func writer(channel chan<- string, msg string) { channel <- msg } func reader(channel <-chan string) { msg := <-channel fmt.Println(msg) } func main() { msgChan := make(chan string, 1) for i := 0; i < 10; i++ { go writer(msgChan, fmt.Sprintf("msg %d", i)) go reader(msgChan) } time.Sleep(time.Second * 2) }
上面示例显示了如何定义具有方向的通道。
- 写入器(writer)函数使用只写通道
- 读取器(reader)函数使用只读通道
msgChan 通道的创建缓冲区大小为 1。写入器函数将数据写入通道,读取器函数从通道读取数据。
使用通道选择处理多个通信操作
select 语句允许 goroutine 等待多个通信操作。select 会阻塞,直到其中一个 case 可以运行,然后执行对应代码。如果多个 case 已准备就绪,它会随机选择一个。
select 和 case 语句用于简化跨多个通道等待的可管理性和可读性。
示例代码:
package main import ( "fmt" "math/rand" "time" ) func pause() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func test1(c chan<- string) { for { pause() c <- "hello" } } func test2(c chan<- string) { for { pause() c <- "world" } } func main() { rand.Seed(time.Now().Unix()) c1 := make(chan string) c2 := make(chan string) go test1(c1) go test2(c2) for { select { case msg1 := <-c1: fmt.Println(msg1) case msg2 := <-c2: fmt.Println(msg2) } } }
示例展示了如何使用 select 语句在多个通道上等待。test1 和 test2 函数分别将数据写入 c1 和 c2 通道。main 函数使用 select 语句从 c1 和 c2 通道读取数据。
select 语句将阻塞,直到其中一个通道准备好发送或接收数据。 如果两个通道都已准备就绪,则 select 语句将随机选择一个。
通道中长时间运行的进程超时停止
time.After 函数用于创建一个在指定持续时间后发送消息的通道,这可用于实现通道的超时。
可以在 select 语句中指定它,以帮助管理从任何受监控的通道接收消息的时间过长的情况。
在使用外部资源时也需要考虑超时问题,因为有些情况无法保证响应时间,因此可能需要在预定时间过后主动采取措施。
使用 select 语句实现超时非常简单,示例如下:
package main import ( "fmt" "time" ) func main() { c1 := make(chan string) go func(channel chan string) { time.Sleep(1 * time.Second) channel <- "hello world" }(c1) select { case msg2 := <-c1: fmt.Println(msg2) case <-time.After(2 * time.Second): //Timeout after 2 second fmt.Println("timeout") } }
- 示例展示了如何使用 time.After 函数创建一个在指定时间后发送消息的通道
- 主函数使用 select 语句从 c1 通道读取数据
- select 语句将阻塞,直到其中一个通道准备好发送或接收数据
- 如果 c1 通道已准备就绪,则主函数将打印该消息
- 如果 c1 通道在 2 秒后仍未准备就绪,则主函数将打印超时消息
关闭通道
关闭通道用于指示通道上不会再发送任何值。它用于向接收方发出信号,表示通道已关闭,不会再发送任何值。
可以明确关闭 Go 通道以帮助解决同步问题,默认情况是在发送完所有值后关闭通道。
通过调用内置的 close 函数来关闭通道:
package main import ( "bytes" "fmt" ) func process(work <-chan string, fin chan<- string) { var b bytes.Buffer for { if msg, notClosed := <-work; notClosed { fmt.Printf("%s received...\n", msg) } else { fmt.Println("Channel closed") fin <- b.String() return } } } func main() { work := make(chan string, 3) fin := make(chan string) go process(work, fin) word := "hello world" for i := 0; i < len(word); i++ { letter := string(word[i]) work <- letter fmt.Printf("%s sent ...\n", letter) } close(work) fmt.Printf("result: %s\n", <-fin) }
上面的示例展示了如何关闭一个通道。工作通道创建时缓冲区大小为 3。process 函数从工作通道读取数据并将数据写入 fin 通道。main 函数将数据写入工作通道并关闭工作通道。如果工作通道未关闭,process 函数将打印消息。如果工作通道已关闭,process 函数将打印消息并将数据写入 fin 通道。
迭代通道消息
可以使用 range 关键字迭代通道,类似于数组、切片和映射中的使用方式一样。这允许快速轻松地迭代通道内的消息。
示例代码如下:
package main import ( "fmt" ) func main() { c := make(chan string, 3) go func() { c <- "hello" c <- "world" c <- "goroutine" close(c) // Closing the channel is very important before proceeding to the iteration hence deadlock error }() for msg := range c { fmt.Println(msg) } }
示例展示了如何使用 range 关键字迭代通道。c 通道的创建缓冲区大小为 3。go 关键字用于创建一个将数据写入 c 通道的 goroutine。main 函数使用 range 关键字迭代 c 通道并打印消息。需要注意的是,如果迭代通道,首先需要关闭它,否则会产生死锁。
通过本文,我们学习了如何使用 Go 中的 goroutine 和通道处理并发。并且学习了如何创建 goroutine,以及如何使用 WaitGroups 和通道在 goroutine 之间进行通信。
同时,还学习了如何使用通道缓冲区、通道方向、通道选择、通道超时、通道关闭和通道迭代。
总之,Goroutine 和通道是 Go 中的强大功能,有助于满足并发和异步流要求。