1、安装kafka的扩展之前,在安装php-rdkafka之前,需要先安装librdkafka
wget https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz tar xvzf v1.0.1.tar.gz cd librdkafka-1.0.1 ./configure make && make install
2、安装rdkafka
wget https://github.com/arnaud-lb/php-rdkafka/archive/3.1.0.tar.gz tar xvzf v1.0.1.tar.gz cd php-rdkafka-3.1.0 phpize ./configure --with-php-config=/usr/local/php/bin/php-config ###你的php-config路径 make && make install
3、在php.ini中添加行
# /usr/local/php/etc/php.ini 编辑 extension=rdkafka.so
接下来就可以利用PHP编写消费者了,有2种消费者类型:High-level consumer、Low-level consumer,区别是kafka服务器不会记录低级别消费者,但不影响它们的消费。
使用Low Level Consumer的主要原因是,用户希望比Consumer Group更好的控制数据的消费, 如
*同一条消息读多次,方便Replay
*只消费某个Topic的部分Partition
*管理事务,从而确保每条消息被处理一次(Exactly once)
与High Level Consumer相对,Low Level Consumer要求用户做大量的额外工作
*在应用程序中跟踪处理offset,并决定下一条消费哪条消息
*获知每个Partition的Leader
*处理Leader的变化
*处理多Consumer的协作
kafka生产者
<?php header( 'Content-Type: text/html; charset=UTF-8' ); // 文档参考 https://github.com/arnaud-lb/php-rdkafka // kafka配置 $conf = new RdKafka\Conf(); // 当 RdKafka\ProducerTopic::produce() 生产消息时触发回调 $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./kafka_dr.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); // 生产者 $producer = new RdKafka\Producer($conf); $producer->setLogLevel(LOG_DEBUG); $producer->addBrokers("localhost:9092"); // 主题配置 $cf = new RdKafka\TopicConf(); // -1:必须等所有brokers同步完成的确认, 1:当前服务器确认, 0:不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉 $cf->set('request.required.acks', 1); // 创建主题 $topic = $producer->newTopic("test", $cf); // 生产消息 for ($i = 1; $i <= 5; $i++) { //第1个参数:RD_KAFKA_PARTITION_UA 自动选择分区;第2个参数:消息flag,固定为0;第3个参数:payload;第4个参数:可选 //public RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] ) : void $topic->produce(RD_KAFKA_PARTITION_UA, 0, "key . $i", "key_1"); } while ($len = $producer->getOutQLen()) { echo "producer getOutQLen: $len <br>"; $producer->poll(50);// 轮询事件,阻塞多少毫秒 }
kafka低级别消费者
<?php header( 'Content-Type: text/html; charset=UTF-8' ); // Low Level消费者 - kafka服务器不记录Low Level消费者 // 文档参考 https://github.com/arnaud-lb/php-rdkafka // kafka配置 $conf = new RdKafka\Conf(); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); // 设置消费组 Set the group id. This is required when storing offsets on the broker $conf->set('group.id', 'testGroup'); //$conf->set('client.id', 'consumer'.time());//kafka服务器不记录Low Level消费者 // 消费者 $consumer = new RdKafka\Consumer($conf); $consumer->addBrokers("localhost:9092"); // 主题配置 $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); // 自动提交时间间隔,默认5000ms $topicConf->set('auto.commit.interval.ms', 1000); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); //$topicConf->set('offset.store.path', __DIR__); // 设置offset的存储为broker $topicConf->set('offset.store.method', 'broker'); //smallest:简单理解为从头开始消费 //largest:简单理解为从最新的开始消费 $topicConf->set('auto.offset.reset', 'smallest'); // 创建主题 $topic = $consumer->newTopic("test", $topicConf); // 第1个参数:消费分区0 // 第2个参数: // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1:表示消费分区,这里是分区0 //参数2:表示同步阻塞多久 $message = $topic->consume(0, 120 * 1000); if(is_null($message)){ continue; } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
kafka高级别消费者
<?php header( 'Content-Type: text/html; charset=UTF-8' ); // High LEVEL消费者 - kafka服务器会记录High Level消费者 // 文档参考 https://github.com/arnaud-lb/php-rdkafka // kafka配置 $conf = new RdKafka\Conf(); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); // Set a rebalance callback to log partition assignments (optional) // 当有新的消费者加入或者退出消费组时,kafka 会自动重新分配分区给消费者,这里注册了一个回调函数,当分区被重新分配时触发 $conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // 设置消费组 Configure the group.id. All consumer with the same group.id will consume different partitions. $conf->set('group.id', 'testGroup'); $conf->set('client.id', 'consumer'.time()); // Initial list of Kafka brokers $conf->set('metadata.broker.list', 'localhost:9092'); // 主题配置 $topicConf = new \RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); // 高级别消费者 $consumer = new \RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test' $consumer->subscribe(['test']); echo "Waiting for partition assignment... (may take some time when quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(120*1000); if(is_null($message)){ continue; } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
PHP消费者收到的消息对象如下:
object(RdKafka\Message)#7 (9) { ["err"]=> int(0) ["topic_name"]=> string(4) "test" ["timestamp"]=> int(1560928893402) ["partition"]=> int(0) ["payload"]=> string(25)"{"type":1,"data":"hello"}" ["len"]=> int(25) ["key"]=> NULL ["offset"]=> int(242) ["headers"]=> NULL }
我们只需要拿到 payload 信息进行处理入库(MYSQL)即可。