当前位置 : 首页 » 文章分类 :  开发  »  Apache-kafka

Apache-kafka

kafka笔记

https://kafka.apache.org/


spring-kafka

spring-kafka 的 AckMode

RECORD:每处理一条commit一次
BATCH:(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME:每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT:累积达到ackCount次的ack去commit
COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
MANUAL:listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit

聊聊spring for kafka的AckMode
https://juejin.im/post/59e0528df265da43133c2ab5

spring-kafka 的自动commit offset机制

enable.auto.commit 设为false时,会使用spring-kafka的自动提交offset机制。
enable.auto.commit 设为true时采用kafka的默认提交模式。
spring-kafka 会检查 enable.auto.commit 变量是否为false,当为false时,
spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。

总结kafka的consumer消费能力很低的情况下的处理方案
https://www.jianshu.com/p/4e00dff97f39


Java消费kafka消息手动commit代码(带sasl配置)

package com.masikkk.kafka;

import com.google.common.collect.Lists;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestKafka {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
        //设置不自动提交,自己手动更新offset
        properties.put("enable.auto.commit", "false");
                // 从最新offset开始消费j
        properties.put("auto.offset.reset", "latest");
        //properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "com-masikkk-kafka-group-test");
        properties.put("auto.commit.interval.ms", "100000");

                // 认证
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Lists.newArrayList("com-masikkk-kafka-topic-test"));

                while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(100);
                        if (records.isEmpty()) {
                                System.out.print("null ");
                                continue;
                        }
                        for (ConsumerRecord<String, String> record: records) {
                                System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset() + ", value = " + record.value());
                                consumer.commitSync();
                        }
                }
    }
}

java自己手动控制kafka的offset
https://blog.csdn.net/qq_20641565/article/details/64440425

Kafka启用SASL_PLAINTEXt动态配置JAAS文件的几种方式
https://blog.csdn.net/russle/article/details/81041135


kafka配置

Broker配置

3.1 Broker Configs
https://kafka.apache.org/documentation/#brokerconfigs

生产者配置

3.3 Producer Configs
https://kafka.apache.org/documentation/#producerconfigs

消费者配置

3.4 Consumer Configs
https://kafka.apache.org/documentation/#consumerconfigs

bootstrap.servers:””

kafka集群地址
格式:host1:port1,host2:port2,...
例如:192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092

group.id:””

消费组id

security.protocol:PLAINTEXT

和brokers之间通信的安全协议,可取以下值:
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

sasl.mechanism:GSSAPI

客户端连接的sasl机制

sasl.jaas.config:null

SASL JAAS 配置文件
例如kafka.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};

enable.auto.commit:true

是否启用自动提交

auto.commit.interval.ms:5000

自动提交offset的时间间隔,当 enable.auto.commit 为true时才起作用

session.timeout.ms:10000

会话超时时间,就是说如果发送心跳时间超过这个时间,broker就会认为消费者死亡了,默认值是10000ms,也就是10s(这个值一般默认没问题)

max.poll.interval.ms:300000

两次poll消息之间的最大间隔
假如消息处理太慢,两次poll之间时间差超过 max.poll.interval.ms 毫秒值,broker就认为这个消费者挂了,就会把它从组内删除,并且重新平衡,把partition分配给组内的其他消费者。

kafka 0.10.0.0 或之前版本中,尚未提供 max.poll.interval.ms 参数,因此 session.timeout.ms 既用于失败检测,也用于控制消息处理时间,同时还承担着rebalance过程的超时控制。在 0.10.1.0 版本时社区对该参数的含义进行了解耦,推出了max.poll.interval.ms参数。实际上,在0.10.1.0.0或之后的版本中,作者推荐用户将session.timeout.ms设置一个很小的值,比如5s,但需要把max.poll.interval.ms设置成平均的消息处理时间。举个例子,假设你一次poll调用返回的消息数是N,你处理每条消息的平均时间是t0,那么你需要设置max.poll.interval.ms稍稍大于N * t0以保证poll调用间隔不会超过该阈值。

max.poll.records:500

一次从kafka中poll出来的数据最大值

heartbeat.interval.ms:3000

这个值是心跳时间,表示消费组多长时间向broker报告一次,这个默认值3000ms,这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)

auto.offset.reset:latest

只能是以下三个值之一:

  • smallest : 自动重置到最小的offset, 这个最小的offset不一定是0, 因为msg可能会被过期删除掉;
  • largest : 自动重置到最大的offset;
  • none: 无法获取有效offset时抛出异常

