0%

20210304学习记录

2021-03-04 学习记录

今天学习的内容:

  1. 数据结构:队列
  2. Go语言unsafe.Pointer的用法
  3. CAS实现无锁队列

1. 数据结构:队列

队列的实现很简单,这次实现了一个简单的链式队列。但是考虑到并发问题,入队和出队操作需要考虑并发同步问题。这次依然是先使用锁来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main

import (
"fmt"
"reflect"
"sync/atomic"
"sync"
"unsafe"
)

// Queue the queue structure
type Queue struct {
head *node
tail *node
lock sync.RWLock
}

type node struct {
val interface{}
next *node
}

// CreateAnyTypeSlice converse interface{} to slice
func CreateAnyTypeSlice(slice interface{}) ([]interface{}, bool) {
val, ok := isSlice(slice)
if !ok {
return nil, false
}

sliceLen := val.Len()

out := make([]interface{}, sliceLen)

for i := 0; i < sliceLen; i++ {
out[i] = val.Index(i).Interface()
}

return out, true
}

func isSlice(slice interface{}) (val reflect.Value, ok bool) {
val = reflect.ValueOf(slice)

if val.Kind() == reflect.Slice {
ok = true
}

return
}

// Create create a queue.
func (queue *Queue) Create(arr interface{}) error {
slice, ok := CreateAnyTypeSlice(arr)
if !ok {
return fmt.Errorf("not a slice")
}

queue.head = new(node)
pNode := queue.head

for i, val := range slice {
pNode.val = val

if i < len(slice)-1 {
pNode.next = new(node)
pNode = pNode.next
}
}

queue.tail = pNode
return nil
}

// Enqueue enqueue a node into the queue
func (queue *Queue) Enqueue(val interface{}) error {
queue.lock.Lock()
defer queue.lock.UnLock()

pNode := new(node)
pNode.val = val
pNode.next = nil
queue.tail.next = pNode
pNode = queue.tail

return nil
}

// Dequeue dequeue the first node of the queue
func (queue *Queue) Dequeue() error {

queue.head = queue.head.next
return nil
}

但是这样操作,锁的颗粒度太大,导致了性能的降低。能不能降低锁的颗粒度,甚至是无锁实现并发呢?

事实上是可以的,可以通过CAS(Compare and Swap)实现无锁队列。后面主要会记录一下今天学到的关于CAS的相关内容。

2. Go语言unsafe.Pointer的用法

首先,golang的unsafe包的官方解释是这样的:

  • 围绕 Go 程序内存安全及类型的操作
  • 很可能会是不可移植的
  • 不受 Go 1 兼容性指南的保护

说白了就是不是那么安全,不推荐使用,但是在特殊情况下使用它有时候会有特殊效果。Go 的指针是不支持指针运算和转换,但是使用unsafe.Pointer可以打破这个限制。unsafe.Pointer是实现定位和读写的内存的基础,Go runtime大量使用它。

1
2
3
4
(1)任何类型的指针都可以被转化为unsafe.Pointer
(2)unsafe.Pointer可以被转化为任何类型的指针
(3)uintptr可以被转化为unsafe.Pointer
(4)unsafe.Pointer可以被转化为uintptr

简单来讲,unsafe.Pointer可以类比为C语言中的void*类型,它可以包含任意类型变量的地址。

一个普通的T类型(基础类型,结构体类型)指针可以被转化为unsafe.Pointer类型指针,并且一个unsafe.Pointer类型指针也可以被转回普通的指针,被转回普通的指针类型并不需要和原始的T类型相同。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"unsafe"
"fmt"
)

func main(){
var a int = 1
p := &a
fp := (*float32)(unsafe.Pointer(p))
fmt.Println(fp) // 0xc000014078

return
}

总的来说,unsafe.Pointer 可以让你的变量在不同的指针类型转来转去,也就是表示为任意可寻址的指针类型。

有了这个点,就可以继续下面的无锁队列的设计了。

3. CAS实现无锁队列

首先我们需要知道CAS是什么。

CAS操作,即Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。

可以用C语言简单描述:

1
2
3
4
5
6
7
8
int compare_and_swap (int* reg, int oldval, int newval)
{ // 看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval
int old_reg_val = *reg;
if (old_reg_val == oldval) {
*reg = newval;
}
return old_reg_val;
}

可以看到,old_reg_val 总是返回,于是,我们可以在 compare_and_swap 操作之后对其进行测试,以查看它是否与 oldval相匹配,因为它可能有所不同,不同的原因是另一个并发线程已成功竞争并成功将 reg 值从 oldval 更改为别的值了。

