RocketMQ源码解析-Broker部分之Broker启动过程
文章目录[隐藏]
目录
-
- broker启动流程
- broker启动可配置参数
- 启动入口`BrokerStartup`
-
- 1.创建brokerController
- 2.`BrokerController`构造函数
- 3.BrokerController初始化`initialize()`
-
- 3.1注册消息处理器`registerProcessor`
- 3.2初始化事务消息相关的服务`initialTransaction()`
- 3.3`initialize`总结
- 4.BrokerControler的`start`
broker启动流程
借用一下【秃头爱健身】博主的的图,我觉得画的很好。
broker启动可配置参数
-n : 指定broker 的 namesrvAddr 地址;
-h :打印命令;
-c :指定配置文件的路径;
-p :启动时候日志打印配置信息;
-m :启动时候日志打印导入的配置信息。
启动入口BrokerStartup
broker启动的入口是在brokerStartup
,方法是main
public static void main(String[] args) {
//创建BrokerController
start(createBrokerController(args));
}
1.创建brokerController
public static BrokerController createBrokerController(String[] args) {
//设置RocketMq的版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//设置broker的netty客户端的发送缓冲大小,默认是128 kb
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
//设置broker的netty客户端的接受缓冲大小,默认是128 kb
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
//命令行选项解析
Options options = ServerUtil.buildCommandlineOptions(new Options());
//解析命令行为 ‘mqbroker’的参数
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
//如果为空,直接退出
if (null == commandLine) {
System.exit(-1);
}
//创建broker,netty的相关配置对象
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
//是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口
nettyServerConfig.setListenPort(10911);
//创建消息存储配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//如果broker是slave节点
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
//比默认的40% 还要小 10
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
//设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存,
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
//解析命令行参数'-c':指定broker的配置文件路径
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//检查broker配置中的nameServer地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
//检查broker的角色
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
//如果是master节点,则设置该节点brokerId=0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
//设置消息存储配置的高可用端口,10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
//解析命令行参数'-p':启动时候日志打印配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
}
//解析命令行参数'-m':启动时候日志打印导入的配置信息
else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
//创建BrokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// 记住所有的配置以防止丢弃
controller.getConfiguration().registerConfig(properties);
//初始化BrokerController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注册关闭的钩子方法
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
//BrokerController的销毁方法
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
broker启动会默认启动3个端口
端口 | 说明 |
---|---|
10911 | 接收消息推送的端口 |
10912 | 消息存储配置的高可用端口 |
10909 | 推送消息的VIP端口 |
2.BrokerController
构造函数
构造入参说明
参数 | 类型 | 说明 |
---|---|---|
brokerConfig | BrokerConfig | 封装Broker的基本配置信息 |
nettyServerConfig | NettyServerConfig | 封装了broker作为对外提供消息读写操作的MQ服务器信息 |
nettyClientConfig | NettyClientConfig | 封装了broker作为NameServer的客户端的信息 |
messageStoreConfig | MessageStoreConfig | 封装消息存储Store的配置信息 |
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// BrokerStartup中准备的配置信息
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件
this.consumerOffsetManager = new ConsumerOffsetManager(this);
//消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json
this.topicConfigManager = new TopicConfigManager(this);
//Consumer端使用pull的方式向Broker拉取消息请求的处理类
this.pullMessageProcessor = new PullMessageProcessor(this);
//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类
this.pullRequestHoldService = new PullRequestHoldService(this);
//有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
//消费者id变化监听器
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
//消费者管理类,并对消费者id变化进行监听
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
//消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json
this.consumerFilterManager = new ConsumerFilterManager(this);
//生产者管理 按照group进行分类
this.producerManager = new ProducerManager();
//客户端心跳连接处理类
this.clientHousekeepingService = new ClientHousekeepingService(this);
//Console控制台获取Broker信息使用
this.broker2Client = new Broker2Client(this);
//订阅关系管理类
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
//Broker对外访问的API
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
//FilterServer管理类
this.filterServerManager = new FilterServerManager(this);
//Broker主从同步进度管理类
this.slaveSynchronize = new SlaveSynchronize(this);
// 各种线程池的阻塞队列
// 发送消息线程池队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
重点对一些核心类进行说明
参数 | 说明 |
---|---|
ConsumerOffsetManager |
Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件 |
topicConfigManager |
消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json |
pullMessageProcessor |
Consumer端使用pull的方式向Broker拉取消息请求的处理类 |
pullRequestHoldService |
Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类 |
messageArrivingListener |
有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法 |
consumerIdsChangeListener |
消费者id变化监听器 |
consumerManager |
消费者管理类,并对消费者id变化进行监听 |
consumerFilterManager |
消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json |
producerManager |
生产者管理 按照group进行分类 |
clientHousekeepingService |
客户端心跳连接处理类 |
broker2Client |
Console控制台获取Broker信息使用 |
subscriptionGroupManager |
订阅关系管理类 |
brokerOuterAPI |
Broker对外访问的API |
filterServerManager |
FilterServer管理类 |
slaveSynchronize |
Broker主从同步进度管理类 |
3.BrokerController初始化initialize()
public boolean initialize() throws CloneNotSupportedException {
//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.json
boolean result = this.topicConfigManager.load();
//加载 不同的Consumer消费的进度情况 文件地址为 {user.home}/store/config/consumerOffset.json
result = result && this.consumerOffsetManager.load();
//加载 订阅关系 文件地址 {user.home}/store/config/subscriptionGroup.json
result = result && this.subscriptionGroupManager.load();
//加载 Consumer的过滤信息配置 文件地址 {user.home}/store/config/consumerFilter.json
result = result && this.consumerFilterManager.load();
//如果加载成功
if (result) {
try {
//创建消息存储类messageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandler
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
//broker消息统计类
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
//加载消息的日志文件,包含CommitLog,ConsumeQueue等
result = result && this.messageStore.load();
//如果加载成功
if (result) {
//开启服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
//设置10909的服务端口
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
//开启10909的服务端口,这个端口只给生产者使用
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//处理消息生产者发送的生成消息请求相关的线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//处理消费者发出的消费消息请求相关的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//处理回复消息api的线程池
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
//查询线程
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
//省略一些线程池
//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法
this.registerProcessor();
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//每天执行一次,统计昨天put的message和get的message
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 默认5s执行一次,会把消费这的偏移量存到文件中 ${user.home}/store/config/consumerOffset.json.json
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.json
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 每3分钟,当消费者消费太慢,会禁用到消费者组
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Size
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
//更新nameServer地址
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
//没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 每隔2分钟从name-server地址服务器拉取最新的配置
// 这个是实现name-server动态增减的唯一方法
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//定时打印master与slave的差距
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
//初始化事务消息相关的服务
initialTransaction();
//初始化权限管理器
initialAcl();
//初始化RPC调用的钩子
initialRpcHooks();
}
return result;
}
3.1注册消息处理器registerProcessor
rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
//省略其他的处理器
最终放到processorTable的map中
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
相关的RequestCode说明一下:
事件名 | code | 说明 |
---|---|---|
SEND_MESSAGE | 10 | 生产者发送信息 |
SEND_MESSAGE_V2 | 310 | 生产者发送信息 |
SEND_BATCH_MESSAGE | 320 | 批量发送消息 |
CONSUMER_SEND_MSG_BACK | 36 | 消费端消费失败的时候返回的消息 |
PULL_MESSAGE | 11 | 消费者拉取消息 |
SEND_REPLY_MESSAGE | 324 | 消费者回包消息,可以用类似RPC调用 |
3.2初始化事务消息相关的服务initialTransaction()
服务加载方式是Java的SPI方式。
private void initialTransaction() {
//加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImpl
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
//AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
//设置对应的brokerController到AbstractTransactionalMessageCheckListener中
this.transactionalMessageCheckListener.setBrokerController(this);
//创建TransactionalMessageCheckService,服务是周期检查事务的服务,
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
}
3.3initialize
总结
initialize
方法相应的逻辑相对来说比较多,稍微总结为已下几步:
1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。
2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。
4.BrokerControler的start
controller.start();
public void start() throws Exception {
if (this.messageStore != null) {
// 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁,
// 以确保在broker运行期间只有一个broker实例操作/store目录
this.messageStore.start();
}
if (this.remotingServer != null) {
// 启动Netty服务监听10911端口,对外提供服务(消息生产、消费)
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
// 监听10909端口
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
// fileWatchService与TLS有关,todo tls解析
this.fileWatchService.start();
}
if (this.brokerOuterAPI != null) {
// 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
// 长轮询机制hold住拉取消息请求的服务
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
// 每10s检查一遍非活动的连接服务
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 处理HA
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 向所有的nameserver发送本机所有的主题数据;
// 包括主题名、读队列个数、写队列个数、队列权限、是否有序等
this.registerBrokerAll(true, false, true);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时向NameServer注册Broker,最小每10s。
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
// Broker信息统计,这个没有具体的实现;所以暂时不用管
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
// Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息
this.brokerFastFailure.start();
}
}
对这几个服务进行说明一下:
服务名 | 类型 | 说明 |
---|---|---|
messageStore | DefaultMessageStore | 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等 |
remotingServer | RemotingServer | Broker的服务端,处理消费者和生产者的请求 |
fastRemotingServer | RemotingServer | 只给消息生产者的服务端 |
fileWatchService | FileWatchService | 启动监控服务连接时用到的SSL连接文件的服务 |
brokerOuterAPI | BrokerOuterAPI | RocketMQ控制台跟Broker交互时候的客户端 |
pullRequestHoldService | PullRequestHoldService | 处理push模式消费,或者延迟消费的服务 |
clientHousekeepingService | ClientHousekeepingService | 心跳连接用的服务 |
filterServerManager | FilterServerManager | 消息过滤的服务 |
transactionalMessageCheckService | TransactionalMessageCheckService | 定期检查和处理事务消息的服务 |
slaveSynchronize | SlaveSynchronize | 主从之间topic,消费偏移等信息同步用的 |
微信扫描下方的二维码阅读更多精彩内容
每日分享到群里,或者推荐给朋友会得大量积分,机会可以兑换微信零钱红包,具体请点击这里,得到了微信红包可以用来支持大飞哥
大飞哥能不能加鸡腿就看各位了!
开发者微信
开发者微信反馈BUG或者VIP可以添加,其他情况反馈可能不及时,见谅
版权声明
初衷是提供高清手机电脑壁纸等图片素材免费分享下载,禁止商用。图片素材来源网络,版权归原作者所有,若有侵权问题敬请告知我们!
【友情提醒】:
因平台原因不易展示大尺度写真,有的写真展示越少代表此套写真越性感,特别是xiuren等写真每一套写真完整套图50-100张不等。更多内容的欣赏请移步 点击这里
【更多图集移步】:
每日更新-点击这里
漂亮小姐姐-点击这里
性感美女-点击这里
清纯女孩-点击这里
xiuren专栏-点击这里
整站资源下载-点击这里