Kafka相关

Kafka - 随笔分类 - 郭俊 Jason - 博客园

www.jasongj.com/2015/01/02/Kafka 深度解析/

查看 kafka 消息队列的积压情况-12463494-51CTO 博客

1
2
kafka-consumer-groups.sh --bootstrap-server 172.20.154.101:9092 --describe --group  marketing_group | grep task-prize-gift

Kafka 的 Lag 计算误区及正确实现_朱小厮的博客-CSDN 博客_kafka lag

查看正在运行的消费组
kafka_2020-07-30-12-51-21

1
2
kafka-consumer-groups --bootstrap-server master:9092 --list --new-consumer
kafka-consumer-groups.sh --bootstrap-server ce-kafka:9092 --list

计算消息的消息堆积情况
kafka_2020-07-30-12-51-47

1
2
kafka-consumer-groups --bootstrap-server master:9092 --describe --group  test_kafka_game_x_g1
kafka-consumer-groups.sh --bootstrap-server ce-kafka:9092 --describe --group default-group

说明:

LogEndOffset 下一条将要被加入到日志的消息的位移
CurrentOffset 当前消费的位移
LAG 消息堆积量

消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量

kafka 清理数据日志 - Adrian·Ding - 博客园

1
2
3
4
5
log.dirs=/data/kafka-logslog.cleaner.enable=true
log.cleanup.policy = delete    // delete|compact
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
1
2
kafka-configs.sh --zookeeper ce-zookeeper:2181 --entity-type topics --entity-name __consumer_offsets --describe
kafka-configs.sh --zookeeper ce-zookeeper:2181 --entity-type topics --entity-name __consumer_offsets --alter --delete-config cleanup.policy

命令行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 创建名称为 topic_1 的主题
kafka-topics.sh --zookeeper nodek.com:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1

# 显示消费组
kafka-consumer-groups.sh --bootstrap-server nodek.com:9092 --list
kafka-consumer-groups.sh --new-consumer --bootstrap-server nodek.com:9092 --list

# 描述消费组(可以查看 offset, client_id, partition等)
kafka-consumer-groups.sh --bootstrap-server nodek.com:9092 --describe --group consumer_demo1

# 生产消息
kafka-console-producer.sh --topic topic_1 --broker-list nodek.com:9092

# 消费消息(指定消费组为 console-consumer-83559)
kafka-console-consumer.sh --bootstrap-server nodek.com:9092 --topic topic_1 --group console-consumer-83559
kafka-console-consumer.sh --bootstrap-server 172.20.154.101:9092 --topic task-prize-gift-dev --from-beginning

kafka的concurrency 配置过大导致CPU占用率过大

问题:kafka的concurrency 配置过大导致CPU占用率过大。
说明和解决方案:3台机器,分区数量为10,concurrency修改前为30,修改后为3(参考值:机器数量*concurrency<=分区数)

修改前后趋势图
image.png

修改前:5dYCAwIUJ0.jpg
修改后:image.png

注意:concurrent 的数量和 @KafkaListener 是相关的(也可以通过注解参数配置覆盖factory的concurrency @KafkaListener(concurrency = 3)),互相直接的线程是不干扰的。
image.png

image.png

消耗资源分析
每一个consumer都会添加到线程池中
image.png

每个consumer都会再 while 循环中执行 isRunning()和 pollAndInvoke() 方法(空轮询消耗CPU)。
image.png

空轮询消耗CPU示例
image.png

参考资料:【spring-kafka】属性 concurrency 的作用及如何配置(RoundRobinAssignor 、RangeAssignor) - 云+社区 - 腾讯云

其他参考资料

kafka 系列七、kafka 核心配置 - 小人物的奋斗 - 博客园

Apache Kafka-通过 concurrency 实现并发消费_小小工匠的技术博客_51CTO 博客
kafka 动态添加 topic,动态添加消费者_小小传奇的博客-CSDN 博客_kafka topic 动态
Kafka 中@KafkaListener 如何动态指定多个 topicJava 知音的博客-CSDN 博客
Kafka auto.offset.reset 值详解_lishuangzhe7047 的博客-CSDN 博客_auto.offset.reset
Kafka 获得 topicPartition 的最早,最新 offset 的时间,以及存储量_卞卞要运动的博客-CSDN 博客_topicpartitions
Kafka 消费者重复消费问题解决 | 贫贫贫贫僧

“高深莫测”的Kafka时间轮原理,原来也就这么回事-CFANZ编程社区