这个配置只有在当前无法获取到有效的offset时才生效,有两种情况:
一、全新的group; 二、已存在的group, 但很久没有提交过offset, 其保存在__consumer_offsets里的信息将被compact并最终清除掉;

Kafka重置消费的Offset
https://www.jianshu.com/p/2945a90b48af

kafka 0.10.1一些使用经验
https://www.jianshu.com/p/32f1f16af937


kafka命令行工具

Mac homebrew 安装 kafka

brew install kafka

kafka使用zookeeper管理,安装过程会自动安装zookeeper

安装目录:
/usr/local/Cellar/kafka/2.1.0

配置文件目录:
/usr/local/etc/kafka

工具脚本目录
/usr/local/Cellar/kafka/2.1.0/libexec/bin

mac环境下使用brew安装kafka
https://www.cnblogs.com/lusecond/p/7672532.html

mac 安装kafka
https://www.jianshu.com/p/1f6387d18989

查看kafka版本

进入目录 /usr/local/Cellar/kafka/2.1.0/libexec/libs
看kafka相关jar包版本

准备kafka认证文件

准备kafka认证文件,后面的脚本都需要用
/Users/lll/kafka.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};

kafka-consumer-groups.sh

查看消费组信息,重置offset
这是0.11.0.0版本提供的新功能且只适用于新版本consumer
在新版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用 KafkaConsumer.seek() 方法,费时费力不说还容易出错。0.11.0.0版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移。

KafkaConsumer.seek(TopicPartition partition, long offset)
KafkaConsumer.seekToBeginning(Collection<TopicPartition> partitions)
KafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)

查看消费组信息:
export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
./kafka-consumer-groups.sh –bootstrap-server 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 –group consumer-group-name –describe

重设消费组offset到最新:
export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
./kafka-consumer-groups-stg.sh –bootstrap-server 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 –topic kafka-topic-name –group consumer-group-name –reset-offsets –to-latest –execute

Kafka consumer group位移重设
https://www.cnblogs.com/huxi2b/p/7284767.html

kafka-console-producer.sh

export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
./kafka-console-producer.sh –broker-list xxx:9092,yyy:9092 –topic sparktest –security-protocol SASL_PLAINTEXT

kafka-console-consumer.sh

同目录下准备消费者配置文件 consumer.properties
指定消费组和认证方式

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
#group.id=consumer-group-name

不指定消费组也可以,好像不指定时会以一个新的消费组去消费消息,比如服务器上有消费者在消费某个topic的消息,我们本地也想看看,这时应该以一个新的消费组去消费,不能和服务器上的消费组争抢消息,以免造成数据不一致。

从头开始消费:
export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
./kafka-console-consumer.sh –bootstrap-server 192.168.1.101:9092,192.168.1.102:9092 –topic kafka-topic-name -from-beginning —offset earliest –consumer.config ./consumer.properties

从最后开始消费:
export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
./kafka-console-consumer.sh –bootstrap-server 192.168.1.101:9092,192.168.1.102:9092 –topic kafka-topic-name –consumer.config ./consumer.properties

【Kafka零基础学习】如何用命令行生产或消费kerberos kafka集群
https://www.jianshu.com/p/150ed14ec161


kafka安全

自0.9.0.0.版本引入Security之后,Kafka一直在完善security的功能。当前Kafka security主要包含3大功能:认证(authentication)、信道加密(encryption)和授权(authorization)。信道加密就是为client到broker、broker到broker以及工具脚本与broker之间的数据传输配置SSL;认证机制主要是指配置SASL,而授权是通过ACL接口命令来完成的。

生产环境中,用户若要使用SASL则必须配置Kerberos,但对于一些小公司而言,他们的用户系统并不复杂(特别是专门为Kafka集群服务的用户可能不是很多),显然使用Kerberos有些大材小用,而且由于运行在内网环境,SSL加密也不是很必要。因此一个SASL+PLAINTEXT的集群环境足以应付一般的使用场景。

Kafka ACL使用实战
https://www.cnblogs.com/huxi2b/p/7382144.html

加载jaas的几种方式

设置系统属性java.security.auth.login.config

比如我们使用kafka命令行时手动设置此属性:
export KAFKA_OPTS=”-Djava.security.auth.login.config=/Users/lll/kafka.conf”
或者在消费者服务的启动脚本中设置 java.security.auth.login.config 属性

