1.kafka安全简介
kafka社区在0.9.0.0版本正式添加了安全特性,并且在0.10.0.0版本中进一步完善。
Kafka的安全特性有如下几个方面:
1.连接认证。服务端与客户端、服务端之间、等等。支持两种认证机制SSL(等价于我们经常使用的TLS)和SASL
2.Zookeeper 与服务端的认证。
3.基于SSL的连接通道数据传输加密
4.客户端读写权限
5.支持可插拔的授权服务和与外部授权服务的集成。
认证(authentication)和授权(authorization)
-
认证:证明自己的过程,对于kafka的认证你必须显示地提供者自己的身份信息来证明自己的身份时合法的。
-
授权:验证认证过的用户可以访问那些资源。
现实的场景中认证和授权我们都是结合使用的。
认证机制分类:
-认证机制- | -引入版本- | -适用场景- |
---|---|---|
SSL | 0.9.0 | SSL做信道加密比较多,SSL认证不如SASL所以一般都会使用SSL来做通信加密 |
SASL/GSSAPI | 0.9.9 | 主要是给 Kerberos 使用的。如果你的公司已经做了 Kerberos 认证(比如使用 Active Directory),那么使用 GSSAPI 是最方便的了。因为你不需要额外地搭建 Kerberos,只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。 |
SASL/PLAIN | 0.10.2 | 简单的用户名密码认证,通常与SSL结合使用,对于小公司来说,没必要搭建公司级别的Kerberos,使用它就比较合适 |
SASL/SCRAM | 0.10.2 | PLAIN的加强版本,支持动态的用户增减 |
SASL/OAUTHBEARER | 2.0 | OAuth 2框架的集成 |
Deleation Token | 1.1 | Delegation Token 是在 1.1 版本引入的,它是一种轻量级的认证机制,主要目的是补充现有的 SASL 或 SSL 认证。如果要使用 Delegation Token,你需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样,Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。 |
ACL规则(kafka授权机制)
Principal P is[Allow/Denied] Operation O Fom Host H On Recourse R
Principal Kafka user
Operation 操作类型 比如 WRITE READ DESCRIBE
Host:kafka ip地址
resourse :资源 TOPIC CLUSTER GROUP 等等
权限列表:
2.使用SASL/PLAIN + ACL搭建内网kafak集群安全机制
由于SASL/PLAIN 创建用户时静态创建的,所以如果新增用户需要修改配置,然后重启kakfa集群。
2.1.创建KafkaServer (jass.config)
KafkaServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};
1.需要注意两个“;”的位置,一定不能少,也不能有多余的空格之类的符号。
2.上边的配置标识创建了三个用户 开启认证
2.2添加server.properties
#开启ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#设置超级用户
super.users=User:admin
#使用SASL_PLIANTEXT 协议
listeners=SASL_PLAINTEXT://10.102.21.92:9092
#Broker之间不启用 SSL
security.inter.broker.protocol=SASL_PLAINTEXT
#PLAIN认证
sasl.enabled.mechanisms=PLAIN
#broker 之间也开启 PLAIN认证
sasl.mechanism.inter.broker.protocol=PLAIN
2.3.启动broker
cp bin/kafka-server-start.sh bin/auth-kafka-server.start.sh
#在脚本中加上一下参数
-Djava.security.auth.login.config=<yourPath>/jass.conf
#启动1
bin/auth-kafka-server.start.sh -daemon config/myserver.properties
#或者直接使用当下启动脚本 启动2
KAFKA_OPTS=-Djava.security.auth.login.config=<YOURPATH>/jass.config bin/kafka-server-start.sh -daemon config/myserver.properties
注意:需要在所有的broker上做一下操作,启动集群。集群启动完成后,当前kafka集群已经开启了认证授权的功能。
2.4发送消费消息实例
1.首先执行一下脚本:
bin/kafka-console.proudcer --broker-list address --topic yourTopic
直接输入此sh进行发送消息会出现如下异常,因为没有使用用户去认证操作。
WARN [Producer clientId=console-producer] Bootstrap broker 10.102.21.92:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
2.创建KafkaClient writer-jaas.conf 指定认证方式以及使用的用户
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};
3.然后我们copy一个脚本将脚本加上 指定客户端启动需要的配置
#最终脚本如下
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/kafka/kafka/writer_jaas.conf kafka.tools.ConsoleProducer "$@"
#执行脚本
bin/write.sh --broker-list brokerAddress --topic test --producer.config producer.conf
此时启动客户端还会有问题,因为我们还未对用户进行授权操作,所以会报没有权限的WARN
对writer用户进行授权
赋予write用户写TOPIC权限
bin/kafka-acls.sh --authorizer kafka.security.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=youAddress --add --allow-principal User:writer --operation Write --topic yourTopic
赋予用户权限后,再去启动producer客户端可以对授权的主题发送消息了。
发送消息客户端创建完毕。
2.5消费消息客户端实例
1.创建 消费者 Client
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader";
};
2.创建consumer.conf(consumer 配置信息)
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
#需要执行消费者组
group.id=test-group
3.消费者授权
需要授予两个权限
READ权限和 GROUP权限(以为消费者有对应的消费者组)
#Read权限
bin/kafka-acls.sh --authorizer kafka.security.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=youAddress --add --allow-principal User:reader --operation Read --topic yourTopic
#Group权限
bin/kafka-acls.sh --authorizer kafka.security.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=youAddress --add --allow-principal User:reader --operation Read --group yourGroupId
3.java 客户端实现
producer
1.创建配置文件
producer,.properties
sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="writer" password="writer";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2.配置kafka客户端信息 其他的操作都不收影响之后要在配置文件中 添加 认证授权相关的配置
//如果kafka集群开启 认证授权功能 加载配置好的 配置文件即可
if(enableAuthentication) {
//Authentication 使用 SASL/SCRAM 认证方式 开启认证配置
InputStream in = ProducerConfig.class.getClassLoader().getResourceAsStream("producer.properties");
try {
properties.load(in);
} catch (IOException e) {
//todo handler produce.properties not exist
e.printStackTrace();
}
}
//一下三个参数都是没有默认值的,必须指定
properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
3.其他操作不需要改变直接发送消息即可。
consumer
consumer端和producer端一样:
1.创建配置
sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="reader" password="reader";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2.加载配置
Properties props = new Properties();
if (enableAuthentication) {
//Authentication 使用 SASL/SCRAM 认证方式 开启认证配置
InputStream in = ConsumerConfig.class.getClassLoader().getResourceAsStream("consumer.properties");
try {
props.load(in);
} catch (IOException e) {
//todo handler produce.properties not exist
e.printStackTrace();
}
}
//必须填写
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
3.需要配置赋予权限的topic 和消费者组id才能正确的去消费消息,消费者端完毕。
本次内容主要实现了一个SASL/PLAIN + ACL的内网kafka集群安全机制,如果需要将SASL/PLAIN升级为SASL/SCRAM过程差不多,可以自己去尝试一下。如果是云上的kafka集群环境就需要使用SSL来实现通信的加密操作。