生产者消费者缓冲队列
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class BlockingQueue {
public:
void push(T value) {
{
std::unique_lock<std::mutex> lock(mutex);
queue.push(std::move(value));
}
condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { return !queue.empty(); });
T value = std::move(queue.front());
queue.pop();
return value;
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex);
return queue.empty();
}
private:
std::queue<T> queue;
mutable std::mutex mutex;
std::condition_variable condition;
};
线程池
class ThreadPool {
public:
ThreadPool(size_t size) : stop(false)
{
for (size_t i = 0; i < size; i++)
{
workers.emplace_back(&ThreadPool::workerThread,this);
}
}
~ThreadPool() {
stop = true;
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queuemutex);
tasks.push(std::move(task));
}
condition.notify_one();
}
private:
void workerThread() {
while (true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queuemutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
private:
mutable std::mutex queuemutex;
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::condition_variable condition;
std::atomic<bool> stop;
};
IOCP管理类
class IOCPManager {
public:
IOCPManager(size_t threadPoolSize)
: threadPool(threadPoolSize), iocpHandle(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) {
if (iocpHandle == NULL) {
throw std::runtime_error("Failed to create IOCP handle");
}
}
~IOCPManager() {
CloseHandle(iocpHandle);
}
void associateHandle(HANDLE handle) {
CreateIoCompletionPort(handle, iocpHandle, (ULONG_PTR)handle, 0);
}
void waitForCompletion() {
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED overlapped;
while (GetQueuedCompletionStatus(iocpHandle, &bytesTransferred, &completionKey, &overlapped, INFINITE)) {
// 将任务放入缓冲队列
taskQueue.push([completionKey, bytesTransferred]() {
// 这里可以处理具体的任务
std::cout << "Task completed for key: " << completionKey << ", bytes: " << bytesTransferred << std::endl;
});
}
}
void processTasks() {
while (true) {
auto task = taskQueue.pop();
threadPool.enqueue(task);
}
}
private:
HANDLE iocpHandle;
BlockingQueue<std::function<void()>> taskQueue;
ThreadPool threadPool;
};
组件总的调用
int main() {
IOCPManager iocpManager(4); // 初始化 IOCP 管理类和线程池
// 假设我们有多个设备
// 这里可以添加设备任务的句柄
// HANDLE deviceHandle = ...;
// iocpManager.associateHandle(deviceHandle);
// 启动 IOCP 等待完成
std::thread ioThread([&iocpManager]() {
iocpManager.waitForCompletion();
});
// 启动任务处理线程
std::thread taskProcessor([&iocpManager]() {
iocpManager.processTasks();
});
// 模拟一些设备的IO操作
// 这里可以添加代码来模拟设备的I/O操作
ioThread.join(); // 等待 IOCP 线程结束
taskProcessor.join(); // 等待任务处理线程结束
return 0;
}