RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何實現(xiàn)一個多讀多寫的線程安全的無鎖隊列

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-08 15:25 ? 次閱讀

在ZMQ無鎖隊列的原理與實現(xiàn)一文中,我們已經(jīng)知道了ypipe可以實現(xiàn)一線程寫一線程讀的無鎖隊列,那么其劣勢就很明顯了,無法適應(yīng)多寫多讀的場景,因為其在讀的時候沒有對r指針加鎖,在寫的時候沒有對w指針加鎖。那么如何實現(xiàn)一個多讀多寫的線程安全的無鎖隊列呢?

  1. 互斥鎖:mutexqueue(太簡單不介紹了)
  2. 互斥鎖+條件變量:blockqueue(太簡單不介紹了)
  3. 內(nèi)存屏障:lockfreequeue(SimpleLockFreeQueue.h 暫時未寫文章介紹)
  4. 雙重CAS原子操作:arraylockfreequeue(本文)

圖片

2. ArrayLockFreeQueue的類接?和變量

該程序使用 gcc 內(nèi)置的__sync_bool_compare_and_swap,但重新做了宏定義封裝。

#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

所謂循環(huán)數(shù)組,就是RingBuffer

圖片

#define QUEUE_INT unsigned long
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16

template
class ArrayLockFreeQueue {
public:

ArrayLockFreeQueue();

virtual ~ArrayLockFreeQueue();

QUEUE_INT size();

bool enqueue(const ELEM_T &a_data);// ?隊列

bool dequeue(ELEM_T &a_data);// 出隊列

private:

ELEM_T m_thequeue[Q_SIZE];

volatile QUEUE_INT m_count;// 隊列內(nèi)有多少元素
volatile QUEUE_INT m_writeIndex;//新元素?列時存放位置在數(shù)組中的下標

volatile QUEUE_INT m_readIndex;//下?個出列元素在數(shù)組中的下標

volatile QUEUE_INT m_maximumReadIndex;最后?個已經(jīng)完成?列操作的元素在數(shù)組中的下標

//取余
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

2.1 變量介紹

  • m_count:隊列的元素個數(shù)
  • m_writeIndex:新元素?列時存放位置在數(shù)組中的下標
  • m_readIndex:下?個出列元素在數(shù)組中的下標
  • m_maximumReadIndex:最后?個已經(jīng)完成?列操作的元素在數(shù)組中的下標。如果它的值跟m_writeIndex不?致,表明有寫請求尚未完成。這意味著,有寫請求成功申請了空間但數(shù)據(jù)還沒完全寫進隊列。所以如果有線程要讀取,必須要等到寫線程將數(shù)據(jù)完全寫?到隊列之后。

必須指明的是使?3種不同的下標都是必須的,因為隊列允許任意數(shù)量的?產(chǎn)者和消費者圍繞著它?作。已經(jīng)存在?種基于循環(huán)數(shù)組的?鎖隊列,使得唯?的?產(chǎn)者和唯?的消費者可以良好的?作。它的實現(xiàn)相當簡潔?常值得閱讀。

2.2 函數(shù)介紹

2.2.1 取余函數(shù)QUEUE_INT countToIndex(QUEUE_INT a_count);

這個函數(shù)非常有用,因為我們實現(xiàn)的是循環(huán)隊列,所以一定要對數(shù)組長度取余。

template
inline QUEUE_INT ArrayLockFreeQueue::countToIndex(QUEUE_INT a_count) {
return (a_count % Q_SIZE); // 取余的時候
},>

隊列已滿判斷如下

// (m_writeIndex + 1) %/Q_SIZE == m_readIndex
countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)

隊列為空判斷如下

//m_readIndex == m_maximumReadIndex
countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)

相關(guān)視頻推薦

C++無鎖隊列的設(shè)計與實現(xiàn)

高并發(fā)場景下,三種鎖方案:互斥鎖,自旋鎖,原子操作的優(yōu)缺點

