#pragma once
#include <thread>
#include <jdbc/mysql_driver.h>
#include <jdbc/mysql_connection.h>
#include <jdbc/cppconn/prepared_statement.h>
#include <jdbc/cppconn/resultset.h>
#include <jdbc/cppconn/statement.h>
#include <jdbc/cppconn/exception.h>
//mysql连接要求在一定时间内要有操作,否则要断开连接
class SqlConnection {
public:
SqlConnection(sql::Connection* con, int64_t lasttime) :_con(con), _last_oper_time(lasttime){}
std::unique_ptr<sql::Connection> _con;
int64_t _last_oper_time;
};
class MySqlPool {
public:
MySqlPool(const std::string&url,const std::string& user ,const std::string& pass,const std::string& schema,int poolSize)
: url_(url), user_(user), pass_(pass), schema_(schema), poolSize_(poolSize), b_stop_(false) {
try {
for (int i = 0; i < poolSize_; i++) {
// 获取Mysql的驱动
sql::mysql::MySQL_Driver* driver = sql::mysql::get_mysql_driver_instance();
// 用驱动获取MySQL连接
auto* con = driver->connect(url_, user_, pass_);
// 使用到对应数据库
con->setSchema(schema_);
// 获取当前时间戳
auto currentTime = std::chrono::system_clock::now().time_since_epoch();
// 将时间戳转换成秒
long long timestamp = std::chrono::duration_cast<std::chrono::seconds>(currentTime).count();
pool_.push(std::make_unique<SqlConnection>(con, timestamp));
}
_check_thread = std::thread([this]() {
while (!b_stop_) {
checkConnection();
std::this_thread::sleep_for(std::chrono::seconds(60));
}
});
_check_thread.detach();
}
catch (sql::SQLException& e) {
std::cout << "mysql pool init failed" << std::endl;
}
}
void checkConnection() {
std::lock_guard<std::mutex> guard(mutex_);
int poolsize = pool_.size();
// 获取当前时间戳
auto currentTime = std::chrono::system_clock::now().time_since_epoch();
// 将时间戳转换成秒
long long timestamp = std::chrono::duration_cast<std::chrono::seconds>(currentTime).count();
for (int i = 0; i < poolsize; i++) {
auto con = std::move(pool_.front());
pool_.pop();
// RAII的思想
Defer defer([this, &con]() {
pool_.push(std::move(con));
});
if (timestamp - con->_last_oper_time < 5) {
continue;
}
try {
// 用该连接创建一个声明,让声明去执行
std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());
stmt->executeQuery("SELECT 1");
con->_last_oper_time = timestamp;
std::cout << "excute timer alive query , cur is " << timestamp << std::endl;
}
catch (sql::SQLException& e) {
std::cout << "Error keeping connection alive : " << e.what() << std::endl;
// 重新创建连接并替换旧的连接
sql::mysql::MySQL_Driver* driver = sql::mysql::get_mysql_driver_instance();
auto* newcon = driver->connect(url_, user_, pass_);
newcon->setSchema(schema_);
con->_con.reset(newcon);
con->_last_oper_time = timestamp;
}
}
}
std::unique_ptr<SqlConnection> getConnection() {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] {
if (b_stop_) {
return true;
}
return !pool_.empty();});
if (b_stop_) {
return nullptr;
}
std::unique_ptr<SqlConnection> con(std::move(pool_.front()));
pool_.pop();
return con;
}
void returnConnection(std::unique_ptr<SqlConnection> con) {
std::unique_lock<std::mutex> lock(mutex_);
if (b_stop_) {
return;
}
pool_.push(std::move(con));
cond_.notify_one();
}
void Close() {
b_stop_ = true;
cond_.notify_all();
}
~MySqlPool() {
std::unique_lock<std::mutex> lock(mutex_);
while (!pool_.empty()) {
pool_.pop();
}
}
private:
std::string url_;// Mysql所在的IP地址
std::string user_;// Mysql连接的用户名
std::string pass_;// Mysql连接需要的密码
std::string schema_;//Mysql所需要使用的数据库
int poolSize_;
std::queue<std::unique_ptr<SqlConnection>> pool_;
std::mutex mutex_;
std::condition_variable cond_;
std::atomic<bool> b_stop_;
std::thread _check_thread;// 心跳机制,告诉Mysql还需要使用连接
};
class MysqlDao
{
public:
MysqlDao();
~MysqlDao();
int RegUser(const std::string& name, const std::string& email, const std::string& pwd);
private:
std::unique_ptr<MySqlPool> pool_;
};
#include "MysqlDao.h"
#include "ConfigMgr.h"
using namespace std;
MysqlDao::MysqlDao()
{
auto& cfg = ConfigMgr::Inst();
const auto& host = cfg["Mysql"]["Host"];
const auto& port = cfg["Mysql"]["Port"];
const auto& pwd = cfg["Mysql"]["Passwd"];
const auto& schema = cfg["Mysql"]["Schema"];
const auto& user = cfg["Mysql"]["User"];
pool_.reset(new MySqlPool(host + ":" + port, user, pwd, schema, 5));
}
MysqlDao::~MysqlDao() {
pool_->Close();
}
int MysqlDao::RegUser(const std::string& name, const std::string& email, const std::string& pwd)
{
auto con = pool_->getConnection();
try {
if (con == nullptr) {
return false;
}
// 准备调用存储过程
unique_ptr < sql::PreparedStatement > stmt(con->_con->prepareStatement("CALL reg_user(?,?,?,@result)"));
// 设置输入参数
stmt->setString(1, name);
stmt->setString(2, email);
stmt->setString(3, pwd);
// 由于PreparedStatement不直接支持注册输出参数,我们需要使用会话变量或其他方法来获取输出参数的值
// 执行存储过程
stmt->execute();
// 如果存储过程设置了会话变量或有其他方式获取输出参数的值,你可以在这里执行SELECT查询来获取它们
// 例如,如果存储过程设置了一个会话变量@result来存储输出结果,可以这样获取:
unique_ptr<sql::Statement> stmtResult(con->_con->createStatement());
unique_ptr<sql::ResultSet> res(stmtResult->executeQuery("SELECT @result AS result"));
if (res->next()) {
int result = res->getInt("result");
cout << "Result: " << result << endl;
pool_->returnConnection(std::move(con));
return result;
}
pool_->returnConnection(std::move(con));
return -1;
}
catch (sql::SQLException& e) {
pool_->returnConnection(std::move(con));
std::cerr << "SQLException: " << e.what();
std::cerr << " (MySQL error code: " << e.getErrorCode();
std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl;
return -1;
}
}
#include "const.h"
#include "MysqlDao.h"
class MysqlMgr : public Singleton<MysqlMgr>
{
friend class Singleton<MysqlMgr>;
public:
~MysqlMgr();
int RegUser(const std::string& name, const std::string& email, const std::string& pwd);
private:
MysqlMgr();
MysqlDao _dao;
};
#include "MysqlMgr.h"
MysqlMgr::~MysqlMgr() {
}
int MysqlMgr::RegUser(const std::string& name, const std::string& email, const std::string& pwd)
{
return _dao.RegUser(name, email, pwd);
}
MysqlMgr::MysqlMgr() {
}
使用json传递数据
nlohmann::json executeQuery(const nlohmann::json& queryJson) {
nlohmann::json resultJson;
try {
std::string query = queryJson.at("query");
auto params = queryJson.value("params", nlohmann::json::array());
std::unique_ptr<sql::PreparedStatement> pstmt(connection->prepareStatement(query));
// 设置参数
for (size_t i = 0; i < params.size(); ++i) {
pstmt->setString(i + 1, params[i].get<std::string>());
}
if (query.substr(0, 6) == "SELECT") {
std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());
while (res->next()) {
nlohmann::json row;
for (int i = 1; i <= res->getMetaData()->getColumnCount(); ++i) {
row[res->getMetaData()->getColumnName(i)] = res->getString(i);
}
resultJson.push_back(row);
}
} else {
int affectedRows = pstmt->executeUpdate();
resultJson["affected_rows"] = affectedRows;
}
} catch (sql::SQLException& e) {
std::cerr << "执行查询失败: " << e.what() << std::endl;
}
return resultJson;
}
nlohmann::json queryJson = {
{"query", "SELECT * FROM users WHERE age > ?"},
{"params", {20}}
};
nlohmann::json results = adapter.executeQuery(queryJson);
std::cout << "查询结果: " << results.dump(4) << std::endl;
// 示例插入
nlohmann::json insertJson = {
{"query", "INSERT INTO users (name, age) VALUES (?, ?)"},
{"params", {"Alice", 25}}
};
nlohmann::json insertResult = adapter.executeQuery(insertJson);
std::cout << "插入的行数: " << insertResult["affected_rows"] << std::endl;