1、安装kafka的扩展之前,在安装php-rdkafka之前,需要先安装librdkafka
wget https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz tar xvzf v1.0.1.tar.gz cd librdkafka-1.0.1 ./configure make && make install
2、安装rdkafka
wget https://github.com/arnaud-lb/php-rdkafka/archive/3.1.0.tar.gz tar xvzf v1.0.1.tar.gz cd php-rdkafka-3.1.0 phpize ./configure --with-php-config=/usr/local/php/bin/php-config ###你的php-config路径 make && make install
3、在php.ini中添加行
# /usr/local/php/etc/php.ini 编辑 extension=rdkafka.so
接下来就可以利用PHP编写消费者了,有2种消费者类型:High-level consumer、Low-level consumer,区别是kafka服务器不会记录低级别消费者,但不影响它们的消费。
使用Low Level Consumer的主要原因是,用户希望比Consumer Group更好的控制数据的消费, 如
*同一条消息读多次,方便Replay
*只消费某个Topic的部分Partition
*管理事务,从而确保每条消息被处理一次(Exactly once)
与High Level Consumer相对,Low Level Consumer要求用户做大量的额外工作
*在应用程序中跟踪处理offset,并决定下一条消费哪条消息
*获知每个Partition的Leader
*处理Leader的变化
*处理多Consumer的协作
kafka生产者
<?php header( 'Content-Type: text/html; charset=UTF-8' );
// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
// 当 RdKafka\ProducerTopic::produce() 生产消息时触发回调
$conf->setDrMsgCb(function ($kafka, $message) {
file_put_contents("./kafka_dr.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
// 生产者
$producer = new RdKafka\Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers("localhost:9092");
// 主题配置
$cf = new RdKafka\TopicConf();
// -1:必须等所有brokers同步完成的确认, 1:当前服务器确认, 0:不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 1);
// 创建主题
$topic = $producer->newTopic("test", $cf);
// 生产消息
for ($i = 1; $i <= 5; $i++) {
//第1个参数:RD_KAFKA_PARTITION_UA 自动选择分区;第2个参数:消息flag,固定为0;第3个参数:payload;第4个参数:可选
//public RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] ) : void
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "key . $i", "key_1");
}
while ($len = $producer->getOutQLen()) {
echo "producer getOutQLen: $len <br>";
$producer->poll(50);// 轮询事件,阻塞多少毫秒
}kafka低级别消费者
<?php header( 'Content-Type: text/html; charset=UTF-8' );
// Low Level消费者 - kafka服务器不记录Low Level消费者
// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {
file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
// 设置消费组 Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'testGroup');
//$conf->set('client.id', 'consumer'.time());//kafka服务器不记录Low Level消费者
// 消费者
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers("localhost:9092");
// 主题配置
$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
// 自动提交时间间隔,默认5000ms
$topicConf->set('auto.commit.interval.ms', 1000);
// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
//$topicConf->set('offset.store.path', __DIR__);
// 设置offset的存储为broker
$topicConf->set('offset.store.method', 'broker');
//smallest:简单理解为从头开始消费
//largest:简单理解为从最新的开始消费
$topicConf->set('auto.offset.reset', 'smallest');
// 创建主题
$topic = $consumer->newTopic("test", $topicConf);
// 第1个参数:消费分区0
// 第2个参数:
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
//参数1:表示消费分区,这里是分区0
//参数2:表示同步阻塞多久
$message = $topic->consume(0, 120 * 1000);
if(is_null($message)){
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}kafka高级别消费者
<?php header( 'Content-Type: text/html; charset=UTF-8' );
// High LEVEL消费者 - kafka服务器会记录High Level消费者
// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {
file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
// Set a rebalance callback to log partition assignments (optional)
// 当有新的消费者加入或者退出消费组时,kafka 会自动重新分配分区给消费者,这里注册了一个回调函数,当分区被重新分配时触发
$conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// 设置消费组 Configure the group.id. All consumer with the same group.id will consume different partitions.
$conf->set('group.id', 'testGroup');
$conf->set('client.id', 'consumer'.time());
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'localhost:9092');
// 主题配置
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
// 高级别消费者
$consumer = new \RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);
echo "Waiting for partition assignment... (may take some time when quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
if(is_null($message)){
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}PHP消费者收到的消息对象如下:
object(RdKafka\Message)#7 (9) {
["err"]=>
int(0)
["topic_name"]=>
string(4) "test"
["timestamp"]=>
int(1560928893402)
["partition"]=>
int(0)
["payload"]=>
string(25)"{"type":1,"data":"hello"}"
["len"]=>
int(25)
["key"]=>
NULL
["offset"]=>
int(242)
["headers"]=>
NULL
}我们只需要拿到 payload 信息进行处理入库(MYSQL)即可。