RocketMQ源码解析-Namesrv启动过程

文章目录[隐藏]

目录

    • namesrv官方介绍
    • 启动流程
    • 启动入口`NamesrvStartup`
      • 1.创建NamesrvController
      • 2.`NamesrvController`构造函数
      • 3.NamesrvController初始化`initialize()`
        • 3.1注册消息处理器`registerProcessor()`
      • 4.NamesrvController的start()

namesrv官方介绍

NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。

主要包括两个功能:

  1. Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  2. 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

启动流程

在这里插入图片描述

启动入口NamesrvStartup

namesrv启动的入口是在NamesrvStartup,方法是main

    public static NamesrvController main0(String[] args) {
try {
//创建NamesrvController
NamesrvController controller = createNamesrvController(args);
//启动namesrc
start(controller);
}

1.创建NamesrvController

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//设置Rokcetmq的版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
//命令行选项解析
Options options = ServerUtil.buildCommandlineOptions(new Options());
//解析命令行为 ‘mqnamesrv’的参数
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
//如果为空,直接退出
if (null == commandLine) {
System.exit(-1);
return null;
}
//创建namesrv,netty的相关配置对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置netty的服务端监听的端口 9876
nettyServerConfig.setListenPort(9876);
//解析命令行参数'-c':指定namesrv的配置文件路径
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//解析命令行参数'-p':启动时候日志打印配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//初始化logback配置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//初始化NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
// 记住所有的配置以防止丢弃
controller.getConfiguration().registerConfig(properties);
return controller;
}

2.NamesrvController构造函数

构造入参说明

参数 类型 说明
namesrvConfig NamesrvConfig 封装namesrv的基本配置信息, 主要包括kvConfigPath等
nettyServerConfig NettyServerConfig netty相关服务端配置
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//kv的配置管理类
this.kvConfigManager = new KVConfigManager(this);
//路由信息管理类
this.routeInfoManager = new RouteInfoManager();
//心跳连接处理类
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

重点看一下这个RouteInfoManager构造方法

    public RouteInfoManager() {
//topic跟queue的映射关系,存储每个topic对应的broker名称、queue数量等信息
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
//同一个BrokerName可能对应多台机器,一个Master和多个Slave。这个结构储存着一个BrokerName对应的属性信息,包括所属Cluster名称,一个Master Broker地址和多个SlaveBroker的地址等
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
//储存集群中Cluster的信息,一个cluster下的多个BrokerName
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
//存储这台Broker机器的实时状态,包括上次更新状态的时间戳,NameServer会定期检查这个时间戳,超时没有更新就认为这个Broker无效了,将其从Broker列表里清除。
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
//过滤服务器,是RocketMQ的一种服务端过滤方式,一个Broker可以有一个或多个FilterServer。这个结构的Key是Broker的地址,Value是和这个Broker关联的多个Filter Server的地址
this.filterServerTable = new HashMap<String, List<String>>(256);
}

3.NamesrvController初始化initialize()

 public boolean initialize() {
//加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册默认的处理器
this.registerProcessor();
//定时扫描不活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//定时打印configTable信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}

3.1注册消息处理器registerProcessor()

该方法主要是给remotingServer注册默认的DefaultRequestProcessor处理器,当有请求到达nettyServer时会根据RequestCode处理不同的逻辑

    @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
//省略......
//重点是这个,broker注册请求
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
}

请求最后会转发到RouteInforManager类的registerBroker()方法中

public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}

createAndUpdateQueueData方法

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}

我们以RequestCode.REGISTER_BROKER大致以图的形式总结一下broker注册请求的流程
在这里插入图片描述

4.NamesrvController的start()

start方法相对来说比较简单

    public void start() throws Exception {
//开启服务端
this.remotingServer.start();
}


微信扫描下方的二维码阅读更多精彩内容

RocketMQ源码解析-Namesrv启动过程

每日分享到群里,或者推荐给朋友会得大量积分,机会可以兑换微信零钱红包,具体请点击这里,得到了微信红包可以用来支持大飞哥

大飞哥能不能加鸡腿就看各位了!

赞赏请扫

开发者微信

大飞哥微信

开发者微信反馈BUG或者VIP可以添加,其他情况反馈可能不及时,见谅

版权声明

初衷是提供高清手机电脑壁纸等图片素材免费分享下载,禁止商用。图片素材来源网络,版权归原作者所有,若有侵权问题敬请告知我们!

【友情提醒】:

因平台原因不易展示大尺度写真,有的写真展示越少代表此套写真越性感,特别是xiuren等写真每一套写真完整套图50-100张不等。更多内容的欣赏请移步 点击这里

【更多图集移步】: 每日更新-点击这里
漂亮小姐姐-点击这里
性感美女-点击这里
清纯女孩-点击这里
xiuren专栏-点击这里
整站资源下载-点击这里

相关新闻