欢迎,来自IP地址为:18.97.14.80 的朋友


并发性 是程序同时执行多个任务的能力,它是构建可扩展、高性能系统的关键要素。

Go 的并发模型基于协程(goroutine)的概念,它是可以同时运行多个函数的轻量级线程;以及通道(Channel),用于在 goroutine 之间安全高效地交换数据的内置通信机制。

Go 的并发特性使开发人员能够编写具有以下功能的程序:

  • 同时处理多个请求,提高响应能力和吞吐量
  • 高效利用多核处理器,最大限度地利用系统资源
  • 编写安全、高效且易于维护的并发代码

Go 的并发模型旨在最大程度地减少开销、减少延迟并防止常见的并发错误(如竞争条件和死锁)。借助 Go 语言的并发特性,开发人员可以轻松构建高性能、可扩展且并发的系统,使其成为构建现代分布式系统、网络和云基础设施的理想选择。

案例假设

首先让我们考虑一个场景来说明并发性: 银行柜员

假如有一家繁忙的银行,有两个出纳员,柜员甲和柜员乙。客户来到银行进行各种交易,如存款、取款和转账。银行的目标当然是快速高效地为客户提供服务。

顺序处理(无并发)

柜员甲和柜员乙按顺序工作,一次一个。当客户到达时,柜员甲会服务该客户,而柜员乙会等到柜员甲完成后再为下一位客户服务。这显然会导致客户等待时间过长。

并发处理

合理的情形是,柜员甲和柜员乙同时工作,同时为客户服务。当客户到达时,柜员甲为客户A进行交易,柜员乙同时帮助另一位客户B进行另一笔交易。他们一起工作,共享银行数据库和现金等资源,同时为多个客户提供服务。

在这种情况下,并发使柜员甲柜员乙能够高效地一起工作,同时为多个客户提供服务,并改善整体客户体验。同样的概念也适用于计算机编程,并发可以使多个任务能够同时运行,从而提高响应能力、效率和性能。

Goroutine 和 Channel

Goroutine 是由 Go 运行时(runtime)管理的轻量级线程。它是在 Go 运行时上运行的函数,它有助于解决并发和异步流要求

Goroutine 允许在程序中启动和并发运行其他执行线程。

Channel 用于在 goroutine 之间进行通信。它是一个类型化的管道,使用通道运算符”<-“发送和接收值。

如何实现 Goroutine

