目录
第一节:代码实现
1-1.消息持久化管理思想
1-2.MessageMapper类
1-3.QueueMessage类
1-4.MessageManager
第二节:单元测试
下期预告:
消息管理模块在mqserver下实现。
第一节:代码实现
消息管理首先需要消息类,它之前在mq/mq_msg.pd.h中已经定义了,所以不用再定义,包含头文件即可。
创建一个名为mq_message.hpp的文件,并添加"老生常谈"的内容:
#ifndef __M_MESSAGE_H__
#define __M_MESSAGE_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <list>
namespace zd
{};
#endif
1-1.消息持久化管理思想
消息的管理是每个队列各自私有一个文件,这样某个队列操作时就不会影响其他队列了,文件名设置为"队列名.mqd"。
当一条持久化消息被推送后,就需要删除它,但是并不是直接将它的从文件抹去,而是将其存储的有效性标志设置为"0",它就变成了无效消息。
当消息大于2000且无效消息占比超过50%时,就会启动垃圾回收函数,它会将文件中的消息全部读取,如果读取到的消息是有效的,就保存在一个临时文件(.mqd.tmp)中,如果是无效的就不做处理,继续读取下一条。
当消息读取完毕后,就删除原来的.mqd文件,将临时文件重命名成"队列名.mpd"。
消息在文件中存储时,为了解决读取太长或者太短的问题,会先存储一个长度,这个长度占用4字节的数据,然后存储消息,即:
长度|消息|长度|消息|长度|消息|长度|消息|长度|消息|长度|消息.............
这样就可以保存每次读取的消息完整了。
1-2.MessageMapper类
先在前面定义一些符号,方便使用:
#define DATAFILE_SUBFIX ".mqd" // 数据文件后缀 #define TMPFILE_SUBFIX ".mqd.tmp" // 临时文件后缀 using MessagePtr = std::shared_ptr<Message>;
一个MessageMapper实例仅仅管理一个队列的持久化消息,所以它的构造函数是路径和队列名:
// 一个消息队列的持久化管理类 class MessageMapper { public: // base_dir:数据文件路径 MessageMapper(std::string& base_dir,const std::string& qname): _qname(qname) { // 使用FileHelper创建 /base_dir/qname.mpd 文件 if(base_dir.back() != '/') base_dir.push_back('/'); _datafile = base_dir+qname+DATAFILE_SUBFIX; _tmpfile = base_dir+qname+TMPFILE_SUBFIX; assert(FileHelper::createDirectory(base_dir)); // 文件不存在才创建,否则会清空文件内容 if(FileHelper(_datafile).exists() == false) { // 创建文件 FileHelper::createDirectory(FileHelper::parentDirectory(_datafile)); FileHelper::createFile(_datafile); } } private: std::string _qname; std::string _datafile; // 含路径的数据文件名 std::string _tmpfile; // 含路径的临时文件名 };
创建数据文件的接口:
// 创建数据文件 bool createMsgFile() { bool ret = FileHelper::createFile(_datafile); if(ret == false) { LOG("创建队列文件失败,文件名%s",_datafile.c_str()); return false; } return true; }
清理队列数据的接口:
// 删除数据文件/临时文件 void removeMsgFile() { FileHelper::removeFile(_datafile); FileHelper::removeFile(_tmpfile); }
向数据文件持久化消息的接口:
// 向数据文件插入消息 bool insert(MessagePtr& msg) { return insert(_datafile,msg); }
private: bool insert(const std::string& filename,MessagePtr& msg) { // 1.序列化消息的属性和内容 std::string body = msg->payload().SerializeAsString(); // 2.获取长度 FileHelper helper(filename); size_t fsize = helper.size(); size_t len = body.size(); // 3.写入数据 // 3.1.写入4字节长度 if(helper.write((char*)&len,fsize,sizeof(size_t)) == false) { LOG("向队列文件写入失败,文件:%s",filename.c_str()); return false; } // 3.2.写入主体内容 if(helper.write(body.c_str(),fsize+sizeof(size_t),body.size()) == false) { LOG("向队列文件写入失败,文件:%s",filename.c_str()); return false; } // 4.更新msg的实际存储位置和长度 msg->set_offset(fsize+sizeof(size_t)); msg->set_length(body.size()); return true; }
从数据文件中删除消息,只需要将有效位变成"0"即可:
// 删除消息 bool remove(MessagePtr& msg) { // 1.有效标志设置为false,0 msg->mutable_payload()->set_valid("0"); // 2.序列化msg std::string body = msg->payload().SerializeAsString(); // 判断长度,防止后面的消息被覆盖 if(body.size() != msg->length()) { LOG("不能修改消息有效性,因为新旧数据长度不一致"); return false; } // 3.写入修改后的数据 FileHelper helper(_datafile); if(helper.write(body.c_str(),msg->offset(),body.size()) == false) { LOG("向队列文件写入失败,文件:%s",_datafile.c_str()); return false; } return true; }
最后是最重要的加载历史消息的函数,它也是垃圾回收函数,一定要弄懂它的工作逻辑:
// 加载历史有效消息/垃圾回收 std::list<MessagePtr> gc() { // 0.创建临时文件 FileHelper::createFile(_tmpfile); std::list<MessagePtr> result; // 1.读取出文件中所有有效数据 FileHelper helper(_datafile); size_t offset = 0; size_t fsize = helper.size(); while(offset < fsize) { // 先读取消息长度 size_t len; if(helper.read((char*)&len,offset,sizeof(size_t))==false) { LOG("读取消息长度时发生错误"); abort(); } offset+=sizeof(size_t); // 再读取实际内容 std::string body; body.resize(len); if(helper.read(&body[0],offset,len)==false) { LOG("读取消息主体时发生错误"); abort(); } offset+=len; // 反序列化 MessagePtr msgp = std::make_shared<Message>(); msgp->mutable_payload()->ParseFromString(body); // 如果是无效消息,直接读取下一个 if(msgp->payload().valid() == "0") { LOG("加载到无效消息 %s",msgp->payload().body().c_str()); continue; } // 如果是有效消息,保存起来 else { LOG("加载到有效消息 %s",msgp->payload().body().c_str()); result.push_back(msgp); } } // 2.将有效消息序列化存储到临时文件 for(auto& msg:result) { if(insert(_tmpfile,msg) == false) { LOG("向临时文件写入失败"); return result; } } // 3. 删除原来的文件 if(helper.removeFile()==false) { LOG("源文件删除失败"); return result; } // 4.将临时文件"转正" if(FileHelper::rename(_tmpfile,_datafile) == false) { LOG("临时文件名修改失败"); return result; } return result; }
这个函数不仅在垃圾回收时会调用,在服务器初始化,拉取历史消息时也会调用,它会只保存有效消息,这与垃圾回收的理念不谋而合。
1-3.QueueMessage类
该类有一个MessageMapper句柄,它不仅管理单个队列的持久化消息,也会管理非持久化消息。
构造函数和成员变量如下:
class QueueMessage { public: using ptr = std::shared_ptr<QueueMessage>; QueueMessage(std::string& base_dir,const std::string& qname): _mapper(base_dir,qname), _qname(qname), _valid_count(0), _total_count(0) {} private: std::mutex _mtx; std::string _qname; size_t _valid_count; // 持久化有效消息的数量 size_t _total_count; // 持久化消息的数量,有效+无效 std::list<MessagePtr> _waitpush_msgs; // 待推送消息 std::unordered_map<std::string,MessagePtr> _durable_msgs; // 持久化消息,不包括无效消息 std::unordered_map<std::string,MessagePtr> _waitack_msgs; // 待确认消息 MessageMapper _mapper; // 持久化管理句柄 };
(1)_waitpush_msg:记录队列中还没有推送给订阅者的消息
(2)_durable_msgs:记录队列中还未推送的有效的持久化消息
(3)_waitack_msgs:记录已经推送,等待订阅者确认收到,然后删除的消息。
第一个函数是恢复历史消息的函数,封装MessageMapper句柄的gc()接口,然后将提取到的消息(在文件中的必然是持久化消息),保存在 _durable_msgs 中:
// 恢复历史消息 bool recovery() { // 恢复历史消息 std::unique_lock<std::mutex> lock(_mtx); _waitpush_msgs = _mapper.gc(); for(auto& msg:_waitpush_msgs) { _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg)); } _valid_count = _total_count = _durable_msgs.size(); return true; }
然后是向队列插入消息的函数,一般来说,调用者应该给我们一个消息属性 bp ,如果给了,就把消息设置成给的属性。
如果没有给,就设置成一些默认值。
消息在内存(_watipush_msg)中管理起来之后,如果需要持久化,还要在数据文件中保存起来。
// 向队列插入消息 bool insert(const BasicProperties* bp,const std::string& body,bool queue_durable) { // 1.构造消息对象 MessagePtr msg = std::make_shared<Message>(); msg->mutable_payload()->set_body(body); if(bp != nullptr) { // 如果消息设置为持久化,还需要队列也是持久化才进行持久化存储 DeliveryMode mode = queue_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE; msg->mutable_payload()->mutable_properties()->set_id(bp->id()); msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode); msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key()); } else { // 如果消息没有设置是否持久化,那么队列持久化,消息默认持久化 DeliveryMode mode = queue_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE; msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid()); msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode); msg->mutable_payload()->mutable_properties()->set_routing_key(""); } std::unique_lock<std::mutex> lock(_mtx); // 2.判断是否需要持久化 if(msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE) { msg->mutable_payload()->set_valid("1"); // 3.持久化存储 if(_mapper.insert(msg) == false) { LOG("消息持久化失败,消息内容:%s",body.c_str()); return false; } // 添加到持久化管理 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg)); _valid_count+=1; _total_count+=1; } // 4.添加到待推送 _waitpush_msgs.push_back(msg); return true; }
其次是获取队首消息的接口,它是队列推送消息的第一步——取出消息:
// 获取待推送队首的消息 zd::MessagePtr front() { std::unique_lock<std::mutex> lock(_mtx); if(_waitpush_msgs.empty()) return nullptr; // 取出要推送的消息 MessagePtr msg = _waitpush_msgs.front(); _waitpush_msgs.pop_front(); // 将消息放到待确认,等待确认后删除 _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(),msg)); return msg; }
然后是删除消息的接口,当服务器收到订阅者的应答后,就会调用它删除消息:
// 删除一条消息 bool remove(const std::string& msg_id) { std::unique_lock<std::mutex> lock(_mtx); // 1.从待确认中删除 auto it = _waitack_msgs.find(msg_id); if(it == _waitack_msgs.end()) { // 没有不需要删除 LOG("要删除的待确认消息不存在"); return true; } //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // _waitack_msgs.erase(msg_id); // 此处就调用是不对的,调用之后消息对象就自动析构了, // 而持久化删除时还需要消息的长度(length)和存储位置(offset) //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // 2.从持久化中删除 if(it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE) { if(_mapper.remove(it->second) == false) { LOG("消息去持久化失败"); return false; } _durable_msgs.erase(msg_id); // 文件中有效少1,无效多1,所以_total_count不变 _valid_count-=1; // 判断是否需要进行垃圾回收 gc(); } _waitack_msgs.erase(msg_id); return true; }
remove()接口的gc(),就是封装了MessageMapper中的gc(),专门用来进行垃圾回收:
private: bool GccCheck() { // 消息数大于2000 && 有效消息比例小于50% return _total_count > 2000 && _valid_count*10/_total_count < 0.5*10; } void gc() { if(GccCheck()) { auto msgs = _mapper.gc(); // 遍历加载的所有有效消息,并将新位置更新到_durable_msgs中 for(auto& msg:msgs) { auto it = _durable_msgs.find(msg->payload().properties().id()); if(it == _durable_msgs.end()) { LOG("垃圾回收后,有一条持久化消息未在内存中进行管理"); // 该消息已经在文件了,只在_waitpush_msgs和_durable_msgs中进行添加即可 _waitpush_msgs.push_back(msg); _durable_msgs.insert(std::pair<std::string,zd::MessagePtr>(msg->payload().properties().id(),msg)); } else { it->second->set_offset(msg->offset()); it->second->set_length(msg->length()); } } _valid_count = _total_count = msgs.size(); } }
最后是一些与单元测试有关的接口,实际工作中并不使用:
size_t waitpush_count() { std::unique_lock<std::mutex> lock(_mtx); return _waitpush_msgs.size(); } size_t waitack_count() { std::unique_lock<std::mutex> lock(_mtx); return _waitack_msgs.size(); } size_t durable_count() { std::unique_lock<std::mutex> lock(_mtx); return _durable_msgs.size(); } size_t total() { std::unique_lock<std::mutex> lock(_mtx); return _total_count; } void clear() { _mapper.removeMsgFile(); _waitack_msgs.clear(); _waitpush_msgs.clear(); _durable_msgs.clear(); }
1-4.MessageManager
一个队列可以管理之后,就可以实现管理多个队列的模块了。
MessageManager有一个unordered_map成员,它将队列名与该队列的管理句柄映射起来:
// 管理所有队列 class MessageManager { public: using ptr = std::shared_ptr<MessageManager>; MessageManager(const std::string& base_dir): _base_dir(base_dir) {} private: std::mutex _mtx; std::string _base_dir; std::unordered_map<std::string,QueueMessage::ptr> _queue_msgs; };
添加一个队列管理句柄的接口:
// 添加一个消息队列 void initQueueMesssge(const std::string& qname) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it != _queue_msgs.end()) { return; } qmp = std::make_shared<QueueMessage>(_base_dir,qname); _queue_msgs.insert(std::make_pair(qname,qmp)); } // 恢复历史数据,如果有的话 qmp->recovery(); }
上述代码的_mtx并不会把 qmp->recovery(); 锁起来,因为每个队列的管理是独立的,队列执行操作自己不影响 MessageManager 操作其他队列,否则线程还需要等待队列操作完毕才能释放锁,这种写法提升了效率。
移除一个队列的管理句柄:
// 删除一个消息队列 void destoryQueueMessage(const std::string& qname) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it != _queue_msgs.end()) { return; } qmp = it->second; _queue_msgs.erase(qname); } qmp->clear(); }
向指定队列新增消息:
// 向指定消息队列插入消息 bool insert(const std::string& qname,BasicProperties* bp,const std::string& body,bool queue_durable) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("向消息队列插入消息失败,消息队列 %s 不存在",qname.c_str()); return false; } qmp = it->second; } return qmp->insert(bp,body,queue_durable); }
获取指定队列的队首消息:
// 获取指定队列队首消息 MessagePtr front(const std::string& qname) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("向消息队列获取消息失败,消息队列 %s 不存在",qname.c_str()); return nullptr; } qmp = it->second; } return qmp->front(); }
确认/删除指定队列的消息:
// 确认指定队列中的消息 bool ack(const std::string& qname,const std::string& msg_id) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("向消息队列确认消息失败,消息队列 %s 不存在",qname.c_str()); return false; } qmp = it->second; } return qmp->remove(msg_id); }
单元测试相关接口:
size_t waitpush_count(const std::string& qname) { { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("获取消息队列待推送消息数量失败,消息队列 %s 不存在",qname.c_str()); return 0; } } return _queue_msgs[qname]->waitpush_count(); } size_t waitack_count(const std::string& qname) { QueueMessage::ptr qmp; { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("获取消息队列待确认消息数量失败,消息队列 %s 不存在",qname.c_str()); return 0; } qmp = it->second; } return qmp->waitack_count(); } size_t durable_count(const std::string& qname) { { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("获取消息队列持久化有效消息数量失败,消息队列 %s 不存在",qname.c_str()); return 0; } } return _queue_msgs[qname]->durable_count(); } size_t total(const std::string& qname) { { std::unique_lock<std::mutex> lock(_mtx); auto it = _queue_msgs.find(qname); if(it == _queue_msgs.end()) { LOG("获取消息队列总持久化消息数量失败,消息队列 %s 不存在",qname.c_str()); return 0; } } return _queue_msgs[qname]->total(); } void clear() { std::unique_lock<std::mutex> lock(_mtx); for(auto& qmsg:_queue_msgs) { qmsg.second->clear(); } }
第二节:单元测试
在mqtest目录下创建mq_message_test.cc的文件,并测试以下内容:
#include "../mqserver/mq_message.hpp" #include <gtest/gtest.h> #include <iostream> #include <unordered_map> zd::MessageManager::ptr mmp; // 全局测试套件------------------------------------------------ // 自己初始化自己的环境,使不同单元测试之间解耦 class MessageTest :public testing::Environment { public: // 全部单元测试之前调用一次 virtual void SetUp() override { // std::cout << "单元测试执行前的环境初始化" << std::endl; mmp = std::make_shared<zd::MessageManager>("./data/"); mmp->initQueueMesssge("q1"); } // 全部单元测试之后调用一次 virtual void TearDown() override { // std::cout << "单元测试执行后的环境清理" << std::endl; // mmp->clear(); } }; // 单元测试 // 测试名称与类名称相同,则会先调用SetUp TEST(MessageTest,MessageTest_test1_Test) { std::cout << "单元测试-1" << std::endl; // 插入一些消息 zd::BasicProperties bp; bp.set_delivery_mode(zd::DeliveryMode::DURABLE); bp.set_id(zd::UUIDHelper::uuid()); bp.set_routing_key(""); mmp->insert("q1",&bp,"Hello World-1",zd::DeliveryMode::DURABLE); mmp->insert("q1",nullptr,"Hello World-2",zd::DeliveryMode::DURABLE); mmp->insert("q1",nullptr,"Hello World-3",zd::DeliveryMode::DURABLE); mmp->insert("q1",nullptr,"Hello World-4",zd::DeliveryMode::DURABLE); ASSERT_EQ(mmp->durable_count("q1"),4); ASSERT_EQ(mmp->total("q1"),4); ASSERT_EQ(mmp->waitack_count("q1"),0); ASSERT_EQ(mmp->waitpush_count("q1"),4); // 删除一条消息 mmp->front("q1"); mmp->ack("q1",bp.id()); ASSERT_EQ(mmp->durable_count("q1"),3); ASSERT_EQ(mmp->total("q1"),4); ASSERT_EQ(mmp->waitack_count("q1"),0); ASSERT_EQ(mmp->waitpush_count("q1"),3); } // 单元测试全部结束后调用TearDown // ---------------------------------------------------------- int main(int argc,char** argv) { testing::InitGoogleTest(&argc,argv); testing::AddGlobalTestEnvironment(new MessageTest); // 注册Test的所有单元测试 if(RUN_ALL_TESTS() != 0) // 运行所有单元测试 { printf("单元测试失败!\n"); } return 0; }
因为使用了BasicProperties构造了一个类,所以编译时还需要加上 mq_msg.pb.cc文件:
mq_message_test:mq_message_test.cc ../mqcommon/mq_msg.pb.cc g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf
执行结果:
再测试拉取历史数据的功能,将insert全部注释掉,保留第一条消息的insert(因为它被删除了,重新插入一下),将删除消息也注释掉:
TEST(MessageTest,MessageTest_test1_Test) { std::cout << "单元测试-1" << std::endl; // 插入一些消息 zd::BasicProperties bp; bp.set_delivery_mode(zd::DeliveryMode::DURABLE); bp.set_id(zd::UUIDHelper::uuid()); bp.set_routing_key(""); mmp->insert("q1",&bp,"Hello World-1",zd::DeliveryMode::DURABLE); // mmp->insert("q1",nullptr,"Hello World-2",zd::DeliveryMode::DURABLE); // mmp->insert("q1",nullptr,"Hello World-3",zd::DeliveryMode::DURABLE); // mmp->insert("q1",nullptr,"Hello World-4",zd::DeliveryMode::DURABLE); ASSERT_EQ(mmp->durable_count("q1"),4); ASSERT_EQ(mmp->total("q1"),4); ASSERT_EQ(mmp->waitack_count("q1"),0); ASSERT_EQ(mmp->waitpush_count("q1"),4); // 删除一条消息 // mmp->front("q1"); // mmp->ack("q1",bp.id()); // ASSERT_EQ(mmp->durable_count("q1"),3); // ASSERT_EQ(mmp->total("q1"),4); // ASSERT_EQ(mmp->waitack_count("q1"),0); // ASSERT_EQ(mmp->waitpush_count("q1"),3); }
执行结果:
第一条是无效消息,后三条都是有效消息,加上又插入的1条消息,共四条消息,仍然不会报错。
那么消息管理模块也完成了。
下期预告:
交换机管理模块、队列管理模块、绑定管理模块、消息管理模块都完成了,之后将这四个模块整合成虚拟机模块。
虚拟机模块实际上很简单,只需要封装四个子模块的接口即可。