您好,登錄后才能下訂單哦!
1、首先安裝kafka擴展
#安裝librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2 $ git clone https://github.com/edenhill/librdkafka.git $ ./configure $ make $ sudo make install #安裝 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1 $ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ phpize $ ./configure $ make all -j 5 $ sudo make install
2、生產者代碼示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for($i = 0; $i < 10; $i++) {
$topic->produce(0,0,'test' . $i);
}
3、消費者代碼示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf = new RdKafka\TopicConf();
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0,10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。