需要C/C++ Linux服務(wù)器架構(gòu)師學(xué)習(xí)資料加qun812855908獲?。?a href="http://m.hljzzgx.com/soft/special/" target="_blank">資料包括C/C++,Linux,golang技術(shù),Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK,ffmpeg等),免費分享

圖片

2.2.2 文字舉例

2.2.2.1 入隊函數(shù)bool enqueue(const ELEM_T &a_data);

下面以文字舉例說明函數(shù),后續(xù)再圖示舉例。

假設(shè)現(xiàn)在有兩個線程都進入了enqueue這個函數(shù),m_writeIndex,m_readIndex和m_maximumReadIndex都為0。

  • 線程1在第一個while和CAS處:
currentWriteIndex = 0
currentReadIndex = 0
CAS(0,0,1)----> m_writeIndex = 1
m_maximumReadIndex = 0
  • 線程2在第一個while和CAS處:
currentWriteIndex = 0
currentReadIndex = 0
CAS(1,0,1)----> m_writeIndex = m_writeIndex false

while再次循環(huán)

currentWriteIndex = 1
currentReadIndex = 0
CAS(1,1,2)----> m_writeIndex = 2
m_maximumReadIndex = 0
  • 線程2在第二個while和CAS處:
CAS(0,1,2) -----> m_maximumReadIndex = m_maximumReadIndex false
yidld讓出CPU
此時線程1執(zhí)行↓
  • 線程1在第二個while和CAS處:
CAS(0,0,1) ------>m_maximumReadIndex = 1
執(zhí)行結(jié)束,此時線程2恢復(fù)執(zhí)行
  • 線程2在第二個while和CAS處:
CAS(1,1,2) ------>m_maximumReadIndex = 2
執(zhí)行結(jié)束
template
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data) {
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do {
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex)) {
return false; // 隊列已經(jīng)滿了
}
// 目的是為了獲取一個能寫入的位置
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// 獲取寫入位置后 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對應(yīng)的位置

// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當線程超過cpu核數(shù)的時候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}
//printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%dn",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
AtomicAdd(&m_count, 1);

return true;
},>

2.2.2.2 出隊函數(shù)bool dequeue(ELEM_T &a_data);

下面以文字舉例說明函數(shù),后續(xù)再圖示舉例。

假設(shè)現(xiàn)在有兩個線程都進入了dequeue這個函數(shù),currentReadIndex為0,currentMaximumReadIndex為2。

  • 線程1執(zhí)行
currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(0,0,1) ----> m_readIndex = 1
  • 線程2執(zhí)行
currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(1,0,1) ----> m_readIndex = m_readIndex false

while再循環(huán)

currentReadIndex = 1
currentMaximumReadIndex = 2
data = m_thequeue[1]
CAS(1,1,2) ----> m_readIndex = 2

如果沒有新數(shù)據(jù)寫入,再次讀取數(shù)據(jù),則currentReadIndex(2)==currentMaximumReadIndex(2)相等,return false,沒有數(shù)據(jù)可讀。

template
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data) {
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;

do {
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;

if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
//printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%dn",m_readIndex,currentReadIndex,m_maximumReadIndex);
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù),元素-1
return true;
}
} while (true);

assert(0);
// Add this return statement to avoid compiler warnings
return false;
},>

2.2.3 圖示舉例

2.2.3.1 入隊函數(shù)bool enqueue(const ELEM_T &a_data);

下面的圖我就不分目錄了,直接一口氣說明完。

以下插圖展示了對隊列執(zhí)?操作時各下標是如何變化的。如果?個位置被標記為X,標識這個位置?存放了數(shù)據(jù)???表示位置是空的。對于下圖的情況,隊列中存放了兩個元素。WriteIndex指示的位置是新元素將會被插?的位置。ReadIndex指向的位置中的元素將會在下?次pop操作中被彈出。

圖片

