函数
string rd_kafka_err2str ( integer $err ) 将rdkafka错误代码转换为字符串

integer rd_kafka_errno2err ( integer $errnox ) 将系统errno转换为kafka错误代码

integer rd_kafka_errno ( void ) 返回系统errno

integer rd_kafka_offset_tail ( integer $cnt ) 返回一个特殊的偏移量值,该值可用于在主题尾部之前开始使用cnt消息

rdkafka\kafkaconsume类
这是高水平消费者,支持自动分区/撤销(pecl rdkafka>=1.0.0,librdkafka>=0.9)

1)public void rdkafka\kafkaconsumer::assign ([ array $topic_partitions = null ] )
更新分配集到$topic_partitions,可以通过调用rdkafka\conf::setdefaulttopicconf()来更改主题的默认配置
$kafkaconsumer->assign([
new rdkafka\topicpartition(“logs”, 0),
new rdkafka\topicpartition(“logs”, 1),
]);

2)public void rdkafka\kafkaconsumer::commit ([ mixed $message_or_offsets = null ] )
同步提交偏移,直到提交偏移或提交失败为止。
如果注册了commit_cb回调,那么它将被调用,并包含未来要使用的调用的提交详细信息。
参数
message_or_offsets
when null, commit offsets for the current assignment.
when a rdkafka\message, commit offset for a single topic+partition based on the message.
when an array of rdkafka\topicpartition, commit offsets for the provided list of partitions.
异常
errors/exceptions
throws rdkafka\exception on errors.
例子:
// commit offsets for the current assignment
$kafkaconsumer->commit();

// commit offsets based on the message’s topic, partition, and offset
$kafkaconsumer->commit($message);

// commit offsets by providing a list of topicpartition
$kafkaconsumer->commit([
new rdkafka\topicpartition($topic, $partition, $offset),
]);

3)public void rdkafka\kafkaconsumer::commitasync ([ string $message_or_offsets = null ] )
异步提交偏移
4)public rdkafka\kafkaconsumer::__construct ( rdkafka\conf $conf )
参数
conf (rdkafka\conf)
the conf object must have group.id set to the consumer group to join.
conf对象必须将group.id设置为要加入的消费者组。
示例:
$conf = new rdkafka\conf();
$conf->set(“group.id”, “mygroupid”);

$kafkaconsumer = new rdkafka\kafkaconsumer($conf);
5)public rdkafka\message rdkafka\kafkaconsumer::consume ( string $timeout_ms )
使用消息或获取错误事件,触发回调
将自动调用任何此类排队事件的已注册回调,包括rebalance_cb, event_cb, commit_cb, etc.
参数
timeout_ms (int) 超时时间(milliseconds)
返回值
returns a rdkafka\message. on error or timeout, rdkafka\message::$err is != rd_kafka_err_no_error, and other properties should be ignored.
注意:
应用程序应确保定期调用consume (),即使没有预期的消息,为等待调用的排队回调提供服务,当rebalnce_cb已经注册时,这一点尤其重要,因为需要正确地调用和处理它,以同步内部使用者状态。

while (true) {
$message = $kafkaconsumer->consume(3600e3);
switch ($message->err) {
case rd_kafka_resp_err_no_error:
handle($message);
break;
case rd_kafka_resp_err__timed_out:
echo “timedout\n”;
break;
default:
throw new \exception($message->errstr());
break;
}
}

6)public array rdkafka\kafkaconsumer::getassignment ( void )
返回由assign设置 或 再平衡的 当前分区分配集
returns the current partition assignment as set by rdkafka\kafkaconsumer::assign() or by rebalancing.
返回值
returns an array of rdkafka\topicpartition 返回rdkafka\topic分区的数组
errors/exceptions
throws rdkafka\exception on errors.

6)public rdkafka\metadata rdkafka\kafkaconsumer::getmetadata ( bool $all_topics , rdkafka\kafkaconsumertopic $only_topic = null , int $timeout_ms)
向代理请求元数据

参数
all_topics (bool)
when true, request info about all topics in cluster. else, only request info about locally known topics.如果为真,请求有关集群中所有主题的信息。否则,只请求有关本地已知主题的信息
only_topic (rdkafka\kafkaconsumertopic)
when non-null, only request info about this topic当非空时,只请求有关此主题的信息。
timeout_ms (int)
timeout (milliseconds) 超时

