Go语言最大的一个特点就是对并发的原生支持
Go的并发指的是能让某个函数独立于其他函数运行的能力.
当一个函数创建为goroutine时, Go会将其视为一个独立的工作单元. 这个单元会被调度到任何可用的逻辑处理上执行.
Go的运行时调度器能管理被创建的goroutine并为他们分配执行时间.
Go基于叫做CSP ( Communicating Sequential Processes ) 的并发同步模型来保证数据的线程安全, CSP是一种消息传递模型, 通过在goroutine之间传递数据来传递消息,而不是对数据进行加锁来来实现同步访问, 用于传递数据的关键数据类型叫做channel
package main
import (
"fmt"
"runtime"
"sync"
)
func main(){
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start")
go func(){
defer wg.Done()
for i:=0;i<100;i++{
fmt.Println("go1:",i)
}
}()
go func(){
defer wg.Done()
for i:=0;i<100;i++{
fmt.Println("go2:",i)
}
}()
fmt.Println("waiting to finish")
wg.Wait()
fmt.Println("Done")
}
上面说到Go语言的运行时会在逻辑处理器上调度goroutine来运行, 每个逻辑处理器都分别绑定到单个操作系统线程
1.5 之前的版本只创建一个逻辑处理器, 1.5及其之后的版本会为每个可用的物理处理器各创建一个
如果创建一个goroutine并准备运行, 这个goroutine就会被放到调度器的全局运行队列中.
之后调度器就将这些队列中的goroutine分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中, 本地运行队列中的goroutine会一直等待直到自己被分配的逻辑处理器执行

如果正在运行的goroutine需要执行一个阻塞的系统调用,比如打开一个文件.
线程和goroutine会从逻辑处理器上分离, 该线程会继续阻塞,等待系统调用返回, 与此同时, 这个逻辑处理器就失去了原来的线程, 所以调度器会创建一个新线程并将其绑定到该逻辑处理器上
一旦阻塞结束, 对应的goroutine会放回到本地队列, 之前的线程会保存好,以便之后继续使用
如果是一个网络IO阻塞, 情况会有一些不同, goroutine会从逻辑处理器分离,并移动到集成了网络轮询器的运行时, 阻塞结束时goroutine会被重新分配到逻辑处理器上

