Kinesis input plugin

edit
  • Plugin version: v2.1.1
  • Released on: 2019-02-16
  • Changelog

For other versions, see the Versioned plugin docs.

Installation

edit

For plugins not bundled by default, it is easy to install by running bin/logstash-plugin install logstash-input-kinesis. See Working with plugins for more details.

Getting Help

edit

For questions about the plugin, open a topic in the Discuss forums. For bugs or feature requests, open an issue in Github. For the list of Elastic supported plugins, please consult the Elastic Support Matrix.

Description

edit

You can use this plugin to receive events through AWS Kinesis. This plugin uses the Java Kinesis Client Library. The documentation at https://github.com/awslabs/amazon-kinesis-client will be useful.

AWS credentials can be specified either through environment variables, or an IAM instance role. The library uses a DynamoDB table for worker coordination, so you’ll need to grant access to that as well as to the Kinesis stream. The DynamoDB table has the same name as the application_name configuration option, which defaults to "logstash".

The library can optionally also send worker statistics to CloudWatch.

Usage

edit
input {
  kinesis {
    kinesis_stream_name => "my-logging-stream"
    codec => json { }
  }
}

Using with CloudWatch Logs

edit

If you want to read a CloudWatch Logs subscription stream, you’ll also need to install and configure the CloudWatch Logs Codec.

Authentication

edit

This plugin uses the default AWS SDK auth chain, DefaultAWSCredentialsProviderChain, to determine which credentials the client will use, unless profile is set, in which case ProfileCredentialsProvider is used.

The default chain reads the credentials in this order:

  • AWS_ACCESS_KEY_ID / AWS_SECRET_KEY environment variables
  • ~/.aws/credentials credentials file
  • EC2 instance profile

The credentials need access to the following services:

  • AWS Kinesis
  • AWS DynamoDB. The client library stores information for worker coordination in DynamoDB (offsets and active worker per partition)
  • AWS CloudWatch. If the metrics are enabled the credentials need CloudWatch update permissions granted.

See the AWS documentation for more information on the default chain.

Kinesis Input Configuration Options

edit

This plugin supports the following configuration options plus the Common Options described later.

Also see Common Options for a list of options supported by all input plugins.

 

application_name

edit
  • Value type is string
  • Default value is "logstash"

The application name used for the dynamodb coordination table. Must be unique for this kinesis stream.

checkpoint_interval_seconds

edit
  • Value type is number
  • Default value is 60

How many seconds between worker checkpoints to dynamodb.

initial_position_in_stream

edit
  • Value type is string
  • Default value is "TRIM_HORIZON"

The value for initialPositionInStream. Accepts "TRIM_HORIZON" or "LATEST".

kinesis_stream_name

edit
  • This is a required setting.
  • Value type is string
  • There is no default value for this setting.

The kinesis stream name.

metrics

edit
  • Value can be any of: ``, cloudwatch
  • Default value is nil

Worker metric tracking. By default this is disabled, set it to "cloudwatch" to enable the cloudwatch integration in the Kinesis Client Library.

profile

edit
  • Value type is string
  • There is no default value for this setting.

The AWS profile name for authentication. This ensures that the ~/.aws/credentials AWS auth provider is used. By default this is empty and the default chain will be used.

region

edit
  • Value type is string
  • Default value is "us-east-1"

The AWS region for Kinesis, DynamoDB, and CloudWatch (if enabled)

role_arn

edit
  • Value type is string
  • There is no default value for this setting.

The AWS role to assume. This can be used, for example, to access a Kinesis stream in a different AWS account. This role will be assumed after the default credentials or profile credentials are created. By default this is empty and a role will not be assumed.

role_session_name

edit
  • Value type is string
  • Default value is logstash

Session name to use when assuming an IAM role. This is recorded in CloudTrail logs for example.

additional_settings

edit
  • Value type is string
  • There is no default value for this setting

The KCL provides several configuration options which can be set in KinesisClientLibConfiguration. These options are configured via various function calls that all begin with with. Some of these functions take complex types, which are not supported. However, you may invoke any one of the withX() functions that take a primitive by providing key-value pairs in snake_case.

Example:

To set the dynamodb read and write capacity values, use these functions: withInitialLeaseTableReadCapacity and withInitialLeaseTableWriteCapacity.

additional_settings => {"initial_lease_table_read_capacity" => 25, "initial_lease_table_write_capacity" => 100}

Common Options

edit

The following configuration options are supported by all input plugins:

Setting Input type Required

add_field

hash

No

codec

codec

No

enable_metric

boolean

No

id

string

No

tags

array

No

type

string

No

Details

edit

 

add_field

edit
  • Value type is hash
  • Default value is {}

Add a field to an event

codec

edit
  • Value type is codec
  • Default value is "plain"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

enable_metric

edit
  • Value type is boolean
  • Default value is true

Disable or enable metric logging for this specific plugin instance by default we record all the metrics we can, but you can disable metrics collection for a specific plugin.

  • Value type is string
  • There is no default value for this setting.

Add a unique ID to the plugin configuration. If no ID is specified, Logstash will generate one. It is strongly recommended to set this ID in your configuration. This is particularly useful when you have two or more plugins of the same type, for example, if you have 2 kinesis inputs. Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.

input {
  kinesis {
    id => "my_plugin_id"
  }
}

tags

edit
  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

type

edit
  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.