返回值
returns a rdkafka\metadata instance
示例
$all = $kafkaconsumer->metadata(true, null, 60e3);

$local = $kafkaconsumer->metadata(false, null, 60e3);

$topic = $kafkaconsumer->newtopic(“mytopic”);
$one = $kafkaconsumer->metadata(true, $topic, 60e3);

7)public array rdkafka\kafkaconsumer::getsubscription ( void )
返回rdkafka\kafkaconsumer:subscribe()设置的当前订阅
return the current subscription as set by rdkafka\kafkaconsumer::subscribe()
返回值
returns an array of topic names 返回主题名称数组

8)public void rdkafka\kafkaconsumer::subscribe ( array $topics )
将订阅集更新为主题。
这将覆盖当前任务。任何先前的订阅都将首先被取消分配和取消订阅。
订阅集表示要消费的所需主题…….
可以通过调用rdkafka\conf::setdefaulttopicconf()更改订阅主题的默认配置。

$kafkaconsumer->assign([
“logs”,
“^mypfx[0-9]_.*”,
]);

9)public returntype rdkafka\kafkaconsumer::unsubscribe ( void )
从当前订阅集取消订阅

rdkafka\kafkaconsumertopic类
(pecl rdkafka >= 1.0.0, librdkafka >= 0.9)
this class represents a topic when using the rdkafka\kafkaconsumer. it can not be instantiated directly, rdkafka\kafkaconsumer::newtopic() should be used instead.
当想使用rdkafka\kafkaconsumer去表示一个主题的时候,不能直接实例化,应该使用rdkafka\kafkaconsumer::newtopic()替代

1)public void rdkafka\kafkaconsumertopic::offsetstore ( integer $partition , integer $offset )
store offset offset for topic partition partition. the offset will be commited (written) to the offset store according to auto.commit.interval.ms.
auto.commit.interval.ms消费者offset提交到zookeeper的频率(以毫秒为单位)(0.9之后就默认存储再broke中了)
auto.commit.enable must be set to false when using this api.使用此api时 auto.commit.enable必须设置为false,如果enable.auto.commit设置为true,则消费者偏移量自动提交给kafka的频率(以毫秒为单位)。
auto.offset.reset largest 如果zookeeper中没有初始偏移量,或偏移值超出范围,
该怎么办?
最小:自动将偏移重置为最小偏移
最大:自动将偏移重置为最大偏移
* 其他任何事情:抛出异常消费者

参数
partition (integer)
partition id
offset (integer)
offset

2)/* inherited methods */
public string rdkafka\topic::getname ( void )

rdkafka类
(pecl rdkafka >= 0.9.1)
this is the base class for low-level clients: rdkafka\consumer, rdkafka\producer. this class can not be instanciated directly, use one of the sub classes instead.
这是低级消费者客户端的基类:rdkafka\consumer,rdkafka\producer。不能直接实例化这个类,而是使用其中一个子类。

1)public integer rdkafka::addbrokers ( string $broker_list )
将一个或多个代理添加到kafka句柄的初始引导代理列表中。
当rdkafka通过查询代理元数据连接到代理时,将自动发现其他代理。
如果代理名称解析为多个地址(可能是地址家族),则所有代理名称都将以循环方式用于连接尝试。
返回值
returns the number of brokers successfully added.成功添加的代理个数

代理还可以使用metadata.broker.list或bootstrap.server配置属性(首选方法)进行定义。

$kafka->addbrokers(“broker1:10000,broker2”);

$kafka->addbrokers(“ssl://broker3:9000,ssl://broker2”);

2)public rdkafka\metadata rdkafka::getmetadata ( bool $all_topics , rdkafka\topic $only_topic = null , int $timeout_ms )
request metadata from broker 向代理请求元数据
上面有一个

3)public integer rdkafka::getoutqlen ( void )
返回当前的输出队列长度。out队列包含等待发送给代理的消息,或代理知道的消息。

3)public rdkafka\queue rdkafka::newqueue ( void )
创建一个新的消息队列实例
return values
returns a rdkafka\queue.

4)public rdkafka\topic rdkafka::newtopic ( string $topic_name [, rdkafka\topicconf $topic_conf = null ] )
creates a new topic instance for topic_name.为topic_name创建一个新的主题实例。
returns a rdkafka\topic (more specifically, either a rdkafka\consumertopic or a rdkafka\producertopic).

