jinlong的个人主页

http://www.qtcn.org/bbs/u/127542  [收藏] [复制]

jinlong

  • 1

    关注

  • 0

    粉丝

  • 2

    访客

  • 等级:新手上路
  • 总积分:1
  • 保密,2011-02-15

最后登录:2017-07-17

更多资料

日志

Qt  单生产者消单费者模式

2017-07-10 22:46
     前言   
     最近做公司项目遇到一个问题,加载20万的日志文件,解析后写入数据库中。做完后发现读文件、解析、写入数据依次搞下来,速度实在是太慢。
所以学习用多线程的来完成这个工作,考虑用生产者消费者模式来完成。
设计思路:
      写一个数据存储类,提供“存入数据”、“取走数据”两个接口,两个接口保证一次只能有一个线程去操作数据。数据满时阻塞“存入数据”接口,等待数据被取走,同理数据空时,阻塞“取走数据”接口,等待存入数据。
具体实现:
      存储数据使用QQueue<T>类,线程同步使用C++11 标准库 std::mutex、std::condition_variable。
      std::mutex 保证只能有一个线程去调用QQueue<T>执行数据操作。std::condition_variable 来控制容器满、容器空的时候的阻塞。
代码实现
.h文件
  1. class Repository
    {
    public:
        Repository();
        /*!
         * brief 存入数据
         * param str
         */
        void AddData(QString str);
        /*!
         * brief 取数据
         * return
         */
        QString TakeData();
    private:
        std::condition_variable m_Queue_Not_Empty;  //队列不满信号
        std::condition_variable m_Queue_Not_Full;       //队列不空信号
        std::mutex m_Queue_Mutex;                              //队列锁
        int m_nQueue_Max_Size;                                   //队列最大长度
        QQueue<QString> m_queue;                             //队列,也可以用其他容器、或者数组
    };

cpp文件
  1. #include "produce.h"
    Repository::Repository()
    {
        m_nQueue_Max_Size = 10;  //默认最大长度为10,可根据需求修改
    }
    void Repository::AddData(QString str)
    {
        std::unique_lock<std::mutex> lock(m_Queue_Mutex);
        //判断是否队列满
        while (m_queue.count() > m_nQueue_Max_Size)
        {
            //等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
            //其他线程可以获取m_Queue_Mutex
            m_Queue_Not_Full.wait(lock);
        }
        m_queue.enqueue(str);
        m_Queue_Not_Empty.notify_all();
        lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
    }
    QString Repository::TakeData()
    {
        std::unique_lock<std::mutex> lock(m_Queue_Mutex);
        //判断是否队列满
        while (m_queue.isEmpty())
        {
            //等待信号触发,阻塞在此处。此时会释放m_Queue_Mutex锁,
            //其他线程可以获取m_Queue_Mutex
            m_Queue_Not_Empty.wait(lock);
        }
        QString str = m_queue.dequeue(); //获取数据
        m_Queue_Not_Full.notify_all();
        lock.unlock(); //释放锁,也可以不调用,最后函数返回时也会释放
        return str;
    }

mian 文件
  1. #include "produce.h"
    int g_nProduceCount = 100;
    void produceTask(Repository* qR)
    {
        //模拟写入100行信息
        for (int i=0; i<g_nProduceCount; i++)
        {
            //模拟读取文件,耗时100毫秒
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            QString str = QString("Produce %1").arg(i);
            qR->AddData(str);
        }
    }
    void conusemeTask(Repository* qR)
    {
        //模拟读取100行信息
        for (int i=0; i<g_nProduceCount; i++)
        {
            QString str = qR->TakeData();
            //模拟处理文件,耗时100毫秒
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
    int main(int argc, char *argv[])
    {
        Repository re;
        std::thread Produce(produceTask, &re);
        std::thread Consume(conusemeTask, &re);
        Produce.join();
        Consume.join();
    }

代码下载地址http://download.csdn.net/download/jin396932711/9895703
写数据和读数据耗时都为100毫秒,结果如下图

设置conusemeTask()函数中处理耗时为200毫秒,即生产速度大于处理速度,如下图

疑问和遗留问题:
1. 如何停止conusemeTask线程
      例子中是我知道有100行数据,读到100行我就停止了线程。实际中是不知道要处理多少行数据才停止的。
      我想到一种处理方法是,在读取文件完成后,写入一个空QString,或者自己定义的一个字符串,在GetData这条信息后,停止线程。(只适合单生产者单消费者模式)
  1. void produceTask(Repository* qR)
    {
        //模拟写入100行信息
        for (int i=0; i<g_nProduceCount; i++)
        {
            //模拟读取文件,耗时100毫秒
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            QString str = QString("Produce %1").arg(i);
            qR->AddData(str);
        }
        qR->AddData("File Read End");
    }
    void conusemeTask(Repository* qR)
    {
        while (1)
        {
            QString str = qR->GetData();
            if (str == "File Read End")
                break;
            //模拟处理文件,耗时100毫秒
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
    }

        第二种处理办法:在GetData(超时时间, QString& outData) 传递超时时间,返回值改给返回bool,,修改m_Queue_Not_Empty.wait()为如下
if (m_Queue_Not_Empty.wait_for(lock, std::chrono::milliseconds(5000)) == std::cv_status::timeout)
        return false;
conusemeTask()中如连续 qR->GetData(超时时间,outData);返回false即可停止线程
        在一种办法:在类中添加一个bool变量, 数据读取完毕后,设置变量为true 。 数据处理线程中 处理完数据后,判断此变量为true且获取容器数据为空,就结束此线程

2. 消费者线程处理速度慢,肯定要多个消费者去处理数据,上面的代码是否满足要求

分类:C++|回复:0|浏览:57|全站可见|转载
 

下一篇: Qt  多生产者多消费模式

上一篇:

Powered by phpwind v8.7 Certificate Copyright Time now is:07-21 12:43
©2005-2016 QTCN开发网 版权所有 Gzip disabled