當?產(chǎn)者準備將數(shù)據(jù)插?到隊列中,它?先通過增加WriteIndex的值來申請空間。MaximumReadIndex指向最后?個存放有效數(shù)據(jù)的位置(也就是實際的隊列尾)。

圖片

?旦空間的申請完成,?產(chǎn)者就可以將數(shù)據(jù)拷?到剛剛申請到的位置中。完成之后增加MaximumReadIndex使得它與WriteIndex的?致。

圖片

現(xiàn)在隊列中有3個元素,接著?有?個?產(chǎn)者嘗試向隊列中插?元素。

圖片

在第?個?產(chǎn)者完成數(shù)據(jù)拷?之前,?有另外?個?產(chǎn)者申請了?個新的空間準備拷?數(shù)據(jù)?,F(xiàn)在有兩個?產(chǎn)者同時向隊列插?數(shù)據(jù)。

圖片

現(xiàn)在?產(chǎn)者開始拷?數(shù)據(jù),在完成拷?之后,對MaximumReadIndex的遞增操作必須嚴格遵循?個順序:第?個?產(chǎn)者線程?先遞增MaximumReadIndex,接著才輪到第?個?產(chǎn)者。這個順序必須被嚴格遵守的原因是,我們必須保證數(shù)據(jù)被完全拷?到隊列之后才允許消費者線程將其出列。讓出cpu的?的也是為了讓排在最前?的?產(chǎn)者完成m_maximumReadIndex的更新

while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
sched_yield(); // 當線程超過cpu核數(shù)的時候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}

圖片

第?個?產(chǎn)者完成了數(shù)據(jù)拷?,并對MaximumReadIndex完成了遞增,現(xiàn)在第?個?產(chǎn)者可以遞增MaximumReadIndex了。

圖片

第?個?產(chǎn)者完成了對MaximumReadIndex的遞增,現(xiàn)在隊列中有5個元素。

template
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data) {
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do {
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex)) {
return false; // 隊列已經(jīng)滿了
}
// 目的是為了獲取一個能寫入的位置
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// 獲取寫入位置后 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對應(yīng)的位置

// 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當線程超過cpu核數(shù)的時候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}
// printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%dn",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
AtomicAdd(&m_count, 1);

return true;
},>

2.2.3.2 出隊函數(shù)bool dequeue(ELEM_T &a_data);

以下插圖展示了元素出列的時候各種下標是如何變化的,隊列中初始有2個元素。WriteIndex指示的位置是新元素將會被插?的位置。ReadIndex指向的位置中的元素將會在下?次pop操作中被彈出。

圖片

消費者線程拷?數(shù)組ReadIndex位置的元素,然后嘗試?CAS操作將ReadIndex加1。如果操作成功消費者成功的將數(shù)據(jù)出列。因為CAS操作是原?的,所以只有唯?的線程可以在同?時刻更新ReadIndex的值。如果操作失敗,讀取新的ReadIndex值,以重復(fù)以上操作(copy數(shù)據(jù),CAS)。

圖片

現(xiàn)在?有?個消費者將元素出列,隊列變成空。

圖片

現(xiàn)在有?個?產(chǎn)者正在向隊列中添加元素。它已經(jīng)成功的申請了空間,但尚未完成數(shù)據(jù)拷?。任何其它企圖從隊列中移除元素的消費者都會發(fā)現(xiàn)隊列?空(因為writeIndex不等于readIndex)。但它不能讀取readIndex所指向位置中的數(shù)據(jù),因為readIndex與MaximumReadIndex相等(相等break)。直到?產(chǎn)者完成數(shù)據(jù)拷?增加MaximumReadIndex的值才能讀取這個數(shù)據(jù)。

圖片

當?產(chǎn)者完成數(shù)據(jù)拷?,隊列的??是1,消費者線程可以讀取這個數(shù)據(jù)了。

template
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data) {
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;

do {
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;

if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
//printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%dn",m_readIndex,currentReadIndex,m_maximumReadIndex);
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù),元素-1
return true;
}
} while (true);

