1,消息引擎系统
1)Kafka是消息引擎系统,两个重要因素: 消息设计、传输协议设计。
2)Kafka消息是结构化的二进制数据。消息本身一般都是结构化数据,xml、json、二进制或者其他自定义结构化类型。
3)kafka自己设计了一套二进制的消息传输协议
。
4)消息队列(message queue)和发布/订阅(pub/sub)模式
message queue:sender发送消息到指定queue,receiver从queue接受消息,一个消息只能被一个receiver接受。
pub/sub模式:有一个topic主题
的概念,发布者(publisher)向topic发送消息,所有订阅了该topic的subscriber都能收到消息。(redis的pub/sub
)
5)kafka引入consumer group
来同时支持message queue和pub/sub。
2,Kafka概要设计
1)零拷贝:
Java 类库通过 java.nio.channels.FileChannel 中的 transferTo() 方法(底层sendfile系统调用)来在 Linux 和 UNIX 系统上支持零拷贝。
内核直接将数据从磁盘文件拷贝到套接字,而无需通过应用程序。
public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;
2)高吞吐量/低延时
micro-batch:批量发送消息。
操作系统页缓存:不直接写IO,直接写入页缓存;消费时大多命中缓存。
零拷贝:网络间数据传输使用sendfile的零拷贝。
顺序顺序磁盘:使用append追加方式,且不允许修改已有消息。
3)消息持久化
kafka broker收到数据,立即写入持久化日志文件中还是page cache内存中,并没有刷新到磁盘
log数据文件刷盘策略
每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
4)负载均衡和故障转移
通过partition leader election分区leader选举
来实现负载均衡。
通过会话
机制将自己注册到zookeeper上,当会话失效,集群选举另一个leader。
5)伸缩性
每台kafka 服务器broker
上的状态,统一交由Zookeeper保管。
3,Kakfa基本概念
1)流式处理框架。
kafka在0.10.0.0版本,正式推出了Kafka Stream
2)Kafka核心架构架构:
生产者发送消息给broker
消费者从broker拉取消息
broker使用zookeeper集群,进行服务器的协调管理
3)消息Message
Producer将ProducerRecord,通过key.serializer、value.serializer
序列化成message
。
Consumer将message(KafkaStream)
,通过key.deserializer、value.deserializer
反序列化成KafkaMessage,进而转成所需对象
。
Message完整格式:使用byte数组比Java堆内存节省空间,大量使用页缓存而非堆内存,避免broker崩溃时从热缓存
image.png
Producer发送消息封装:
image.png
Consumer接收消息封装:
image.png
4)topic和partition
topic代表一类消息,用于区分业务
partition(文件夹)从0开始,有自己的编号;为了提升系统吞吐量,形成了topic-partition-message的三层结构
5)offset位移
topic partition下,每个消息都分配一个offset值,固定的
不同的消费者,正在消费的offset值,变动的
kafka每条消息,是一个<topic, partition, offset>的三元组
6)Segment段
每一个partition文件夹下,有多个大小相等的segment 数据文件(文件名:从0开始,后续为最后一条消息的offset)
包含.index索引文件.log数据文件
image.png
7)replica 副本
利用冗余,唯一目的就是防止数据丢失
leader:对外提供服务(针对每一个partition,都有leader)
follower:作为备份同步(fetch)leader、挂掉后充当leader
8)ISRin-sync replica,leader维护
与leader同步的replica集合,包含自己
replica落后leader replica.lag.time.max.ms | replica.lag.max.messages,则移除ISR。
3,应用场景
1)服务通信。 比如Order的kafka消息,多个服务都会关注。
2)数据传输。将订单信息,发送给数据服务。
3)异步处理。自己的服务发送并接收,做异步的处理。
网友评论