我有x boost线程同时工作.一个生产者线程使用计算任务填充同步队列.消费者线程弹出任务并计算它们.
图片来源:https://www.quantnet.com/threads/c-multithreading-in-boost.10028/
用户可以在此过程中完成程序,所以我需要正确关闭线程.由于异常被抛出,我目前的方法似乎不起作用.它的意图是在系统关闭时,所有进程都应该被杀死,并停止其当前的任务,无论他们做什么.你能告诉我,你会如何杀死线程?
线程初始化:
for (int i = 0; i < numberOfThreads; i++) { std::thread* thread = new std::thread(&MyManager::worker,this); mThreads.push_back(thread); }
线程破坏:
void MyManager::shutdown() { for (int i = 0; i < numberOfThreads; i++) { mThreads.at(i)->join(); delete mThreads.at(i); } mThreads.clear(); }
工人:
void MyManager::worker() { while (true) { int current = waitingList.pop(); Object * p = objects.at(current); p->calculateMesh(); //this task is internally locked by a mutex try { boost::this_thread::interruption_point(); } catch (const boost::thread_interrupted&) { // Thread interruption request received,break the loop std::cout << "- Thread interrupted. Exiting thread." << std::endl; break; } } }
同步队列:
#include <queue> #include <thread> #include <mutex> #include <condition_variable> template <typename T> class ThreadSafeQueue { public: T pop() { std::unique_lock<std::mutex> mlock(mutex_); while (queue_.empty()) { cond_.wait(mlock); } auto item = queue_.front(); queue_.pop(); return item; } void push(const T& item) { std::unique_lock<std::mutex> mlock(mutex_); queue_.push(item); mlock.unlock(); cond_.notify_one(); } int sizeIndicator() { std::unique_lock<std::mutex> mlock(mutex_); return queue_.size(); } private: bool isEmpty() { std::unique_lock<std::mutex> mlock(mutex_); return queue_.empty(); } std::queue<T> queue_; std::mutex mutex_; std::condition_variable cond_; };
... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68 C++ ... std::_Mutex_base::lock() Line 42 C++ ... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220 C++ ... ThreadSafeQueue<int>::pop() Line 13 C++ ... MyManager::worker() Zeile 178 C++
解决方法
根据我在Boost和Java中处理线程的经验,尝试关闭外部线程总是凌乱.我从来没有能够真正做到这一点干净地工作.
我所得到的最好的一个布尔值可用于所有设置为true的消费者线程.当您将其设置为false时,线程将自动返回.在你的情况下,这很容易被放入你拥有的while循环中.
最重要的是,您将需要一些同步,以便您可以在删除线程之前等待线程返回,否则您可以很难定义行为.
来自我以前的一个项目的例子:
线程创建
barrier = new boost :: barrier(numOfThreads 1);
threads = new detail :: updater_thread * [numOfThreads];
for (unsigned int t = 0; t < numOfThreads; t++) { //This object is just a wrapper class for the boost thread. threads[t] = new detail::updater_thread(barrier,this); }
线程破坏
for(unsigned int i = 0; i< numOfThreads; i){
threads [i] – > requestStop(); //通知所有线程停止.
}
barrier->wait();//The update request will allow the threads to get the message to shutdown. for (unsigned int i = 0; i < numOfThreads; i++) { threads[i]->waitForStop();//Wait for all threads to stop. delete threads[i];//Now we are safe to clean up. }
线程包装器可能引起关注的一些方法.
//Constructor updater_thread::updater_thread(boost::barrier * barrier) { this->barrier = barrier; running = true; thread = boost::thread(&updater_thread::run,this); } void updater_thread::run() { while (running) { barrier->wait(); if (!running) break; //Do stuff barrier->wait(); } } void updater_thread::requestStop() { running = false; } void updater_thread::waitForStop() { thread.join(); }