assert(0);
// Add this return statement to avoid compiler warnings
return false;
},>

2.2.4 計算隊列的大小size函數(shù)的ABA問題與解決方案

template
QUEUE_INT ArrayLockFreeQueue::size() {
QUEUE_INT currentWriteIndex = m_writeIndex;
QUEUE_INT currentReadIndex = m_readIndex;

if (currentWriteIndex >= currentReadIndex)
return currentWriteIndex - currentReadIndex;
else
return Q_SIZE + currentWriteIndex - currentReadIndex;
},>

下面的場景描述了 size 為何會返回一個不正確的值:

1. 當 currentWriteIndex = m_writeIndex 執(zhí)行之后,m_writeIndex=3,m_readIndex = 2 那么實際 size 是 1;
2. 之后操作線程被搶占,且在它停止運行的這段時間內(nèi),有 2 個元素被插入和從隊列中移除。所以 m_writeIndex=5,m_readIndex = 4,而 size 還是 1;
3. 現(xiàn)在被搶占的線程恢復(fù)執(zhí)行,讀取 m_readIndex 值,這個時候 currentReadIndex=4,currentWriteIndex=3;
4. currentReadIndex > currentWriteIndex'所以 m_totalSize + currentWriteIndex - currentReadIndex`被返回,這個值意味著隊列幾乎是滿的,而實際上隊列幾乎是空的。

解決方案:添加一個用于保存隊列中元素數(shù)量的成員 count.這個成員可以通過 AtomicAdd/AtomicSub 來實現(xiàn)原子的遞增和遞減。

但需要注意的是這增加了一定開銷,因為原子遞增,遞減操作比較昂貴也很難被編譯器優(yōu)化。如果可以容忍size函數(shù)的ABA問題,則可以不用count與AtomicAdd/AtomicSub。使用者可以根據(jù)自己的使用場合選擇是否承受額外的運行時開銷。

2.2.5 與智能指針一起使用,內(nèi)存無法得到釋放

如果你打算用這個隊列來存放智能指針對象.需要注意,將一個智能指針存入隊列之后,如果它所占用的位置沒有被另一個智能指針覆蓋,那么它所指向的內(nèi)存是無法被釋放的(因為它的引用計數(shù)器無法下降為 0).這對于一個操作頻繁的隊列來說沒有什么問題,但是程序員需要注意的是,一旦隊列被填滿過一次那么應(yīng)用程序所占用的內(nèi)存就不會下降,即使隊列被清空.除非自己做改動,每次 pop 手動 delete。

3. 多個?產(chǎn)者線程的情況下yielding處理器的必要性

讀者可能注意到了enqueue函數(shù)中使?了sched_yield()來主動的讓出處理器,對于?個聲稱?鎖的算法??,這個調(diào)?看起來有點奇怪。正如?章開始的部分解釋過的,多線程環(huán)境下影響性能的其中?個因素就是Cache損壞。?產(chǎn)?Cache損壞的?種情況就是?個線程被搶占,操作系統(tǒng)需要保存被搶占線程的上下?,然后將被選中作為下?個調(diào)度線程的上下?載?。此時Cache中緩存的數(shù)據(jù)都會失效,因為它是被搶占線程的數(shù)據(jù)?不是新線程的數(shù)據(jù)。

所以,當此算法調(diào)?sched_yield()意味著告訴操作系統(tǒng):“我要把處理器時間讓給其它線程,因為我要等待某件事情的發(fā)?”。?鎖算法和通過阻塞機制同步的算法的?個主要區(qū)別在于?鎖算法不會阻塞在線程同步上,那么為什么在這?我們要主動請求操作系統(tǒng)搶占??呢?這個問題的答案沒那么簡單。它與有多少個?產(chǎn)者線程在并發(fā)的往隊列中存放數(shù)據(jù)有關(guān):每個?產(chǎn)者線程所執(zhí)?的CAS操作都必須嚴格遵循FIFO次序,?個?于申請空間,另?個?于通知消費者數(shù)據(jù)已經(jīng)寫?完成可以被讀取了。