为具有不同配置的同一主题名称创建两个主题实例没有任何效果。每个主题实例都将使用第一个实例的配置。
$conf = new rdkafka\topicconf();
$conf->set(“…”, “…”);

$topic = $kafka->newtopic(“mytopic”, $conf);

4)public void rdkafka::poll ( integer $timeout_ms )
对于事件的轮询,导致调用应用程序提供的回调
使用rdkafka子类的应用程序应该确保定期调用poll(),以便为等待调用的任何排队回调服务。

events:
delivery report callbacks rdkafka\conf::setdrmsgcb() [producer]
error callbacks (rdkafka\conf::seterrorcb())
stats callbacks (rdkafka\conf::setstatscb())
throttle callbacks (rdkafka\conf::setthrottlecb())

parameters
timeout_ms (integer)
specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. for non-blocking calls, provide 0 as timeout_ms. to wait indefinately for an event, provide -1.
指定调用将阻止等待事件的最大时间(以毫秒为单位)。对于非阻塞调用,提供0作为timeout_ms。若要不确定地等待某个事件,请提供-1。

return values
returns the number of events served.返回服务的事件数

5)public void rdkafka::setloglevel ( integer $level )
指定内部kafka日志记录和调试产生的最大日志记录级别。如果设置了“debug”配置属性,该级别将自动调整为log_debug。

parameters
level (integer)
log level. can take any log_* constant (see the syslog function).日志级别。可以接受任何log_*常量(请参阅syslog函数)。

 

rdkafka\consumer 类
this is the low-level kafka consumer. it can be used with kafka >= 0.8.
低级消费者
1)public rdkafka\consumer::__construct ([ rdkafka\conf $conf = null ] )
parameters
conf (rdkafka\conf)
an optional rdkafka\conf instance.

此类只有继承(低级消费者基类rdkafka)的以下几个方法
rdkafka\consumer extends rdkafka {
/* methods */
/* inherited methods */
public integer rdkafka::addbrokers ( string $broker_list )
public rdkafka\metadata rdkafka::getmetadata ( bool $all_topics , rdkafka\topic $only_topic = null , int $timeout_ms )
public integer rdkafka::getoutqlen ( void )
public rdkafka\queue rdkafka::newqueue ( void )
public rdkafka\topic rdkafka::newtopic ( string $topic_name [, rdkafka\topicconf $topic_conf = null ] )
public void rdkafka::poll ( integer $timeout_ms )
public void rdkafka::setloglevel ( integer $level )
}
rdkafka\producer类
(pecl rdkafka >= 0.9.1)

1)public rdkafka\producer::__construct ([ rdkafka\conf $conf = null ] )
parameters
conf (rdkafka\conf)
an optional rdkafka\conf instance.

rdkafka\producer extends rdkafka {
/* methods */
/* inherited methods */
public integer rdkafka::addbrokers ( string $broker_list )
public rdkafka\metadata rdkafka::getmetadata ( bool $all_topics , rdkafka\topic $only_topic = null , int $timeout_ms )
public integer rdkafka::getoutqlen ( void )
public rdkafka\queue rdkafka::newqueue ( void )
public rdkafka\topic rdkafka::newtopic ( string $topic_name [, rdkafka\topicconf $topic_conf = null ] )
public void rdkafka::poll ( integer $timeout_ms )
public void rdkafka::setloglevel ( integer $level )
}

rdkafka\topic类
(pecl rdkafka >= 0.9.1)

1)public string rdkafka\topic::getname ( void )
returns the topic name.返回主题名称

rdkafka\consumertopic 类
(pecl rdkafka >= 0.9.1)

当使用rdkafka\consumer时,该类表示一个主题。不能直接实例化它,应该使用rdkafka\consumer:newtopic()。

1)public rdkafka\message rdkafka\consumertopic::consume ( integer $partition , integer $timeout_ms )
消费-使用来自分区的单个消息
消费者之前必须使用 rdkafka\consumertopic::consumestart().
必须检查返回消息的err属性是否存在错误。
err属性等于rd_kafka_resp_err_pary_eof,表示已到达分区的结束,通常不应将其视为错误。应用程序应该处理这种情况(例如,忽略)。
parameters
partition (integer)
the partition to consume
timeout_ms
the maximum amount of time to wait for a message to be received.

returns a rdkafka\message or null on timeout. 正常返回rdkafka\message,超时返回null。

