Kafka output
editKafka output
editThe Kafka output sends events to Apache Kafka.
Compatibility: This output can connect to Kafka version 0.8.2.0 and later. Older versions might work as well, but are not supported.
This example configures a Kafka output called kafka-output
in the Elastic Agent
elastic-agent.yml
file, with settings as described further in:
outputs: kafka-output: type: kafka hosts: - 'kafka1:9092' - 'kafka2:9092' - 'kafka3:9092' client_id: Elastic version: 1.0.0 compression: gzip compression_level: 4 username: <my-kafka-username> password: <my-kakfa-password> sasl: mechanism: SCRAM-SHA-256 partition: round_robin: group_events: 1 topics: - topic: '%{[fields.log_topic]}' headers: [] timeout: 30 broker_timeout: 30 required_acks: 1 ssl: verification_mode: full
Kafka output configuration settings
editThe kafka
output supports the following settings, grouped by category.
Many of these settings have sensible defaults that allow you to run Elastic Agent with
minimal configuration.
Setting | Description |
---|---|
(boolean) Enables or disables the output. If set to |
|
The addresses your Elastic Agents will use to connect to one or more Kafka brokers. Following is an example hosts: - 'localhost:9092' - 'mykafkahost01:9092' - 'mykafkahost02:9092' |
|
Kafka protocol version that Elastic Agent will request when connecting. Defaults to 1.0.0. The protocol version controls the Kafka client features available to Elastic Agent; it does not prevent Elastic Agent from connecting to Kafka versions newer than the protocol version. |
Authentication settings
editSetting | Description |
---|---|
The username for connecting to Kafka. If username is configured, the password must be configured as well. |
|
The password for connecting to Kafka. |
|
The SASL mechanism to use when connecting to Kafka. It can be one of:
|
|
When sending data to a secured cluster through the |
Memory queue settings
editThe memory queue keeps all events in memory.
The memory queue waits for the output to acknowledge or drop events. If the queue is full, no new events can be inserted into the memory queue. Only after the signal from the output will the queue free up space for more events to be accepted.
The memory queue is controlled by the parameters queue.mem.flush.min_events
and flush.timeout
. If
flush.timeout
is 0s
or queue.mem.flush.min_events
is 0
or 1
then events can be sent by the output as
soon as they are available. If the output supports a bulk_max_size
parameter it controls the
maximum batch size that can be sent.
If queue.mem.flush.min_events
is greater than 1
and flush.timeout
is greater than 0s
, events will only
be sent to the output when the queue contains at least queue.mem.flush.min_events
events or the
flush.timeout
period has expired. In this mode the maximum size batch that that can be sent by the
output is queue.mem.flush.min_events
. If the output supports a bulk_max_size
parameter, values of
bulk_max_size
greater than queue.mem.flush.min_events
have no effect. The value of queue.mem.flush.min_events
should be evenly divisible by bulk_max_size
to avoid sending partial batches to the output.
This sample configuration forwards events to the output if 512 events are available or the oldest available event has been waiting for 5s in the queue:
queue.mem.events: 4096 queue.mem.flush.min_events: 512 queue.mem.flush.timeout: 5s
Setting | Description |
---|---|
The number of events the queue can store. This value should be evenly divisible by Default: |
|
The minimum number of events required for publishing. If this value is set to 0 or 1, events are available to the output immediately. If this value is greater than 1 the output must wait for the queue to accumulate this minimum number of events or for Default: |
|
(int) The maximum wait time for Default: |
Topics settings
editUse these options to dynamically set the Kafka topic for each Elastic Agent event.
Setting | Description |
---|---|
The default Kafka topic used for produced events. You can set the topic dynamically by using a format string to access any event field. For example, this configuration uses a custom field, topic: '%{[fields.log_topic]}' |
|
One or more topic processors including a condition, the event value to check against, and the resulting Kafka topic. Events that don’t match against any defined processor are set to the default topic. Rule settings:
As an example for setting up your processors, you might want to route log events based on severity. To do so, you can specify a default topic for all events not matched by other processors: outputs: kafka-output: type: kafka hosts: - 'kafka1:9092' - 'kafka2:9092' - 'kafka3:9092' topics: - topic: 'critical-%{[agent.version]}' when: contains: message: ' “CRITICAL”' - topic: 'error-%{[agent.version]}' when: contains: message: ' “ERR”' - topic: '%{[fields.log_topic]}' All non-critical and non-error events will then route to the default |
Partition settings
editThe number of partitions created is set automatically by the Kafka broker based on the list of topics. Records are then published to partitions either randomly, in round-robin order, or according to a calculated hash.
In the following example, after each event is published to a partition, the partitioner selects the next partition in round-robin fashion.
partition: round_robin: group_events: 1
Setting | Description |
---|---|
Sets the number of events to be published to the same partition, before the partitioner selects a new partition by random. The default value is 1 meaning after each event a new partition is picked randomly. |
|
Sets the number of events to be published to the same partition, before the partitioner selects the next partition. The default value is 1 meaning after each event the next partition will be selected. |
|
List of fields used to compute the partitioning hash value from. If no field is configured, the events key value will be used. |
|
Randomly distribute events if no hash or key value can be computed. |
Header settings
editA header is a key-value pair, and multiple headers can be included with the same key. Only string values are supported. These headers will be included in each produced Kafka message.
Setting | Description |
---|---|
The key to set in the Kafka header. |
|
The value to set in the Kafka header. |
|
The configurable ClientID used for logging, debugging, and auditing purposes. The default is |
Other configuration settings
editYou can specify these various other options in the kafka-output
section of the agent configuration file.
Setting | Description |
---|---|
(string) The number of seconds to wait before trying to reconnect to Kafka
after a network error. After waiting Default: |
|
(string) The maximum number of seconds to wait before attempting to connect to Kafka after a network error. Default: |
|
The maximum length of time a Kafka broker waits for the required number of ACKs before timing out (see the Default: |
|
(int) Duration to wait before sending bulk Kafka request. Default: |
|
(int) The maximum number of events to bulk in a single Kafka request. Default: |
|
(int) Per Kafka broker number of messages buffered in output pipeline. Default: |
|
Output codec configuration. You can specify either the
Example configuration that uses the output.console: codec.json: pretty: true escape_html: false
Example configurable that uses the output.console: codec.format: string: '%{[@timestamp]} %{[message]}' |
|
Select a compression codec to use. Supported codecs are |
|
For the Increasing the compression level reduces the network usage but increases the CPU usage. Default: |
|
(string) The keep-alive period for an active network connection. If Default: |
|
(int) The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. This value should be equal to or less than the broker’s Default: |
|
Kafka metadata update settings. The metadata contains information about brokers, topics, partition, and active leaders to use for publishing.
|
|
The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1. Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error. Default: |
|
The number of seconds to wait for responses from the Kafka brokers before timing out. The default is 30 (seconds). Default: |