起源

看Go Channel源码的时候,都有提到 Ring Buffer,同时力扣上也刷到了类似的题目 622. 设计循环队列,这里记录下 Ring Buffer 的调查结果, Golang Channel 也用到了循环队列。

实现

下面分为普通不加锁的循环队列和无锁的循环队列,代码实现和TestCase都在仓库 ring_buffer

不加锁的循环队列

循环队列类似 Producer-Consumer 模式,Tail指针的移动产生数据,Head指针的移动消费数据。 循环队列可以使用取余来获取新的索引,但CPU做位运算性能更高,所以改成Capacity为2的幂次,这样通过和Mask做与操作实现Turn Arround。

package no_lock

// no_lock指不用锁

type RingBuffer struct {
	Queue      []interface{}
	Head, Tail uint64
	Cap, Mask  uint64
}

func findPowerOfTwo(givenMum uint64) uint64 {
	givenMum--
	givenMum |= givenMum >> 1
	givenMum |= givenMum >> 2
	givenMum |= givenMum >> 4
	givenMum |= givenMum >> 8
	givenMum |= givenMum >> 16
	givenMum |= givenMum >> 32
	givenMum++
	return givenMum
}

func Constructor(k int) RingBuffer {
	capacity := findPowerOfTwo(uint64(k))
	return RingBuffer{
		Queue: make([]interface{}, capacity),
		Head:  uint64(0),
		Tail:  uint64(0),
		Cap:   capacity,
		Mask:  capacity - 1,
	}
}

func (c *RingBuffer) EnQueue(value interface{}) bool {
	if c.IsFull() {
		return false
	}
	newTail := (c.Tail + 1) & c.Mask
	c.Tail = newTail
	c.Queue[newTail] = value
	return true
}

func (c *RingBuffer) DeQueue() (value interface{}, success bool) {
	if c.IsEmpty() {
		return nil, false
	}
	newHead := (c.Head + 1) & c.Mask
	c.Head = newHead
	return c.Queue[newHead], true
}

func (c *RingBuffer) IsEmpty() bool {
	return c.Head == c.Tail
}

func (c *RingBuffer) IsFull() bool {
	return c.Tail-c.Head == c.Cap-1 || c.Head-c.Tail == 1
}

无锁的循环队列

lock-free,是一种底层通过CPU提供的硬件同步原语CAS指令(compare and swap / compare and set)来实现的乐观并发控制算法,因为原子指令本身就是带锁的操作,只是锁的粒度比较小,所以虽然没有显式的调用锁,但是实现了锁的功能。

Golang里可以通过atomic包来进行CAS原子操作,循环队列里设计到并发读写的就是head、tail和queue这三个参数的操作,这里全部通过atomic包进行读写。这里涉及的unsafe包的使用,可以参考 Golang Unsafe Pointer

package lock_free_1

import (
	"sync/atomic"
	"unsafe"
)

type RingBuffer struct {
	queue      []interface{}
	head, tail uint64
	cap, mask  uint64
}

func findPowerOfTwo(givenMum uint64) uint64 {
	givenMum--
	givenMum |= givenMum >> 1
	givenMum |= givenMum >> 2
	givenMum |= givenMum >> 4
	givenMum |= givenMum >> 8
	givenMum |= givenMum >> 16
	givenMum |= givenMum >> 32
	givenMum++
	return givenMum
}

func Constructor(k int) RingBuffer {
	capacity := findPowerOfTwo(uint64(k))
	return RingBuffer{
		queue: make([]interface{}, capacity),
		head:  uint64(0),
		tail:  uint64(0),
		cap:   capacity,
		mask:  capacity - 1,
	}
}

func (c *RingBuffer) EnQueue(value interface{}) bool {
	// EnQueue only 非nil的值
	if value == nil {
		return false
	}

	oldHead := atomic.LoadUint64(&c.head)
	oldTail := atomic.LoadUint64(&c.tail)
	if IsFull(oldHead, oldTail, c.cap) {
		return false
	}

	newTail := (oldTail + 1) & c.mask
	// 判断newTailData是否为nil
	if newTailData := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.queue[newTail]))); newTailData != nil {
		return false
	}

	if !atomic.CompareAndSwapUint64(&c.tail, oldTail, newTail) {
		return false
	}

	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&c.queue[newTail])), unsafe.Pointer(&value))
	return true
}

func (c *RingBuffer) DeQueue() (value interface{}, success bool) {
	oldHead := atomic.LoadUint64(&c.head)
	oldTail := atomic.LoadUint64(&c.tail)
	if IsEmpty(oldHead, oldTail) {
		return nil, false
	}

	newHead := (oldHead + 1) & c.mask
	headData := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.queue[newHead])))
	if headData == nil {
		return nil, false
	}

	if !atomic.CompareAndSwapUint64(&c.head, oldHead, newHead) {
		return nil, false
	}

	// 原数据置为nil
	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&c.queue[newHead])), nil)
	return *(*interface{})(headData), true
}

func IsEmpty(head, tail uint64) bool {
	return head == tail
}

func IsFull(head, tail, cap uint64) bool {
	return tail-head == cap-1 || head-tail == 1
}

CAS的 ABA问题,以下面的代码举例,Producer0在CAS判断之前已经拿到了Tail值,这时候被让出了调度权,Producer非0 开始不断的 EnQueue,直到环形队列Turn Arround,碰巧Producer0重新开始调度时Tail值又回到了之前的值,Producer0进行CAS操作成功,以为什么都没发生,然后set了值,这样Producer非0设置的值就被覆盖了。

维基百科的例子如下:

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

这里解决办法是修改 Tail和Head的赋值逻辑,改成自增,而不是和mask进行与操作,因为字段类型是 uint64,所以前面的ABA问题,Producer0 让出调度权后,队列要 EnQueue 1«64 - 1 次,可能性极低。具体代码参考 仓库

Reference