改成更方便操作和目前更多实现的形式如下:

1
2
3
4
5
6
7
8
bool compare_and_swap (int *addr, int oldval, int newval)
{
if ( *addr != oldval ) {
return false;
}
*addr = newval;
return true;
}

即通过检查返回值的true或者false来确定是否成功修改。

随后我们可以考虑实现一个基础的无锁入队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (queue *Queue) Enqueue(val interface{}) error {
pNode := new(node)
pNode.val = val
pNode.next = nil

p := new(node)

for {
p = queue.tail
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&p.next)), unsafe.Pointer(nil), unsafe.Pointer(pNode)) { // go语言中使用atomic包中的CompareAndSwapXXXX方法来实现CAS操作,其中XXXX对应着不同类型的操作变量
break
}
}

atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(p), unsafe.Pointer(pNode))

return nil
}

这里有几个点需要注意:

  1. 执行流程。可以发现,整个程序的for循环中的CAS操作是在尝试修改p.next的值,如果值为nil(即未被其他线程修改过),则将其设置成pNode,否则设置失败,返回false,继续循环,再次尝试修改。

  2. 语言细节。这里atomic.CompareAndSwapPointer函数中第一个参数要传入*unsafe.Pointer类型,如果直接写成&(unsafe.Pointer(&p.next))会提示错误,提示这个unsafe.Pointer类型不能读取地址。这里需要巧妙运用一下前面说的unsafe.Pointer类型的任意类型转换,我们把unsafe.Pointer类型强制转换为*unsafe.Pointer类型即可。

  3. 程序存在死锁风险。首先考虑for循环,当循环跳出的时候,说明我们的p.next已经被成功修改为pNode,但是tail指针还没有修改,那么此时其他线程的这个CAS操作会一直失败,永远不能出循环。这个时候最后一步queue.tail将被置为pNode,如果这一步还没有操作,这个线程就突然挂掉了,那么其他的线程将陷入死循环,整个程序出现死锁,golang的runtime会直接panic,程序运行失败。

我们考虑让每个线程都先直接将尾指针丢到队尾。因为所有的线程都共享着queue.tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了。

改进算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (queue *Queue) Enqueue(val interface{}) error {
pNode := new(node)
pNode.val = val
pNode.next = nil

tail := new(node)
next := new(node)

for {
tail = queue.tail
next = tail.next

// 如果尾指针被移动了,则重新取尾指针
if tail != queue.tail {
continue
}

// 如果尾指针的next不是nil,说明尾指针已经被动过,于是操作线程的尾指针挪到正确的位置,然后重新循环
if next != nil {
// CAS(queue.tail, tail, next)
atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(next))
continue
}

// CAS(tail.next, next, pNode) == true 节点添加成功则退出
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)), unsafe.Pointer(next), unsafe.Pointer(pNode)) {
break
}
}

// CAS(queue.tail, tail, pNode)
atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(pNode))
return nil
}

Dequeue算法同理,对head指针的移动使用CAS算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (queue *Queue) Dequeue() error {

p := new(node)

for {
p = queue.head
if p.next == nil {
return fmt.Errorf("Dequeue an empty queue")
}

if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.head)), unsafe.Pointer(p), unsafe.Pointer(p.next)) {
break
}
}

return nil
}

不过这里同样存在一个问题,如果headtail指针都指向同一个节点,这个时候入队将返回错误,但是在判断p.next == nil这个条件的时候,另一个Enqueue()函数执行到了一半,此时的p.next不是nil了,但是tail还没有更新,导致Enqueue()未完成时就被Dequeue,此时队列为空,但是headtail没有指向同一个节点。

改进算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (queue *Queue) Dequeue() error {

head := new(node)
tail := new(node)
next := new(node)

for {
head = queue.head
tail = queue.tail
next = head.next

// 如果头指针移动了(即有出队),则重新取头指针
if head != queue.head {
continue
}

// 空队列
if head == tail && next == nil {
return fmt.Errorf("dequeue an empty queue")
}

// tail指针落后
if head == tail && next == nil {

// CAS(queue.tail, tail, next)
atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(next))
continue
}

// CAS(queue.head, head, next)
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.head)), unsafe.Pointer(head), unsafe.Pointer(next)) {
break
}
}

return nil
}

CAS算法还存在ABA问题,这里暂时不考虑了,后面学习过程中再总结。