实际上,一个goroutine并不相当于一个线程,goroutine的出现正是为了替代原来的线程概念成为最小的调度单位。一旦运行goroutine时,先去当先线程查找,如果线程阻塞了,则被分配到空闲的线程,如果没有空闲的线程,那么就会新建一个线程。注意的是,当goroutine执行完毕后,线程不会回收推出,而是成为了空闲的线程。
这里的并发(concurrency) 和 并行(parallelism)是有区别的
并发指的是多个线程在同一处理器上被轮询执行
并行则是在多个处理器上同时执行
接下来我们看看线程安全的问题
竞争状态
如果两个或者多个goroutine在没有相互同步的情况下,访问某个共享资源,并试图同时读写这个资源, 就处于相互竞争的状态
插播几句废话
刚开始学习go的时候因为读了一些相关说明, 对go的并发模型期待很高, 认为它会比java的并发模型先进很多
其实还是挺失望的, 对于线程不安全的状态python使用GIL, java则使用同步或者CAS, go其实并没有对java的模型进行质变
还是一个套路
如果你运行下面的java代码,每次运行你都会得到不同的值例如19或者20
public class App {
static int result = 0;
public static void add() {
result++;
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = null;
Thread t2 = null;
t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
add();
System.out.println("t1:" + result);
}
});
t1.start();
t2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
add();
System.out.println("t2:" + result);
}
});
t2.start();
t1.join();
t2.join();
System.out.println(result);
}
}
输出错误结果19时的输出为 , 可以看到第二次和第三次结果两个线程计算出了相同的结果2
t2:1
t2:2
t1:2
t2:3
t1:4
t2:5
t2:6
t2:8
t1:8
t2:9
t1:10
t2:11
t2:12
t1:14
t2:14
t1:15
t1:16
t1:17
t1:18
t1:19
19
因为java的线程会使用变量的副本进行运算, 所以上面的代码会得到一个经典的线程不安全模型
同样的代码在go中则没有出现同样的情况, 说明go在并发时好像是不持有变量副本的
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var result = 0;
func main(){
runtime.GOMAXPROCS(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start")
go func(){
defer wg.Done()
for i:=0;i<100;i++{
time.Sleep(1000)
fmt.Println("go1:",i)
result++
fmt.Println("result1:",result)
}
}()
go func(){
defer wg.Done()
for i:=0;i<100;i++{
time.Sleep(1000)
fmt.Println("go2:",i)
result++
fmt.Println("result2:",result)
}
}()
fmt.Println("waiting to finish")
wg.Wait()
fmt.Println("Done:",result)
}
上面的代码会一直输出结果为200
start
waiting to finish
go1: 0
result1: 1
go2: 0
result2: 2
go2: 1
result2: 3
go1: 1
result1: 4
go1: 2
result1: 5
go2: 2
result2: 6
go2: 3
result2: 7
go1: 3
result1: 8
go1: 4
result1: 9
go2: 4
result2: 10
go2: 5
result2: 11
go1: 5
result1: 12
...
...
result1: 196
go1: 98
result1: 197
go2: 98
result2: 198
go2: 99
result2: 199
go1: 99
result1: 200
Done: 200
如果要像java那样操作,需要对代码进行一些小改动
下面这个例子会和java一样, 每个goroutine都会创造一个变量的副本,然后在调用结束之前一直使用自己的副本,这样也就忽略了其他goroutine修改后的值
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var result = 0;
func main(){
runtime.GOMAXPROCS(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start")
go func(){
defer wg.Done()
for i:=0;i<10;i++{
time.Sleep(1000)
fmt.Println("go1:",i)
temp:=result
runtime.Gosched()
temp++
result=temp
fmt.Println("result1:",result)
}
}()
go func(){
defer wg.Done()
for i:=0;i<10;i++{
time.Sleep(1000)
fmt.Println("go2:",i)
temp:=result
runtime.Gosched()
temp++
result=temp
fmt.Println("result2:",result)
}
}()
fmt.Println("waiting to finish")
wg.Wait()
fmt.Println("Done:",result)
}
输出
start
waiting to finish
go1: 0
go2: 0
result1: 1
result2: 1
go2: 1
go1: 1
result2: 2
result1: 2
go1: 2
go2: 2
result1: 3
result2: 3
go2: 3
result2: 4
go1: 3
result1: 5
go1: 4
go2: 4
result1: 6
result2: 6
go2: 5
go1: 5
result2: 7
result1: 7
go1: 6
go2: 6
result1: 8
result2: 8
go2: 7
go1: 7
result2: 9
result1: 9
go1: 8
go2: 8
result1: 10
result2: 10
go2: 9
go1: 9
result2: 11
result1: 11
Done: 11
解决办法其实也和java差不多
基于atomic实现
package main
import (
"fmt"
"runtime"
"sync"
"time"
"sync/atomic"
)
var result int64 = 0;
func main(){
runtime.GOMAXPROCS(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start")
go func(){
defer wg.Done()
for i:=0;i<10;i++{
time.Sleep(1000)
fmt.Println("go1:",i)
runtime.Gosched()
atomic.AddInt64(&result, 1);
fmt.Println("result1:",result)
}
}()
go func(){
defer wg.Done()
for i:=0;i<10;i++{
time.Sleep(1000)
fmt.Println("go2:",i)
runtime.Gosched()
atomic.AddInt64(&result, 1);
fmt.Println("result2:",result)
}
}()
fmt.Println("waiting to finish")
wg.Wait()
fmt.Println("Done:",result)
}
基于通道(channel)
当一个资源需要在goroutine之间共享时, 通道在goroutine之间架起了一个管道, 并提供了确保同步交换数据的机制
声明通道时, 需要指定将要被共享的数据类型.
可以通过通道共享 内置类型, 命名类型, 结构类型 和 引用类型的值或者指针
在go语言中需要使用内置函数 make 来创建一个通道
unbuffered := make( chan int )
buffered := make( chan string,10 )
向通道发送值
buffered <- "the mssages"
从通道接收一个字符串
value := <-buffered
一个简单的channel例子
package main
import "fmt"
func main() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
}
channel的工作方式有些类似于生产者和消费者, 无缓存的channel 无论是 读 还是 写 都会造成阻塞
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var result int = 0
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(2)
messages := make(chan int)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
result++
messages <- result
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
result := <-messages
fmt.Println(result)
}
}()
fmt.Println("waiting to finish")
wg.Wait()
fmt.Println(result)
}
最后我们用两个channel 实现一下最开始的 双线程 计数器
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
messages := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for {
time.Sleep(1 * time.Second)
result, ok := <-messages
if result >= 5 {
close(messages)
return
}
if ok {
result++
fmt.Println("1:", result, ok)
messages <- result
} else {
return
}
}
}()
go func() {
defer wg.Done()
for {
time.Sleep(1 * time.Second)
result, ok := <-messages
if result >= 5 {
close(messages)
return
}
if ok {
result++
fmt.Println("2:", result, ok)
messages <- result
} else {
return
}
}
}()
messages <- 0
fmt.Println("waiting to finish")
wg.Wait()
}
输出是
waiting to finish
1: 1 true
2: 2 true
1: 3 true
2: 4 true
1: 5 true