/** Enqueue an element
@param[in] data Element to insert, it will be copied
@return true on success *//* 元素入列. */boolenqueue(Tconst&data)MY_ATTRIBUTE((warn_unused_result)){/* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead
we use the capacity to convert the sequence to an array
index. This is why the ring buffer must be a size which
is a power of 2. This also allows the sequence to double
as a ticket/lock. *//* 获取当前的 m_enqueue_pos. */size_tpos=m_enqueue_pos.load(std::memory_order_relaxed);Cell*cell;for(;;){/* 以 m_capacity 取模求对应位置的元素. */cell=&m_ring[pos&m_capacity];size_tseq;/* 获取元素 cell 的 m_pos. */seq=cell->m_pos.load(std::memory_order_acquire);/* 计算 cell->m_pos 和 m_enqueue_pos 的差值. */intptr_tdiff=(intptr_t)seq-(intptr_t)pos;/* If they are the same then it means this cell is empty */if(diff==0){/* Claim our spot by moving head. If head isn't the same as we last
checked then that means someone beat us to the punch. Weak compare is
faster, but can return spurious results which in this instance is OK,
because it's in the loop *//* cell->m_pos 和 m_enqueue_pos 相等代表 cell 为空闲状态, m_enqueue_pos 自增 1,
* 假如自增失败即代表当前位置的 cell 已被占用, 需要重新获取 m_enqueue_pos. */if(m_enqueue_pos.compare_exchange_weak(pos,pos+1,std::memory_order_relaxed)){break;}}elseif(diff<0){/* The queue is full *//* cell->m_pos 和 m_enqueue_pos 在入列成功的状态下都是自增 1.
* diff < 0 即代表 m_enqueue_pos 已经回环,但当前的 cell 仍未出列. */return(false);}else{/* 重新获取 m_enqueue_pos. */pos=m_enqueue_pos.load(std::memory_order_relaxed);}}cell->m_data=data;/* Increment the sequence so that the tail knows it's accessible *//* cell->m_pos 自增 1. */cell->m_pos.store(pos+1,std::memory_order_release);return(true);}
dequeue 实现
/** Dequeue an element
@param[out] data Element read from the queue
@return true on success *//* 元素出列. */booldequeue(T&data)MY_ATTRIBUTE((warn_unused_result)){Cell*cell;/* 获取当前的 m_dequeue_pos. */size_tpos=m_dequeue_pos.load(std::memory_order_relaxed);for(;;){/* 以 m_capacity 取模求对应位置的元素. */cell=&m_ring[pos&m_capacity];/* 获取元素 cell 的 m_pos. */size_tseq=cell->m_pos.load(std::memory_order_acquire);/* 计算 cell->m_pos 和 m_dequeue_pos + 1 的差值. */autodiff=(intptr_t)seq-(intptr_t)(pos+1);if(diff==0){/* Claim our spot by moving the head. If head isn't the same as we last
checked then that means someone beat us to the punch. Weak compare is
faster, but can return spurious results. Which in this instance is
OK, because it's in the loop. *//* cell->m_pos 和 m_dequeue_pos + 1 相等代表 cell 已经是成功入列的元素,
* 因为在每次成功入列后, cell->m_pos 会自增 1.
* 尝试 m_dequeue_pos 自增 1,
* 假如自增失败即代表当前位置的 cell 已被出列, 需要重新获取 m_dequeue_pos. */if(m_dequeue_pos.compare_exchange_weak(pos,pos+1,std::memory_order_relaxed)){break;}}elseif(diff<0){/* The queue is empty *//* cell 入列成功会将 m_pos 自增 1, 所以假如 m_pos 小于 m_dequeue_pos + 1,
* 即代表 cell 元素暂未入列. */return(false);}else{/* Under normal circumstances this branch should never be taken. *//* 重新获取 m_dequeue_pos. */pos=m_dequeue_pos.load(std::memory_order_relaxed);}}/* 获取对应元素的数据成员. */data=cell->m_data;/* Set the sequence to what the head sequence should be next
time around *//* 更新 cell->m_pos 为 m_dequeue_pos + m_capacity + 1. */cell->m_pos.store(pos+m_capacity+1,std::memory_order_release);return(true);}
总结
以上基本上就是 InnoDB 实现的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上, 重点是利用 CAS 这种原子操作来将之前需要 mutex 保护的并发操作开销减小.