c++的Mysql连接

#pragma once
#include <thread>
#include <jdbc/mysql_driver.h>
#include <jdbc/mysql_connection.h>
#include <jdbc/cppconn/prepared_statement.h>
#include <jdbc/cppconn/resultset.h>
#include <jdbc/cppconn/statement.h>
#include <jdbc/cppconn/exception.h>

//mysql连接要求在一定时间内要有操作,否则要断开连接
class SqlConnection {
public:
    SqlConnection(sql::Connection* con, int64_t lasttime) :_con(con), _last_oper_time(lasttime){}
    std::unique_ptr<sql::Connection> _con;
    int64_t _last_oper_time;
};

class MySqlPool {
public:
    MySqlPool(const std::string&url,const std::string& user ,const std::string& pass,const std::string& schema,int poolSize)
        : url_(url), user_(user), pass_(pass), schema_(schema), poolSize_(poolSize), b_stop_(false) {
        try {
            for (int i = 0; i < poolSize_; i++) {
                // 获取Mysql的驱动
                sql::mysql::MySQL_Driver* driver = sql::mysql::get_mysql_driver_instance();
                // 用驱动获取MySQL连接
                auto* con = driver->connect(url_, user_, pass_);
                // 使用到对应数据库
                con->setSchema(schema_);
                // 获取当前时间戳
                auto currentTime = std::chrono::system_clock::now().time_since_epoch();
                // 将时间戳转换成秒
                long long timestamp = std::chrono::duration_cast<std::chrono::seconds>(currentTime).count();
                pool_.push(std::make_unique<SqlConnection>(con, timestamp));
            }
            _check_thread = std::thread([this]() {
                while (!b_stop_) {
                    checkConnection();
                    std::this_thread::sleep_for(std::chrono::seconds(60));
                }
                });
            _check_thread.detach();
        }
        catch (sql::SQLException& e) {
            std::cout << "mysql pool init failed" << std::endl;
        }

    }

    void checkConnection() {
        std::lock_guard<std::mutex> guard(mutex_);
        int poolsize = pool_.size();
        // 获取当前时间戳
        auto currentTime = std::chrono::system_clock::now().time_since_epoch();
        // 将时间戳转换成秒
        long long timestamp = std::chrono::duration_cast<std::chrono::seconds>(currentTime).count();
        for (int i = 0; i < poolsize; i++) {
            auto con = std::move(pool_.front());
            pool_.pop();
            // RAII的思想
            Defer defer([this, &con]() {
                pool_.push(std::move(con));
                });
            if (timestamp - con->_last_oper_time < 5) {
                continue;
            }
            try {
                // 用该连接创建一个声明,让声明去执行
                std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());
                stmt->executeQuery("SELECT 1");
                con->_last_oper_time = timestamp;
                std::cout << "excute timer alive query , cur is " << timestamp << std::endl;
            }
            catch (sql::SQLException& e) {
                std::cout << "Error keeping connection alive : " << e.what() << std::endl;
                // 重新创建连接并替换旧的连接
                sql::mysql::MySQL_Driver* driver = sql::mysql::get_mysql_driver_instance();
                auto* newcon = driver->connect(url_, user_, pass_);
                newcon->setSchema(schema_);
                con->_con.reset(newcon);
                con->_last_oper_time = timestamp;
            }
        }
    }

    std::unique_ptr<SqlConnection> getConnection() {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this] {
            if (b_stop_) {
                return true;
                }
            return !pool_.empty();});
        if (b_stop_) {
            return nullptr;
        }
        std::unique_ptr<SqlConnection> con(std::move(pool_.front()));
        pool_.pop();
        return con;
    }

    void returnConnection(std::unique_ptr<SqlConnection> con) {
        std::unique_lock<std::mutex> lock(mutex_);
        if (b_stop_) {
            return;
        }
        pool_.push(std::move(con));
        cond_.notify_one();
    }
    void Close() {
        b_stop_ = true;
        cond_.notify_all();
    }
    ~MySqlPool() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (!pool_.empty()) {
            pool_.pop();
        }
    }

private:
    std::string url_;// Mysql所在的IP地址
    std::string user_;// Mysql连接的用户名
    std::string pass_;// Mysql连接需要的密码
    std::string schema_;//Mysql所需要使用的数据库
    int poolSize_;
    std::queue<std::unique_ptr<SqlConnection>> pool_;
    std::mutex mutex_;
    std::condition_variable cond_;
    std::atomic<bool> b_stop_;
    std::thread _check_thread;// 心跳机制,告诉Mysql还需要使用连接
};

