加餐 | JMQ的Broker是如何异步处理消息的?
加餐 | JMQ的Broker是如何异步处理消息的?
讲述:李玥
时长16:34大小11.37M
JMQ 的 Broker 是如何异步处理消息的?
俩大爷的思考题
思考题
赞 21
提建议
精选留言(59)
- 包明2019-08-26Requests Queue 内存级的 怎么做到不丢请求?
作者回复: 你要明白一件事儿,客户端发来一条数据,直到服务端返回给客户端发送成功,这段时间内,数据是允许丢失的。 数据丢失后,客户端收不到发送成功响应,自然会重试。
共 4 条评论63 - 青舟2019-08-24https://github.com/qingzhou413/geektime-mq-rpc.git 使用netty作为网络库,server和client的io线程数都是1,笔记本4核标压2.3G时间2.3秒。
作者回复: 👍👍👍
共 2 条评论26 - 付永强2019-08-28这么看来JMQ基本上是把所有涉及io等待地方步骤进行异步设计。学到了!感谢分享!🤔18
- ub82019-12-14老师,文中提到的 “我们把回复响应这个需要等待资源的操作,也异步放到其他的线程中去执行。”;这个是怎么实现的呢? ResponseThread ,和 RequestThread 是如何对应上的?
作者回复: 一般简单常用的做法都是在处理request的线程中执行业务逻辑,然后把response返回。 其实从网络层面来看,Request和Response只是客户端和服务端互相发送的两段数据,和服务端处理用什么线程完全没有关系。 Request和Response如何对应,这个取决于网络传输协议是怎么设计的。这个我们在课程中讲过。 在服务端实现中,可以接受到Request之后,进行异步处理,异步处理完成之后,无论在什么线程中,只要能拿到处理结果构建出Response,通过网络发给客户端就可以了。
共 4 条评论14 - 谢清2019-10-21刷了下评论,青舟写的最好,其次Martin 青舟 https://github.com/qingzhou413/geektime-mq-rpc.git Martin https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java展开10
- linqw2019-08-25老师有个疑问,帮忙解答下哦,在上面的那个流程图中,WriteThread是单线程从请求队列中获取到消息然后把消息放到journal Cache,开启ReplicationThread、FlushThread进行处理,能否把WriteThread做成分配器了,mq只要保证topic下的队列有序就可以,同一个队列的消息由WriteThread分配给同一个线程进行处理,线程池的形式,线程池中的每个工作线程内部都有个集合保存消息,如果前面没有同一个队列的消息,分配给最空闲的线程进行执行,那这样的话,WriteThread只要分配消息,比如可以对发送过来的消息中要保存的队列属性值进行hash,然后根据hash值判断线程池中的所有线程的消息集合是否有相同队列的消息,有的话分配给同一个线程执行,没有的话最空闲的线程执行,mysql的binlog同步也是有几种这种策略,并发的同步sql,刚开始是基于库,同一个库的sql分配到一个线程执行同步,不同库进行并发,后来是基于redo log的组提交形式,能组提交的sql可以并发的同步,再后来是WRITESET,根据库名、表名、索引(主键和唯一索引)计算hash进行分发策略。展开
作者回复: 这个流程中处理的数据已经是被分配过的,单个队列的数据。
8 - 吾皇万岁万岁万万岁2019-08-24请问老师,JMQ在follower节点响应后,就给生产者发送确认消息,此时如果leader节点故障,数据还在JournalCache里面,拿是不是可以认为这部分数据丢失?
作者回复: 不会丢,因为数据已经复制到了从节点上,leader宕机后,会重新选举出新的leader(也就是之前的某个follower),这个新leader上是有原leader上的全部数据的。
共 5 条评论7 - 李冲2019-11-26借用老师的的go源码经过读,写,去掉锁3次演进,分别在金山云ECS(2核4G)上跑到3.1/2.4/1.4秒的样子。 最终的文件:https://github.com/lichongsw/algorithm/blob/master/duplex_communication_optimization_3_no_write_lock.go
作者回复: 👍👍👍
共 2 条评论5 - Martin2019-09-20https://github.com/MartinDai/mq-in-action/blob/master/src/main/java/com/doodl6/mq/MeetInRpc.java 基于netty4实现的,Macbook pro 2015款 13寸 测试差不多4.2秒左右,老师帮忙看看哪里还可以有优化空间吗
作者回复: 👍👍👍正确写出这个程序差不多就是这个耗时。
共 2 条评论5 - 许童童2019-08-24老师说得很好,想成为一个真成的高手,理解并实践这些基本原理,是必不可少的,不要只停留在知识的表面,要领会技术背后的思想,才能活学活用,实践肯定是少不了的,加油。5
- 李心宇🦉2019-10-23老师好,我对JMQ的broker接收生产者请求并写入消息的流程有个疑问。 在处理完数据落盘和多节点数据复制之后,要给生产者回复响应了,这时候broker如何能找到生产者呢?我理解是第一步生产者发送请求建立的TCP连接句柄没有释放,最后再通过这个连接句柄来write响应。这样的话,还是每个连接在得到响应之前不能释放需要占用一个线程啊。请问是怎么做到在第一步接收响应阶段只需要很少的线程的?是不是利用异步非阻塞,在线程里设置大量的协程来处理请求?展开
作者回复: 你的大部分理解都没问题,有一个小问题是,维持一个TCP连接并不一定需要占用一个线程。只有在这个连接上执行收发数据的时候,才需要占用线程。收到请求处理完,把请求交给其它线程处理后,当前线程就可以释放了。直到响应生成后,这段时间只维持TCP连接是不需要占用任何线程的。
共 3 条评论5 - 凡2020-02-23借鉴了 MySql 的思想,跟新数据先写入 Buffer pool,然后定期将内存的数据刷到磁盘,这样减少了磁盘 IO 的操作频率,提高了性能2
- 刘天鹏2019-08-26老师的writeTo函数没必要加锁吧,用一个局部缓冲把lengthByte serialBytes payloadBytes拼装好再一起发送应该就可以了 我把我的大爷改成双工的,多线程的,可以打包发送的,现在大爷还只有一张嘴一个耳朵(一个socket连接) 可能是没有逻辑处理的负担,多线程没啥改变 打包发送有一定的改进,1次3条数据 14s 现在100万次胡同碰面,8.66s(公司的i7) https://gist.github.com/liutianpeng/d9330f85d47525a8e32dcd24f5738e55展开
作者回复: 如果是多个线程并发向同一个socket中写数据,这个锁是必须加的。 比如,A线程发送“123456”,B线程发送“abcd”,如果不加锁,对端收到的可能就成了“12abc345d6”,这种数据是对端是没法解析的。
共 2 条评论3 - nfx2020-03-28有个疑问, request queue, journal cache都是生产者, 消费者模型的队列, 里面应该会有锁吧? 只是这种队列锁需要的资源很少. 不知道怎么理解对不对
作者回复: 这个地方理论上是不需要锁的,因为是Append Only的数据模型,也就是说数据在尾部追加写入,写入后不可改变,那读写就不会有冲突,所以不需要加锁。 但实际上,因为内存是有限的,总是要发生换页操作,相当于把数据从内存中删除,这个地方还是需要用锁来控制的。
共 3 条评论2 - 南辕北辙2019-08-26把老师的代码看了n遍,再用java的原生nio实现了一遍,理解了老师的用心良苦。前二章序列化与网络协议那块的知识,对应代码对于struct的构造,四字节的总长度,四字节的序号,以及变长内容,然后二个大爷在接受到数据以后也是根据这个协议进行取数据。也就对应了序列化以及协议都是自定义的!!二个大爷都懂该怎么从二进制的数据中,提取出对话。
作者回复: 恭喜你学到了😄
共 4 条评论3 - ykkk882019-08-25如果要保证前一句话要对方确认回复后再发送下一句话 就不能用这种方式了吧?这样吞吐量就会很低了
作者回复: 是的。 一般的协议都是这种“请求-响应”的方式,不需要同步的请求之间有序。
2 - TWT_Marq2019-08-25JMQ中,接受请求的iothreads将请求丢给request queue,WriteThread从queue中取出来开始干活。这个queue是concurrentBlockingQueue吗?
作者回复: 是的
2 - linqw2019-08-25我用java实现了一版,老师帮忙看下哦,评论只能发2000字,其余在评论区补上 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** * @author linqw */ public class SocketExample { private static final Logger LOGGER = LoggerFactory.getLogger(SocketExample.class); private static final ReentrantLock LI_WRITE_REENTRANT_LOCK = new ReentrantLock(); private static final ReentrantLock ZHANG_WRITE_REENTRANT_LOCK = new ReentrantLock(); private static final int NCPU = Runtime.getRuntime().availableProcessors(); private final ThreadPoolExecutor serverThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("server-"), new ThreadPoolExecutor.CallerRunsPolicy()); private final ThreadPoolExecutor clientThreadPoolExecutor = new ThreadPoolExecutor(NCPU, NCPU, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200000), new ThreadFactoryImpl("client-"), new ThreadPoolExecutor.CallerRunsPolicy()); private volatile boolean started = false; /** * 俩大爷已经遇见了多少次 */ private volatile AtomicInteger count = new AtomicInteger(0); /** * 总共需要遇见多少次 */ private static final int TOTAL = 1; private static final String Z0 = " 吃了没,您吶?"; private static final String Z3 = " 嗨!吃饱了溜溜弯儿。"; private static final String Z5 = " 回头去给老太太请安!"; private static final String L1 = " 刚吃。"; private static final String L2 = " 您这,嘛去?"; private static final String L4 = " 有空家里坐坐啊。";展开
作者回复: 还是贴一下GitHub的链接把,也方便其他同学学习。
共 10 条评论2 - 豆沙包2019-08-24老师,我在运行你的老大爷代码的时候发现了一个问题,经常报读长度故障。我调试了一下,应该是因为张大爷阻塞在readfrom的时候,李大爷count++了。然后count大于total,client释放了tcp。这导致readfrom无法读出准确数据。我在返回读数据故障前判断了一下count和total的关系,如果count大于等于total正常返回,就没有再报过异常了。您看我分析的是否有问题呢
作者回复: 这个地方是因为,完成了所有会话之后,关闭socket连接的时候,没有先等待2个大爷读写socket的4个线程都结束导致的。 虽然不影响结果,但还是可以改进一下,做到优雅的退出。
2 - solar2019-08-24一些分配内存地方也可以优化,可以预先分配一块复用,减少向内核申请分配内存次数.
作者回复: 这些想法,最好是落到代码上,通过真实的优化效果,来印证自己的想法。
共 2 条评论2