Kafka 最佳实践

Kafka 最佳实践

下面为PPT的节选和个人的补充:

原文

Kafka 基本配置及性能优化

硬件要求

集群规模 内存 CPU 存储
Kafka Brokers 3+ 24GB+(for small)64GB+(for large) 多核(12cpu+core +),并允许超线程 6+ x1TB 的专属磁盘(RAID 或 JBOD
Zookeeper 3(for small)5(for large) 8GB+(for small)24GB+(for large) 2 core+ SSD 用于中间的日志传输

OS 调优

  • OS page cache

    • 通过调整/proc/sys/vm/dirty_background_ratio /proc/sys/vm/dirty_ratio来调优性能
    • 尽量分配与所有日志的激活日志段大小相同的页缓存大小
    • wiki
  • fd 限制:100k+;

  • 禁用 swapping

  • TCP 调优;

  • JVM 配置

    • JDK 8 并且使用 G1 垃圾收集器;
    • 至少要分配 6-8 GB 的堆内存

磁盘存储

  • 使用多块磁盘,并配置为 Kafka 专用的磁盘;

  • JBOD vs RAID10

    • RAID10 成本相对高
  • JBOD(Just a Bunch of Disks) 简单说就是普通磁盘

  • JBOD限制

    • 比如磁盘失败将导致Kafka异常关闭

    • 跨磁盘数据不保证一致性

    • 多级目录

    • 社区也正在解决这么问题,可以关注 KIP 112、113:

      • 必要的工具用于管理 JBOD;
      • 自动化的分区管理;
      • 磁盘损坏时,Broker 可以将 replicas 迁移到好的磁盘上;
      • 在同一个 Broker 的磁盘间 reassign replicas;
  • RAID 10 的特点:

    • 可以允许单磁盘的损坏;
    • 性能和保护;
    • 不同磁盘间的负载均衡;
    • 高命中来减少 space;
    • 单一的 mount point;
  • SSD

    • 昂贵的选项
  • 文件系统:

    • 使用 EXT 或 XFS;
    • Issue on NFS
    • SNA NAS

基本监控

  • CPU 负载
  • 网络带宽
  • 文件句柄数
  • 磁盘空间
  • 磁盘 IO 性能
  • JVM GC 信息
  • ZooKeeper 监控

Kafka Replication

Replication介绍

  • Partition 有两种副本:Leader,Follower;

  • Leader 负责维护 in-sync-replicas(ISR)

    • replica.lag.time.max.ms:默认为10000,如果 follower 落后于 leader 的消息数超过这个数值时,leader 就将 follower 从 isr 列表中移除;
    • min.insync.replica:Producer 端使用来用于保证持久性

Under Replicated Partitions

  • JMX指标:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

  • 可能原因

    • broker挂了
    • controller问题
    • zk问题
    • 网络问题
  • 解决办法

    • 调整ISR参数,比如:

      • min.insync.replica
      • replica.lag.time.max.ms
      • num.replica.fetchers 默认为1,用于从 leader 同步数据的 fetcher 线程数
    • 增加broker

Controller

  • 负责管理 partition 生命周期;

  • 避免 Controller’s ZK会话超时:

  • ISR 抖动;

    • ZK Server 性能问题;
    • Broker 长时间的 GC;
    • 网络 IO 问题;
  • 监控:

    • kafka.controller:type=KafkaController,name=ActiveControllerCount- jmx
    • LeaderElectionRate

Unclean leader 选举

  • 允许不在 isr 中 replica 被选举为 leader。

    • 这是 Availability 和 Correctness 之间选择,Kafka 默认选择了可用性;
    • unclean.leader.election.enable:默认为 true,即允许不在 isr 中 replica 选为 leader,这个配置可以全局配置,也可以在 topic 级别配置;
    • 监控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec - JMX
    • default下一版本将改变

Broker 配置

基本配置

  • log.retention.{ms, minutes, hours} , log.retention.bytes

  • message.max.bytes, replica.fetch.max.bytes

  • delete.topic.enable:默认为 false,是否允许通过 admin tool 来删除 topic;

  • unclean.leader.election.enable = false

  • min.insync.replicas = 2

    • 当 Producer 的 acks 设置为 all 或 -1 时,min.insync.replicas 代表了必须进行确认的最小 replica 数,如果不够的话 Producer 将会报 NotEnoughReplicas 或 NotEnoughReplicasAfterAppend 异常
  • replica.lag.time.max.ms 超过这个时间没有发送请求的话,follower 将从 isr 中移除)

  • num.replica.fetchers = 1 用于从 leader 同步数据的 fetcher 线程数

  • replica.fetch.response.max.bytes

  • zookeeper.session.timeout.ms = 30s

  • num.io.threads:默认为8,KafkaRequestHandlerPool 的大小

Cluster评估

  • Broker 评估

    • 每个 Broker 的 Partition 数不应该超过2k
    • 控制partition <=25GB;
  • 集群评估(Broker 的数量根据以下条件配置)

    • 数据保留时间
    • 集群的流量大小
  • 集群扩容:

    • 磁盘使用率<60%
    • 网络使用率<75%
  • 集群监控

    • 确保topic分区分布尽量均匀
    • 确保broker节点不会出现磁盘、带宽耗尽