2)public void rdkafka\consumertopic::consumequeuestart ( integer $partition , integer $offset , rdkafka\queue $queue )
与rdkafka\consumertopic::consumertopic()相同,但将传入消息重新路由到提供的队列。应用程序必须使用一个rdkafka\queue::consumer*()函数来接收获取的消息。
参数
partition (integer)
partition id
offset (integer)
offset
queue (rdkafka\queue)
a rdkafka\queue instance

3)public void rdkafka\consumertopic::consumestart ( integer $partition , integer $offset )
开始在偏移量处使用分区的消息(请参阅参数中允许的值)。

librdkafka将尝试通过反复从代理获取批消息,直到达到阈值,从而将queued.min.messages (config属性)消息保留在本地队列中。
应用程序应该使用rdkafka\consumertopic::consumestart()方法来使用本地队列中的消息,每个kafka消息都表示为rdkafka\message对象。
对于同一个主题和分区,不能多次调用rdkafka\consumertopic::consumestart()。在没有停止消费的情况下,先使用rdkafka\consumertopic::consumestop()停止消费后再开始消费。

parameters
partition (integer)
partition id
offset (integer)
can be either a proper offset (0..n), or one the the special offset:
可以是正常的偏移量(0.n),也可以是特殊的偏移量:
rd_kafka_offset_beginning
rd_kafka_offset_end
rd_kafka_offset_stored
the return value of rd_kafka_offset_tail()
示例:
$partition = 123;

// consume from the end
$topic->consumestart($partition, rd_kafka_offset_end);

// consume from the stored offset
$topic->consumestart($partition, rd_kafka_offset_stored);

// consume 200 messages from the end
$topic->consumestart($partition, rd_kafka_offset_tail(200));

4)public void rdkafka\consumertopic::consumestop ( integer $partition )
stop consuming messages from partition停止使用来自分区的消息
停止使用分区消息,清除当前本地队列中的所有消息。

5)public void rdkafka\consumertopic::offsetstore ( integer $partition , integer $offset )
store offset存储offset
parameters
partition (integer)
partition id
offset (integer)
offset

rdkafka\producertopic类
(pecl rdkafka >= 0.9.1)
当使用rdkafka\producer时,该类表示一个主题。不能直接实例化它,应该使用rdkafka\producer::newtopic().

rdkafka\producertopic extends rdkafka\topic {
/* methods */
public void produce ( integer $partition , integer $msgflags , string $payload [, string $key = null ] )
/* inherited methods */
public string rdkafka\topic::getname ( void )
}

1)public void rdkafka\producertopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = null ] )
生成并向代理发送一条消息。这是一个异步和非阻塞的。
parameters
partition (integer)
can be either rd_kafka_partition_ua (unassigned) for automatic partitioning using the topic’s partitioner function (see rdkafka\topicconf::setpartitioner(), or a fixed partition (0..n).
msgflags (integer)
可以是rd_kafka_parid_ua(未分配的),用于使用主题的分区函数(请参见rdkafka\topicconf::setpartitioner(),也可以是固定的分区(0.n)。
msgflags (integer)
must be 0
payload (string)
payload string
key (string)
optional message key, if non-null it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
可选消息键,如果非空,则将其传递给主题分区程序,并与消息一起发送给代理并传递给使用者。

$message = [
‘type’ => ‘account-created’,
‘id’ => $accountid,
‘date’ => date(date_w3c),
];
$payload = json_encode($message);
$topic->produce(rd_kafka_partition_ua, 0, $payload, $accountid);with the message to the broker and passed on to the consumer.

rdkafka\queuel类
(pecl rdkafka >= 0.9.1)
1)public rdkafka\message rdkafka\queue::consume ( string $timeout_ms )
使用一条消息
parameters
timeout_ms
the maximum amount of time to wait for a message to be received.
return values
returns a rdkafka\message or null on timeout.

rdkafka\message 类
(pecl rdkafka >= 0.9.1)
此对象表示单个已消费或生产的消息或事件(设置了$err)。
this object represents either a single consumed or produced message, or an event ($err is set).
an application must check rdkafka\message::err to see if the object is a proper message (error is rd_kafka_resp_err_no_error) or an error event.

rdkafka\message {
/* properties */
public $err ; //error code
public $topic_name ;
public $partition ;
public $payload ;
public $key ;
public $offset ;
/* methods */
public string errstr ( void )
}

1)public string rdkafka\message::errstr ( void )
这是一种方便的方法,将错误作为字符串返回
return values
the error as a string
if ($message->err) {
echo $message->errstr(), “\n”;
}

