Merge pull request #15 from ntt360/master update libkafka version to v2.3.0
Merge pull request #15 from ntt360/master
update libkafka version to v2.3.0
Qbusbridge 是 pub-sub 消息系统的客户端 SDK,目前它支持:
用户可以通过修改配置切换到任意一个 pub-sub 消息系统。默认配置是访问 Kafka,如果想要切换到 Pulsar,需要修改配置为:
mq.type=pulsar # Other configs for Pulsar...
更多细节见 config。
Qbusbridge-Kafka 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费;
针对使用者比较关心的消息生产的可靠性,作了近一步的提升
确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。
git clone --recursive https://github.com/Qihoo360/qbusbridge.git
此外,qbus SDK 静态链接到 libstdc++,因此必须确保 libstdc++.a 存在。对于 CentOS 用户,运行:
libstdc++.a
sudo yum install -y glibc-static libstdc++-static
如果你希望 librdkafa 支持 kafka sacl 鉴权相关功能,那么还需要安装:
librdkafa
kafka
sacl
sudo yum install -y cyrus-sasl-devel # 如果你还用到 GSSAPI 认证,那么还需要编译该插件 sudo yum install -y cyrus-sasl-gssapi
运行./build_dependencies.sh。
./build_dependencies.sh
它会自动下载子模块,并将其安装到cxx/thirdparts/local,即CMakeLists.txt查找头文件和库文件的目录。
cxx/thirdparts/local
CMakeLists.txt
见 ./cxx/thirdparts/local:
./cxx/thirdparts/local
include/ librdkafka/ rdkafka.h log4cplus/ logger.h lib/ librdkafka.a liblog4cplus.a
如果你希望支持 sacl 功能,可以在编译后进入 cxx/thirdparts/librdkafka/examples/ 目录,执行如下命名测试 sacl 组件是否编译成功:
cxx/thirdparts/librdkafka/examples/
cd cxx/thirdparts/librdkafka/examples/ ./rdkafka_example -X builtin.features # builtin.features = gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc
确保 builtin.features 输出值编译了 sasl 相关鉴权模块!
builtin.features
sasl
进入cxx目录,执行./build.sh,会生成以下文件:
cxx
./build.sh
include/ qbus_consumer.h qbus_producer.h lib/ debug/libQBus.so release/libQBus.so
虽然构建 C++ SDK 需要 C++11 支持,但是 SDK 也可以被旧版本 g++ 使用。比如,使用 g++ 4.8.5 编译 qbus SDK,使用 g++ 4.4.7 使用 qbus SDK。
进入golang目录,执行./build.sh,会生成以下文件:
golang
gopath/ src/ qbus/ qbus.go libQBus_go.so
可以运行 USE_GO_MOD=1 ./build.sh 来启用 go module,此时会生成以下文件:
USE_GO_MOD=1 ./build.sh
examples/ go.mod qbus/ qbus.go go.mod libQBus_go.so
进入python目录,执行./build.sh,会生成以下文件:
python
examples/ qbus.py _qbus.so
进入php目录,执行./build.sh,会生成以下文件:
php
examples/ qbus.php qbus.so
进入examples子目录,运行 ./build.sh [debug|release] 生成可执行文件,其中 debug 是使用 lib/debug 目录下的 libQBus.so,release 是使用 lib/release 目录下的 libQBus.so。运行make clean删除它们。
examples
./build.sh [debug|release]
debug
lib/debug
libQBus.so
release
lib/release
make clean
如果要运行自己的程序,可以参考Makefile文件。
Makefile
进入examples子目录,运行./build.sh生成可执行文件,运行./clean.sh删除它们。
./clean.sh
运行可执行文件时把libQBus_go.so路径加入LD_LIBRARY_PATH环境变量:
libQBus_go.so
LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH
如果要运行自己的程序,将生成的gopath目录加入GOPATH环境变量,或者将gopath/src/qbus目录移动到$GOPATH/src下。
gopath
GOPATH
gopath/src/qbus
$GOPATH/src
将生成的qbus.py和_qbus.so拷贝至要运行的Python脚本同一路径即可。
qbus.py
_qbus.so
编辑php.ini文件,添加extension=<module-path>,<module-path>为qbus.so路径。
php.ini
extension=<module-path>
<module-path>
qbus.so
bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic_name); bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key); void QbusProducer::uninit();
#include <string> #include <iostream> #include "qbus_producer.h" int main(int argc, const char* argv[]) { qbus::QbusProducer qbus_producer; if (!qbus_producer.init("127.0.0.1:9092", "./log", "./config", "topic_test")) { std::cout << "Failed to init" << std::endl; return 0; } std::string msg("test\n"); if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) { std::cout << "Failed to produce" << std::endl; } qbus_producer.uninit(); return 0; }
消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;
sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。
下面是消费接口,以c++为例:
bool QbusConsumer::init(const std::string& broker_list, const std::string& log_path, const std::string& config_path, const QbusConsumerCallback& callback); bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic); bool QbusConsumer::subscribe(const std::string& group, const std::vector<std::string>& topics); bool QbusConsumer::start(); void QbusConsumer::stop(); bool QbusConsumer::pause(const std::vector<std::string>& topics); bool QbusConsumer::resume(const std::vector<std::string>& topics);
c++ sdk的使用范例:
#include <iostream> #include "qbus_consumer.h" qbus::QbusConsumer qbus_consumer; class MyCallback: public qbus::QbusConsumerCallback { public: virtual void deliveryMsg(const std::string& topic, const char* msg, const size_t msg_len) const { std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl; } }; int main(int argc, char* argv[]) { MyCallback my_callback; if (qbus_consumer.init("127.0.0.1:9092", "log", "config", my_callback)) { if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) { if (!qbus_consumer.start()) { std::cout << "Failed to start" << std::endl; return NULL; } while (1) sleep(1); //可以执行其他业务逻辑 qbus_consumer.stop(); } else { std::cout << "Failed subscribe" << std::endl; } } else { std::cout << "Failed init" << std::endl; } return 0; }
可以用pause()和resume()方法来暂停或恢复某些主题的消费,具体示例见qbus_pause_resume_example.cc。
pause()
resume()
更多API使用方法参考C examples,C++ examples,Go examples,Python examples,PHP examples目录下的示例代码。
配置文件是INI格式:
[global] [topic] [sdk]
global和topic配置见rdkafka 1.0.x configuration,sdk配置见sdk configuration。
通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility.
例如,对0.9.0.1版本的broker,必须添加以下配置:
[global] api.version.request=false broker.version.fallback=0.9.0.1
当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker,api.version.request 应该配置为 true。否则消息协议会使用旧版本,比如,没有时间戳字段。
api.version.request
QQ 群:876834263
版权所有:中国计算机学会技术支持:开源发展技术委员会 京ICP备13000930号-9 京公网安备 11010802032778号
简介 English
Qbusbridge 是 pub-sub 消息系统的客户端 SDK,目前它支持:
用户可以通过修改配置切换到任意一个 pub-sub 消息系统。默认配置是访问 Kafka,如果想要切换到 Pulsar,需要修改配置为:
更多细节见 config。
Qbusbridge-Kafka 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费;
针对使用者比较关心的消息生产的可靠性,作了近一步的提升
特点
编译
确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。
git clone
此外,qbus SDK 静态链接到 libstdc++,因此必须确保
libstdc++.a存在。对于 CentOS 用户,运行:支持 sacl
如果你希望
librdkafa支持kafkasacl鉴权相关功能,那么还需要安装:1. 安装子模块
运行
./build_dependencies.sh。它会自动下载子模块,并将其安装到
cxx/thirdparts/local,即CMakeLists.txt查找头文件和库文件的目录。见
./cxx/thirdparts/local:如果你希望支持
sacl功能,可以在编译后进入cxx/thirdparts/librdkafka/examples/目录,执行如下命名测试sacl组件是否编译成功:确保
builtin.features输出值编译了sasl相关鉴权模块!2. 编译SDK
C/C++
进入
cxx目录,执行./build.sh,会生成以下文件:Go
进入
golang目录,执行./build.sh,会生成以下文件:可以运行
USE_GO_MOD=1 ./build.sh来启用 go module,此时会生成以下文件:Python
进入
python目录,执行./build.sh,会生成以下文件:PHP
进入
php目录,执行./build.sh,会生成以下文件:3. 编译示例程序
C/C++
进入
examples子目录,运行./build.sh [debug|release]生成可执行文件,其中debug是使用lib/debug目录下的libQBus.so,release是使用lib/release目录下的libQBus.so。运行make clean删除它们。如果要运行自己的程序,可以参考
Makefile文件。Go
进入
examples子目录,运行./build.sh生成可执行文件,运行./clean.sh删除它们。运行可执行文件时把
libQBus_go.so路径加入LD_LIBRARY_PATH环境变量:如果要运行自己的程序,将生成的
gopath目录加入GOPATH环境变量,或者将gopath/src/qbus目录移动到$GOPATH/src下。Python
将生成的
qbus.py和_qbus.so拷贝至要运行的Python脚本同一路径即可。PHP
编辑
php.ini文件,添加extension=<module-path>,<module-path>为qbus.so路径。使用
数据生产
数据消费
消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;
sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。
下面是消费接口,以c++为例:
c++ sdk的使用范例:
可以用
pause()和resume()方法来暂停或恢复某些主题的消费,具体示例见qbus_pause_resume_example.cc。更多API使用方法参考C examples,C++ examples,Go examples,Python examples,PHP examples目录下的示例代码。
配置
配置文件是INI格式:
global和topic配置见rdkafka 1.0.x configuration,sdk配置见sdk configuration。
通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility.
例如,对0.9.0.1版本的broker,必须添加以下配置:
当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker,
api.version.request应该配置为 true。否则消息协议会使用旧版本,比如,没有时间戳字段。联系我们
QQ 群:876834263