20 | RocketMQ Producer源码分析:消息生产的实现过程
20 | RocketMQ Producer源码分析:消息生产的实现过程
讲述:李玥
时长19:16大小13.23M
从单元测试看 Producer API 的使用
启动过程
消息发送过程
小结
思考题
赞 10
提建议
精选留言(31)
- . 。o O2019-10-28请教老师一个问题,如果异步发送的话,就是把发送逻辑封装成任务放到线程池里去处理,那么是不是就没法保证消息的顺序性了呢?哪怕是通过key哈希到一个同一个队列,但是发送消息的任务执行先后顺序没法保证吧?
作者回复: 异步发送仍然可以保证严格顺序,但需要注意几点: 1. 需要单线程异步发送; 2. 需要记录一个递增流水号,保证每个发出的消息都有一个流水号,如果某个流水号的消息发送出错,需要重发这个流水号之后的所有消息。比如,连续异步发送12345这5条消息,假如已经异步发送了12345,然后异步检查发送结果的时候发现3发送失败了,需要从3开始重发。也就是按顺序重发345。 3. 消费逻辑需要幂等,能接受2中的这种情况,也就是说,收到的消息有可能是:12(3丢了)45 345。
共 2 条评论33 - 微微一笑2019-09-11老师好,先祝您节日快乐!!!您辛苦了~ 有几个疑问需要老师解答一下: ①今天在看rocketMq源码过程中,发现DefaultMQProducer有个属性defaultTopicQueueNums,它是用来设置topic的ConsumeQueue的数量的吗?我之前的理解是,consumeQueue的数量是创建topic的时候指定的,跟producer没有关系,那这个参数又有什么作用呢? ②在RocketMq的控制台上可以创建topic,需要指定writeQueueNums,readQueueNums,perm,这三个参数是有什么用呢?这里为什么要区分写队列跟读队列呢?不应该只有一个consumeQueue吗? ③用户请求-->异步处理--->用户收到响应结果。异步处理的作用是:用更少的线程来接收更多的用户请求,然后异步处理业务逻辑。老师,异步处理完后,如何将结果通知给原先的用户呢?即使有回调接口,我理解也是给用户发个短信之类的处理,那结果怎么返回到定位到用户,并返回之前请求的页面上呢?需要让之前的请求线程阻塞吗?那也无法达到【用更少的线程来接收更多的用户请求】的目的丫。 望老师能指点迷津~~~展开
作者回复: A1:这个参数是控制客户端在生产消费的时候会访问同一个主题的队列数量,假设一个主题有100个队列,对于每一个客户端来说,它没必要100个队列都访问,只需要使用其中的几个队列就行了。 A2:writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。这两个参数是服务端参数,优先级是高于客户端控制的参数defaultTopicQueueNums的。perm是设置Topic读写等权限的参数,具体如何设置你需要去看一下文档。 A3:如果局限于:“APP/浏览器 --[http协议]-->web 服务”这样的场景,受限于http协议,前端和web服务的交互一定是单向和同步的。一定要等待结果然后返回响应,但是,这种情况仍然可以使用异步的方法,这个我在“08答疑”中解释秒杀的时候其实已经给出了答案。很多同学不理解的原因是思维被web框架给限制住了。像spring web这种框架,它把处理web请求都给你封装好了,你只要写一个handler就行了,很方便。但是,这个handler只能是一个同步方法,它必须在返回值中给出响应结果,所以导致很多同学的思维转不过来这个弯儿。 你可以结合我们讲的异步网络IO内容想一下,http协议发一个请求到服务端,就是发了一些数据过来,服务端回响应也就是在这个连接上给它返回一些数据回去就可以了。至于什么时候往回发响应数据,哪个线程来发,有要求吗?并没有。只要在超时之前发响应就可以了。我们讲得如何来实现异步网络IO的方法处理的不就是这种情况吗? 这个过程不是说一定要做成和web框架一样的同步处理。
共 3 条评论22 - lmtoo2019-09-10这种异步方式几乎没有意义,底层的netty已经实现了异步,这里只是在选择消息队列等判断的过程加了异步,最终callback还是由netty线程来调用的共 8 条评论20
- z.l2019-11-18老师,异步发送为什么是弃用,还是没有看懂,感觉超时时间的计算没有错啊…
作者回复: 我理解主要的原因除了超时时间的计算不准确以外,更重要的原因是这个异步方法还有改进的空间,其实可以直接结合Netty做到不需要Executor。
13 - Peter2019-11-04课后作业,请老师指正: 从方法的注释看,说是因为异常处理和超时时间的语义不对。 异常处理这块我觉得应该是采用统一的异常处理,而不应该是有的异常抛出,而有的异常通过回调方法返回客户端。 再说超时时间的错误语义,严格来说应该是不准确的超时时间,因为在run方法里进行时间判断(if (timeout > costTime))实际上已经是开始执行当前线程的时间,而之前的排队时间没有算,因此我改进的方法应该是这样: CompletableFuture.runAsync(() -> { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); } catch (Exception e) { sendCallback.onException(e); } } else { sendCallback.onException( new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); } }, executor);展开
作者回复: 给认真思考完成作业的同学点赞👍!
共 7 条评论7 - 每天晒白牙2019-09-10我总结的kafka生产消息的源码分析 https://mp.weixin.qq.com/s/-s34_y16HU6HR5HDsSD4bg6
- leslie2019-09-10编程语言的话Python或Go可以么?极客时间里都有购买,就是忙着其它课程的学习,一直没顾的上编程语言的学习。 从开始一路跟到现在:算是少数一直在完全没有缺的课;前期一直遍边学习边针对开篇时的学习目标针对当下工作环境的Nosql DB和MQ使用率的低下的问题找解决思路和方案,课后笔记主要同样集中在思路以及针对思路的困惑查疑上,代码这块完全没顾上。虽然代码的思路看的懂,发现动手能力确实非常欠缺。一路学到现在梳理到现在整体方案大致定下来:以及早期的部分课程的结束;课程的主要方案自己估计在掌握思路的基础上去补强Coding能力。虽然DBA的Coding能力都比较烂,不过还是得边学边啃下来;逼自己一下总能勉强写出来,估计就是效率问题、、、MQ这块PY或GO哪种更合适,或者说都可以? 感谢老师一路的辛勤授业:授课之余尽力去帮助学生们解惑,让我们能一路走来一路成长;愿老师教师节快乐,谢谢老师的分享。展开
作者回复: 个人建议学习Java或者Go,这两种语言都有不错的生态系统,都可以用来构建大规模集群。 相对来说,Java的生态系统更强大,Go比较年轻,有很多Java不具备的语言特性。 Python本来只是一门脚本语言,特别适合开发机器学习程序而火起来了,如果你不是从事机器学习相关的研发,不太建议作为第一语言来学习。
6 - 墙角儿的花2019-09-11老师 对于im服务器集群,客户端的socket均布在各个服务器,目标socket不在同一个服务器上时,服务器间需要转发消息,这个场景需要低延迟无需持久化,服务器间用redis的发布订阅,因其走内存较快,即使断电还可以走库。im服务器和入库服务间用其他mq解耦,因为这个环节需要持久化,所以选rocketmq或kafka,但kafka会延迟批量发布消息 所以选rocketmq,这两个环节的mq选型可行吗。
作者回复: 有一个问题你需要考虑,你是不是需要为每一个会话(比如,张三和李四之间开始聊天,成为一个会话)在MQ中凑创建一个Topic呢?这样会导致MQ集群中的Topic数量非常多。假设你的系统注册用户数是n,理论上最多会需要 n x n 个Topic,这还没有计算用户拉的群。 对于海量的Topic数量,RocketMQ和Kafka都不是太好的选择。
共 3 条评论5 - 明日2019-09-10李老师节日快乐! 关于思考题看到了源码的注释说异常处理和超时时间有问题。 自己看的话一是异常这里抛未知的原因,不够明确。 二是这里用的线程池默认使用了虚拟机可用的线程,可能会对其他服务造成影响。 三是超时时间这把线程阻塞可能等待的时间也包括进去了不太合适。 感觉代码层次使用老师说过的completablefuture处理更优雅。另外底层使用了netty,应该直接用异步io就行了吧。展开4
- 侧面2020-01-14有这篇这课就买的值了2
- 二少2021-04-13DefaultMQProducer是DefaultMQProducerImpl的门面,但二者的类名起得有点怪怪的感觉。类名有Impl后缀,一般都表示这个类是某个接口的实现类,但实际上却是门面和被包装类的关系。而且把门面类给个facade后缀不是更适当一些吗?大家怎么看。1
- Heaven2021-02-08因为Netty本身就支持异步的写入消息,并注入Listener,这一步的发送,则是利用Nio的WorkGroup,这种情况下,显式的使用线程池异步的发送显得有点多余1
- 编程界的小学生2020-07-01RocketMQ同等的策略模式还有消费端的时候选择消费者与queue的对应策略: AllocateMessageQueueStrategy接口下有如下几个实现类 AllocateMessageQueueAveragely AllocateMachineRoomNearby AllocateMessageQueueAveragelyByCircle AllocateMessageQueueByConfig:这个策略真不知道有啥鸟用 AllocateMessageQueueByMachineRoom AllocateMessageQueueConsistentHash 而且看这名字就知道是策略模式。直接以Strategy结尾。展开1
- 我丢了一只小凳子2020-02-28跟踪源码发现,异步回调,最后还是在NettyRemotingAbstract中启动线程池做了 /** * Execute callback in callback executor. If callback executor is null, run directly in current thread */ private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } }展开1
- Peter2019-11-04老师继续请教问题: 1.DefaultMQPullConsumer和DefaultMQPushConsumer有什么区别 2.为什么pullConsumer的启动和producer的启动在同一个start方法里(最终都在MQClientInstance#start里) 3.rebalanceService服务是干嘛的
作者回复: PullConsumer:业务代码在需要的时候调用consumer.pullxxxx方法从consumer拉消息;PushConsumer:当有消息的时候,consumer会自动调用messageListener(业务处理消息的代码)。 这两种方式主要是为了方便使用者进行线程控制,没有什么本质区别。
1 - 姑射仙人2019-09-281. 异常处理问题:线程内部抛出的异常,比如MQBrokerException,客户端无法感知到,以为发送成功,会继续执行。 2. 超时时间的概念问题:这里似乎去掉了线程调度的时间,将剩下的时间给了netty,个人感觉也应该包含进去。对客户端而言,调度是自己的事,不应包含在网络超时时间里。 请老师指正。展开1
- 谁都会变2022-09-29 来自上海消息生产者启动拉取消息这个感觉没什么用啊,它不是推送消息得吗?
- fomy2020-02-161、为什么ServiceState变量不设置成volatile呢? 2、消费者MessageQueue(readQueueNums)怎么和生产者MessageQueue(writeQueueNums)关联起来的呢?比如readQueueNums=19个,writeQueueNums=23个,它们是怎么关联的呢?
作者回复: A1: 因为它服务的类没有设计成线程安全的,所以也没必要用volatile关键字。 A2:writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。因为对主题来说,生产者的实例数和消费者的实例数是没有关系的,所以这两个参数是不关联的。
- 七楼2020-01-02mvc框架的 controller也算是门面模式的门面把?他也是提供一个可访问系统的借口 隐藏了系统内部的复杂性 对吗共 1 条评论
- z.l2019-11-14DefaultMQProducerImpl的start和shutdown方法没有加同步,serviceState也只是一个普通成员变量没加volatile,不会有线程安全问题吗?
作者回复: 这两个方法没有做到线程安全,但是这两个方法的实现内部,调用的方法都是线程安全的方法。