直接设置Producer或者Consumer的sasl.jaas.config属性

例如:

Properties properties = new Properties();
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Lists.newArrayList("com-masikkk-kafka-topic-test"));

Kafka启用SASL_PLAINTEXt动态配置JAAS文件的几种方式
https://blog.csdn.net/russle/article/details/81041135


一次Kafka消息循环消费问题排查

现象:
有个微服务的接口被持续大量调用,产生的日志迅速堆满磁盘,产生告警。经查问题发现调用来自kafka消息的消费服务,有一个topic里有100多条消息被不断的重复消费,而每条消息的消费都要调100多次微服务接口,最终导致微服务日志堆积。

问题:
查到问题来自kafka消息循环消费后,发现日志中有自动 commit offset 失败:

Auto-commit of offsets {swc-uds-relation-stg-inviter-relations-2=OffsetAndMetadata{offset=146, metadata=''}, swc-uds-relation-stg-inviter-relations-1=OffsetAndMetadata{offset=3, metadata=''}, swc-uds-relation-stg-inviter-relations-0=OffsetAndMetadata{offset=3, metadata=''}} failed for group uds-receiver-relation-inviter-relationship-stg: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

自动commit offset失败的原因也说的很清楚,消息消费太慢,下次 poll 消息时超过 max.poll.interval.ms ,然后就被再平衡了,从而自动 commit offset 失败,offset没有往前走,下次再poll消息时还会取到之前的消息,造成重复消费现象。
一共两台消费者服务器,每次rebalance时自动转给另一个消费者,但另一个消费者也必然消费超时,就这么互相转来转去,各自都commit offset失败,一直循环。
kafka版本:spring-kafka:1.3.1.RELEASE, 对应的kafka版本:kafka-clients:0.11.0.0

原因:
原因也很容易找到,每条消息的消费中要调用100多次服务接口导致超时。

解决方法:
kafka的异常提示中已经给出了解决方案:要么改大超时时间,要么减少每次poll取回的消息个数,总之就是想办法在超时时间内快速把消息消费掉。
我的解决方法是把接口调用改为多线程异步调用,每次取完消息马上就commit offset,后来继续消费过程。

处理当前问题:
由于服务器上这些消息一直循环消费浪费服务器资源,临时想了个解决方法,在本地用同一个group id把服务器上的消费消费掉,让offset往前走,解决当前的循环消费问题,后续改进代码。
试了kafka的命令行工具,可以用同一个消费组在控制台打出消息,但貌似消费完后没有自动commit,offset还是原来的值。后来用java写了段代码,手动调用 consumer.commitSync(); 才使offset前进。
但offset前进后,服务器上还是在重复消费这些消息,猜可能是已经分配的消息消费失败的话总是会重复,后来重启了一下消费者服务,没问题了,因为消费者重启后,会去 latest offset 拉取消息,肯定就取不到之前的消息了。

总结:
一般情况下,kafka重复消费都是由于未正常提交offset

Kafka的CommitFailedException异常
https://www.cnblogs.com/huxi2b/p/8405566.html


KafkaEmbedded kafka单元测试

Spring Kafka - Embedded Unit Test Example
https://www.codenotfound.com/spring-kafka-embedded-unit-test-example.html

code-not-found/spring-kafka
https://github.com/code-not-found/spring-kafka

Simple embedded Kafka test example with spring boot
https://stackoverflow.com/questions/48753051/simple-embedded-kafka-test-example-with-spring-boot


异常

CommitFailedException异常

CommitFailedException异常:位移提交失败时候抛出的异常。通常该异常被抛出时还会携带这样的一段话:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

抛出时机
从源代码方面说,CommitFailedException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法。

消息处理时间大于 max.poll.interval.ms 时: 如前所述,这是该异常最“正宗”的出现场景。复现也比较容易,用户只需写一个consumer程序,订阅topic(即使用consumer.subscribe),设置max.poll.interval.ms=N,然后在consumer.poll循环中Thread.sleep(>N),之后手动提交位移即可复现,比如:

props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("topic1", "topic2", ...));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    // 处理消息
    Thread.sleep(6000L);
    consumer.commitSync();
}

Kafka的CommitFailedException异常
https://www.cnblogs.com/huxi2b/p/8405566.html


System property ‘java.security.auth.login.config’ is not set

