RocketMQ源码解析-Broker部分之Broker启动过程

文章目录[隐藏]

目录

    • broker启动流程
    • broker启动可配置参数
    • 启动入口`BrokerStartup`
      • 1.创建brokerController
      • 2.`BrokerController`构造函数
      • 3.BrokerController初始化`initialize()`
        • 3.1注册消息处理器`registerProcessor`
        • 3.2初始化事务消息相关的服务`initialTransaction()`
        • 3.3`initialize`总结
      • 4.BrokerControler的`start`

broker启动流程

借用一下【秃头爱健身】博主的的图,我觉得画的很好。
在这里插入图片描述

broker启动可配置参数

 -n : 指定broker 的 namesrvAddr 地址;
-h :打印命令;
-c :指定配置文件的路径;
-p :启动时候日志打印配置信息;
-m :启动时候日志打印导入的配置信息。

启动入口BrokerStartup

broker启动的入口是在brokerStartup,方法是main

    public static void main(String[] args) {
//创建BrokerController
start(createBrokerController(args));
}

1.创建brokerController

public static BrokerController createBrokerController(String[] args) {
//设置RocketMq的版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//设置broker的netty客户端的发送缓冲大小,默认是128 kb
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
//设置broker的netty客户端的接受缓冲大小,默认是128 kb
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
//命令行选项解析
Options options = ServerUtil.buildCommandlineOptions(new Options());
//解析命令行为 ‘mqbroker’的参数
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
//如果为空,直接退出
if (null == commandLine) {
System.exit(-1);
}
//创建broker,netty的相关配置对象
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
//是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口
nettyServerConfig.setListenPort(10911);
//创建消息存储配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//如果broker是slave节点
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
//比默认的40% 还要小 10
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
//设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存,
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
//解析命令行参数'-c':指定broker的配置文件路径
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//检查broker配置中的nameServer地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
//检查broker的角色
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
//如果是master节点,则设置该节点brokerId=0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
//设置消息存储配置的高可用端口,10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
//解析命令行参数'-p':启动时候日志打印配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
}
//解析命令行参数'-m':启动时候日志打印导入的配置信息
else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
//创建BrokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// 记住所有的配置以防止丢弃
controller.getConfiguration().registerConfig(properties);
//初始化BrokerController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注册关闭的钩子方法
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
//BrokerController的销毁方法
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

broker启动会默认启动3个端口

端口 说明
10911 接收消息推送的端口
10912 消息存储配置的高可用端口
10909 推送消息的VIP端口

2.BrokerController构造函数

构造入参说明

参数 类型 说明
brokerConfig BrokerConfig 封装Broker的基本配置信息
nettyServerConfig NettyServerConfig 封装了broker作为对外提供消息读写操作的MQ服务器信息
nettyClientConfig NettyClientConfig 封装了broker作为NameServer的客户端的信息
messageStoreConfig MessageStoreConfig 封装消息存储Store的配置信息
public  BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// BrokerStartup中准备的配置信息
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件
this.consumerOffsetManager = new ConsumerOffsetManager(this);
//消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系,  会读取store/config/topics.json
this.topicConfigManager = new TopicConfigManager(this);
//Consumer端使用pull的方式向Broker拉取消息请求的处理类
this.pullMessageProcessor = new PullMessageProcessor(this);
//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类
this.pullRequestHoldService = new PullRequestHoldService(this);
//有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
//消费者id变化监听器
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
//消费者管理类,并对消费者id变化进行监听
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
//消费者过滤类,按照Topic进行分类,  会读取store/config/consumerFilter.json
this.consumerFilterManager = new ConsumerFilterManager(this);
//生产者管理 按照group进行分类
this.producerManager = new ProducerManager();
//客户端心跳连接处理类
this.clientHousekeepingService = new ClientHousekeepingService(this);
//Console控制台获取Broker信息使用
this.broker2Client = new Broker2Client(this);
//订阅关系管理类
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
//Broker对外访问的API
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
//FilterServer管理类
this.filterServerManager = new FilterServerManager(this);
//Broker主从同步进度管理类
this.slaveSynchronize = new SlaveSynchronize(this);
// 各种线程池的阻塞队列
// 发送消息线程池队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}

重点对一些核心类进行说明

参数 说明
ConsumerOffsetManager Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件
topicConfigManager 消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json
pullMessageProcessor Consumer端使用pull的方式向Broker拉取消息请求的处理类
pullRequestHoldService Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类
messageArrivingListener 有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法
consumerIdsChangeListener 消费者id变化监听器
consumerManager 消费者管理类,并对消费者id变化进行监听
consumerFilterManager 消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json
producerManager 生产者管理 按照group进行分类
clientHousekeepingService 客户端心跳连接处理类
broker2Client Console控制台获取Broker信息使用
subscriptionGroupManager 订阅关系管理类
brokerOuterAPI Broker对外访问的API
filterServerManager FilterServer管理类
slaveSynchronize Broker主从同步进度管理类

3.BrokerController初始化initialize()