Broker 监控

  • Partition 数:kafka.server:type=ReplicaManager,name=PartitionCount - JMX
  • Leader 副本数:kafka.server:type=ReplicaManager,name=LeaderCount - JMX
  • ISR 扩容/缩容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec - JMX
  • 读写速率:Message in rate/Byte in rate/Byte out rate
  • 网络请求的平均空闲率:NetworkProcessorAvgIdlePercent
  • 请求处理平均空闲率:RequestHandlerAvgIdlePercent

Topic 评估

  • partition 数

    • Partition 数应该至少与最大 consumer group 中 consumer 线程数一致
    • 对于使用频繁的 topic,应该设置更多的 partition
    • 控制 partition<=25GB 左右
    • 考虑应用未来的增长(可以使用一种机制进行自动扩容)
  • 使用带 key 的 topic

    • 相同的key 进入相同的partition
  • partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)

分区选择

  • 根据TPS的要求设置 partition 数:

    • 假设 Producer 单 partition 的吞吐量为 P
    • consumer 消费一个 partition 的吞吐量为 C
    • 而要求的吞吐量为 T
    • 那么 partition 数至少应该大于 T/P、T/C 的最大值
  • 更多分区意味着更多的文件句柄、消息处理延时和更多的内存使用

    • 一条消息只有其被同步到 isr 的所有 broker 上后,才能被消费,partition 越多,不同节点之间同步就越多,这可能会带来毫秒级甚至数十毫秒级的延迟
    • 对于每一个 segment,在 broker 都会有一个对应的 index 和实际数据文件,而对于 Kafka Broker,它将会对于每个 segment 每个 index 和数据文件都会打开相应的 file handle(可以理解为 fd),因此,partition 越多,将会带来更多的 fd
    • Producer 和 Consumer 都会按照 partition 去缓存数据,每个 partition 都会带来数十 KB 的消耗,partition 越多, Client 将会占用更多的内存
  • 参考

Quotas

  • 避免恶意客户端并维护SLA - 安全认证
  • 设定字节率阈值限制
  • 监控throttle-rate,byte-rate
  • replica.fecth.response.max.bytes: 设置follower副本fetch请求response大小
  • 限制带宽: kafka-reassign-partitions.sh --throttle options

Kafka Producer

  • 使用Java版本producer

  • 使用kafka-producer-perf-test.sh测试

  • 设置好内存、cpu、batch、压缩等参数

    • batch.size: 越大,TPS越大,延时也越大
    • linger.ms: 越大,TPS越大,延时也越大
    • max.in.flight.requests.per.connection: 增加TPS,影响消息接收顺序
    • compression.type: 设置压缩类型,提升TPS
    • acks: 设置消息持久性级别
  • 避免发送大消息(会使用更多内存,降低broker处理)

性能调优

  • 如果吞吐量小于网络带宽

    • 增加线程;
    • 提高 batch.size;
    • 增加更多 producer 实例;
    • 增加 partition 数;
  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解

  • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置

监控指标

  • batch-size-avg
  • compression-rate-avg
  • waiting-threads
  • buffer-available-bytes
  • record-queue-time-max
  • record-send-rate
  • records-per-request-avg

Kafka Consumer

  • 使用 kafka-consumer-perf-test.sh 测试环境;

  • 吞吐量问题:

    • partition 数不够
    • OS缓存命中太低,分配更多页缓存
    • 应用的处理逻辑
  • offset topic

    • offsets.topic.replication.factor:默认为3;
    • offsets.retention.minutes:默认为1440,即 1day;
    • MonitorISR,topicsize;
  • offset commit慢的问题:异步 commit 或 手动 commit

基本配置

  • fetch.min.bytes 、fetch.max.wait.ms fetch最小bytes和最大等待时间

  • max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance

  • max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500;

  • session.timeout.ms;

  • Consumer Rebalance

    • check timeouts
    • check processing times/logic
    • GC Issues
  • 网络配置

监控

监控消费速度

  • Consumer Lag:consumer offset 与 the end of log(partition 可以消费的最大 offset) 的差值

  • 监控

    • metric 监控:records-lag-max;
    • 通过 bin/kafka-consumer-groups.sh 查看
    • 用于 consumer 监控的 LinkedIn’s Burrow
  • 减少 Lag

    • 分析 consumer:是 GC 问题还是 Consumer hang 住了
    • 增加 Consumer 的线程
    • 增加分区数和 consumer 线程
    • 提高业务处理速度

如何保证数据不丢?

  1. producer
  • retries = MAX
  • acks=all
  • max.in.flight.requests.per.connection = 1
  1. broker
  • replication factor >= 3
  • min.insync.replicas = 2
  • 关闭unclean leader选举
  1. consumer
  • auto.offset.commit = false
  • 消息被处理后提交offset