博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
48. 源代码解读-RocketMQ-client接收消息流程
阅读量:6926 次
发布时间:2019-06-27

本文共 12217 字,大约阅读时间需要 40 分钟。

1. 简介

消费消息可以分成pull和push方式,push消息使用比较简单,因为RocketMQ已经帮助我们封装了大部分流程,我们只要重写回调函数即可。

下面我们就以push消费方式为例,分析下这部分源代码流程。

2. 消费者启动流程图

48. 源代码解读-RocketMQ-client接收消息流程

3.消费者类图

48. 源代码解读-RocketMQ-client接收消息流程

4. 消费者源代码流程

4.1 消费客户端启动

根据官方(:

//初始化DefaultMQPushConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");//设置命名服务,参考namesrv的启动consumer.setNamesrvAddr("localhost:9876");//设置消费起始位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅消费的主题和过滤符consumer.subscribe("TopicTest", "*");//设置消息回调函数consumer.registerMessageListener(new MessageListenerConcurrently() {      @Override      public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});//启动消费者consumer.start();

4.2 消息者启动

我们接着看consumer.start()方法

@Overridepublic void start() throws MQClientException {     this.defaultMQPushConsumerImpl.start();}

DefaultMQPushConsumerImpl.java

public synchronized void start() throws MQClientException {        switch (this.serviceState) {            case CREATE_JUST:                ...                this.checkConfig();//检查参数                ...                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);                ...                this.pullAPIWrapper = new PullAPIWrapper(                    mQClientFactory,                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                ...                this.offsetStore.load();                ...                this.consumeMessageService.start();                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);                ...                mQClientFactory.start();                this.serviceState = ServiceState.RUNNING;            ...        }        ...    }

在初始化一堆参数之后,然后调用mQClientFactory.start();

private MQClientInstance mQClientFactory;

其实这个命名有点奇怪啊(阿里程序员手抖了?),为什么MQClientInstance类型的变量名称叫mQClientFactory ...

那继续看MQClientInstance的start

4.3 MQClientInstance

public void start() throws MQClientException {        synchronized (this) {            switch (this.serviceState) {                case CREATE_JUST:                    ...                    // Start request-response channel                    this.mQClientAPIImpl.start();                    // Start various schedule tasks                    this.startScheduledTask();                    // Start pull service                    this.pullMessageService.start();                    // Start rebalance service                    this.rebalanceService.start();                    // Start push service                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                    log.info("the client factory [{}] start OK", this.clientId);                    this.serviceState = ServiceState.RUNNING;                    break;               ...            }        }    }

各行代码的作用就像源代码里面的注释一样,重点看下pullMessageService.start和rebalanceService.start

pullMessageService.start作用是不断从一个阻塞队列里面获取pullRequest请求,然后去RocketMQ broker里面获取消息。
如果没有pullRequest的话,那么它将阻塞。
那么,pullRequest请求是怎么放进去的呢?这个就要看rebalanceService了。

4.4 pullMessageService.start

private final LinkedBlockingQueue
pullRequestQueue = new LinkedBlockingQueue
();@Overridepublic void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { .. } }}

顺便说一句,pullMessageService和rebalanceService都是继承自ServiceThread

public class PullMessageService extends ServiceThread {}

ServiceThread简单封装了线程的启动,调用start方法,就会调用它的run方法。

public ServiceThread() {        this.thread = new Thread(this, this.getServiceName()); //把当前对象作为runnable传入线程构造函数    }    public void start() {        this.thread.start();    }

这样启动线程就要方便一点,看起来舒服一点。

嗯,继续分析之前的分析。

从pullMessageService的run方法可以看出它是从阻塞队列pullRequestQueue里面获取pullRequest,如果没有那么将阻塞。(如果不清楚java阻塞的使用,清百度)

执行完一次pullReqeust之后,再继续下一次获取阻塞队列,因为它是个while循环。

所以,我们需要分析下pullRequest放进队列的流程,也就是rebalanceService.

4.5 rebalanceService

public class RebalanceService extends ServiceThread {    @Override    public void run() {        while (!this.isStopped()) {            this.waitForRunning(waitInterval);            this.mqClientFactory.doRebalance();        }    }}

MQClientInstance.java

public void doRebalance() {        for (Map.Entry
entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }

DefaultMQPushConsumerImpl.java

@Override    public void doRebalance() {        if (!this.pause) {            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());        }    }

RebalanceImpl.java

public void doRebalance(final boolean isOrder) {        Map
subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry
entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
private void rebalanceByTopic(final String topic, final boolean isOrder) {        switch (messageModel) {            case BROADCASTING: {                ....            case CLUSTERING: {                Set
mqSet = this.topicSubscribeInfoTable.get(topic); List
cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List
mqAll = new ArrayList
(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List
allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set
allocateResultSet = new HashSet
(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }

一路跟下来,来到了RebalanceImpl.java的rebalanceByTopic方法,这个方法里面有两个case(Broadcasting和Clustering)也就是消息消费的两个模式,广播和集群消息。

广播的话,所有的监听者都会收到消息,集群的话,只有一个消费者可以收到,我们以集群消息为例。
先大概解释下在rebalanceByTopic里面要做什么。

  1. 从namesrv获取broker里面这个topic的消费者数量
  2. 从namesrv获取broker这个topic的消息队列数量
  3. 根据前两部获取的数据进行负载均衡计算,计算出当前消费者客户端分配到的消息队列。
  4. 按照分配到的消息队列,去broker请求这个消息队列里面的消息。

上面代码厘米mqset就是这个topic的消费队列,一般是4个,但是这个值是可以修改的,存储的位置在~/store/config/topics.json里面,比如:

"TopicTest":{        "order":false,        "perm":6,        "readQueueNums":4,        "topicFilterType":"SINGLE_TAG",        "topicName":"TopicTest",        "topicSysFlag":0,        "writeQueueNums":4}

可以修改readQueueNums和writeQueueNums为其他值

try {     allocateResult = strategy.allocate(                 this.consumerGroup,         this.mQClientFactory.getClientId(),         mqAll,         cidAll);  } catch (Throwable e) {         return;  }

这段代码就是客户端根据获取到的这个topic消费者数量和消息队列数量,使用负载均衡策略计算出当前客户端能够使用的消息队列。

负载均衡策略代码在这个位置。

48. 源代码解读-RocketMQ-client接收消息流程

那我们继续4.4 pullMessageService.start分析,因为rebalanceService已经把pullRequest放到了阻塞队列。

4.6 PullMessageService.run

@Override    public void run() {        while (!this.isStopped()) {            try {                PullRequest pullRequest = this.pullRequestQueue.take();                if (pullRequest != null) {                    this.pullMessage(pullRequest);                }            } catch (InterruptedException e) {            } catch (Exception e) {            }        }    }
private void pullMessage(final PullRequest pullRequest) {        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());        if (consumer != null) {            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;            impl.pullMessage(pullRequest);        } else {        }    }

调用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)这个方法里面。

4.6.1

public void pullMessage(final PullRequest pullRequest) {        ...        final long beginTimestamp = System.currentTimeMillis();        PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                System.out.printf("pullcallback onsuccess: " + pullResult + " %n");                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                    switch (pullResult.getPullStatus()) {                        case FOUND:                            long firstMsgOffset = Long.MAX_VALUE;                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            } else {                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispathToConsume);                            }                            break;                    }                }            }            @Override            public void onException(Throwable e) {                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            }        };        try {            this.pullAPIWrapper.pullKernelImpl(                pullRequest.getMessageQueue(),                subExpression,                subscriptionData.getExpressionType(),                subscriptionData.getSubVersion(),                pullRequest.getNextOffset(),                this.defaultMQPushConsumer.getPullBatchSize(),                sysFlag,                commitOffsetValue,                BROKER_SUSPEND_MAX_TIME_MILLIS,                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                CommunicationMode.ASYNC,                pullCallback            );        } catch (Exception e) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);        }    }

上面这段代码主要就是设置消息获取后的回调函数PullCallback pullCallback,然后调用pullAPIWrapper.pullKernelImpl去Broker里面获取消息。

获取成功后,就会回调pullCallback的onSuccess方法的FOUND case分支。

在pullCallback的onSucess方法的FOUND case分支,会根据回调是同步还是异步,分为两种情况,如下:

48. 源代码解读-RocketMQ-client接收消息流程

同步消息和异步消息区别的源代码实现以后再讲。

     本文转自rongwei84n 51CTO博客,原文链接:http://blog.51cto.com/483181/2056301,如需转载请自行联系原作者

你可能感兴趣的文章
最详细的Log4j使用教程
查看>>
EventBus的使用以及学习心得
查看>>
Android Activity的Launch Mode
查看>>
Java内嵌Groovy脚本引擎进行业务规则剥离(四)--Groovy规则化脚本整合至JAVA
查看>>
AWS 使用 CodeBuild 进行云端构建
查看>>
0. SpringBoot --引言
查看>>
《大话数据结构》读书笔记系列(二)---- 算法
查看>>
JSTL 核心标签库 使用
查看>>
解决Rockmongo不显示中文的问题
查看>>
asp.net core 自定义使用MemoryCache
查看>>
fedora下/etc/sysconfig/network-scripts/ifcfg-eth0配置
查看>>
spring boot 的application.yml配置
查看>>
正则表达式与sed工具
查看>>
Twemproxy和Redis性能压力测试
查看>>
EXCEL汇总文件信息
查看>>
Vmware vSphere 5.0系列教程之一 Vmware vSphere 5.0简介
查看>>
使用 OpenSSL API 进行安全编程
查看>>
StringBuilder与StringBuffer的比较
查看>>
Puppet-2:Puppet 配置第一个Agent
查看>>
Android 数字签名
查看>>