本文共 12217 字,大约阅读时间需要 40 分钟。
消费消息可以分成pull和push方式,push消息使用比较简单,因为RocketMQ已经帮助我们封装了大部分流程,我们只要重写回调函数即可。
下面我们就以push消费方式为例,分析下这部分源代码流程。
根据官方(:
//初始化DefaultMQPushConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");//设置命名服务,参考namesrv的启动consumer.setNamesrvAddr("localhost:9876");//设置消费起始位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅消费的主题和过滤符consumer.subscribe("TopicTest", "*");//设置消息回调函数consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});//启动消费者consumer.start();
我们接着看consumer.start()方法
@Overridepublic void start() throws MQClientException { this.defaultMQPushConsumerImpl.start();}
DefaultMQPushConsumerImpl.java
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: ... this.checkConfig();//检查参数 ... this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); ... this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); ... this.offsetStore.load(); ... this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); ... mQClientFactory.start(); this.serviceState = ServiceState.RUNNING; ... } ... }
在初始化一堆参数之后,然后调用mQClientFactory.start();
private MQClientInstance mQClientFactory;
其实这个命名有点奇怪啊(阿里程序员手抖了?),为什么MQClientInstance类型的变量名称叫mQClientFactory ...
那继续看MQClientInstance的start
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ... // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; ... } } }
各行代码的作用就像源代码里面的注释一样,重点看下pullMessageService.start和rebalanceService.start
pullMessageService.start作用是不断从一个阻塞队列里面获取pullRequest请求,然后去RocketMQ broker里面获取消息。 如果没有pullRequest的话,那么它将阻塞。 那么,pullRequest请求是怎么放进去的呢?这个就要看rebalanceService了。private final LinkedBlockingQueuepullRequestQueue = new LinkedBlockingQueue ();@Overridepublic void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { .. } }}
顺便说一句,pullMessageService和rebalanceService都是继承自ServiceThread
public class PullMessageService extends ServiceThread {}
ServiceThread简单封装了线程的启动,调用start方法,就会调用它的run方法。
public ServiceThread() { this.thread = new Thread(this, this.getServiceName()); //把当前对象作为runnable传入线程构造函数 } public void start() { this.thread.start(); }
这样启动线程就要方便一点,看起来舒服一点。
嗯,继续分析之前的分析。
从pullMessageService的run方法可以看出它是从阻塞队列pullRequestQueue里面获取pullRequest,如果没有那么将阻塞。(如果不清楚java阻塞的使用,清百度)
执行完一次pullReqeust之后,再继续下一次获取阻塞队列,因为它是个while循环。
所以,我们需要分析下pullRequest放进队列的流程,也就是rebalanceService.
public class RebalanceService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } }}
MQClientInstance.java
public void doRebalance() { for (Map.Entryentry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
DefaultMQPushConsumerImpl.java
@Override public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
RebalanceImpl.java
public void doRebalance(final boolean isOrder) { MapsubTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { .... case CLUSTERING: { SetmqSet = this.topicSubscribeInfoTable.get(topic); List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List mqAll = new ArrayList (); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set allocateResultSet = new HashSet (); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
一路跟下来,来到了RebalanceImpl.java的rebalanceByTopic方法,这个方法里面有两个case(Broadcasting和Clustering)也就是消息消费的两个模式,广播和集群消息。
广播的话,所有的监听者都会收到消息,集群的话,只有一个消费者可以收到,我们以集群消息为例。 先大概解释下在rebalanceByTopic里面要做什么。上面代码厘米mqset就是这个topic的消费队列,一般是4个,但是这个值是可以修改的,存储的位置在~/store/config/topics.json里面,比如:
"TopicTest":{ "order":false, "perm":6, "readQueueNums":4, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "topicSysFlag":0, "writeQueueNums":4}
可以修改readQueueNums和writeQueueNums为其他值
try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { return; }
这段代码就是客户端根据获取到的这个topic消费者数量和消息队列数量,使用负载均衡策略计算出当前客户端能够使用的消息队列。
负载均衡策略代码在这个位置。那我们继续4.4 pullMessageService.start分析,因为rebalanceService已经把pullRequest放到了阻塞队列。
@Override public void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { } } }
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { } }
调用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)这个方法里面。
public void pullMessage(final PullRequest pullRequest) { ... final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { System.out.printf("pullcallback onsuccess: " + pullResult + " %n"); if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume); } break; } } } @Override public void onException(Throwable e) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }; try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
上面这段代码主要就是设置消息获取后的回调函数PullCallback pullCallback,然后调用pullAPIWrapper.pullKernelImpl去Broker里面获取消息。
获取成功后,就会回调pullCallback的onSuccess方法的FOUND case分支。
在pullCallback的onSucess方法的FOUND case分支,会根据回调是同步还是异步,分为两种情况,如下:
同步消息和异步消息区别的源代码实现以后再讲。
本文转自rongwei84n 51CTO博客,原文链接:http://blog.51cto.com/483181/2056301,如需转载请自行联系原作者