美文网首页kafka
kafka架构师4-图解kafka源码2

kafka架构师4-图解kafka源码2

作者: fat32jin | 来源:发表于2020-08-26 23:17 被阅读0次

1.1. 如何处理响应消息? 0;11:00 ~ 0;30:00

上节内容NetworkClient.poll 方法
response.request().callback().onComplete(response);

Sender#completeBatch方法里面:

         //TODO 核心代码 把异常的信息也给带过去了
         //我们刚刚看的就是这儿的代码
        //里面调用了用户传进来的回调函数
        //回调函数调用了以后
        //说明我们的一个完整的消息的发送流程就结束了。
        batch.done(baseOffset, timestamp, exception);
        ——》RecordBatch#done

1. 2. 消息发送完了以后内存如何处理?

        this.accumulator.deallocate(batch); // 回收
       ——》free.deallocate

1.3. 消息异常如何处理? 到 ~ 0;34:00

1.4. 如何处理超时的批次? 0;34:00 ~ 0;45:00

Sender#run 开始
Sender#completeBatch方法
——》RecordBatch#done
——》 DemoCallBack#onCompletion方法
RecordAccumulator #abortExpiredBatches方法

batch#maybeExpire

1. 5. 如何处理长时间没有收到响应的消息 0;49:00~ 1:02:00

NetworkClient.poll 方法里面
——》
//TODO 处理长时间没有接受到响应
handleTimedOutRequests(responses, updatedNow);
——》processDisconnection
//对这些请求进行处理
//大家会看到一个比较有意思的事
//自己封装了一个响应。这个响应里面没有服务端响应消息(服务端没给响应)
//失去连接的状态表标识为true
responses.add(new ClientResponse(request, now, true, null));

image.png

1. 6. 客户端源码精华总结 1:02:00 ~ 1:26:00

关键方法代码:
KafkaProducer#doSend
this.sender.wakeup(); //唤醒sender线程
↓↓
Sender#run
1、核心流程封装在一个方法里面 doSend
2、自定义异常,提示清晰
3、底层无需处理异常,直接往上抛,核心逻辑处理
4、面向对象的思想
5、自己设计高性能的数据结构,线程安全,读多写少,
6、高并发情况下,为了性能,线程安全,缩小锁的粒度,分段加锁
7、为了减少gc,使用内存池设计
8、客户端发送消息,支持异步,异步化设计(回调函数)
9、kafka的网络设计,一个客户端管多个网络
10、批处理设计(吞吐量上升)
11、多个同一broker请求合并一起
12、响应处理,考虑全面
13、支持序列化(自定义序列化格式),支持配置压缩
14、粘包、拆包的处理(思路奇妙,代码经典)

2 . 观察Kafka源码的包(服务端代码) 1:43:00

超高并发的网络架构.png

core包
Kafka.scala 类 main 方法
——》 kafkaServerStartable.startup
——》KafkaServerStartable#starup 方法
——》KafkaServer类 startup()
——》SocketServer#startup()
//核心的线程
//在Acceptor类的主构造函数里面,启动了3个Processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize,
brokerId, processors.slice(processorBeginIndex, processorEndIndex),
connectionQuotas)

2.1 Acceptor线程是如何启动的? 1:53:00 ~
SocketServer#startup()
——》Acceptor类 run()

  1. Processor线程是如何启动的? 2:00:00 ~ 2:30:00 ~
    Acceptor类 run()
    Acceptor类 accept()

Processor 类 run 方法

  1. Processor线程是如何接收请求的?
    2:26:00 ~
    Processor 类 run 方法
    ->Processor#poll
    -> Selector#poll

while (isRunning) {
try {
// setup any new connections that have been queued up
//读取每个SocketChannel,把每个SocketChannel
//都往Selector上面注册OP_READ事件。
configureNewConnections()
// register any new responses for writing
//TODO 看起来像是处理响应的。绑定 OP_WRITE
processNewResponses()
//我们大胆的猜测,根据我们之前的了解
//读取和发送请求的代码应该都是在这个方法里面完成的。
//TODO 再次进去
poll()
//TODO 用来处理接收到当的请求
processCompletedReceives()
//todo 处理我们已经发送出去的响应
processCompletedSends()
processDisconnected()
} catch {

  1. Processor线程是如何处理StateReceiver的请求的?
    2:37:00 ~
    <kafka精讲40讲> 服务器调优
    <图解kafka70讲>

相关文章

  • kafka架构师4-图解kafka源码2

    1.1. 如何处理响应消息? 0;11:00 ~ 0;30:00 上节内容NetworkClient.poll 方...

  • Kafka源码分析-Content Table

    Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3 Kafka源码分析...

  • kafka架构师3-图解kafka源码2

    1. 掌握内存池设计 0:20:00 ~ 0:47:44 目的: 减少fullgc 概率 原理图: Sen...

  • KafkaProducer

    Kafka源码阅读(一):Kafka Producer整体架构概述及源码分析 zqhxuyuan Kafka源码分...

  • 消息队列之一: kafka

    原则: kafka版本!kafka broker及spring-boot配置看官网,看kafka源码, 源码, 源...

  • Kafka

    参考 Kafka中文 图解 Kafka 水印备份机制 Kafka ISR 副本同步机制 关于 Kafka 的一些面...

  • Kafka源码阅读准备

    目标是希望能通过idea工具阅读Kafka源码并能成功的编译Kafka源码。 源码下载 Kafka的源码地址在 h...

  • Kafka视频集

    kafka企业级入门实战完整版 Kafka系列教程 Kafka入门 分布式消息通信Kafka原理剖析 阿里架构师直...

  • Kafka源码环境搭建

    1 git上找到kafka地址 地址:Kafka源码git地址 2 下载源码 这里我们切换到1.1,主要针对1.1...

  • Kafka源码剖析

    Kafka源码剖析 Kafka源码剖析之源码阅读环境搭建 首先下载源码:http://archive.apache...

网友评论

    本文标题:kafka架构师4-图解kafka源码2

    本文链接:https://www.haomeiwen.com/subject/cqhprktx.html