前言 最近做公司项目遇到一个问题,加载20万的日志文件,解析后写入数据库中。做完后发现读文件、解析、写入数据依次搞下来,速度实在是太慢。
所以学习用多线程的来完成这个工作,考虑用生产者消费者模式来完成。
设计思路: 写一个数据存储类,提供“存入数据”、“取走数据”两个接口,两个接口保证一次只能有一个线程去操作数据。数据满时阻塞“存入数据”接口,等待数据被取走,同理数据空时,阻塞“取走数据”接口,等待存入数据。
具体实现: 存储数据使用QQueue<T>类,线程同步使用C++11 标准库 std::mutex、std::condition_variable。
std::mutex 保证只能有一个线程去调用QQueue<T>执行数据操作。std::condition_variable 来控制容器满、容器空的时候的阻塞。
代码实现.h文件
- class Repository
{
public:
Repository();
/*!
* brief 存入数据
* param str
*/
void AddData(QString str);
/*!
* brief 取数据
* return
*/
QString TakeData();
private:
std::condition_variable m_Queue_Not_Empty; //队列不满信号
std::condition_variable m_Queue_Not_Full; //队列不空信号
std::mutex m_Queue_Mutex; //队列锁
int m_nQueue_Max_Size; //队列最大长度
QQueue<QString> m_queue; //队列,也可以用其他容器、或者数组
};
cpp文件
- #include "produce.h"
Repository::Repository()
{
m_nQueue_Max_Size = 10; //默认最大长度为10,可根据需求修改
}
void Repository::AddData(QString str)
{
std::unique_lock<std::mutex> lock(m_Queue_Mutex);
//判断是否队列满
while (m_queue.count() > m_nQueue_Max_Size)
{
//等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
//其他线程可以获取m_Queue_Mutex
m_Queue_Not_Full.wait(lock);
}
m_queue.enqueue(str);
m_Queue_Not_Empty.notify_all();
lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
}
QString Repository::TakeData()
{
std::unique_lock<std::mutex> lock(m_Queue_Mutex);
//判断是否队列满
while (m_queue.isEmpty())
{
//等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
//其他线程可以获取m_Queue_Mutex
m_Queue_Not_Empty.wait(lock);
}
QString str = m_queue.dequeue(); //获取数据
m_Queue_Not_Full.notify_all();
lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
return str;
}
mian 文件
- #include "produce.h"
int g_nProduceCount = 100;
void produceTask(Repository* qR)
{
//模拟写入100行信息
for (int i=0; i<g_nProduceCount; i++)
{
//模拟读取文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
QString str = QString("Produce %1").arg(i);
qR->AddData(str);
}
}
void conusemeTask(Repository* qR)
{
//模拟读取100行信息
for (int i=0; i<g_nProduceCount; i++)
{
QString str = qR->TakeData();
//模拟处理文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main(int argc, char *argv[])
{
Repository re;
std::thread Produce(produceTask, &re);
std::thread Consume(conusemeTask, &re);
Produce.join();
Consume.join();
}
代码下载地址
http://download.csdn.net/download/jin396932711/9895703 写数据和读数据耗时都为100毫秒,结果如下图
设置conusemeTask()函数中处理耗时为200毫秒,即生产速度大于处理速度,如下图
疑问和遗留问题:1. 如何停止conusemeTask线程。
例子中是我知道有100行数据,读到100行我就停止了线程。实际中是不知道要处理多少行数据才停止的。
我想到一种处理方法是,在读取文件完成后,写入一个空QString,或者自己定义的一个字符串,在GetData这条信息后,停止线程。(只适合单生产者单消费者模式)
- void produceTask(Repository* qR)
{
//模拟写入100行信息
for (int i=0; i<g_nProduceCount; i++)
{
//模拟读取文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
QString str = QString("Produce %1").arg(i);
qR->AddData(str);
}
qR->AddData("File Read End");
}
void conusemeTask(Repository* qR)
{
while (1)
{
QString str = qR->GetData();
if (str == "File Read End")
break;
//模拟处理文件,耗时100毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
第二种处理办法:在GetData(超时时间, QString& outData) 传递超时时间,返回值改给返回bool,,修改m_Queue_Not_Empty.wait()为如下
if (m_Queue_Not_Empty.wait_for(lock, std::chrono::milliseconds(5000)) == std::cv_status::timeout)
return false;
conusemeTask()中如连续 qR->GetData(超时时间,outData);返回false即可停止线程
在一种办法:在类中添加一个bool变量, 数据读取完毕后,设置变量为true 。 数据处理线程中 处理完数据后,判断此变量为true且获取容器数据为空,就结束此线程
2. 消费者线程处理速度慢,肯定要多个消费者去处理数据,上面的代码是否满足要求