进程:数据集合的动态执行过程
线程:顺控,cpu调度和分派的基本单位
协程:用户态线程,速度快
Go 用的是MPG 线程模型,倡导用 CSP 并发模型来控制线程之间的任务协作。
Go 是通过 goroutine 和 channel 来实现 CSP 并发模型的:
goroutine:即协程,Go 中的并发实体,是一种轻量级的用户线程,是消息的发送和接收方;
channel: 即通道, goroutine 使用通道发送和接收消息。
注:本篇示例代码来自网络。
1.goroutine和 channel语法
2.消费者和生产者模型
3.多路复用
4.context上下文
1.goroutine和 channel语法
var name chan T // 双向 channel
var name chan <- T // 只能发送消息的 channel
var name T <- chan // 只能接收消息的 channel
channel <- val // 发送消息
val := <- channel // 接收消息
val, ok := <- channel // 非阻塞接收消息
2.消费者和生产者模型
package main
import (
"fmt"
"time"
)
//生产者
func Producer(begin, end int, queue chan<- int) {
for i:= begin ; i < end ; i++ {
fmt.Println("produce:", i)
queue <- i
}
}
//消费者
func Consumer(queue <-chan int) {
for val := range queue { //当前的消费者循环消费
fmt.Println("consume:", val)
}
}
func main() {
queue := make(chan int)
defer close(queue)
for i := 0; i < 3; i++ {
go Producer(i * 5, (i+1) * 5, queue) //多个生产者
}
go Consumer(queue) //单个消费者
time.Sleep(time.Second) // 避免主 goroutine 结束程序
}
3.多路复用 select
package main
import (
"fmt"
"time"
)
func send(ch chan int, begin int ) {
// 循环向 channel 发送消息
for i :=begin ; i< begin + 10 ;i++{
ch <- i
}
}
func receive(ch <-chan int) {
val := <- ch
fmt.Println("receive:", val)
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go send(ch1, 0)
go receive(ch2)
// 主 goroutine 休眠 1s,保证调度成功
time.Sleep(time.Second)
for {
select {
case val := <- ch1: // 从 ch1 读取数据
fmt.Printf("get value %d from ch1\n", val)
case ch2 <- 2 : // 使用 ch2 发送消息
fmt.Println("send value by ch2")
case <-time.After(2 * time.Second): // 超时设置
fmt.Println("Time out")
return
}
}
}
4.context上下文
package main
import (
"context"
"fmt"
"time"
)
const DB_ADDRESS = "db_address"
const CALCULATE_VALUE = "calculate_value"
func readDB(ctx context.Context, cost time.Duration) {
fmt.Println("db address is", ctx.Value(DB_ADDRESS))
select {
case <- time.After(cost): // 模拟数据库读取
fmt.Println("read data from db")
case <-ctx.Done():
fmt.Println(ctx.Err()) // 任务取消的原因
// 一些清理工作
}
}
func calculate(ctx context.Context, cost time.Duration) {
fmt.Println("calculate value is", ctx.Value(CALCULATE_VALUE))
select {
case <- time.After(cost): // 模拟数据计算
fmt.Println("calculate finish")
case <-ctx.Done():
fmt.Println(ctx.Err()) // 任务取消的原因
// 一些清理工作
}
}
func main() {
ctx := context.Background(); // 创建一个空的上下文
// 添加上下文信息
ctx = context.WithValue(ctx, DB_ADDRESS, "localhost:10086")
ctx = context.WithValue(ctx, CALCULATE_VALUE, 1234)
// 设定子 Context 2s 后执行超时返回
ctx, cancel := context.WithTimeout(ctx, time.Second * 2)
defer cancel()
// 设定执行时间为 4 s
go readDB(ctx, time.Second * 4)
go calculate(ctx, time.Second * 4)
// 充分执行
time.Sleep(time.Second * 5)
}