要实现 goroutine,只需要在调用函数时,前面加上 go 关键字:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func pause() {
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func sendMsg(msg string) {
    pause()
    fmt.Println(msg)
}

func main() {
    sendMsg("hello") // sync

    go sendMsg("test1") // async
    go sendMsg("test2") // async
    go sendMsg("test3") // async

    sendMsg("main") // sync

    time.Sleep(2 * time.Second)
}

从上面的例子可以看出:

  • sendMsg 函数有同步调用和异步调用两种方式
  • 当 sendMsg 函数调用时,如果不带 go 关键字,则 sendMsg 函数会被同步调用
  • 当 sendMsg 函数调用时,如果带 go 关键字,则 sendMsg 函数会被异步调用

Goroutine 的工作原理

当使用 go 关键字调用 sendMsg 函数时,main 函数不会等待 sendMsg 函数执行完毕就继续执行下一行代码,并在调用 sendMsg 函数后立即返回。

否则,该函数是同步调用的,main 函数将等待 sendMsg 函数执行完毕才继续执行下一行代码。

运行上述示例时输出的顺序将与代码的顺序不同,并且多次执行程序,其运行结果也不尽相同,因为三个 goroutine 都是并发运行的,并且由于函数暂停了一段时间,它们唤醒并输出的顺序将不同。

time.Sleep(2 * time.Second) 是一种快速简便的方法,用于保持 main 函数运行 2 秒,以允许 goroutine 在 main 函数退出之前完成执行。否则,在调用 goroutine 后,main 函数将立即退出,goroutine 将没有足够的时间完成执行,从而不会显示 goroutine 的执行结果。

使用 WaitGroups

与上例中使用的 time.Sleep(2 * time.Second) 不同,WaitGroups 更标准,它被用于等待一组 goroutine 完成执行。这是同步多个 goroutines 的简单方法。

示例程序如下:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func pause() {
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func sendMsg(msg string, wg *sync.WaitGroup) {
	defer wg.Done()
	pause()
	fmt.Println(msg)
}

func main() {
	var wg sync.WaitGroup

	wg.Add(3)

	go func(msg string) {
		defer wg.Done()
		pause()
		fmt.Println(msg)
	}("test1")

	go sendMsg("test2", &wg)
	go sendMsg("test3", &wg)

	wg.Wait()
}

示例中还使用了匿名函数声明,并通过 goroutine 来调用它。

上面的例子中,sync.WaitGroup 用于等待三个 goroutine 执行完毕后退出主函数。通过它,就可以同步三个 goroutine 和主函数:

  • sync.WaitGroup (wg) 管理 goroutine 并跟踪正在运行的 goroutine 数量
  • sync.WaitGroup.Add (wg.Add) 方法用于添加正在运行的 goroutine 数量作为参数
  • sync.WaitGroup.Done (wg.Done) 方法用于减少正在运行的 goroutine 数量
  • sync.WaitGroup.Wait (wg.Wait) 方法用于等待所有 goroutine 执行完毕后退出主函数

通道的概念

通道用于在 goroutine 之间进行通信。它是一种类型化的管道,可以通过它使用通道运算符”<-“发送和接收消息。

最简单的方式是,一个 goroutine 将消息写入通道,另一个 goroutine 从通道中读取这个写入的消息。

使用 make 方法和 chan 关键字及其类型来创建通道,通道用于传输声明时设置的类型的消息。

示例如下:

package main 

func main(){ 
	msgChan := make(chan string) 
}

上面的示例创建了一个字符串消息类型的通道 msgChan。

将数据写入通道

要将数据写入通道,首先指定通道的名称(msgChan),然后指定”<-“运算符和消息,这被视为发送方:

msgChan <- "hello world"

从通道读取数据

要从通道读取数据,只需将运算符”<-“移到通道名称(msgChan)前面,即可将其分配给变量,这被视为接收器:

msg := <- msgChan

使用 Goroutine 实现通道

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {

	msgChan := make(chan string)

	go func() {
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		msgChan <- "hello" // Write data to the channel
		msgChan <- "world" // Write data to the channel
	}()

	msg1 := <- msgChan
	msg2 := <- msgChan

	fmt.Println(msg1, msg2)
}

以上的示例展示了如何从通道写入和读取数据。创建 msgChan 通道并使用 go 关键字创建一个将数据写入通道的 goroutine。msg1 和 msg2 变量用于从通道读取数据。

通道的行为类似于先进先出队列。因此,当一个 goroutine (匿名函数)将数据写入通道时,另一个 goroutine (main)会按照写入的顺序从通道读取数据。

通道缓冲区

通道可以是缓冲的,也可以是无缓冲的。前面的示例使用即为无缓冲通道。

无缓冲通道会导致发送方在将消息发送到通道后立即阻塞,直到接收方收到消息。

缓冲通道允许发送方在缓冲区已满之前不阻塞地将消息发送到通道。因此,发送方仅在缓冲区已满后才阻塞,并等待另一个 goroutine 从通道读取,确保空间大小可用后再解除阻塞。

创建缓冲通道,只需要在使用 make 函数创建时使用第二个参数来指示缓冲区大小即可:

msgBufChan := make(chan string, 2)

上面的示例代码创建了一个字符串类型的缓冲通道 msgBufChan,其缓冲区大小为 2。这意味着该通道在阻塞之前最多可以容纳两条消息。

package main

import (
	"time"
)

func main() {
	size := 3
	msgBufChan := make(chan int, size)

	// reader (receiver)
	go func() {
		for {
			_ = <-msgBufChan
			time.Sleep(time.Second)
		}
	}()

	//writer (sender)
	writer := func() {
		for i := 0; i <= 10; i++ {
			msgBufChan <- i
			println(i)
		}
	}

	writer()
}

上面的示例创建了一个 int 类型的缓冲通道 msgBufChan,其缓冲区大小为 3。

writer 函数将数据写入通道,而匿名函数(reader)从通道中读取数据。

当程序运行时,会看到立即打印出 0 到 3 的数字,其余的数字 4 到 10 则以每秒一个的速度缓慢打印出来(time.Sleep(time.Second))。

这显示了缓冲通道在阻塞之前指定其可以容纳的大小的效果。

通道方向

当使用通道作为函数参数时,默认情况下,可以在函数内发送和接收消息。为了在编译时提供额外的安全性,通道函数参数可以定义为方向。也就是说,它们可以定义为只读或只写。

例如:

package main

import (
	"fmt"
	"time"
)

func writer(channel chan<- string, msg string) {
	channel <- msg
}

func reader(channel <-chan string) {
	msg := <-channel
	fmt.Println(msg)
}

func main() {
	msgChan := make(chan string, 1)

	for i := 0; i < 10; i++ {
		go writer(msgChan, fmt.Sprintf("msg %d", i))
		go reader(msgChan)
	}

	time.Sleep(time.Second * 2)
}

上面示例显示了如何定义具有方向的通道。

  • 写入器(writer)函数使用只写通道
  • 读取器(reader)函数使用只读通道

msgChan 通道的创建缓冲区大小为 1。写入器函数将数据写入通道,读取器函数从通道读取数据。

使用通道选择处理多个通信操作

select 语句允许 goroutine 等待多个通信操作。select 会阻塞,直到其中一个 case 可以运行,然后执行对应代码。如果多个 case 已准备就绪,它会随机选择一个。

select 和 case 语句用于简化跨多个通道等待的可管理性和可读性。

示例代码:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func pause() {
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func test1(c chan<- string) {
	for {
		pause()
		c <- "hello"
	}
}

func test2(c chan<- string) {
	for {
		pause()
		c <- "world"
	}
}

func main() {
	rand.Seed(time.Now().Unix())

	c1 := make(chan string)
	c2 := make(chan string)

	go test1(c1)
	go test2(c2)

	for {
		select {
		case msg1 := <-c1:
			fmt.Println(msg1)
		case msg2 := <-c2:
			fmt.Println(msg2)
		}
	}
}

示例展示了如何使用 select 语句在多个通道上等待。test1 和 test2 函数分别将数据写入 c1 和 c2 通道。main 函数使用 select 语句从 c1 和 c2 通道读取数据。

select 语句将阻塞,直到其中一个通道准备好发送或接收数据。 如果两个通道都已准备就绪,则 select 语句将随机选择一个。

通道中长时间运行的进程超时停止

time.After 函数用于创建一个在指定持续时间后发送消息的通道,这可用于实现通道的超时。

可以在 select 语句中指定它,以帮助管理从任何受监控的通道接收消息的时间过长的情况。

在使用外部资源时也需要考虑超时问题,因为有些情况无法保证响应时间,因此可能需要在预定时间过后主动采取措施。

使用 select 语句实现超时非常简单,示例如下:

package main

import (
	"fmt"
	"time"
)

func main() {
	c1 := make(chan string)

	go func(channel chan string) {
		time.Sleep(1 * time.Second)
		channel <- "hello world"
	}(c1)

	select {
	case msg2 := <-c1:
		fmt.Println(msg2)
	case <-time.After(2 * time.Second): //Timeout after 2 second
		fmt.Println("timeout")
	}
}
  • 示例展示了如何使用 time.After 函数创建一个在指定时间后发送消息的通道
  • 主函数使用 select 语句从 c1 通道读取数据
  • select 语句将阻塞,直到其中一个通道准备好发送或接收数据
  • 如果 c1 通道已准备就绪,则主函数将打印该消息
  • 如果 c1 通道在 2 秒后仍未准备就绪,则主函数将打印超时消息

关闭通道

关闭通道用于指示通道上不会再发送任何值。它用于向接收方发出信号,表示通道已关闭,不会再发送任何值。

可以明确关闭 Go 通道以帮助解决同步问题,默认情况是在发送完所有值后关闭通道。

通过调用内置的 close 函数来关闭通道:

package main

import (
	"bytes"
	"fmt"
)

func process(work <-chan string, fin chan<- string) {
	var b bytes.Buffer
	for {
		if msg, notClosed := <-work; notClosed {
			fmt.Printf("%s received...\n", msg)
		} else {
			fmt.Println("Channel closed")
			fin <- b.String()
			return
		}
	}
}

func main() {
	work := make(chan string, 3)
	fin := make(chan string)

	go process(work, fin)

	word := "hello world"

	for i := 0; i < len(word); i++ {
		letter := string(word[i])
		work <- letter
		fmt.Printf("%s sent ...\n", letter)
	}

	close(work)

	fmt.Printf("result: %s\n", <-fin)
}

上面的示例展示了如何关闭一个通道。工作通道创建时缓冲区大小为 3。process 函数从工作通道读取数据并将数据写入 fin 通道。main 函数将数据写入工作通道并关闭工作通道。如果工作通道未关闭,process 函数将打印消息。如果工作通道已关闭,process 函数将打印消息并将数据写入 fin 通道。

迭代通道消息

可以使用 range 关键字迭代通道,类似于数组、切片和映射中的使用方式一样。这允许快速轻松地迭代通道内的消息。

示例代码如下:

package main

import (
	"fmt"
)

func main() {
	c := make(chan string, 3)

	go func() {
		c <- "hello"
		c <- "world"
		c <- "goroutine"
		close(c) // Closing the channel is very important before proceeding to the iteration hence deadlock error
	}()

	for msg := range c {
		fmt.Println(msg)
	}
}

示例展示了如何使用 range 关键字迭代通道。c 通道的创建缓冲区大小为 3。go 关键字用于创建一个将数据写入 c 通道的 goroutine。main 函数使用 range 关键字迭代 c 通道并打印消息。需要注意的是,如果迭代通道,首先需要关闭它,否则会产生死锁。

通过本文,我们学习了如何使用 Go 中的 goroutine 和通道处理并发。并且学习了如何创建 goroutine,以及如何使用 WaitGroups 和通道在 goroutine 之间进行通信。

同时,还学习了如何使用通道缓冲区、通道方向、通道选择、通道超时、通道关闭和通道迭代。

总之,Goroutine 和通道是 Go 中的强大功能,有助于满足并发和异步流要求。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注