-
php教程之PHP 消息队列 Kafka 使用(2)
创建一个消费类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
<?php class KafkaConsumer { public static $brokerList = '127.0.0.1:9092' ; public static function consumer() { $conf = new \RdKafka\Conf(); $conf-> set ( 'group.id' , 'test' ); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers( "127.0.0.1" ); $topicConf = new \RdKafka\TopicConf(); $topicConf-> set ( 'auto.commit.interval.ms' , 100); $topicConf-> set ( 'offset.store.method' , 'broker' ); $topicConf-> set ( 'auto.offset.reset' , 'smallest' ); $topic = $rk->newTopic( 'test' , $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while ( true ) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break ; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n" ; break ; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n" ; break ; default : throw new \Exception($message->errstr(), $message->err); break ; } } } } |