飘易博客(作者:Flymorn)
订阅《飘易博客》RSS,第一时间查看最新文章!
飘易首页 | 留言本 | 关于我 | 订阅Feed

【跟我一起搭建物联网平台】4、Kafka消息中间件

Author:飘易 Source:飘易
Categories:物联网 PostTime:2019-6-26 21:33:46
正 文:

什么是Kafka?

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。


kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。


kafka的总体数据流是这样的:



Centos 7安装java

Kafka运行在JVM上,需要先安装java环境。

首先检索包含java的列表

yum list java*

安装java1.8 的64位版本

yuminstall java-1.8.0-openjdk-devel.x86_64

使用命令检查是否安装成功:

java -version

返回:

openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build1.8.0_212-b04)
OpenJDK 64-Bit Server VM (build25.212-b04, mixed mode)

 


Zookeeper安装

kafka依赖zookeeper。

阿里云镜像:https://mirrors.aliyun.com/apache/zookeeper/

创建文件夹

mkdir /zookeeper
mkdir /zookeeper/zkdata
mkdir /zookeeper/zkdatalog

下载zookeeper-3.4.14

wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz
cd zookeeper-3.4.14
cp conf/zoo_sample.cfg conf/zoo.cfg         复制配置文件

编辑配置文件vim zoo.cfg

#心跳间隔
tickTime=2000
#其他服务器连接到Leader时,最长能忍受的心跳间隔数:10*2000 = 20秒
initLimit=10
#发送消息时,多长忍受心跳间隔数:5*2000= 10秒
syncLimit=5
#快照日志
dataDir= /zookeeper/zkdata
#事务日志
dataLogDir= /zookeeper/zkdatalog
#客户端连接zookeeper服务器的端口(默认端口号:2181)
clientPort=2181
#集群ip(通过ifconfig查看本机内网IP,单机可忽略)
#server.1=172.16.29.141:2888:3888
#server.2=IP:2888:3888
#server.3=IP:2888:3888

创建myid文件,对应zoo.cfg里的server.1、server.2、server.3

#server1
echo "1" >/zookeeper/zkdata/myid
#server2
echo "2" >/zookeeper/zkdata/myid
#server3
echo "3" >/zookeeper/zkdata/myid

 

启动

cd /zookeeper/zookeeper-3.4.14
启动:./bin/zkServer.sh start
停止:./bin/zkServer.sh stop
状态:./bin/zkServer.sh status

 

如果状态异常,可以查看当前目录下的zookeeper.out日志文件。


 


安装Kafka

Kafka 消息中间件

阿里云镜像:https://mirrors.aliyun.com/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz

创建文件夹

mkdir /kafka

下载

wget https://mirrors.aliyun.com/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -zxvf kafka_2.12-2.2.1.tgz

修改配置文件

vim kafka_2.12-2.2.1/config/server.properties

主要修改如下:

#分别是1/2/3!!集群中保证唯一
broker.id=1
#对应ip(默认端口:9092)
listeners=PLAINTEXT://:9092
#对应ip
advertised.listeners=PLAINTEXT://:9092
#zookeeper集群地址(若是单机,只配一个即可)
zookeeper.connect=localhost:2181


控制

启动
./bin/kafka-server-start.sh -daemon ./config/server.properties
停止
./bin/kafka-server-stop.sh


Kafka Topic分区

我们在创建Topic时,需要指定分区数

./bin/kafka-topics.sh --create --zookeeper zookeeper_ip1:2181,zookeeper_ip2:2181,zookeeper_ip3:2181 --replication-factor 3 --partitions 3 --topic your_topic

建议分区数为server个数的整数倍


修改分区

./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka-new-topic --partitions 3

查看指定分区的日志,例如分区:1

./bin/kafka-console-consumer.sh --bootstrap-server 172.16.29.141:9092 --topic kafka-topic --partition 1


测试:

创建主题

现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition分区,并且备份因子也设置为1:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看所有topic

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

删除topic

./bin/kafka-topics.sh  --delete --zookeeper localhost:2181  --topic  mysql_topic



发送消息

kafka自带了一个producer生产者命令客户端,可以从本地文件中读取内容,也可以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行会被当做成一个独立的消息。

首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg


消费消息

对于consumer消费者,kafka同样也携带了一个命令行客户端,会将获取到的内容在命令中进行输出:

#消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test
#从头开始
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test --from-beginning
#从头开始只消费10条
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test --from-beginning --max-messages 10

如果你是通过不同的终端窗口来运行以上的命令,将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。


查看消费者分组

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list


查看消费的进度(按消费者分组)

#查看消费的进度

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

返回:

 

其中,LAG是还没有被消费的消息个数。



查看某个主题下一共有多少条消息(获取最大位移数):

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

--time -1 表示要获取指定topic所有分区当前的最大位移,--time -2 表示获取当前最早位移。


kafka一般命令

启动zk 
bin/zkServer.sh start
启动kafka
bin/kafka-server-start.sh config/server.properties &
停止kafka 如果不管用 就是用kill -9 
bin/kafka-server-stop.sh
1.创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.列出主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test
4.消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test
5.删除主题
 1. 删除 kafka 主题 
    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic sceniccenter-base-ticket
 2. 在kafka 数据目录删除主题文件夹
 3. 删除 zookeeper 上的 记录
    1)登录zookeeper客户端:命令:./zkCli.sh 
    2)找到topic所在的目录:ls /brokers/topics
    3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。    
    另外被标记为 marked for deletion 的topic你可以在zookeeper客户端中通过命令获得: ls /admin/delete_topics/【topic name】
  总结  彻底删除topic:
     1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
     2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过 zookeeper-client 删除掉broker下的topic即可。
6.查看toplic 的分区等情况 
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test


 本文完。

作者:飘易
来源:飘易
版权所有。转载时必须以链接形式注明作者和原始出处及本声明。
上一篇:没有了
下一篇:【跟我一起搭建物联网平台】3、使用储存库安装 EMQ X服务器(MQTT)
0条评论 “【跟我一起搭建物联网平台】4、Kafka消息中间件”
No Comment .
发表评论
名称(*必填)
邮件(选填)
网站(选填)

记住我,下次回复时不用重新输入个人信息
© 2007-2019 飘易博客 Www.Piaoyi.Org 原创文章版权由飘易所有