原因是找不到 kafka 账号配置文件,可能原因是配置文件是通过启动脚本的 jvm参数传入的,但本地启动spring boot时忘了指定这个参数,就可能出此问题。
解决方法是在idea中配置Run Configuration,把kafka配置文件路径加入 jvm 参数中。
VM options: -Djava.security.auth.login.config=/Users/user/IdeaProjects/uds/uds-message-consumer/conf/test/kafka.conf

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:100)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:58)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:374)

Kafka “Login module not specified in JAAS config”
https://stackoverflow.com/questions/45756543/kafka-login-module-not-specified-in-jaas-config


基本概念

Kafka是一个分布式流数据系统,使用Zookeeper进行集群的管理。与其他消息系统类似,整个系统由生产者、Broker Server和消费者三部分组成,生产者和消费者由开发人员编写,通过API连接到Broker Server进行数据操作。我们重点关注三个概念:

Topic

Topic,是Kafka下消息的类别,类似于RabbitMQ中的Exchange的概念。这是逻辑上的概念,用来区分、隔离不同的消息数据,屏蔽了底层复杂的存储方式。对于大多数人来说,在开发的时候只需要关注数据写入到了哪个topic、从哪个topic取出数据。

Partition

Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上,比如下图所示的topic就有4个partition,分散在两台机器上。这种方式在大多数分布式存储中都可以见到,比如MongoDB、Elasticsearch的分片技术,其优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。

Consumer Group

Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。理解consumer group记住下面这三个特性就好了:

1)consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
2)group.id是一个字符串,唯一标识一个consumer group
3)consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

offset

Kafka消费后都会提交保存当前的消费位置offset, 可以选择保存在zk, 本地文件或其他存储系统;
Kafka 0.8以后提供了Coordinator的角色,Coordinator除了可以来协调消费的group作balance外, 还接受 OffsetCommit Request, 用来存储消费的offset到Kafka本身中

Kafka消费端的offset主要由consumer来控制, Kafka将每个consumer所监听的tocpic的partition的offset保存在__consumer_offsets主题中


kafka partition

kafka partition

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。

生产者在生产数据的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪一个分区中,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,在消费者端,同一个消费组可以多线程并发的从多个分区中同时消费数据(后续将介绍这块)。

Consumer分区分配机制

分配partition的策略:

range:对于每个topic,会将topic的partition编上序号排好序,然后consumer线程以字典序排序。然而把partition的总数除以consumer线程的总数来决定分配给每个线程的partition数目。如果无法除尽,将余数再均分给排序靠前的几个线程,即这些线程都会多出额外的一个partition。
round-robin:这个策略把所有partition和所有consumer线程都列出来。然后它以循环制分配partition给线程。如果所有consumer实例的订阅是相同的,那么partition会均匀分布。这个分配策略只有当以下情况成立时才可用:a.每个topic在一个consumer实例中有同样的stream数目。b.在group中的每个consumer实例订阅的topic的集合是相同的。
如果所有consumer实例有相同的consumer group,那么这个就像传统的队列,负载均衡到所有consumer上。假如多个consumer实例都有多个线程,且属于同一个group,那么一个topic的所有partition会均匀分配给所有线程。

接收消息的顺序只能保证一个partition之内是有序的,一个consumer接收多个partition的话是无法保证消息全局有序的,即consumer接收的消息的顺序可能跟producer发送的顺序不同。

Producer分区机制

当指定partition key的时候,分配partition的策略:

hash:由消息所提供的key来进行hash,然后分发到对应的partition。这是默认使用的partition机制。
自定义:自己实现partition接口,并在配置中用参数partitioner.class指定这个实现。
当没有指定partition key的时候,分配partition的策略:

随机:把每个消息随机分发到一个partition中。在10分钟内,该partition不会切换。所以,当producer数目小于partition时,在一定时间内会有部分partition没有收到数据。

Kafka分配分区的机制
http://lsr1991.github.io/2015/07/09/kafka-partition-mechanism/

kafka-manager

kafka-manager
https://github.com/yahoo/kafka-manager

kafka集群管理工具kafka-manager部署安装
https://www.jianshu.com/p/c24ed08dfa63

Apache Kafka 入门 - Kafka-manager的基本配置和运行
https://blog.csdn.net/isea533/article/details/73727485


上一篇 Java8 Optional 笔记

下一篇 Apache-HttpComponents

阅读
5,586
阅读预计24分钟
创建日期 2018-08-27
修改日期 2019-01-19
类别
百度推荐