EMQX服务器搭建好之后,我们需要将MQTT消息持久化保存到mysql等数据库里了,这个时候就需要借助kafka这个消息中间件了,下面飘易来介绍下Kafka插件编译安装。
Kafka插件github上有好几个:
https://github.com/bob403/emqx_kafka_bridge (我们选择这个,支持emqx-v3.0版本)
https://github.com/iotblue/emqx-kafka-bridge/tree/emqx30
https://github.com/shsjdcbg/emq-kafka
修改 emqx-rel目录下的Makefile:
DEPS += $(foreachdep,$(MAIN_APPS),$(call app_name,$(dep))) DEPS += emqx_kafka_bridge 【增加这行】 ... $(foreach dep,$(MAIN_APPS),$(evaldep_$(call app_name,$(dep)) = $(CLONE_METHOD) https://github.com/emqx/$(dep)$(call app_vsn,$(dep)))) dep_emqx_kafka_bridge = git https://github.com/bob403/emqx_kafka_bridge.git master 【增加这行】
在relx.config里增加:
{emqx_kafka_bridge, load}
重新编译emqx:
cd emqx-relx rm -rf _rel //删除之前编译的结果 make clean make
修改kafka插件配置:
/emq/emqx-rel/_rel/emqx/etc/plugins/emqx_kafka_bridge.conf
【注意】这个插件的配置文件在好几个地方都存在,我们需要找到编译后的【_rel】目录下的配置,否则不生效。
根据实际情况调整配置:
##-------------------------------------------------------------------- ## kafka Bridge ##-------------------------------------------------------------------- ## The Kafka loadbalancer node hostthat bridge is listening on. ## ## Value: 127.0.0.1, localhost kafka.host = localhost ## The kafka loadbalancer node portthat bridge is listening on. ## ## Value: Port kafka.port = 9092 ## The kafka loadbalancer nodepartition strategy. ## ## Value: random, sticky_round_robin,strict_round_robin, custom kafka.partitionstrategy =strict_round_robin ## Each worker represents aconnection to a broker + topic + partition combination. ## You can decide how many workers tostart for each partition. ## ## Value: kafka.partitionworkers = 8 ## payload topic. ## kafka上要提前建立一个名为Processing的Topic ## Value: string kafka.payloadtopic = Processing
启用插件
列出所有插件:
./emqx_ctl plugins list
看到:
Plugin(emqx_kafka_bridge,version=3.0, description=EMQ X Kafka Bridge, active=false)
启动kafka插件:
./emqx_ctl plugins load emqx_kafka_bridge
返回:
Start apps: [emqx_kafka_bridge] Plugin emqx_kafka_bridge loadedsuccessfully.
测试步骤MQTT消息是否转发到kafka:
1,kafka上要提前建立一个名为Processing的Topic(分区为3,方便多个客户端消费)
./bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic Processing
2,打开MQTT客户端,我使用的是chrome浏览器应用MQTTBox:
随便发布消息到一个MQTT主题,比如test,这里的MQTT 的topic和kafka的topic毫无关系,不要搞混了。
3,Kafka消费终端,消费主题Processing:
./bin/kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic Processing
收到反馈如下:
{"action":"connected","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","ts":1561539149} {"action":"message_publish","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","topic":"test","payload":"a1","ts":1561539155} {"action":"message_publish","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","topic":"test","payload":"a2","ts":1561539157} {"action":"disconnected","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561537211825","username":"undefined","ts":1561539143}
可以收到MQTT转发的事件,比如connected、disconnected、message_publish等。
剩下的事我们就可以利用PHP、java等客户端去消费Processing主题,将需要的信息存入MYSQL等数据库了。