rdkafka\conf 类
(pecl rdkafka >= 0.9.1)
this class holds configuration for consumers and producers.
a list of available properties can be found on the »librdkafka repository. note that available configuration properties and default values may change depending on the librdkafka version.
该类包含使用者和生产者的配置
请注意,可用的配置属性和默认值可能会根据librdkafka 版本而改变。

rdkafka\conf {
/* methods */
public void dump ( void )
public void set ( string $name , string $value )
public void setdefaulttopicconf ( rdkafka\topicconf $topic_conf )
public void setdrmsgcb ( callable $callback )
public void seterrorcb ( callable $callback )
public void setrebalancecb ( callable $callback )
}

1)public void rdkafka\conf::dump ( void )
dumps the configuration properties and values to an array.
转储配置属性和值到数组
return values
returns an array with configuration properties as keys, and configuration values as values.

2)public void rdkafka\conf::set ( string $name , string $value )
set configuration property name to value.
设置配置属性 属性名=>属性值

3)public void rdkafka\conf::setdefaulttopicconf ( rdkafka\topicconf $topic_conf )
设置用于自动订阅主题的默认主题配置。可以与rdkafka\kafkaconsumer::subscribe()或者rdkafka\kafkaconsumer::assign()一起使用
sets the default topic configuration to use for for automatically subscribed topics. this can be used along with rdkafka\kafkaconsumer::subscribe() or rdkafka\kafkaconsumer::assign().

4)public void rdkafka\conf::setdrmsgcb ( callable $callback )
设置传递报告回调,对于rdkafka\producertopic::producer()接受的每条消息,将调用一次传递报告回调,并将err设置为指示生产请求的结果。

当消息成功地生成时,或者如果librdkafka 遇到永久故障,或者临时错误的重试计数器已经耗尽,就会调用回调。

应用程序必须定期调用rdkafka::poll(),以便为排队的传递报告回调服务。

parameters
callback (callable)
a callable with the following signature:
<?php
/**
* @param rdkafka\kafka $kafka
* @param rdkafka\message $message
*/
function (rdkafka\kafka $kafka, rdkafka\message $message);

$conf->setdrmsgcb(function ($kafka, $message) {
if ($message->err) {
// message permanently failed to be delivered
} else {
// message successfully delivered
}
});

5)public void rdkafka\conf::seterrorcb ( callable $callback )

设置错误回调。librdkafka 使用错误回调将ciritcal错误信号发送回应用程序。

parameters
callback (callable)
a callable with the following signature:
<?php
/**
* @param object $kafka
* @param int $err
* @param string $reason
*/
function ($kafka, $err, $reason);

<?php
$conf->seterrorcb(function ($kafka, $err, $reason) {
printf(“kafka error: %s (reason: %s)\n”, rd_kafka_err2str($err), $reason);
});
?>

6)public void rdkafka\conf::setrebalancecb ( callable $callback )

set rebalance callback for use with coordinated consumer group balancing.
设置“再平衡回调”,以便与协调的消费者组 平衡一起使用。

registering a rebalance_cb turns off librdkafka’s automatic partition assignment/revocation and instead delegates that responsibility to the application’s rebalance_cb.
注册一个rebalance_cb会关闭librdkafka的自动分区分配/撤销,而是将这一责任委托给应用程序的rebalance_cb。

the rebalance callback is responsible for updating librdkafka’s assignment set based on the two events rd_kafka_resp_err__assign_partitionsand rd_kafka_resp_err__revoke_partitions but should also be able to handle arbitrary rebalancing failures where err is neither of those.
重新平衡回调负责根据这两个事件rd_kafka_resp_err__assign_partitions和rd_kafka_resp_err__revoke_partitions更新librdkafka的分配集,但也应该能够再平衡处理任意的不止这些的失败。

in this latter case (arbitrary error), the application must $kafka->assign(null) to synchronize (同步)state.
后一种情况 必须使用assign去同步状态

在没有重新平衡回调的情况下,这是由librdkafka自动完成的,但是注册一个重新平衡回调会使应用程序在执行其他操作时具有灵活性,同时还可以执行排序/撤销操作(assinging/revocation)。例如从另一个位置获取偏移量(在赋值时)或手动提交偏移量(在revoke上)。