如果我們的應(yīng)?程序只有唯?的?產(chǎn)者操作這個隊列,sche_yield()將永遠沒有機會被調(diào)?,第2個CAS操作永遠不會失敗。因為在?個?產(chǎn)者的情況下沒有?能破壞?產(chǎn)者執(zhí)?這兩個CAS操作的FIFO順序。

?當多于?個?產(chǎn)者線程往隊列中存放數(shù)據(jù)的時候,問題就出現(xiàn)了。概括來說,?個?產(chǎn)者通過第1個CAS操作申請空間,然后將數(shù)據(jù)寫?到申請到的空間中,然后執(zhí)?第2個CAS操作通知消費者數(shù)據(jù)準備完畢可供讀取了。這第2個CAS操作必須遵循FIFO順序,也就是說,如果A線程第?先執(zhí)?完第?個CAS操作,那么它也要第1個執(zhí)?完第2個CAS操作,如果A線程在執(zhí)?完第?個CAS操作之后停?,然后B線程執(zhí)?完第1個CAS操作,那么B線程將?法完成第2個CAS操作,因為它要等待A先完成第2個CAS操作。?這就是問題產(chǎn)?的根源。讓我們考慮如下場景,3個消費者線程和1個消費者線程:

  • 線程1,2,3按順序調(diào)?第1個CAS操作申請了空間。那么它們完成第2個CAS操作的順序也應(yīng)該與這個順序?致,1,2,3。
  • 線程2?先嘗試執(zhí)?第2個CAS,但它會失敗,因為線程1還沒完成它的第2此CAS操作呢。同樣對于線程3也是?樣的。
  • 線程2和3將會不斷的調(diào)?它們的第2個CAS操作,直到線程1完成它的第2個CAS操作為?。
  • 線程1最終完成了它的第2個CAS,現(xiàn)在線程3必須等線程2先完成它的第2個CAS。
  • 線程2也完成了,最終線程3也完成。

在上?的場景中,?產(chǎn)者可能會在第2個CAS操作上?旋?段時間,?于等待先于它執(zhí)?第1個CAS操作的線程完成它的第2次CAS操作。在?個物理處理器數(shù)量?于操作隊列線程數(shù)量的系統(tǒng)上,這不會有太嚴重的問題:因為每個線程都可以分配在??的處理器上執(zhí)?,它們最終都會很快完成各?的第2次CAS操作。雖然算法導(dǎo)致線程處理忙等狀態(tài),但這正是我們所期望的,因為這使得操作更快的完成。也就是說在這種情況下我們是不需要sche_yield()的,它完全可以從代碼中刪除。

但是,在?個物理處理器數(shù)量少于線程數(shù)量的系統(tǒng)上,sche_yield()就變得?關(guān)重要了。讓我們再次考查上?3個線程的場景,當線程3準備向隊列中插?數(shù)據(jù):如果線程1在執(zhí)?完第1個CAS操作,在執(zhí)?第2個CAS操作之前被搶占,那么線程2,3就會?直在它們的第2個CAS操作上忙等(它們忙等,不讓出處理器,線程1也就沒機會執(zhí)?,它們就只能繼續(xù)忙等,也就是說,如果不適用 sched_yield,一直自旋,那么可能多個線程同時阻塞在第二個 CAS 那兒),直到線程1重新被喚醒,完成它的第2個CAS操作。這就是需要sche_yield()的場合了,操作系統(tǒng)應(yīng)該避免讓線程2,3處于忙等狀態(tài)。它們應(yīng)該盡快的讓出處理器讓線程1執(zhí)?,使得線程1可以把它的第2個CAS操作完成。這樣線程2和3才能繼續(xù)完成它們的操作。

4. 循環(huán)數(shù)組無鎖隊列的性能測試

4.1 性能測試

