2021-03-04 学习记录 今天学习的内容:
数据结构:队列
Go语言unsafe.Pointer的用法
CAS实现无锁队列
1. 数据结构:队列 队列的实现很简单,这次实现了一个简单的链式队列。但是考虑到并发问题,入队和出队操作需要考虑并发同步问题。这次依然是先使用锁来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 package mainimport ( "fmt" "reflect" "sync/atomic" "sync" "unsafe" ) type Queue struct { head *node tail *node lock sync.RWLock } type node struct { val interface {} next *node } func CreateAnyTypeSlice (slice interface {}) ([]interface {}, bool ) { val, ok := isSlice(slice) if !ok { return nil , false } sliceLen := val.Len() out := make ([]interface {}, sliceLen) for i := 0 ; i < sliceLen; i++ { out[i] = val.Index(i).Interface() } return out, true } func isSlice (slice interface {}) (val reflect.Value, ok bool ) { val = reflect.ValueOf(slice) if val.Kind() == reflect.Slice { ok = true } return } func (queue *Queue) Create (arr interface {}) error { slice, ok := CreateAnyTypeSlice(arr) if !ok { return fmt.Errorf("not a slice" ) } queue.head = new (node) pNode := queue.head for i, val := range slice { pNode.val = val if i < len (slice)-1 { pNode.next = new (node) pNode = pNode.next } } queue.tail = pNode return nil } func (queue *Queue) Enqueue (val interface {}) error { queue.lock.Lock() defer queue.lock.UnLock() pNode := new (node) pNode.val = val pNode.next = nil queue.tail.next = pNode pNode = queue.tail return nil } func (queue *Queue) Dequeue () error { queue.head = queue.head.next return nil }
但是这样操作,锁的颗粒度太大,导致了性能的降低。能不能降低锁的颗粒度,甚至是无锁实现并发呢?
事实上是可以的,可以通过CAS(Compare and Swap)实现无锁队列。后面主要会记录一下今天学到的关于CAS的相关内容。
2. Go语言unsafe.Pointer的用法 首先,golang的unsafe包的官方解释是这样的:
围绕 Go 程序内存安全及类型的操作
很可能会是不可移植的
不受 Go 1 兼容性指南的保护
说白了就是不是那么安全,不推荐使用,但是在特殊情况下使用它有时候会有特殊效果。Go 的指针是不支持指针运算和转换,但是使用unsafe.Pointer
可以打破这个限制。unsafe.Pointer
是实现定位和读写的内存的基础,Go runtime大量使用它。
1 2 3 4 (1)任何类型的指针都可以被转化为unsafe.Pointer (2)unsafe.Pointer可以被转化为任何类型的指针 (3)uintptr可以被转化为unsafe.Pointer (4)unsafe.Pointer可以被转化为uintptr
简单来讲,unsafe.Pointer
可以类比为C语言中的void*
类型,它可以包含任意类型变量的地址。
一个普通的T类型(基础类型,结构体类型)指针可以被转化为unsafe.Pointer
类型指针,并且一个unsafe.Pointer
类型指针也可以被转回普通的指针,被转回普通的指针类型并不需要和原始的T类型相同。
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package mainimport ( "unsafe" "fmt" ) func main () { var a int = 1 p := &a fp := (*float32 )(unsafe.Pointer(p)) fmt.Println(fp) return }
总的来说,unsafe.Pointer
可以让你的变量在不同的指针类型转来转去,也就是表示为任意可寻址的指针类型。
有了这个点,就可以继续下面的无锁队列的设计了。
3. CAS实现无锁队列 首先我们需要知道CAS是什么。
CAS操作,即Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。
可以用C语言简单描述:
1 2 3 4 5 6 7 8 int compare_and_swap (int * reg, int oldval, int newval) { int old_reg_val = *reg; if (old_reg_val == oldval) { *reg = newval; } return old_reg_val; }
可以看到,old_reg_val
总是返回,于是,我们可以在 compare_and_swap
操作之后对其进行测试,以查看它是否与 oldval
相匹配,因为它可能有所不同,不同的原因是另一个并发线程已成功竞争并成功将 reg
值从 oldval
更改为别的值了。
改成更方便操作和目前更多实现的形式如下:
1 2 3 4 5 6 7 8 bool compare_and_swap (int *addr, int oldval, int newval) { if ( *addr != oldval ) { return false ; } *addr = newval; return true ; }
即通过检查返回值的true或者false来确定是否成功修改。
随后我们可以考虑实现一个基础的无锁入队操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (queue *Queue) Enqueue (val interface {}) error { pNode := new (node) pNode.val = val pNode.next = nil p := new (node) for { p = queue.tail if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&p.next)), unsafe.Pointer(nil ), unsafe.Pointer(pNode)) { break } } atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(p), unsafe.Pointer(pNode)) return nil }
这里有几个点需要注意:
执行流程。可以发现,整个程序的for循环中的CAS操作是在尝试修改p.next
的值,如果值为nil(即未被其他线程修改过),则将其设置成pNode
,否则设置失败,返回false
,继续循环,再次尝试修改。
语言细节。这里atomic.CompareAndSwapPointer
函数中第一个参数要传入*unsafe.Pointer
类型,如果直接写成&(unsafe.Pointer(&p.next))
会提示错误,提示这个unsafe.Pointer
类型不能读取地址。这里需要巧妙运用一下前面说的unsafe.Pointer
类型的任意类型转换,我们把unsafe.Pointer
类型强制转换为*unsafe.Pointer
类型即可。
程序存在死锁风险。首先考虑for循环,当循环跳出的时候,说明我们的p.next
已经被成功修改为pNode
,但是tail
指针还没有修改,那么此时其他线程的这个CAS操作会一直失败,永远不能出循环。这个时候最后一步queue.tail
将被置为pNode
,如果这一步还没有操作,这个线程就突然挂掉了,那么其他的线程将陷入死循环,整个程序出现死锁,golang的runtime会直接panic,程序运行失败。
我们考虑让每个线程都先直接将尾指针丢到队尾。因为所有的线程都共享着queue.tail
,所以,一旦有人动了它后,相当于其它的线程也跟着动了。
改进算法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (queue *Queue) Enqueue (val interface {}) error { pNode := new (node) pNode.val = val pNode.next = nil tail := new (node) next := new (node) for { tail = queue.tail next = tail.next if tail != queue.tail { continue } if next != nil { atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(next)) continue } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)), unsafe.Pointer(next), unsafe.Pointer(pNode)) { break } } atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(pNode)) return nil }
Dequeue算法同理,对head
指针的移动使用CAS算法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (queue *Queue) Dequeue () error { p := new (node) for { p = queue.head if p.next == nil { return fmt.Errorf("Dequeue an empty queue" ) } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.head)), unsafe.Pointer(p), unsafe.Pointer(p.next)) { break } } return nil }
不过这里同样存在一个问题,如果head
和tail
指针都指向同一个节点,这个时候入队将返回错误,但是在判断p.next == nil
这个条件的时候,另一个Enqueue()
函数执行到了一半,此时的p.next
不是nil了,但是tail
还没有更新,导致Enqueue()
未完成时就被Dequeue,此时队列为空,但是head
和tail
没有指向同一个节点。
改进算法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (queue *Queue) Dequeue () error { head := new (node) tail := new (node) next := new (node) for { head = queue.head tail = queue.tail next = head.next if head != queue.head { continue } if head == tail && next == nil { return fmt.Errorf("dequeue an empty queue" ) } if head == tail && next == nil { atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.tail)), unsafe.Pointer(tail), unsafe.Pointer(next)) continue } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&queue.head)), unsafe.Pointer(head), unsafe.Pointer(next)) { break } } return nil }
CAS算法还存在ABA问题,这里暂时不考虑了,后面学习过程中再总结。