Kafka相关
www.jasongj.com/2015/01/02/Kafka 深度解析/
查看 kafka 消息队列的积压情况-12463494-51CTO 博客
1 | kafka-consumer-groups.sh --bootstrap-server 172.20.154.101:9092 --describe --group marketing_group | grep task-prize-gift |
Kafka 的 Lag 计算误区及正确实现_朱小厮的博客-CSDN 博客_kafka lag
查看正在运行的消费组
1 | kafka-consumer-groups --bootstrap-server master:9092 --list --new-consumer |
计算消息的消息堆积情况
1 | kafka-consumer-groups --bootstrap-server master:9092 --describe --group test_kafka_game_x_g1 |
说明:
LogEndOffset 下一条将要被加入到日志的消息的位移
CurrentOffset 当前消费的位移
LAG 消息堆积量消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量
kafka 清理数据日志 - Adrian·Ding - 博客园
1 | log.dirs=/data/kafka-logslog.cleaner.enable=true |
1 | kafka-configs.sh --zookeeper ce-zookeeper:2181 --entity-type topics --entity-name __consumer_offsets --describe |
命令行操作
1 | # 创建名称为 topic_1 的主题 |
kafka的concurrency 配置过大导致CPU占用率过大
问题:kafka的concurrency 配置过大导致CPU占用率过大。
说明和解决方案:3台机器,分区数量为10,concurrency修改前为30,修改后为3(参考值:机器数量*concurrency<=分区数
)
修改前后趋势图
修改前:
修改后:
注意:concurrent
的数量和 @KafkaListener
是相关的(也可以通过注解参数配置覆盖factory的concurrency @KafkaListener(concurrency = 3)
),互相直接的线程是不干扰的。
消耗资源分析
每一个consumer都会添加到线程池中
每个consumer都会再 while 循环中执行 isRunning()和 pollAndInvoke() 方法(空轮询消耗CPU)。
空轮询消耗CPU示例
参考资料:【spring-kafka】属性 concurrency 的作用及如何配置(RoundRobinAssignor 、RangeAssignor) - 云+社区 - 腾讯云
其他参考资料
kafka 系列七、kafka 核心配置 - 小人物的奋斗 - 博客园
Apache Kafka-通过 concurrency 实现并发消费_小小工匠的技术博客_51CTO 博客
kafka 动态添加 topic,动态添加消费者_小小传奇的博客-CSDN 博客_kafka topic 动态
Kafka 中@KafkaListener 如何动态指定多个 topicJava 知音的博客-CSDN 博客
Kafka auto.offset.reset 值详解_lishuangzhe7047 的博客-CSDN 博客_auto.offset.reset
Kafka 获得 topicPartition 的最早,最新 offset 的时间,以及存储量_卞卞要运动的博客-CSDN 博客_topicpartitions
Kafka 消费者重复消费问题解决 | 贫贫贫贫僧