前面我们使用 SASL/PLAIN
已经控制了 Kafka
整体访问需要通过认证,但是对于每个 Topic 还是没有进行管控,例如我们只允许第三方系统进行读取操作,如果没有进行管控那么他也可以对任意 Topic 进行其他高危操作,例如 写入别的队列,删除队列.所以我们在此启用 ACL 来控制权限.
项目准备
Zookeeper
Kafka
步骤概述
配置 Kafka Broker
配置 JAAS
修改启动文件
创建 SCRAM 账户
启动 Kafka
验证 SASK/SCRAM
配置 Kafka Broker
进入 Config
配置 server.properties
增加以下内容:
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# sasl.enabled.mechanisms 使用什么加密方案
sasl.enabled.mechanisms=SCRAM-SHA-256
# 上面是 SASL/PLAIN 认证相关 下面是 ACL 认证相关内容
# authorizer.class.name 使用哪个 Class 进行认证处理
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# allow.everyone.if.no.acl.found 假如开启则没有 ACL 配置的用户可以访问全部内容默认关闭
allow.everyone.if.no.acl.found=false
# super.users 设置用户
super.users=User:admin
配置 JAAS
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
#Client{
# org.apache.kafka.common.security.plain.PlainLoginModule required
# username="zkclient"
# password="zkclient";
#};
KafkaServer
用于Kafka
集群Broker
节点之间通信的账户密码.其中的
Username
将采用SCRAM
中的账户密码 这个在稍后的配置中创建Client
是Broker
与Zookeeper
之间的认证,本次内容不采用该内容所以暂时注解掉
修改启动文件
进入 bin 目录下 修改 Kafka-server-start.sh 文件.
在 KAFKA_HEAP_OPTS 添加 JAAS 的配置:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf"
创建 SCRAM 账户
在启动 Kafka
之前我们需要创建好账户 账户分别为 admin
和 alice
$ cd /usr/local/kafka/bin # 进入你自己的安装目录
# admin 账户创建
$ ./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]' --entity-type users --entity-name admin
# alice 账户创建
$ ./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=alice-secret]' --entity-type users --entity-name alice
SASL/SCRAM
认证的用户信息是动态创建存储在 ZooKeeper 中, 由于上面的配置kafka_server_jaas.conf
中 Broker 之间的通信是通过admin
用户的,如果该用户不存在会 「启动报错」。
启动 Kafka
# 后台执行
$ ./kafka-server-start.sh -daemon ../config/server.properties
# 前台执行 我建议在未完成部署测试之前先不要开启后台启动,这样可以更方便的看到Kafka的异常
$ ./kafka-server-start.sh ../config/server.properties
验证 SASL/SCRAM 鉴权
客户端认证配置
如果你有例如 Franz
的工具可以跳过该内容.
配置管理员 连接属性
进入 config
目录配置 cmd.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
配置认证的类型以及登录逻辑的处理类和用户,使用超级管理员 admin
配置生产者 连接属性
进入 config
目录配置 producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
为了方便就不再创建新用户
配置消费者 连接属性
进入 config
目录配置 comsumer.properties
# consumer group id
group.id=deault-user-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
消费者需要配置
group.id
否则在认证时无法通过
创建Topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1 --command-config ../config/cmd.properties
--bootstrap-server
Kafka 服务器地址--topic
Topic 名称command-config
认证配置
消费队列
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties
会提示异常
ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256
Processed a total of 0 messages
因为 Alice 还未配置 ACL 权限
配置 ACL 权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation Read --topic test --group deault-user-group
给 Alice deault-user-group 在 Topic test 一个只读权限
启动消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties
执行完成后Shell 窗口将会一直阻塞等待消息
创建生产者
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config ../config/producer.properties
执行完成后 显示
>
后 输入内容如果在Consumer
Shell 中也看到了对应的内容那么你就完成整体流程.
References
基于 SASL/SCRAM 让 Kafka 实现动态授权认证 - https://mp.weixin.qq.com/s/I49qiLrKSHS7WEoVkkpPeQ
评论