什么是Kafka?
kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。
kafka的总体数据流是这样的:
Centos 7安装java
Kafka运行在JVM上,需要先安装java环境。
首先检索包含java的列表
yum list java*
安装java1.8 的64位版本
yuminstall java-1.8.0-openjdk-devel.x86_64
使用命令检查是否安装成功:
java -version
返回:
openjdk version "1.8.0_212" OpenJDK Runtime Environment (build1.8.0_212-b04) OpenJDK 64-Bit Server VM (build25.212-b04, mixed mode)
Zookeeper安装
kafka依赖zookeeper。
阿里云镜像:https://mirrors.aliyun.com/apache/zookeeper/
创建文件夹
mkdir /zookeeper mkdir /zookeeper/zkdata mkdir /zookeeper/zkdatalog
下载zookeeper-3.4.14
wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz tar -zxvf zookeeper-3.4.14.tar.gz cd zookeeper-3.4.14 cp conf/zoo_sample.cfg conf/zoo.cfg 复制配置文件
编辑配置文件vim zoo.cfg
#心跳间隔 tickTime=2000 #其他服务器连接到Leader时,最长能忍受的心跳间隔数:10*2000 = 20秒 initLimit=10 #发送消息时,多长忍受心跳间隔数:5*2000= 10秒 syncLimit=5 #快照日志 dataDir= /zookeeper/zkdata #事务日志 dataLogDir= /zookeeper/zkdatalog #客户端连接zookeeper服务器的端口(默认端口号:2181) clientPort=2181 #集群ip(通过ifconfig查看本机内网IP,单机可忽略) #server.1=172.16.29.141:2888:3888 #server.2=IP:2888:3888 #server.3=IP:2888:3888
创建myid文件,对应zoo.cfg里的server.1、server.2、server.3
#server1 echo "1" >/zookeeper/zkdata/myid #server2 echo "2" >/zookeeper/zkdata/myid #server3 echo "3" >/zookeeper/zkdata/myid
启动
cd /zookeeper/zookeeper-3.4.14 启动:./bin/zkServer.sh start 停止:./bin/zkServer.sh stop 状态:./bin/zkServer.sh status
如果状态异常,可以查看当前目录下的zookeeper.out日志文件。
安装Kafka
Kafka 消息中间件
阿里云镜像:https://mirrors.aliyun.com/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
创建文件夹
mkdir /kafka
下载
wget https://mirrors.aliyun.com/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz tar -zxvf kafka_2.12-2.2.1.tgz
修改配置文件
vim kafka_2.12-2.2.1/config/server.properties
主要修改如下:
#分别是1/2/3!!集群中保证唯一 broker.id=1 #对应ip(默认端口:9092) listeners=PLAINTEXT://:9092 #对应ip advertised.listeners=PLAINTEXT://:9092 #zookeeper集群地址(若是单机,只配一个即可) zookeeper.connect=localhost:2181
控制
启动 ./bin/kafka-server-start.sh -daemon ./config/server.properties 停止 ./bin/kafka-server-stop.sh
Kafka Topic分区
我们在创建Topic时,需要指定分区数
./bin/kafka-topics.sh --create --zookeeper zookeeper_ip1:2181,zookeeper_ip2:2181,zookeeper_ip3:2181 --replication-factor 3 --partitions 3 --topic your_topic
建议分区数为server个数的整数倍
修改分区
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka-new-topic --partitions 3
查看指定分区的日志,例如分区:1
./bin/kafka-console-consumer.sh --bootstrap-server 172.16.29.141:9092 --topic kafka-topic --partition 1
测试:
创建主题
现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition分区,并且备份因子也设置为1:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看所有topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
删除topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic mysql_topic
发送消息
kafka自带了一个producer生产者命令客户端,可以从本地文件中读取内容,也可以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >this is a msg >this is a another msg
消费消息
对于consumer消费者,kafka同样也携带了一个命令行客户端,会将获取到的内容在命令中进行输出:
#消费消息 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test #从头开始 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test --from-beginning #从头开始只消费10条 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test --from-beginning --max-messages 10
如果你是通过不同的终端窗口来运行以上的命令,将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
查看消费者分组
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看消费的进度(按消费者分组)
#查看消费的进度
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
返回:
其中,LAG是还没有被消费的消息个数。
查看某个主题下一共有多少条消息(获取最大位移数):
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
--time -1 表示要获取指定topic所有分区当前的最大位移,--time -2 表示获取当前最早位移。
kafka一般命令:
启动zk bin/zkServer.sh start 启动kafka bin/kafka-server-start.sh config/server.properties & 停止kafka 如果不管用 就是用kill -9 bin/kafka-server-stop.sh 1.创建主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 2.列出主题 bin/kafka-topics.sh --list --zookeeper localhost:2181 3.生产消息 bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test 4.消费消息 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test 5.删除主题 1. 删除 kafka 主题 bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic sceniccenter-base-ticket 2. 在kafka 数据目录删除主题文件夹 3. 删除 zookeeper 上的 记录 1)登录zookeeper客户端:命令:./zkCli.sh 2)找到topic所在的目录:ls /brokers/topics 3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。 另外被标记为 marked for deletion 的topic你可以在zookeeper客户端中通过命令获得: ls /admin/delete_topics/【topic name】 总结 彻底删除topic: 1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录 2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过 zookeeper-client 删除掉broker下的topic即可。 6.查看toplic 的分区等情况 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
本文完。