本篇文章給大家介紹PHP實現生產者與消費者,希望對需要的朋友有所幫助!
前言
PHP中使用Kafka需要RdKafka擴展,而RdKafka依賴于librdkafka,所以這兩個我們都需要安裝,具體安裝方法自行百度,本篇不做說明了。
生產者(測試)
創建消費者需要步驟:
- 生產者配置參數
- 創建生產者實例
- 創建主題實例(依賴生產者)
- 生產主題消息
- 推送消息
具體代碼如下:
$conf = new RdKafkaConf(); // 綁定服務節點 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 創建生產者 $kafka = new RdKafkaProducer($conf); // 創建主題實例 $topic = $kafka->newTopic('p1r1'); // 生產主題數據,此時消息在緩沖區中,并沒有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞時間(毫秒), 0為非阻塞 $kafka->poll(0); // 推送消息,如果不調用此函數,消息不會被發送且會丟失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new RuntimeException('Was unable to flush, messages might be lost!'); }
消費者
創建一個消費者需要幾個步驟:
- 消費者配置參數
- 應用配置參數創建消費者實例
- 訂閱對應主題
- 拉取數據
- 提交位移
具體代碼如下:
$conf = new RdKafkaConf(); // 綁定消費者組 $conf->set('group.id', 'ceshi'); // 綁定服務節點,多個用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 設置自動提交為false $conf->set('enable.auto.commit', 'false'); // 設置當前消費者拉取數據時的偏移量, 可選參數: // earliest: 如果消費者組是新創建的,從頭開始消費,否則從消費者組當前消費位移開始。 // latest:如果消費者組是新創建的,從最新偏移量開始,否則從消費者組當前消費位移開始。 $conf->set('auto.offset.reset', 'earliest'); // 創建消費者實例 $consumer = new RdKafkaKafkaConsumer($conf); // 消費者訂閱主題,數組形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消費數據,阻塞5秒(5秒內有數據就消費,沒有數據等待5秒進入下一輪循環) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 業務邏輯 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr(), $message->err); break; } } // 關閉消費者(一般用在腳本中,不需要關閉) $conumser->close();
只消費指定分區中的數據:
// 對消費者指定分區,注意此方式不能與subscribe一同使用 $consumer->assign([ new RdKafkaTopicPartition("topic", 0), new RdKafkaTopicPartition("topic", 1), ]);
站長資訊網