parameters
callback (callable)
a callable with the following signature:
<?php
/**
* @param rdkafka\kafkaconsumer $kafka
* @param int $err
* @param array $partitions
*/
function (rdkafka\kafkaconsumer $kafka, $err, $partitions);
err参数被设置为rd_kafka_resp_err__assign_partitions或rd_kafka_resp_err__revoke_partitions(或意外错误)。
partitions参数是rdkafka\topicpartition数组,表示分配或撤销的完整分区集。

<?php
$conf->setrebalancecb(function (rdkafka\kafkaconsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case rd_kafka_resp_err__assign_partitions:
// application may load offets from arbitrary external
// storage here and update partitions
$kafka->assign($partitions);
break;

case rd_kafka_resp_err__revoke_partitions:
if ($manual_commits) {
// optional explicit manual commit
$kafka->commit($partitions);
}
$kafka->assign(null);
break;

default:
handle_unlikely_error($err);
$kafka->assign(null); // sync state同步状态
break;
}
}
?>

rdkafka\topicconf类
(pecl rdkafka >= 0.9.1)

该类保存主题topic实例的配置。
a list of available properties can be found on the » librdkafka repository. note that available configuration properties and default values may change depending on the librdkafka version.
注意配置属性依赖版本 , 可以从librdkafka仓库中查看详细配置

rdkafka\topicconf {
/* methods */
public void dump ( void )
public void set ( void )
public void setpartitioner ( integer $partitioner )
}

1)public void rdkafka\topicconf::dump ( void )
将配置属性和值转储到数组。
返回一个数组,其中配置属性作为键,配置值作为值。

2)public void rdkafka\topicconf::set ( void )
set configuration property name to value.

3)public void rdkafka\topicconf::setpartitioner ( integer $partitioner )
将分区器设置为根据keys将消息路由到分区。
parameters
partitioner (integer)
must be one of the rd_kafka_msg_partitioner_* constants.
必须是rd_kafka_msg_partiator_*常量之一。

rdkafka\exception类
(pecl rdkafka >= 0.9.1)

rdkafka exception.异常类
rdkafka\exception extends exception {
/* inherited properties */
protected string $message ;
protected int $code ;
protected string $file ;
protected int $line ;
/* methods */
/* inherited methods */
final public string exception::getmessage ( void )
final public exception exception::getprevious ( void )
final public mixed exception::getcode ( void )
final public string exception::getfile ( void )
final public int exception::getline ( void )
final public array exception::gettrace ( void )
final public string exception::gettraceasstring ( void )
public string exception::__tostring ( void )
final private void exception::__clone ( void )
}

rdkafka\topicpartition类
(pecl rdkafka >= 1.0.0, librdkafka >= 0.9)

topic+partition 主题加分区
rdkafka\topicpartition {
/* methods */
public integer getoffset ( void )
public integer getpartition ( void )
public string gettopic ( void )
public void setoffset ( string $offset )
public returntype setpartition ( string $partition )
public returntype settopic ( string $topic_name )
}

1)public rdkafka\topicpartition::__construct ( string $topic , integer $partition [, integer $offset = null ] )
parameters
topic (string)
topic name
partition (integer)
partition id
offset (integer)
offset
<?php
new rdkafka\topicpartition(“mytopic”, 1);
?>

2)public integer rdkafka\topicpartition::getoffset ( void )
获取偏移量

3)public integer rdkafka\topicpartition::getpartition ( void )
gets the partition id. 得到分区id

4)public string rdkafka\topicpartition::gettopic ( void )
gets the topic name. 得到主题

5)public void rdkafka\topicpartition::setoffset ( string $offset )
sets the offset. 设置偏移量

6)public returntype rdkafka\topicpartition::setpartition ( string $partition )
sets the partition id.

7)public returntype rdkafka\topicpartition::settopic ( string $topic_name )
sets the topic name.

rdkafka\metadata类
(pecl rdkafka >= 0.9.1)
the metadata class represents broker information. metadata instances are returned by rdkafka::getmetadata() and rdkafka\kafkaconsumer::getmetadata().
元数据类表示代理信息。元数据实例由rdkafka::getmetadata() 和rdkafka\kafkaconsumer::getmetadata()返回。

rdkafka\metadata {
/* methods */
public rdkafka\metadata\collection getbrokers ( void )
public int getorigbrokerid ( void )
public string getorigbrokername ( void )
public rdkafka\metadata\collection gettopics ( void )
}

