Có vẻ giống với Semaphore trong multiple thread của win api.
Thường dùng cho tài nguyên có nhiều hơn một đơn vị.
Có vẻ giống với Semaphore trong multiple thread của win api.
Thường dùng cho tài nguyên có nhiều hơn một đơn vị.
có thể hiểu s1 s2 … s5 là 5 phần nhỏ của 1 task lớn? Mỗi phần nhỏ được nhiều thread xử lý, xử lý xong hết rồi mới chạy tiếp phần tiếp theo phải ko? :V
C++20 có std::barrier
:V
The class template
std::barrier
provides a thread-coordination mechanism that allows at most an expected number of threads to block until the expected number of threads arrive at the barrier.
còn múa tạm với condition_variable
thì thế này :V
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <cctype>
#include <conio.h>
std::mutex m;
std::condition_variable dataReady;
std::condition_variable workDone;
int mainIteration = 0;
int nWorkDone = 0;
bool ended = false;
void workerThread(int workerId, std::string& data) {
int iteration = 1;
while (true) {
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
dataReady.wait(lk, [&] { return ended || iteration == mainIteration; });
if (ended) {
std::cout << "Worker" << workerId << " ending...\n";
// Go straight to completion
if (++nWorkDone == 2) {
lk.unlock();
workDone.notify_one();
}
return;
}
// std::cout is shared, this line needs to stay inside the lock
std::cout << "Worker" << workerId << " thread is processing data #" << ++iteration << "\n";
lk.unlock();
// Do work
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
data += '_' + std::to_string(iteration + workerId);
// Send data back to main()
lk.lock();
std::cout << "Worker thread signals data processing completed\n";
if (++nWorkDone == 2) {
lk.unlock();
workDone.notify_one();
}
}
}
int main() {
std::string data1 = "Example data1";
std::string data2 = "Example data2";
std::thread worker1(workerThread, 0, std::ref(data1));
std::thread worker2(workerThread, 1, std::ref(data2));
bool mainQuit = false;
while (true) {
// send data1 to the worker thread
{
std::lock_guard<std::mutex> lk(m);
nWorkDone = 0;
if (!mainQuit) {
++mainIteration; // next iteration
std::cout << "main() signals data1&2 ready for processing\n";
} else {
ended = true;
std::cout << "main() ending...\n";
}
}
dataReady.notify_all();
{
// wait for the worker
std::unique_lock<std::mutex> lk(m);
workDone.wait(lk, [] { return nWorkDone == 2; });
}
if (!mainQuit) {
std::cout << "Back in main(), data1 = " << data1 << '\n';
std::cout << "Back in main(), data2 = " << data2 << '\n';
// Ability to stop
std::cout << ">>>>> PRESS ANY KEY TO STOP <<<<<\n";
if (_kbhit()) mainQuit = true;
} else {
break;
}
}
std::cout << "main() ended\n";
worker1.join();
worker2.join();
}
worker thread sẽ lock mutex bằng unique_lock
để condition_variable
có thể notify nó được :V Sau khi lock mutex thì phải chờ điều kiện gì để condition_variable
nó unlock, ở đây vì main()
có thể ngừng các thread nên sẽ có 1 biến bool ended
để ngắt, và vì các worker thread cần chờ thực hiện đúng iteration của mình, vd iteration 1 hết rồi mới tới iteration 2, nên phải có thêm biến int iteration
để biết nó đang ở iteration nào :V Điều kiện để unlock là ended
hoặc threadIteration == currentIteration
:V
// shared giữa các thread
std::mutex m;
std::condition_variable dataReady;
...
void workerThread(...) {
int iteration = 1;
while (true) {
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
dataReady.wait(lk, [&] { return ended || iteration == mainIteration; });
...
}
nếu main()
notify ended
, thì worker thread ko làm gì cả, báo cáo đã xong nhiệm vụ và ngủm (return):
if (ended) {
std::cout << "Worker" << workerId << " ending...\n";
// Go straight to completion
if (++nWorkDone == 2) {
lk.unlock();
workDone.notify_one();
}
return;
}
//
lk.unlock();
cách thông báo tôi đã làm việc xong là ++nWorkDone
. Và thread cuối cùng phải notitfy cho main()
biết tất cả các worker khác đã xong việc. Ở đây có 2 thread :V do lười quá nên viết tạm số 2 luôn :V Dòng ktra thread cuối cùng báo cáo xong việc là if (++nWorkDone == 2)
, nếu là thread cuối cùng thì phải thông báo cho main()
qua 1 condition_variable
(cv) khác với cv mà main()
báo cho các worker threads: workDone.notify_one();
. Dòng thông báo này phải nằm ngoài cái lock kia ko thì sẽ gây lỗi, nên phải gọi lk.unlock()
trước khi gọi notify
. Và sau khi xử lý xong ba cái logic về tín hiệu công việc kia thì phải unlock lk
cho mấy worker khác xử lý logic tín hiệu.
sau đó là phần do work, thích làm gì thì làm :V Nếu access shared variables với các worker khác ở đây thì phải xài mutex khác với mutex m
. Code vd này thì ko có :V
// chỉ đơn giản là append vào chuỗi data, chờ 1 giây cho nó chậm :V
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
data += '_' + std::to_string(iteration + workerId);
làm xong việc thì phải thông báo làm xong việc :V Cách thông báo tương tự như khi xử lý ended
, có điều ở đây ko có ngủm luôn mà tiếp tục vòng lặp while (true)
obtain unique_lock chờ iteration mới (hoặc bị ngắt do ended
)
lk.lock();
std::cout << "Worker thread signals data processing completed\n";
if (++nWorkDone == 2) {
lk.unlock();
workDone.notify_one();
}
do lk
được unlock trước đó, mà ở đây ++nWorkDone
shared giữa các worker khác nên ta phải lock nó lại lk.lock()
trước khi thực hiện thông báo xong việc.
Sau khi khởi tạo dữ liệu, khởi tạo các thread thì ta lặp 1 vòng lặp các iteration :V Trong vòng lặp điều đầu tiên làm là giao việc cho mấy thằng worker:
// send data1 to the worker thread
{
std::lock_guard<std::mutex> lk(m);
nWorkDone = 0;
if (!mainQuit) {
++mainIteration; // next iteration
std::cout << "main() signals data1&2 ready for processing\n";
} else {
ended = true;
std::cout << "main() ending...\n";
}
}
ở đây ko cần xài unique_lock
, lock mutex m
bằng lock_guard
là đủ, lock_guard
là simple lock khi nó được tạo ra thì nó lock mutex m
, bị hủy thì nó gọi unlock mutex m
.
bước 1 của giao việc là set số việc hoàn thành là 0 :V nWorkDone = 0
, bước 2 là ktra đã bị ngắt hay chưa, nếu bị ngắt thì gán ended = true
. Ở đây xài biến khác để ktra ngắt là vì ended
được shared giữa các worker và main, mainQuit
chỉ thuộc về main()
. Nếu chưa ngắt thì ++currentIteration
(lúc viết mình đặt tên là mainIteration lười sửa lại quá :V). Cuối cùng là notify all thông qua cv dataReady.notify_all()
để thông báo cho tất cả các worker biết đã có iteration mới/đã ngắt. Ko gọi notify_one()
nhiều lần vì nó có vấn đề gì đó :V :V, ko chính xác như notify_all()
. Và tương tự như trên phải unlock mutex rồi mới gọi notify. Ở đây wrap lock_guard
trong {}
là để nó tự hủy tự unlock :V
notify all xong thì ngồi rung đùi chờ mấy thằng worker làm việc =] Chờ làm xong bằng cách obtain 1 unique_lock
và wait
tới khi nWorkDone == 2
là được:
{
// wait for the worker
std::unique_lock<std::mutex> lk(m);
workDone.wait(lk, [] { return nWorkDone == 2; });
}
chờ xong thì unlock cái lock kia, ở đây nó được wrap {}
nên nó tự hủy tự unlock ko cần gọi. Quay lại main()
thì ở đây giả lập ngắt bằng cách check _kbhit()
của Windows <conio.h>
:V Vậy là xong :V
nguồn từ SO :V https://stackoverflow.com/questions/53271862/using-a-single-condition-variable-to-pause-multiple-threads