public boolean initialize() throws CloneNotSupportedException {
//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.json
boolean result = this.topicConfigManager.load();
//加载 不同的Consumer消费的进度情况  文件地址为 {user.home}/store/config/consumerOffset.json
result = result && this.consumerOffsetManager.load();
//加载 订阅关系  文件地址  {user.home}/store/config/subscriptionGroup.json
result = result && this.subscriptionGroupManager.load();
//加载 Consumer的过滤信息配置  文件地址  {user.home}/store/config/consumerFilter.json
result = result && this.consumerFilterManager.load();
//如果加载成功
if (result) {
try {
//创建消息存储类messageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandler
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
//broker消息统计类
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
//加载消息的日志文件,包含CommitLog,ConsumeQueue等
result = result && this.messageStore.load();
//如果加载成功
if (result) {
//开启服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
//设置10909的服务端口
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
//开启10909的服务端口,这个端口只给生产者使用
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//处理消息生产者发送的生成消息请求相关的线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//处理消费者发出的消费消息请求相关的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//处理回复消息api的线程池
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
//查询线程
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
//省略一些线程池
//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法
this.registerProcessor();
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//每天执行一次,统计昨天put的message和get的message
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 默认5s执行一次,会把消费这的偏移量存到文件中  ${user.home}/store/config/consumerOffset.json.json
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.json
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 每3分钟,当消费者消费太慢,会禁用到消费者组
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Size
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
//更新nameServer地址
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
//没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 每隔2分钟从name-server地址服务器拉取最新的配置
// 这个是实现name-server动态增减的唯一方法   
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//定时打印master与slave的差距
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
//初始化事务消息相关的服务
initialTransaction();
//初始化权限管理器
initialAcl();
//初始化RPC调用的钩子
initialRpcHooks();
}
return result;
}

3.1注册消息处理器registerProcessor

rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求

 public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
//省略其他的处理器      

最终放到processorTable的map中

    @Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}

相关的RequestCode说明一下:

事件名 code 说明
SEND_MESSAGE 10 生产者发送信息
SEND_MESSAGE_V2 310 生产者发送信息
SEND_BATCH_MESSAGE 320 批量发送消息
CONSUMER_SEND_MSG_BACK 36 消费端消费失败的时候返回的消息
PULL_MESSAGE 11 消费者拉取消息
SEND_REPLY_MESSAGE 324 消费者回包消息,可以用类似RPC调用

3.2初始化事务消息相关的服务initialTransaction()

服务加载方式是Java的SPI方式。

 private void initialTransaction() {
//加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImpl
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
//AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
//设置对应的brokerController到AbstractTransactionalMessageCheckListener中
this.transactionalMessageCheckListener.setBrokerController(this);
//创建TransactionalMessageCheckService,服务是周期检查事务的服务,
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
}

3.3initialize总结

initialize方法相应的逻辑相对来说比较多,稍微总结为已下几步:

1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。
在这里插入图片描述

2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。

4.BrokerControler的start

 controller.start();
public void start() throws Exception {
if (this.messageStore != null) {
// 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁,
// 以确保在broker运行期间只有一个broker实例操作/store目录
this.messageStore.start();
}
if (this.remotingServer != null) {
// 启动Netty服务监听10911端口,对外提供服务(消息生产、消费)
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
// 监听10909端口
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
// fileWatchService与TLS有关,todo tls解析
this.fileWatchService.start();
}
if (this.brokerOuterAPI != null) {
// 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
// 长轮询机制hold住拉取消息请求的服务
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
// 每10s检查一遍非活动的连接服务
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 处理HA
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 向所有的nameserver发送本机所有的主题数据;
// 包括主题名、读队列个数、写队列个数、队列权限、是否有序等
this.registerBrokerAll(true, false, true);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时向NameServer注册Broker,最小每10s。
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
// Broker信息统计,这个没有具体的实现;所以暂时不用管
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
// Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息
this.brokerFastFailure.start();
}
}

对这几个服务进行说明一下:

服务名 类型 说明
messageStore DefaultMessageStore 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等
remotingServer RemotingServer Broker的服务端,处理消费者和生产者的请求
fastRemotingServer RemotingServer 只给消息生产者的服务端
fileWatchService FileWatchService 启动监控服务连接时用到的SSL连接文件的服务
brokerOuterAPI BrokerOuterAPI RocketMQ控制台跟Broker交互时候的客户端
pullRequestHoldService PullRequestHoldService 处理push模式消费,或者延迟消费的服务
clientHousekeepingService ClientHousekeepingService 心跳连接用的服务
filterServerManager FilterServerManager 消息过滤的服务
transactionalMessageCheckService TransactionalMessageCheckService 定期检查和处理事务消息的服务
slaveSynchronize SlaveSynchronize 主从之间topic,消费偏移等信息同步用的


微信扫描下方的二维码阅读更多精彩内容

RocketMQ源码解析-Broker部分之Broker启动过程

每日分享到群里,或者推荐给朋友会得大量积分,机会可以兑换微信零钱红包,具体请点击这里,得到了微信红包可以用来支持大飞哥

大飞哥能不能加鸡腿就看各位了!

赞赏请扫

开发者微信

大飞哥微信

开发者微信反馈BUG或者VIP可以添加,其他情况反馈可能不及时,见谅

版权声明

初衷是提供高清手机电脑壁纸等图片素材免费分享下载,禁止商用。图片素材来源网络,版权归原作者所有,若有侵权问题敬请告知我们!

【友情提醒】:

因平台原因不易展示大尺度写真,有的写真展示越少代表此套写真越性感,特别是xiuren等写真每一套写真完整套图50-100张不等。更多内容的欣赏请移步 点击这里

【更多图集移步】: 每日更新-点击这里
漂亮小姐姐-点击这里
性感美女-点击这里
清纯女孩-点击这里
xiuren专栏-点击这里
整站资源下载-点击这里

相关新闻