互斥鎖隊列 vs 互斥鎖+條件變量隊列 vs 內(nèi)存屏障鏈表 vs RingBuffer CAS 實現(xiàn)。

4寫1讀:性能中等

圖片

4寫4讀:性能中上

圖片

1寫4讀:性能最好

圖片

7寫7讀:性能比互斥鎖隊列還差

圖片

4.2 分析

雖然沒有分析第三個內(nèi)存屏障鏈表的代碼,但是我們不難看出互斥鎖+條件變量 與 內(nèi)存屏障鏈表 的性能差別不大。為什么呢?鏈表的方式需要不斷的申請和釋放元素。當然,用內(nèi)存池可以適當改善這個影響,但是內(nèi)存池在分配內(nèi)存與釋放內(nèi)存的時候也會涉及到線程間的數(shù)據(jù)競爭,所以用鏈表的方式性能相對提升不多。

隨著生產(chǎn)者數(shù)量的增加,無鎖隊列的效率迅速下降。因為在多個生產(chǎn)者的情況下,第 2 個 CAS 將對性能產(chǎn)生影響。我們通過測試得出循環(huán)數(shù)組的?鎖隊列在1寫4讀的場景下性能提升是最高的,因為只有一個生產(chǎn)者,那么第二個CAS不會有yield的情況出現(xiàn)。由此我們可以得出一個結(jié)論,在一寫多讀的場景,我們可以優(yōu)先使用循環(huán)數(shù)組的?鎖隊列,比如下圖的場景。

圖片

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 接口
    +關(guān)注

    關(guān)注

    33

    文章

    8575

    瀏覽量

    151014
  • 封裝
    +關(guān)注

    關(guān)注

    126

    文章

    7873

    瀏覽量

    142893
  • 內(nèi)存
    +關(guān)注

    關(guān)注

    8

    文章

    3019

    瀏覽量

    74003
  • 線程安全
    +關(guān)注

    關(guān)注

    0

    文章

    13

    瀏覽量

    2457
