线程池+生产者消费者缓冲队列

生产者消费者缓冲队列

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class BlockingQueue {
public:
    void push(T value) {
        {
            std::unique_lock<std::mutex> lock(mutex);
            queue.push(std::move(value));
        }
        condition.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mutex);
        condition.wait(lock, [this] { return !queue.empty(); });
        T value = std::move(queue.front());
        queue.pop();
        return value;
    }

    bool empty() const {
        std::unique_lock<std::mutex> lock(mutex);
        return queue.empty();
    }

private:
    std::queue<T> queue;
    mutable std::mutex mutex;
    std::condition_variable condition;
};

线程池

class ThreadPool {
public:
    ThreadPool(size_t size) : stop(false)
    {
        for (size_t i = 0; i < size; i++)
        {
            workers.emplace_back(&ThreadPool::workerThread,this);
        }
    }
    ~ThreadPool() {
        stop = true;
        condition.notify_all();
        for (std::thread& worker : workers) {
            worker.join();
        }
    }

    void enqueue(std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(queuemutex);
            tasks.push(std::move(task));
        }
        condition.notify_one();
    }
private:
    void workerThread() {
        while (true)
        {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queuemutex);
                condition.wait(lock, [this] { return stop || !tasks.empty(); });
                if (stop && tasks.empty()) return;
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    }
private:
    mutable std::mutex queuemutex;
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::condition_variable condition;
    std::atomic<bool> stop;
};

IOCP管理类


class IOCPManager {
public:
    IOCPManager(size_t threadPoolSize)
        : threadPool(threadPoolSize), iocpHandle(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) {
        if (iocpHandle == NULL) {
            throw std::runtime_error("Failed to create IOCP handle");
        }
    }

    ~IOCPManager() {
        CloseHandle(iocpHandle);
    }

    void associateHandle(HANDLE handle) {
        CreateIoCompletionPort(handle, iocpHandle, (ULONG_PTR)handle, 0);
    }

    void waitForCompletion() {
        DWORD bytesTransferred;
        ULONG_PTR completionKey;
        LPOVERLAPPED overlapped;

        while (GetQueuedCompletionStatus(iocpHandle, &bytesTransferred, &completionKey, &overlapped, INFINITE)) {
            // 将任务放入缓冲队列
            taskQueue.push([completionKey, bytesTransferred]() {
                // 这里可以处理具体的任务
                std::cout << "Task completed for key: " << completionKey << ", bytes: " << bytesTransferred << std::endl;
                });
        }
    }

    void processTasks() {
        while (true) {
            auto task = taskQueue.pop();
            threadPool.enqueue(task);
        }
    }

private:
    HANDLE iocpHandle;
    BlockingQueue<std::function<void()>> taskQueue;
    ThreadPool threadPool;
};

组件总的调用


int main() {
    IOCPManager iocpManager(4);  // 初始化 IOCP 管理类和线程池

    // 假设我们有多个设备
    // 这里可以添加设备任务的句柄
    // HANDLE deviceHandle = ...;
    // iocpManager.associateHandle(deviceHandle);

    // 启动 IOCP 等待完成
    std::thread ioThread([&iocpManager]() {
        iocpManager.waitForCompletion();
        });

    // 启动任务处理线程
    std::thread taskProcessor([&iocpManager]() {
        iocpManager.processTasks();
        });

    // 模拟一些设备的IO操作
    // 这里可以添加代码来模拟设备的I/O操作

    ioThread.join(); // 等待 IOCP 线程结束
    taskProcessor.join(); // 等待任务处理线程结束
    return 0;
}