build(deps): bump ruby/setup-ruby from 1.298.0 to 1.299.0 (#553)
Bumps ruby/setup-ruby from 1.298.0 to 1.299.0.
updated-dependencies:
- dependency-name: ruby/setup-ruby dependency-version: 1.299.0 dependency-type: direct:production update-type: version-update:semver-minor …
Signed-off-by: dependabot[bot] support@github.com Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9
京公网安备 11010802032778号
fluent-plugin-kafka, a plugin for Fluentd
A fluentd plugin to both consume and produce data for Apache Kafka.
Installation
Add this line to your application’s Gemfile:
And then execute:
Or install it yourself as:
If you want to use zookeeper related parameters, you also need to install zookeeper gem. zookeeper gem includes native extension, so development tools are needed, e.g. ruby-devel, gcc, make and etc.
Requirements
Usage
Common parameters
SSL authentication
Set path to SSL related files. See Encryption and Authentication using SSL for more detail.
SASL authentication
with GSSAPI
Set principal and path to keytab for SASL/GSSAPI authentication. See Authentication using SASL for more details.
with Plain/SCRAM
Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication. See Authentication using SASL for more details.
Input plugin (@type ‘kafka’)
Consume events by single consumer.
Supports a start of processing from the assigned offset for specific topics.
See also ruby-kafka README for more detailed documentation about ruby-kafka.
Consuming topic name is used for event tag. So when the target topic name is
app_event, the tag isapp_event. If you want to modify tag, useadd_prefixoradd_suffixparameters. Withadd_prefix kafka, the tag iskafka.app_event.Input plugin (@type ‘kafka_group’, supports kafka group)
Consume events by kafka consumer group features..
See also ruby-kafka README for more detailed documentation about ruby-kafka options.
topicssupports regex pattern since v0.13.1. If you want to use regex pattern, use/pattern/like/foo.*/.Consuming topic name is used for event tag. So when the target topic name is
app_event, the tag isapp_event. If you want to modify tag, useadd_prefixoradd_suffixparameter. Withadd_prefix kafka, the tag iskafka.app_event.Input plugin (@type ‘rdkafka_group’, supports kafka consumer groups, uses rdkafka-ruby)
With the introduction of the rdkafka-ruby based input plugin we hope to support Kafka brokers above version 2.1 where we saw compatibility issues when using the ruby-kafka based @kafka_group input type. The rdkafka-ruby lib wraps the highly performant and production ready librdkafka C lib.
See also rdkafka-ruby and librdkafka for more detailed documentation about Kafka consumer options.
Consuming topic name is used for event tag. So when the target topic name is
app_event, the tag isapp_event. If you want to modify tag, useadd_prefixoradd_suffixparameter. Withadd_prefix kafka, the tag iskafka.app_event.Output plugin
This
kafka2plugin is for fluentd v1 or later. This plugin usesruby-kafkaproducer for writing data. Ifruby-kafkadoesn’t fit your kafka environment, checkrdkafka2plugin instead. This will beout_kafkaplugin in the future.The
<formatter name>in<format>uses fluentd’s formatter plugins. See formatter article.Note: Java based Kafka client uses
murmur2as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it tomurmur2instead ofcrc32. Note that for usingmurmur2hash partitioner function, you must installdigest-murmurhashgem.ruby-kafka sometimes returns
Kafka::DeliveryFailederror without good information. In this case,get_kafka_client_logis useful for identifying the error cause. ruby-kafka’s log is routed to fluentd log so you can see ruby-kafka’s log in fluentd logs.Supports following ruby-kafka’s producer options.
If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring
See also Kafka::Client for more detailed documentation about ruby-kafka.
This plugin supports compression codec “snappy” also. Install snappy module before you use snappy compression.
snappy gem uses native extension, so you need to install several packages before. On Ubuntu, need development packages and snappy library.
On CentOS 7 installation is also necessary.
This plugin supports compression codec “lz4” also. Install extlz4 module before you use lz4 compression.
This plugin supports compression codec “zstd” also. Install zstd-ruby module before you use zstd compression.
Load balancing
Messages will be assigned a partition at random as default by ruby-kafka, but messages with the same partition key will always be assigned to the same partition by setting
default_partition_keyin config file. If key namepartition_key_keyexists in a message, this plugin set the value of partition_key_key as key.If key name
message_key_keyexists in a message, this plugin publishes the value of message_key_key to kafka and can be read by consumers. Same message key will be assigned to all messages by settingdefault_message_keyin config file. If message_key_key exists and if partition_key_key is not set explicitly, messsage_key_key will be used for partitioning.Headers
It is possible to set headers on Kafka messages. This only works for kafka2 and rdkafka2 output plugin.
The format is like key1:value1,key2:value2. For example:
You may set header values based on a value of a fluentd record field. For example, imagine a fluentd record like:
And the following fluentd config:
The Kafka message will have a header of source_ip=12.7.0.0.1.
The configuration format is jsonpath. It is descibed in https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor
Excluding fields
Fields can be excluded from output data. Only works for kafka2 and rdkafka2 output plugin.
Fields must be specified using an array of dot notation
$., for example:This config can be used to remove fields used on another configs.
For example,
$.source.ipcan be extracted with configheaders_from_recordand excluded from message payload.Send only a sub field as a message payload
If
record_keyis provided, the plugin sends only a sub field given by that key. The configuration format is jsonpath.e.g. When the following configuration and the incoming record are given:
configuration:
record:
only the
datafield will be serialized by the formatter and sent to Kafka. The topleveldatakey will be removed.Buffered output plugin
This plugin uses ruby-kafka producer for writing data. This plugin is for v0.12. If you use v1, see
kafka2. Support of fluentd v0.12 has ended.kafka_bufferedwill be an alias ofkafka2and will be removed in the future.kafka_bufferedsupports the followingruby-kafkaparameters:kafka_bufferedhas two additional parameters:Note: Java based Kafka client uses
murmur2as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it tomurmur2instead ofcrc32. Note that for usingmurmur2hash partitioner function, you must installdigest-murmurhashgem.Non-buffered output plugin
This plugin uses ruby-kafka producer for writing data. For performance and reliability concerns, use
kafka_bufferdoutput instead. This is mainly for testing.This plugin also supports ruby-kafka related parameters. See Buffered output plugin section.
Note: Java based Kafka client uses
murmur2as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it tomurmur2instead ofcrc32. Note that for usingmurmur2hash partitioner function, you must installdigest-murmurhashgem.rdkafka based output plugin
This plugin uses
rdkafkainstead ofruby-kafkafor kafka client. You need to install rdkafka gem.rdkafka2is for fluentd v1.0 or later.rdkafka2supportsdiscard_kafka_delivery_failed_regexparameter:discard_kafka_delivery_failed_regex- default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as/unknown_topic/.If you use v0.12, use
rdkafkainstead.FAQ
Why fluent-plugin-kafka can’t send data to our kafka cluster?
We got lots of similar questions. Almost cases, this problem happens by version mismatch between ruby-kafka and kafka cluster. See ruby-kafka README for more details: https://github.com/zendesk/ruby-kafka#compatibility
To avoid the problem, there are 2 approaches:
Contributing
git checkout -b my-new-feature)git commit -am 'Added some feature')git push origin my-new-feature)