1)public rdkafka\metadata\collection rdkafka\metadata::getbrokers ( void )
get broker list
returns a rdkafka\metadata\collection of rdkafka\metadata\broker
2)public int rdkafka\metadata::getorigbrokerid ( void )
获取源自此元数据的代理id

3)public string rdkafka\metadata::getorigbrokername ( void )
获取源自此元数据的代理名称。

4)public rdkafka\metadata\collection rdkafka\metadata::gettopics ( void )
获取主题列表。根据元数据的请求方式,这可能包含单个主题、本地已知主题列表或所有集群主题。
returns a rdkafka\metadata\collection of rdkafka\metadata\topic

rdkafka\metadata\collection类
(pecl rdkafka >= 0.9.1)

集合类用作元数据项的集合。它实现了 countable and iterable,因此它可以与count()和foreach一起使用

rdkafka\metadata\collection implements countable , iterator {
/* methods */
public int count ( void )
public mixed current ( void )
public scalar key ( void )
public void next ( void )
public void rewind ( void )
public boolean valid ( void )
}

1)public int rdkafka\metadata\collection::count ( void )
returns the number of elements as integer 返回元素数量

2)public mixed rdkafka\metadata\collection::current ( void )
gets the current value. 获取到当前的值
返回值:
the current value if it is valid or null otherwise.

3)public scalar rdkafka\metadata\collection::key ( void )
get the current key.
返回值:
the current key if it is valid or null otherwise.

4)public void rdkafka\metadata\collection::next ( void )
移到下一个元素。
5)public void rdkafka\metadata\collection::rewind ( void )
将iterator倒转到第一个元素

6)public boolean rdkafka\metadata\collection::valid ( void )
checks if current position is valid 检查当前位置是否有效
returns true on success or false on failure.

