Fluentd output plugin to write data into Hadoop HDFS over WebHDFS/HttpFs.
“webhdfs” output plugin formats data into plain text, and store it as files on HDFS. This plugin supports:
inject tag and time into record (and output plain text data) using <inject> section
format events into plain text by format plugins using <format> section
control flushing using <buffer> section
Paths on HDFS can be generated from event timestamp, tag or any other fields in records.
Requirements
fluent-plugin-webhdfs
fluentd
ruby
>= 1.0.0
>= v0.14.4
>= 2.1
< 1.0.0
< v0.14.0
>= 1.9
Older versions
The versions of 0.x.x of this plugin are for older version of Fluentd (v0.12.x). Old style configuration parameters (using output_data_type, output_include_* or others) are still supported, but are deprecated.
Users should use <format> section to control how to format events into plain text.
Configuration
WebHDFSOutput
To store data by time,tag,json (same with ‘@type file’) over WebHDFS:
<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
path /path/on/hdfs/access.log.%Y%m%d_%H.log
ssl true
ssl_ca_file /path/to/ca_file.pem # if needed
ssl_verify_mode peer # if needed (peer or none)
</match>
Here ssl_verify_mode peer means to verify the server’s certificate.
You can turn off it by setting ssl_verify_mode none. The default is peer.
See net/http
and openssl documentation for further details.
With kerberos authentication:
<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
path /path/on/hdfs/access.log.%Y%m%d_%H.log
kerberos true
kerberos_keytab /path/to/keytab # if needed
renew_kerberos_delegation_token true # if needed
</match>
<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
path /path/on/hdfs/access.log.%Y%m%d_%H
compress gzip # or 'bzip2', 'snappy', 'hadoop_snappy', 'lzo_command', 'zstd'
</match>
Note that if you set compress gzip, then the suffix .gz will be added to path (or .bz2, .sz, .snappy, .lzo, .zst).
Note that you have to install additional gem for several compress algorithms:
snappy: install snappy gem
hadoop_snappy: install snappy gem
bzip2: install bzip2-ffi gem
zstd: install zstandard gem
Note that zstd will require installation of the libzstd native library. See the zstandard-ruby repo for infomration on the required packages for your operating system.
You can also specify compression block size (currently supported only for Snappy codecs):
With this configuration paths in HDFS will be like /path/on/hdfs/access.log.20201003_12.snappy.
This one may be useful when (for example) you need to use snappy codec but .sz files are not recognized as snappy files in HDFS.
Namenode HA / Auto retry for WebHDFS known errors
fluent-plugin-webhdfs (v0.2.0 or later) accepts 2 namenodes for Namenode HA (active/standby). Use standby_namenode like this:
And you can also specify to retry known hdfs errors (such like LeaseExpiredException) automatically. With this configuration, fluentd doesn’t write logs for this errors if retry successed.
Writing data on HDFS single file from 2 or more fluentd nodes, makes many bad blocks of HDFS. If you want to run 2 or more fluentd nodes with fluent-plugin-webhdfs, you should configure ‘path’ for each node.
To include hostname, #{Socket.gethostname} is available in Fluentd configuration string literals by ruby expression (in "..." strings). This plugin also supports ${uuid} placeholder to include random uuid in paths.
For hostname:
<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
path "/log/access/%Y%m%d/#{Socket.gethostname}.log" # double quotes needed to expand ruby expression in string
</match>
Or with random filename (to avoid duplicated file name only):
With default configuration, fluent-plugin-webhdfs checks HDFS filesystem status and raise error for inactive NameNodes.
If you were using unstable NameNodes and have wanted to ignore NameNode errors on startup of fluentd, enable ignore_start_check_error option like below:
With unstable datanodes that frequently downs, appending over WebHDFS may produce broken files. In such cases, specify append no and ${chunk_id} parameter.
<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
append no
path "/log/access/%Y%m%d/#{Socket.gethostname}.${chunk_id}.log"
</match>
out_webhdfs creates new files on hdfs per flush of fluentd, with chunk id. You shouldn’t care broken files from append operations.
fluent-plugin-webhdfs
Fluentd output plugin to write data into Hadoop HDFS over WebHDFS/HttpFs.
“webhdfs” output plugin formats data into plain text, and store it as files on HDFS. This plugin supports:
<inject>section<format>section<buffer>sectionPaths on HDFS can be generated from event timestamp, tag or any other fields in records.
Requirements
Older versions
The versions of
0.x.xof this plugin are for older version of Fluentd (v0.12.x). Old style configuration parameters (usingoutput_data_type,output_include_*or others) are still supported, but are deprecated. Users should use<format>section to control how to format events into plain text.Configuration
WebHDFSOutput
To store data by time,tag,json (same with ‘@type file’) over WebHDFS:
If you want JSON object only (without time or tag or both on header of lines), use
<format>section to specifyjsonformatter:To specify namenode,
namenodeis also available:To store data as JSON, including time and tag (using
<inject>), over WebHDFS:To store data as JSON, including time as unix time, using path including tag as directory:
With username of pseudo authentication:
Store data over HttpFs (instead of WebHDFS):
With ssl:
Here
ssl_verify_mode peermeans to verify the server’s certificate. You can turn off it by settingssl_verify_mode none. The default ispeer. See net/http and openssl documentation for further details.With kerberos authentication:
NOTE: You need to install
gssapigem for kerberos. See https://github.com/kzk/webhdfs#for-kerberos-authenticationIf you want to compress data before storing it:
Note that if you set
compress gzip, then the suffix.gzwill be added to path (or.bz2,.sz,.snappy,.lzo,.zst). Note that you have to install additional gem for several compress algorithms:Note that zstd will require installation of the libzstd native library. See the zstandard-ruby repo for infomration on the required packages for your operating system.
You can also specify compression block size (currently supported only for Snappy codecs):
If you want to explicitly specify file extensions in HDFS (override default compressor extensions):
With this configuration paths in HDFS will be like
/path/on/hdfs/access.log.20201003_12.snappy. This one may be useful when (for example) you need to use snappy codec but.szfiles are not recognized as snappy files in HDFS.Namenode HA / Auto retry for WebHDFS known errors
fluent-plugin-webhdfs(v0.2.0 or later) accepts 2 namenodes for Namenode HA (active/standby). Usestandby_namenodelike this:And you can also specify to retry known hdfs errors (such like
LeaseExpiredException) automatically. With this configuration, fluentd doesn’t write logs for this errors if retry successed.Performance notifications
Writing data on HDFS single file from 2 or more fluentd nodes, makes many bad blocks of HDFS. If you want to run 2 or more fluentd nodes with fluent-plugin-webhdfs, you should configure ‘path’ for each node. To include hostname,
#{Socket.gethostname}is available in Fluentd configuration string literals by ruby expression (in"..."strings). This plugin also supports${uuid}placeholder to include random uuid in paths.For hostname:
Or with random filename (to avoid duplicated file name only):
With configurations above, you can handle all of files of
/log/access/20120820/*as specified timeslice access logs.For high load cluster nodes, you can specify timeouts for HTTP requests.
For unstable Namenodes
With default configuration, fluent-plugin-webhdfs checks HDFS filesystem status and raise error for inactive NameNodes.
If you were using unstable NameNodes and have wanted to ignore NameNode errors on startup of fluentd, enable
ignore_start_check_erroroption like below:For unstable Datanodes
With unstable datanodes that frequently downs, appending over WebHDFS may produce broken files. In such cases, specify
append noand${chunk_id}parameter.out_webhdfscreates new files on hdfs per flush of fluentd, with chunk id. You shouldn’t care broken files from append operations.TODO
Copyright