iT邦幫忙

1

[C++] Trigger functions which run in the same thread

當執行緒必須不斷去執行某項工作,也就是thread裡面包含while,此用法常見於將工作平行化及背景化。
情境假設,有項工作分別有三個階段:開始(start)、執行(loop)、結束(stop)。簡單的作法可能如下。

void thread_simple() {
	start();
	while (condiction) loop();
	stop();
}

情境升級!
有項工作分別有三個階段:開始(start)、執行(loop)、結束(stop),呼叫時機點可由外部決定。或許為此設計出一個class如下。

class worker_simple {
public:
	int Start() { ... }
	int Stop() { ... }
	void StartThread() {
		condiction = true;
		thread = new std::thread(&worker_simple::Loop, this);
	}
	int Loop() {
		while (condiction) { ... }
	}
private:
	std::thread *thread;
};

情境再升級!最終版!
有項工作分別有三個階段:開始(start)、執行(loop)、結束(stop),呼叫時機點可由外部決定。所有工作都必須在同一條執行緒上執行

下列這個範例都寫在同一個cpp檔,方便複製貼上後可直接執行。此範例支援工作階段的無限擴充,(但型別受限於std::function),歡迎指教是否有更好作法。

#include <functional>
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

class Worker {
public:
	Worker() {
		is_thread_alive = true;
		thread = new std::thread(&Worker::thread_main, this);
	}
	~Worker() {
		cfit.wait_result([this]() {
			is_thread_alive = false;
			is_run_main = false;
			return 0;
		});
		thread->join();
		delete thread;
	}
	int Start() {
		return cfit.wait_result(std::bind<int>(&Worker::start, this));
	}
	int Stop() {
		return cfit.wait_result(std::bind<int>(&Worker::stop, this));
	}
private:
	std::thread *thread;
	bool is_thread_alive = false;
	bool is_run_main = false;
	void thread_main() {
		while (is_thread_alive) {
			cfit.call_function();
			if (is_run_main) loop();
		}
	}
	int start() {
		is_run_main = true;
		printf("%s\n", __func__);
		return 0;
	}
	int stop() {
		is_run_main = false;
		printf("%s\n", __func__);
		return 0;
	}
	void loop() {
		printf("%s %d\n", __func__, clock());
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	}

	template<class T>
	class call_func_in_thread {
	public:
		T wait_result(std::function<T()> func) {
			std::unique_lock<std::mutex> lock(mutex);
			one_time_lock = false;
			function = func;
			request.notify_all();
            // send a request and wait the reply
			reply.wait(lock, [this] { return function == nullptr; });
			lock.unlock();
			return result;
		}
		void call_function() {
			std::unique_lock<std::mutex> lock(mutex);
            // wait for a request and execute function if it exists
			request.wait(lock, [this] { return !one_time_lock; });
			if (function != nullptr) {
				result = function();
				function = nullptr;
				reply.notify_all();
			}
			lock.unlock();
		}
	private:
		std::function<int()> function;
		std::condition_variable request, reply;
		std::mutex mutex;
		bool one_time_lock = true;
		T result;
	};
	call_func_in_thread<int> cfit;
};


int main()
{
	Worker *worker = new Worker();
	while (true) {
		char c = getchar();
		if (c == 'e') break;
		else if (c == '1') worker->Start();
		else if (c == '2') worker->Stop();
		else if (c == '3') delete worker;
	}
	return 0;
}

尚未有邦友留言

立即登入留言