QT实现生产者-消费者模型
ssk-wh Lv4

生产者-消费者模型

使用信号量实现-QSemaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <QCoreApplication>
#include <QSemaphore>
#include <QThread>
#include <QDebug>
#include <QQueue>
#include <QRandomGenerator>

const int bufferSize = 100;
QSemaphore writeSemaphore(bufferSize);
QSemaphore readSemaphore;
QQueue<int> queue;

class Producer : public QThread
{
public:
void run() override
{
while (1) {
static int resources_index = 0;
// 0~100ms内随机产生一个资源
QThread::msleep(QRandomGenerator::global()->bounded(100));
writeSemaphore.acquire();
queue.append(resources_index++);
qDebug() << "+++" << resources_index << "produced";
readSemaphore.release();
}
}
};

class Consumer : public QThread
{
public:
void run() override
{
while (1) {
readSemaphore.acquire();
int resources_index = queue.takeFirst();
writeSemaphore.release();

// 0~500ms内随机消费一个资源
QThread::msleep(QRandomGenerator::global()->bounded(500));
qDebug() << "---" << resources_index << "consumed";
}
}
};
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);

Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();

return a.exec();
}

使用条件变量实现-QWaitCondition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include <QtCore>
#include <QRandomGenerator>

const int bufferSize = 100;

class Producer : public QThread
{
public:
Producer(QWaitCondition *wc1, QWaitCondition *wc2, QMutex *mutex, QQueue<int> &queue, QObject *parent = nullptr)
: QThread(parent)
, m_consumeWaitCondition(wc1)
, m_produceWaitCondition(wc2)
, m_mutex(mutex)
, m_queue(queue)
{
}

void run() override
{
while(1) {
// 0~100ms内随机产生一个资源
QThread::msleep(QRandomGenerator::global()->bounded(100));

m_mutex->lock();
static int resource_index = 0;
if (m_queue.size() < bufferSize) {
m_queue.append(resource_index++);
qDebug() << "+++" << resource_index << "produced";
} else {
m_produceWaitCondition->wait(m_mutex);
}
m_mutex->unlock();

m_mutex->lock();
m_consumeWaitCondition->wakeOne();
m_mutex->unlock();
}
}

private:
QWaitCondition *m_consumeWaitCondition;
QWaitCondition *m_produceWaitCondition;
QMutex *m_mutex;
QQueue<int> &m_queue;
};

class Consumer : public QThread
{
public:
Consumer(QWaitCondition *wc1, QWaitCondition *wc2, QMutex *mutex, QQueue<int> &queue, QObject *parent = nullptr)
: QThread(parent)
, m_consumeWaitCondition(wc1)
, m_produceWaitCondition(wc2)
, m_mutex(mutex)
, m_queue(queue)
{
}

void run() override
{
while (1) {
int resources_index = -1;
m_mutex->lock();
if (m_queue.size() > 0) {
resources_index = m_queue.takeFirst();
m_mutex->unlock();
} else {
m_consumeWaitCondition->wait(m_mutex);
m_mutex->unlock();
}

m_mutex->lock();
m_produceWaitCondition->wakeAll();
m_mutex->unlock();

// 0~500ms内随机消费一个资源
QThread::msleep(QRandomGenerator::global()->bounded(500));
qDebug() << "---" << resources_index << "consumed";
}
}

private:
QWaitCondition *m_consumeWaitCondition;
QWaitCondition *m_produceWaitCondition;
QMutex *m_mutex;
QQueue<int> &m_queue;
};

int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);

QWaitCondition queueNotEmpty;
QWaitCondition queueNotFull;
QMutex mutex;
QQueue<int> taskQueue;

Producer producer(&queueNotEmpty, &queueNotFull, &mutex, taskQueue);
Consumer consumer(&queueNotEmpty, &queueNotFull, &mutex, taskQueue);
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return 0;
}
 Comments