前言 上一节实现了一个单生产者单消费者模式功能,现在再如何实现一个多生产者多消费者的模式。
设计思路 多生产者多消费者跟单个消费者生产者比较起来,只是多个几个线程去生产数据,去获取数据,
那我只需要保证多个线程“存入数据”是线程安全的,多个线程“读取数据”是线程安全的就可以了。
下面就需要去写测试代码验证下师傅满足这些要求。
代码分析 首先查看下获取数据函数是否可以保证一次只有一个线程去获取数据
- QString Repository::TakeData()
{
std::unique_lock<std::mutex> lock(m_Queue_Mutex);
//判断是否队列满
while (m_queue.isEmpty())
{
//等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
//其他线程可以获取m_Queue_Mutex
std::cout << "queue is empty" << " ThreadID:" << std::this_thread::get_id() << std::endl;
m_Queue_Not_Empty.wait(lock);
}
QString str = m_queue.dequeue(); //获取数据
std::cout << QString("Take [%1]").arg(str).toStdString()
<< " ThreadID:" << std::this_thread::get_id()<< std::endl;
m_Queue_Not_Full.notify_all();
lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
return str;
}
进入函数时会获取m_Queue_Mutex锁,这可以保证在队列不满的情况下,一次只有一个线程进入此函数,通过 m_queue.dequeue();取得数据。这满足多消费者的要求。
再考虑队列满的时候,当前线程调用 m_Queue_Not_Empty.wait(lock);时会释放m_Queue_Mutex锁,此时其他线程进行此函数也会阻塞到m_Queue_Not_Empty.wait(lock)并释放m_Queue_Mutex锁
这样所有的消费者线程会被阻塞,且m_Queue_Mutex锁处于释放状态,这时其写入数据的线程就可以获得m_Queue_Mutex锁,进行数据写入。我们需要验证数据写入完毕后m_Queue_Not_Empty.notify_all();
被调用时,多个消费线程是否是同时从m_Queue_Not_Empty.wait(lock)解除阻塞同时获取数据(此时说明TakeData函数不上线程安全的),还是m_Queue_Not_Empty.wait(lock)一次解除阻塞依次获取数据(满足多消费者要求)。
考虑下std::condition_variable 的wait()的函数说明,调用时会释放传入的std::mutex锁且进行阻塞状态。当其他线程调用notify_all()时,wait()会再次获取被它释放的std::mutex锁,如获取失败就会阻塞等待获取到。
这样多个线程同时在wait(std::mutex) 同一个mutex锁时,一旦notify_all()调用,也只能有一个线程能在wait(std::mutex)处获取锁的所有权,其他线程还要等待获取锁。
代码验证下上面的考虑
- std::mutex g_outputMutex;
std::condition_variable condition;
int i=0;
//线程1 对i自加100次
void test1()
{
std::unique_lock<std::mutex> lock(g_outputMutex);
condition.wait(lock);
for (int j=0; j<100; j++)
{
++i;
std::cout << "test1:" << i <<std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
//线程2 对i自加100次
void test2()
{
std::unique_lock<std::mutex> lock(g_outputMutex);
condition.wait(lock);
for (int j=0; j<100; j++)
{
++i;
qDebug()<<"test2:" << i;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
//此线程在两秒后 调用notify_all(), 10秒后结束
void makeCondition()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::unique_lock<std::mutex> lock(g_outputMutex);
qDebug()<<"notify all";
condition.notify_all();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
qDebug()<<"notify all wait 10s";
}
int main(int argc, char *argv[])
{
#if 1 //测试 condition_variable 的notify函数
std::thread produce(makeCondition); // 2秒瘦调用notify_all() 查看两个线程是并发执行,还是依次执行
std::thread consume(test1);
std::thread consume2(test2);
produce.join();
consume.join();
consume2.join();
#endif
}
执行后结果
从结果中看到,makeCondition()线程调用lock.unlock();释放锁后,线程test2获取了锁开始执行(和makeCondition()并发执行),线程test1依然被阻塞,
只能线程test2执行完毕释放锁后,线程test1才开始执行。
由此可以说明,多个消费者在TakeData()的m_Queue_Not_Empty.wait(lock);阻塞时,如m_Queue_Not_Empty.notify_all()被调用,只能有一个消费者从wait()处获取锁
继续执行,此时再次判断queue中是否有数据,因数据刚被写入,此消费者就可以获取刚写入的数据,把数据返回后再释放锁。另一个消费者获取刚被释放的锁,再次去
判断queue中是否有数据,因数据刚被上一个消费取走,此消费者会再次因没有数据在调用m_Queue_Not_Empty.wait(lock), 同理剩下的消费者也会重复这一情况。
注意:由此也可以看出来只能用while (m_queue.isEmpty())来判断而不能用if
同理也可以肯定AddData()也是线程安全的,可以实现多生产者功能。
以下是测试代码
- #include <mutex>
#include <condition_variable>
#include <QQueue>
#include <atomic>
class Repository
{
public:
Repository();
/*!
* brief 存入数据
* param str
*/
void AddData(QString str);
/*!
* brief 取数据
* return
*/
QString TakeData();
void Stop(){m_bStop = true;}
bool IsStop(){return m_bStop;}
int Count(){return m_queue.size();}
private:
std::condition_variable m_Queue_Not_Empty; //队列不满信号
std::condition_variable m_Queue_Not_Full; //队列不空信号
std::mutex m_Queue_Mutex; //队列锁
int m_nQueue_Max_Size; //队列最大长度
QQueue<QString> m_queue; //队列,也可以用其他容器、或者数组
std::atomic<bool> m_bStop; //停止存入是True
};
- #include "Repository.h"
#include <QDebug>
#include <iostream>
#include <thread>
Repository::Repository()
{
m_nQueue_Max_Size = 10; //默认最大长度为10,可根据需求修改
m_bStop = false;
}
void Repository::AddData(QString str)
{
std::unique_lock<std::mutex> lock(m_Queue_Mutex);
//判断是否队列满
while (m_queue.count() > m_nQueue_Max_Size)
{
//等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
//其他线程可以获取m_Queue_Mutex
std::cout << "queue is full" << " ThreadID:" << std::this_thread::get_id() << std::endl;
m_Queue_Not_Full.wait(lock);
}
m_queue.enqueue(str);
std::cout << QString("Add [%1]").arg(str).toStdString()
<< " ThreadID:" << std::this_thread::get_id()<< std::endl;
m_Queue_Not_Empty.notify_all();
lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
}
QString Repository::TakeData()
{
std::unique_lock<std::mutex> lock(m_Queue_Mutex);
//判断是否队列满
while (m_queue.isEmpty())
{
//等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
//其他线程可以获取m_Queue_Mutex
std::cout << "queue is empty" << " ThreadID:" << std::this_thread::get_id() << std::endl;
m_Queue_Not_Empty.wait(lock);
}
QString str = m_queue.dequeue(); //获取数据
std::cout << QString("Take [%1]").arg(str).toStdString()
<< " ThreadID:" << std::this_thread::get_id()<< std::endl;
m_Queue_Not_Full.notify_all();
lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
return str;
}
- #include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include "Repository.h"
int g_nProduceCount = 100;
void produceTask(Repository* qR)
{
//模拟写入100行信息
for (int i=0; i<g_nProduceCount; i++)
{
//模拟读取文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(10));
QString str = QString("Produce %1").arg(i);
qR->AddData(str);
}
qR->Stop(); //停止
}
void produceTask2(Repository* qR)
{
//模拟写入100行信息
for (int i=0; i<g_nProduceCount; i++)
{
//模拟读取文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(10));
QString str = QString("Produce %1").arg(i+100);
qR->AddData(str);
}
qR->Stop(); //停止
}
void conusemeTask(Repository* qR)
{
while (1)
{
QString str = qR->TakeData();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
//std::cout << "Take :" << str.toStdString() << " ThreadID:" << std::this_thread::get_id() << std::endl;
if(qR->IsStop() && qR->Count() < 1)
break;
}
}
int main(int argc, char *argv[])
{
Repository re;
std::thread produce(produceTask, &re);
std::thread produce1(produceTask2, &re);
std::thread* threads[5];
for (int i=0; i<5; i++)
{
threads[i] = new std::thread(conusemeTask, &re);
}
produce.join();
produce1.join();
for (int i=0; i<5; i++)
{
threads[i]->join();
}
}
结果疑问和遗留问题:1.如何通知多个消费者线程停止。
上述代码中通过添加一个bool变量,来通知消费者线程停止。但这个存在问题,如果生产者生产数据速度比消费者处理慢,当一个生产者停止生产后,设置bool为true.
此时消费者线程正好取完数据,另个生产者速度慢,没有写入数据,此消费者就会通过 if(qR->IsStop() && qR->Count() < 1)判断而停止。另一个生产者再写入数据时就没有
消费者处理了。 目前还没想好怎么处理,目前只能做单生产者多消费模式使用。
- while (1)
{
QString str = qR->TakeData();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if(qR->IsStop() && qR->Count() < 1)
break;
}
2.存在一个bug。如果生产者产生的数据很少,没有填满队列,则有几个消费者线程会一直等待,结束不了。