Apache Kafka允许客户端使用SSL加密流量和身份验证。默认情况下,SSL被禁用,但可以在需要时打开。
为每个Kafka代理生成SSL密钥和证书
部署一个或多个具有SSL支持的代理的第一步是为每个服务器生成公共/私有密钥对。由于Kafka希望所有密钥和证书都存储在密钥库中,因此我们将使用Java的keytool命令执行此任务。
keytool -keystore server.keystore.jks -alias localhost -validity 365 -storetype PKCS12 -genkey -dname "CN=localhost, OU=IT, O=MyCompany, L=beijing, ST=beijing, C=CN"
注意:CN属性必须写成localhost不然会导致后期生成的证书出错!!!
Enter keystore password:test1234
Re-enter new password:test1234
keystore: 密钥仓库存储证书文件。密钥仓库文件包含证书的私钥(保证私钥的安全)。
validity: 证书的有效时间,天。
证书生成后可以使用以下命令验证是否生成成功:
keytool -list -v -keystore server.keystore.jks
server.keystore.jks 这个文件包含了一对公私钥,和一个证书来识别机器。但是,证书是未签名的,这意味着攻击者可以创建一个这样的证书来伪装成任何机器。
2.创建自己的CA
生成的CA是一个简单的公私钥对和证书,用于签名其他的证书。
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
[root@node01 opt]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
Generating a 2048 bit RSA private key
......................................................+++
...................................................+++
writing new private key to 'ca-key'
Enter PEM pass phrase:
Verifying - Enter PEM pass phrase:
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:CN
State or Province Name (full name) []:beijing
Locality Name (eg, city) [Default City]:beijing
Organization Name (eg, company) [Default Company Ltd]:myCompany
Organizational Unit Name (eg, section) []:IT
Common Name (eg, your name or your server's hostname) []:kafkaSSL
Email Address []:
3.创建信任库
将生成的CA添加到clients.truststore(客户的信任库),以便client可以信任这个CA:
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
示例:
[root@node01 opt]# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
Enter keystore password:
Re-enter new password:
Owner: CN=kafkaSSL, OU=IT, O=myCompany, L=beijing, ST=beijing, C=CN
Issuer: CN=kafkaSSL, OU=IT, O=myCompany, L=beijing, ST=beijing, C=CN
Serial number: db4a52d0cfa8dabd
Valid from: Tue Feb 28 18:19:12 CST 2023 until: Wed Feb 28 18:19:12 CST 2024
Certificate fingerprints:
MD5: D7:A8:77:A8:86:B2:27:CE:40:F4:8C:EF:F3:7D:94:15
SHA1: 60:14:AA:54:87:2F:75:6C:1A:28:E9:79:9C:6F:F8:CB:1D:B6:FE:D8
SHA256: 55:65:05:66:69:0E:41:AC:D8:05:33:3B:13:E6:A2:4F:93:FD:F8:0D:DA:05:E3:57:0D:BB:31:35:84:09:2E:5C
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: A0 BF F9 B5 D4 02 30 6C 7C E4 56 8C 84 2F 35 EA ......0l..V../5.
0010: 0A 3B EF CE .;..
]
]
#2: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
CA:true
PathLen:2147483647
]
#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: A0 BF F9 B5 D4 02 30 6C 7C E4 56 8C 84 2F 35 EA ......0l..V../5.
0010: 0A 3B EF CE .;..
]
]
Trust this certificate? [no]: yes
Certificate was added to keystore
客户端的信任库存储所有客户端信任的证书,将证书导入到一个信任仓库也意味着信任由该证书签名的所有证书,正如上面的比喻,信任政府(CA)也意味着信任它颁发的所有护照(证书),此特性称为信任链,在大型的kafka集群上部署SSL时特别有用的。可以用单个CA签名集群中的所有证书,并且所有的机器共享相同的信任仓库,这样所有的机器也可以验证其他的机器了。
4.签名证书
用步骤2生成的CA签名步骤1生成的证书。首先需要导出请求文件。
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
cert-file: 出口,服务器的未签名证书
然后用CA签名
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
最后,你需要导入CA的证书和已签名的证书到密钥仓库:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
5.配置Kafka Broker
listeners=SSL://localhost:9093
#server.keystore.jks的地址
ssl.keystore.location=/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
#server.truststore.jks的地址
ssl.truststore.location=/ssl/server.truststore.jks
ssl.truststore.password=test1234
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
6.配置Kafka 客户端
Producer和Consumer的SSL的配置是相同的。
client-ssl.properties:
security.protocol=SSL
ssl.truststore.location=/ssl/client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.endpoint.identification.algorithm=
7.启动Kafka测试
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka:
bin/kafka-server-start.sh config/server.properties
创建topic:
bin/kafka-topics.sh --create --topic testSSL --bootstrap-server localhost:9093 --command-config client-ssl.properties
生产者:
bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic testSSL --producer.config client-ssl.properties
消费者:文章来源:https://uudwc.com/A/LmVJ2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic testSSL --consumer.config client-ssl.properties
注意:client-ssl.properties里需要配置ssl.endpoint.identification.algorithm=来取消验证主机名。文章来源地址https://uudwc.com/A/LmVJ2