predefined constants
the constants below are defined by this extension, and will only be available when the extension has either been compiled into php or dynamically loaded at runtime.
下面的常量是由这个扩展定义的,并且只有当扩展编译到php或在运行时动态加载时才可用。
rd_kafka_consumer (integer)
rd_kafka_offset_beginning (integer)
start consuming from beginning of kafka partition queue: oldest msg.
rd_kafka_offset_end (integer)
start consuming from end of kafka partition queue: next msg.
rd_kafka_offset_stored (integer)
start consuming from offset retrieved from offset store.
rd_kafka_partition_ua (integer)
the unassigned partition is used by the producer api for messages that should be partitioned using the configured or default partitioner.
rd_kafka_producer (integer)
rd_kafka_version (integer)
rd_kafka_resp_err__begin (integer)
rd_kafka_resp_err__bad_msg (integer)
local: bad message format
rd_kafka_resp_err__bad_compression (integer)
local: invalid compressed data
rd_kafka_resp_err__destroy (integer)
local: broker handle destroyed
rd_kafka_resp_err__fail (integer)
local: communication failure with broker
rd_kafka_resp_err__transport (integer)
local: broker transport failure
rd_kafka_resp_err__crit_sys_resource (integer)
local: critical system resource failure
rd_kafka_resp_err__resolve (integer)
local: host resolution failure
rd_kafka_resp_err__msg_timed_out (integer)
local: message timed out
rd_kafka_resp_err__partition_eof (integer)
broker: no more messages
rd_kafka_resp_err__unknown_partition (integer)
local: unknown partition
rd_kafka_resp_err__fs (integer)
local: file or filesystem error
rd_kafka_resp_err__unknown_topic (integer)
local: unknown topic
rd_kafka_resp_err__all_brokers_down (integer)
local: all broker connections are down
rd_kafka_resp_err__invalid_arg (integer)
local: invalid argument or configuration
rd_kafka_resp_err__timed_out (integer)
local: timed out
rd_kafka_resp_err__queue_full (integer)
local: queue full
rd_kafka_resp_err__isr_insuff (integer)
local: isr count insufficient
rd_kafka_resp_err__node_update (integer)
local: broker node update
rd_kafka_resp_err__ssl (integer)
local: ssl error
rd_kafka_resp_err__wait_coord (integer)
local: waiting for coordinator
rd_kafka_resp_err__unknown_group (integer)
local: unknown group
rd_kafka_resp_err__in_progress (integer)
local: operation in progress
rd_kafka_resp_err__prev_in_progress (integer)
local: previous operation in progress
rd_kafka_resp_err__existing_subscription (integer)
local: existing subscription
rd_kafka_resp_err__assign_partitions (integer)
local: assign partitions
rd_kafka_resp_err__revoke_partitions (integer)
local: revoke partitions
rd_kafka_resp_err__conflict (integer)
local: conflicting use
rd_kafka_resp_err__state (integer)
local: erroneous state
rd_kafka_resp_err__unknown_protocol (integer)
local: unknown protocol
rd_kafka_resp_err__not_implemented (integer)
local: not implemented
rd_kafka_resp_err__authentication (integer)
local: authentication failure
rd_kafka_resp_err__no_offset (integer)
local: no offset stored
rd_kafka_resp_err__end (integer)
rd_kafka_resp_err_unknown (integer)
unknown broker error
rd_kafka_resp_err_offset_out_of_range (integer)
broker: offset out of range
rd_kafka_resp_err_invalid_msg (integer)
broker: invalid message
rd_kafka_resp_err_unknown_topic_or_part (integer)
broker: unknown topic or partition
rd_kafka_resp_err_invalid_msg_size (integer)
broker: invalid message size
rd_kafka_resp_err_leader_not_available (integer)
broker: leader not available
rd_kafka_resp_err_not_leader_for_partition (integer)
broker: not leader for partition
rd_kafka_resp_err_request_timed_out (integer)
broker: request timed out
rd_kafka_resp_err_broker_not_available (integer)
broker: broker not available
rd_kafka_resp_err_replica_not_available (integer)
broker: replica not available
rd_kafka_resp_err_msg_size_too_large (integer)
broker: message size too large
rd_kafka_resp_err_stale_ctrl_epoch (integer)
broker: stalecontrollerepochcode
rd_kafka_resp_err_offset_metadata_too_large (integer)
broker: offset metadata string too large
rd_kafka_resp_err_network_exception (integer)
broker: broker disconnected before response received
rd_kafka_resp_err_group_load_in_progress (integer)
broker: group coordinator load in progress
rd_kafka_resp_err_group_coordinator_not_available (integer)
broker: group coordinator not available
rd_kafka_resp_err_not_coordinator_for_group (integer)
broker: not coordinator for group
rd_kafka_resp_err_topic_exception (integer)
broker: invalid topic
rd_kafka_resp_err_record_list_too_large (integer)
broker: message batch larger than configured server segment size
rd_kafka_resp_err_not_enough_replicas (integer)
broker: not enough in-sync replicas
rd_kafka_resp_err_not_enough_replicas_after_append (integer)
broker: message(s) written to insufficient number of in-sync replicas
rd_kafka_resp_err_invalid_required_acks (integer)
broker: invalid required acks value
rd_kafka_resp_err_illegal_generation (integer)
broker: specified group generation id is not valid
rd_kafka_resp_err_inconsistent_group_protocol (integer)
broker: inconsistent group protocol
rd_kafka_resp_err_invalid_group_id (integer)
broker: invalid group.id
rd_kafka_resp_err_unknown_member_id (integer)
broker: unknown member
rd_kafka_resp_err_invalid_session_timeout (integer)
broker: invalid session timeout
rd_kafka_resp_err_rebalance_in_progress (integer)
broker: group rebalance in progress
rd_kafka_resp_err_invalid_commit_offset_size (integer)
broker: commit offset data size is not valid
rd_kafka_resp_err_topic_authorization_failed (integer)
broker: topic authorization failed
rd_kafka_resp_err_group_authorization_failed (integer)
broker: group authorization failed
rd_kafka_resp_err_cluster_authorization_failed (integer)
broker: cluster authorization failed
rd_kafka_conf_unknown (integer)
rd_kafka_conf_invalid (integer)
rd_kafka_conf_ok (integer)
rd_kafka_msg_partitioner_random (integer)
the random partitioner. this was the default partitioner in librdkafka 0.8. assigns partition randomly.
rd_kafka_msg_partitioner_consistent (integer)
the consistent partitioner. uses consistent hashing to map identical keys onto identical partitions. uses crc32 as hashing function. messages with no key or empty key are always assigned to the same partition.
rd_kafka_log_print (integer)
the print logger. prints messages to stderr.
rd_kafka_log_syslog (integer)
the syslog logger. sends messages to syslog.
rd_kafka_log_syslog_print (integer)
the syslog-print partitioner. sends messages to syslog and prints them to stderr.