class MysqlDao
{
public:
    MysqlDao();
    ~MysqlDao();
    int RegUser(const std::string& name, const std::string& email, const std::string& pwd);
private:
    std::unique_ptr<MySqlPool> pool_;
};
#include "MysqlDao.h"
#include "ConfigMgr.h"
using namespace std;
MysqlDao::MysqlDao()
{
    auto& cfg = ConfigMgr::Inst();
    const auto& host = cfg["Mysql"]["Host"];
    const auto& port = cfg["Mysql"]["Port"];
    const auto& pwd = cfg["Mysql"]["Passwd"];
    const auto& schema = cfg["Mysql"]["Schema"];
    const auto& user = cfg["Mysql"]["User"];
    pool_.reset(new MySqlPool(host + ":" + port, user, pwd, schema, 5));
}
MysqlDao::~MysqlDao() {
    pool_->Close();
}
int MysqlDao::RegUser(const std::string& name, const std::string& email, const std::string& pwd)
{
    auto con = pool_->getConnection();
    try {
        if (con == nullptr) {
            return false;
        }
        // 准备调用存储过程
        unique_ptr < sql::PreparedStatement > stmt(con->_con->prepareStatement("CALL reg_user(?,?,?,@result)"));
        // 设置输入参数
        stmt->setString(1, name);
        stmt->setString(2, email);
        stmt->setString(3, pwd);

        // 由于PreparedStatement不直接支持注册输出参数,我们需要使用会话变量或其他方法来获取输出参数的值

          // 执行存储过程
        stmt->execute();
        // 如果存储过程设置了会话变量或有其他方式获取输出参数的值,你可以在这里执行SELECT查询来获取它们
       // 例如,如果存储过程设置了一个会话变量@result来存储输出结果,可以这样获取:
        unique_ptr<sql::Statement> stmtResult(con->_con->createStatement());
        unique_ptr<sql::ResultSet> res(stmtResult->executeQuery("SELECT @result AS result"));
        if (res->next()) {
            int result = res->getInt("result");
            cout << "Result: " << result << endl;
            pool_->returnConnection(std::move(con));
            return result;
        }
        pool_->returnConnection(std::move(con));
        return -1;
    }
    catch (sql::SQLException& e) {
        pool_->returnConnection(std::move(con));
        std::cerr << "SQLException: " << e.what();
        std::cerr << " (MySQL error code: " << e.getErrorCode();
        std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl;
        return -1;
    }
}
#include "const.h"
#include "MysqlDao.h"
class MysqlMgr : public Singleton<MysqlMgr>
{
    friend class Singleton<MysqlMgr>;
public:
    ~MysqlMgr();
    int RegUser(const std::string& name, const std::string& email, const std::string& pwd);
private:
    MysqlMgr();
    MysqlDao  _dao;
};

#include "MysqlMgr.h"
MysqlMgr::~MysqlMgr() {
}
int MysqlMgr::RegUser(const std::string& name, const std::string& email, const std::string& pwd)
{
    return _dao.RegUser(name, email, pwd);
}
MysqlMgr::MysqlMgr() {
}
使用json传递数据
 nlohmann::json executeQuery(const nlohmann::json& queryJson) {
        nlohmann::json resultJson;
        try {
            std::string query = queryJson.at("query");
            auto params = queryJson.value("params", nlohmann::json::array());
            std::unique_ptr<sql::PreparedStatement> pstmt(connection->prepareStatement(query));

            // 设置参数
            for (size_t i = 0; i < params.size(); ++i) {
                pstmt->setString(i + 1, params[i].get<std::string>());
            }

            if (query.substr(0, 6) == "SELECT") {
                std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());
                while (res->next()) {
                    nlohmann::json row;
                    for (int i = 1; i <= res->getMetaData()->getColumnCount(); ++i) {
                        row[res->getMetaData()->getColumnName(i)] = res->getString(i);
                    }
                    resultJson.push_back(row);
                }
            } else {
                int affectedRows = pstmt->executeUpdate();
                resultJson["affected_rows"] = affectedRows;
            }
        } catch (sql::SQLException& e) {
            std::cerr << "执行查询失败: " << e.what() << std::endl;
        }

        return resultJson;
    }

     nlohmann::json queryJson = {
        {"query", "SELECT * FROM users WHERE age > ?"},
        {"params", {20}}
    };

    nlohmann::json results = adapter.executeQuery(queryJson);
    std::cout << "查询结果: " << results.dump(4) << std::endl;

    // 示例插入
    nlohmann::json insertJson = {
        {"query", "INSERT INTO users (name, age) VALUES (?, ?)"},
        {"params", {"Alice", 25}}
    };

    nlohmann::json insertResult = adapter.executeQuery(insertJson);
    std::cout << "插入的行数: " << insertResult["affected_rows"] << std::endl;