收藏 人收藏

    評論

    相關(guān)推薦

    Linux下線程間通訊---讀寫和條件變量

    讀寫,它把對共享資源的訪問者劃分成讀者和者,讀者只對共享資源進行訪問,者則需要對共享資源進行操作。件變量是
    的頭像 發(fā)表于 08-26 20:44 ?1468次閱讀
    Linux下<b class='flag-5'>線程</b>間通訊---讀寫<b class='flag-5'>鎖</b>和條件變量

    探索字節(jié)隊列的魔法:類型支持、函數(shù)重載與線程安全

    探索字節(jié)隊列的魔法:類型支持、函數(shù)重載與線程安全代碼難度指數(shù):文章學(xué)習(xí)重點:參數(shù)宏的使用技巧、引言在嵌入式系統(tǒng)和實時應(yīng)用中,數(shù)據(jù)的傳輸和
    的頭像 發(fā)表于 11-15 01:08 ?785次閱讀
    探索字節(jié)<b class='flag-5'>隊列</b>的魔法:<b class='flag-5'>多</b>類型支持、函數(shù)重載與<b class='flag-5'>線程</b><b class='flag-5'>安全</b>

    隊列WHILE循環(huán)程序框架

    在需要多個隊列WHILE循環(huán)的程序框架里,將隊列捆綁提高的程序的可讀性
    發(fā)表于 09-27 22:13

    MCU上的原子操作

    變量的操作(必須單核),大家可以仔細想想。隊列是MCU上經(jīng)常用的,對中斷通信接口的緩沖非常方便可靠。以此為基礎(chǔ),可跨平臺
    發(fā)表于 03-06 09:39

    Linux下的線程安全是什么

    數(shù)據(jù)二義性。同步與互斥:同步:通過條件判斷,實現(xiàn)對靈界資源訪問的時序合理性?;コ猓和ㄟ^唯訪問,實現(xiàn)對臨界資源的安全性。、互斥
    發(fā)表于 07-01 13:34

    在MCU開發(fā)中使用多線程操作一寫一讀是否需要保護?

    在MCU(以常見的stm32為例)開發(fā)中使用多線程操作,我們經(jīng)常遇到的問題是關(guān)于多線程訪問數(shù)據(jù)的問題,多線程訪問數(shù)據(jù)基本上可以分為幾大類:
    發(fā)表于 02-01 15:42

    緩存如何實現(xiàn)

    少的業(yè)務(wù)場景:大部分請求是對數(shù)據(jù)進行修改,少部分請求對數(shù)據(jù)進行讀取。
    發(fā)表于 08-18 10:55 ?849次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>緩存如何<b class='flag-5'>實現(xiàn)</b>

    利用CAS技術(shù)實現(xiàn)隊列

    【 導(dǎo)讀 】:本文 主要講解利用CAS技術(shù)實現(xiàn)隊列。 關(guān)于
    的頭像 發(fā)表于 01-11 10:52 ?2281次閱讀
    利用CAS技術(shù)<b class='flag-5'>實現(xiàn)</b><b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊列</b>

    cubeMX+STM32+Freertos 隊列時阻塞

    隊列時阻塞本例內(nèi)容是創(chuàng)建隊列,由多個任務(wù)往隊列
    發(fā)表于 12-09 15:21 ?10次下載
    cubeMX+STM32+Freertos <b class='flag-5'>讀</b><b class='flag-5'>隊列</b>時阻塞

    關(guān)于CAS等原子操作介紹 隊列的鏈表實現(xiàn)方法

    在開始說隊列之前,我們需要知道很重要的技術(shù)就是CAS操作——Compare & Set,或是 Compare & Swap,現(xiàn)在幾乎
    的頭像 發(fā)表于 05-18 09:12 ?3405次閱讀
    關(guān)于CAS等原子操作介紹 <b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊列</b>的鏈表<b class='flag-5'>實現(xiàn)</b>方法

    發(fā)燒友實測 | i.MX8MP 編譯DPDK源碼實現(xiàn)rte_ring環(huán)隊列進程間通信

    作者|donatello1996來源|電子發(fā)燒友題圖|飛凌嵌入式rte_ring是用CAS實現(xiàn)FIFO環(huán)形
    的頭像 發(fā)表于 01-10 16:29 ?2009次閱讀
    發(fā)燒友實測 | i.MX8MP 編譯DPDK源碼<b class='flag-5'>實現(xiàn)</b>rte_ring<b class='flag-5'>無</b><b class='flag-5'>鎖</b>環(huán)<b class='flag-5'>隊列</b>進程間通信

    讀寫實現(xiàn)原理規(guī)則

    讀寫 互斥或自旋要么是加鎖狀態(tài)、要么是不加鎖狀態(tài),而且次只有
    的頭像 發(fā)表于 07-21 11:21 ?905次閱讀
    讀寫<b class='flag-5'>鎖</b>的<b class='flag-5'>實現(xiàn)</b>原理規(guī)則

    隊列的潛在優(yōu)勢

    隊列 先大致介紹隊列
    的頭像 發(fā)表于 11-09 09:23 ?586次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊列</b>的潛在優(yōu)勢

    c++線程的基本類型和用法

    。 互斥(Mutex) 互斥用于控制多個線程對他們之間共享資源互斥訪問的信號量。也就是說是為了避免多個
    的頭像 發(fā)表于 11-09 15:02 ?2490次閱讀
    c++<b class='flag-5'>線程</b>中<b class='flag-5'>鎖</b>的基本類型和用法

    隊列解決的問題

    訪問;CPU訪問cache的速度要比訪問內(nèi)存要快的;由于線程頻繁切換,會造成cache失效,將導(dǎo)致應(yīng)用程序性能下降。 阻塞引起的CPU浪費 mutex是阻塞的,在負載較重的應(yīng)用程
    的頭像 發(fā)表于 11-10 15:33 ?930次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊列</b>解決的問題
    RM新时代网站-首页