This Kafka sink connector for Amazon EventBridge allows you to send events (records) from one or multiple Kafka
topic(s) to the specified event bus, including useful features such as:
offloading large events to S3 (✨ new in v1.3.0)
configurable topic to event detail-type name mapping with option to provide a custom class to customize event detail-type naming (✨ new in v1.3.0)
JsonPathDetailTypeMapper as a custom implementation to set the EventBridge detail-type using JsonPath expressions
from a Kafka sink record (✨ new in v1.4.0)
custom IAM profiles per connector
IAM role-based authentication
provide custom credentials provider class (✨ new in v1.3.3)
Amazon EventBridge Event Bus is a serverless event router that enables you to create scalable event-driven applications
by routing events between your own applications, third-party SaaS applications, and other AWS services. You can set up
routing rules to determine where to send your events, allowing for application architectures to react to changes in your
systems as they occur. To get started with Amazon EventBridge, visit our
documentation.
Getting Help
The best way to interact with our team is through GitHub. You can open an
issue and choose from one of our templates for
bug reports, feature requests, security questions, or guidance.
If you have a support plan with AWS Support, you can also create a new support
case.
Installation
Amazon Managed Streaming for Apache Kafka (MSK)
Amazon MSK Connect provides a fully managed experience for using Kafka Connect with Amazon MSK. For a complete
step-by-step tutorial on setting up this connector with Amazon MSK Connect, refer to the official Amazon MSK Connect
tutorial for the EventBridge Kafka
Connector.
Two kafka-eventbridge-sink JAR files, are created on each
release. The JAR file *-with-dependencies.jar
contains all required dependencies of the connector, excluding Kafka Connect dependencies and (de)serializers, such
as connect-api and connect-json. To support additional (de)serializers, such as Avro and Protobuf using the AWS
Glue Schema
Registry,
install these dependencies in your Kafka Connect environment before deploying this connector.
From Source
The following steps describe how to clone the repo and perform a clean packaging of the connector. Requires Maven and
Java Development Kit (JDK 17 or later).
Clone the repo:
git clone https://github.com/aws/eventbridge-kafka-connector.git
cd eventbridge-kafka-connector
[!TIP]
If you want to reuse your local Maven cache and/or persist the Maven dependencies pulled, add -v <local_maven_folder>:/root/.m2 to the above command.
Configuration
In addition to the common Kafka Connect sink-related
configuration options, this connector defines the following configuration properties.
Property
Required
Default
Description
aws.eventbridge.connector.id
Yes
The unique ID of this connector (used in the EventBridge event source field as a suffix on kafka-connect. to uniquely identify a connector).
aws.eventbridge.region
Yes
The AWS region of the target event bus.
aws.eventbridge.eventbus.arn
Yes
The ARN of the target event bus.
aws.eventbridge.endpoint.uri
No
An optional service endpoint URI used to connect to EventBridge.
aws.eventbridge.eventbus.global.endpoint.id
No
An optional global endpoint ID of the target event bus specified using abcde.xyz syntax (see API documentation).
aws.eventbridge.eventbus.resources
No
Optional Resources (comma-seperated) to add to each EventBridge event.
aws.eventbridge.detail.types
No
"kafka-connect-${topic}"
The detail-type that will be used for the EventBridge events. Can be defined per topic e.g., "topic1:MyDetailType, topic2:MyDetailType", as a single expression with a dynamic ${topic} placeholder for all topics e.g., "my-detail-type-${topic}" or as a static value without additional topic information for all topics e.g, "my-detail-type".
aws.eventbridge.detail.types.mapper.class
No
An optional class name implementing software.amazon.event.kafkaconnector.mapping.DetailTypeMapper to customize the EventBridge detail-typefield mapping (see Topic to detail-type mapping). If specified, the configuration property aws.eventbridge.detail.types is ignored.
aws.eventbridge.time.mapper.class
No
An optional class name implementing software.amazon.event.kafkaconnector.mapping.TimeMapper to customize the EventBridge Timefield mapping. If not specified, the event Time is set by EventBridge.
aws.eventbridge.retries.max
No
2
The maximum number of retry attempts when sending events to EventBridge.
aws.eventbridge.retries.delay
No
200
The retry delay in milliseconds between each retry attempt.
aws.eventbridge.auth.credentials_provider.class
No
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider or software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider if aws.eventbridge.iam.role.arn is provided
An optional class name of the credentials provider to use. It must implement software.amazon.awssdk.auth.credentials.AwsCredentialsProvider with a no-arg constructor and optionally org.apache.kafka.common.Configurable to configure the provider after instantiation.
The part of the event (payload) to offload to S3 (only active when aws.eventbridge.offloading.default.s3.bucket is set)
[!NOTE]
When using the default retry configuration (or retries > 0), the connector provides at-least-once delivery semantics
for valid Kafka records, i.e., records which can be correctly (de)serialized before making a delivery attempt to
EventBridge.
Examples
JSON Encoding
The following minimal configuration configures the connector with default values, consuming Kafka records from the topic
"json-values-topic" with record keys as String and JSON values (without schema), and sending events to the custom
EventBridge event bus "kafkabus" in region "us-east-1".
{
"name": "EventBridgeSink-Json",
"config": {
// consume from earliest record or last checkpointed offset if available
"auto.offset.reset": "earliest",
"connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
"topics": "json-values-topic",
"aws.eventbridge.connector.id": "my-json-values-connector",
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:1234567890:event-bus/kafkabus",
"aws.eventbridge.region": "us-east-1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
// see note below on JSON schemas
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
[!NOTE]
Currently, when using JsonConverter for keys or values, the connector uses a fixed configuration
schemas.enable=false, i.e., JSON schemas are not included in the outgoing EventBridge event.
JSON Encoding with Dead-Letter Queue
Continuing the example above, the following configuration defines a dead-letter queue (DLQ), i.e., topic, "json-dlq"
which will be created with an replication factor of 1 if it does not exist. Records which cannot be converted or
delivered to EventBridge will be sent to this DLQ.
Avro Encoding with multiple Topics, IAM Role and custom Retries
The following configuration shows some advanced options, such as multiple topics with customized detail-type mapping,
customized retry behavior, and IAM-based authentication, and how to deserialize Avro-encoded record values (with
JSON-encoded keys) using AWS Glue Schema Registry
(GSR).
{
"name": "EventBridgeSink-Avro",
"config": {
"auto.offset.reset": "earliest",
"connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
"topics": "avro-topic-1,avro-topic-2",
"aws.eventbridge.connector.id": "avro-test-connector",
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:1234567890:event-bus/kafkabus",
"aws.eventbridge.region": "us-east-1",
// customized retries
"aws.eventbridge.retries.max": 1,
"aws.eventbridge.retries.delay": 1000,
// custom detail-type mapping with topic suffix
"aws.eventbridge.detail.types": "avro-test-${topic}",
// IAM-based authentication
"aws.eventbridge.iam.role.arn":"arn:aws:iam::1234567890:role/EventBridgePutEventsRole",
"tasks.max": 1,
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
// dependencies (Classes) must be in the connector $CLASSPATH
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
// GSR region
"value.converter.region": "us-east-1",
// GSR registry to use (expects schemas to exist and IAM role to have permission to read)
"value.converter.registry.name": "avro-kafka-eventbridge",
"value.converter.avroRecordType": "GENERIC_RECORD"
}
}
[!IMPORTANT]
This connector does not include custom (de)serializers, such as AWSKafkaAvroConverter as shown above. Refer to the
Kafka Connect, schema registry (e.g.
GSR),
or (de)serializer (e.g. GSR SerDes) documentation how to
provide them to Kafka connectors.
Topic to detail-type Mapping
The main task of this connector is to convert Kafka records to EventBridge events. Since this connector can be used to
consume from multiple Kafka topics, which an EventBridge user might want to filter later on, the mapping of topic names
to the EventBridge detail-type, i.e. event type, is customizable.
The default, i.e., when the configuration option aws.eventbridge.detail.types is not set, uses kafka-connect- as a
prefix, followed by the topic name of each individual record. Alternatively, a custom detail-type can be defined per
topic, provided as a comma-separated list with the syntax "<topic_name>:<detail_type>,<topic_name>:<detail_type>,..."
e.g., "orders:com.example.org.orders.event.v0,customers:com.example.org.customers.event.v0". Records from the orders
topic would result in EventBridge events with a detail-type: com.example.org.orders.event.v0.
If only the topic name should be used, a single expression with a dynamic ${topic} placeholder for all topics can be
used e.g., "my-detail-type-${topic}" (using a hardcoded prefix), "${topic}" (only topic name), or as a static value
without additional topic information "my-detail-type".
[!TIP]
You can implement custom detail-type mapping by specifying a custom class in the
aws.eventbridge.detail.types.mapper.class configuration property. For example, this project provides a custom
JsonPathDetailTypeMapper, supporting JsonPath expressions to extract a particular value from a Kafka record.
Using the custom JsonPathDetailTypeMapper
The JsonPathDetailTypeMapper is a built-in implementation that allows you to set the EventBridge detail-type from a
Kafka record using JsonPath expressions. This is particularly useful when you want to derive the detail-type from the
content of your Kafka messages instead of using topic names.
How it works
The JsonPathDetailTypeMapper works as follows:
It converts the Kafka record to JSON format to enable JsonPath processing
This means it can handle any Kafka record schema as long as it can be converted to a JSON object
It extracts the detail-type value using the configured JsonPath expression
The extracted value is used as the EventBridge detail-type
For example, given the following value in a Kafka record (after internal conversion to a JSON object by
JsonPathDetailTypeMapper):
[!NOTE] When using JsonPathDetailTypeMapper, the setting aws.eventbridge.detail.types is ignored.
Restrictions and Fallback Behavior
The JsonPath expression must be definite, meaning it should resolve to a single value, otherwise an exception during
configuration of the JsonPathDetailTypeMapper is thrown.
The extracted value must be of type String.
The record topic is returned as fallback behavior in the following cases:
If the Kafka record cannot be converted to a valid JSON object
If the JsonPath expression is not found in the record
If the extracted value is null, not a string, or an empty string
[!NOTE]
Fallback cases are logged as WARN or ERROR depending on severity. JsonPath parsing is logged at TRACE level.
Offloading large events (payloads) to S3
The current PutEvents size limit in
EventBridge requires the combined request entry size to be less than 1MB. This can be problematic in cases where Kafka topics contain records exceeding this limit. By
default, the connector logs a warning when trying to send those events to EventBridge which can be ignored (dropped) or
sent to a Kafka dead-letter topic (see Payloads exceeding PutEvents Limit).
Alternatively, the connector can be configured to offload (parts of) the event to S3 before calling the PutEvents API.
This is also known as the claim-check pattern. When enabled (see Configuration), every record
received from the associated Kafka topics in the connector which matches the
JSONPath expression defined in
aws.eventbridge.offloading.default.fieldref(default: $.detail.value) will be offloaded.
Configure Offloading
To enable offloading, specify an S3 bucket via aws.eventbridge.offloading.default.s3.bucket.
[!NOTE]
The IAM credentials/role used in the connector needs
PutObject permissions.
Unless overwritten by aws.eventbridge.offloading.default.fieldref, the connector will offload the value in
$.detail.value to S3, delete that key from the event and add claim-check information to the event metadata (see
examples below). The JSONPath expression applies to the converted EventBridge event before calling PutEvents to EventBridge.
The benefits of this approach over other offloading implementations is flexibility in which parts of the events should
be offloaded and retaining as much of the original event as possible to harness the powerful event filtering
capabilities in EventBridge. For example, some events in a topic might contain large blobs of binary/base64-encoded data
which most consumers are not interested. In those cases, offloading helps to trim down event (payload) size and giving
the consumer(s) interested in the full payload the option to fully reconstruct the event based on the offloaded S3
object and metadata added to the event structure.
[!NOTE]
Array and wildcard references are not allowed in the JSONPath expression defined in
aws.eventbridge.offloading.default.fieldref and the JSONPath must always begin with $.detail.value.
Examples
Assuming offloading is enabled via the setting aws.eventbridge.offloading.default.s3.bucket="my-offloading-bucket" and
the following event structure which the S3 offloading logic in the connector operates on before making the final
PutEvents API call to EventBridge:
In the S3 bucket my-offloading-bucket there would be an object 2d10c6f6-31e9-43b4-8706-51b4cf5534d8 containing:
{
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023",
"orderPreferences": null
}
Continuing the example, if aws.eventbridge.offloading.default.fieldref is $.detail.value.non-existing-key,
offloading would pass this event through without modification. The resulting event would be the same as the input event
without offloading information:
If aws.eventbridge.offloading.default.fieldref is $.detail.value.orderPreferences and matches a key with a null
value, offloading is also skipped as there is nothing to offload. The resulting event would be the same as the input
event without offloading information:
[!NOTE]
If offloading matches a key with an empty object {} or array [], these values are considered a match and will be
offloaded just as any other matched value.
Records with null Keys and Tombstone Handling
Records with null keys are processed and included in the EventBridge event detail structure. The null key is
preserved in the event, allowing downstream processors to know the original record had no key.
Records with null values (tombstones) are processed and included in the EventBridge event detail structure. The
null value is preserved, allowing downstream EventBridge rules and targets to handle tombstone records according to
their requirements. This is particularly useful for:
Change Data Capture (CDC) scenarios where tombstones indicate record deletions
Applications that need to track when values are explicitly set to null
Example EventBridge event structure for null keys and tombstone records:
By default, the connector is configured to retry failed PutEvents
API calls, i.e. an Exception was
thrown, 2 times, i.e., 3 total attempts, with a constant delay between each retry of 200 milliseconds. These
values can be configured (see configuration). The following exceptions (incl. their subclasses) are
considered retryable: AwsServiceException, SdkClientException, ExecutionException, InterruptedException,
TimeoutException.
[!NOTE]
EventBridgeExceptions with a 413 status code (PutEventsRequestEntry limit exceeded) are not retried.
[!NOTE]
The setting aws.eventbridge.retries.max is also used on the underlying AWS SDK client, which automatically handles
certain retryable errors, such as throttling, without immediately throwing an exception. Currently, this can lead to
more than the desired retry attempts since those exceptions are also considered retryable by the connector code.
Authentication and Permissions
Authentication (IAM Credentials)
Each connector task creates an EventBridge client using the AWS
DefaultCredentialsProvider
to look up credentials. AWS credential providers use a predefined configuration and configuration
order to
retrieve credentials from the various credential sources.
For example, you can provide (temporary) credentials to the connector using AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN environment variables. For information how to use AWS config and
credentials profiles to resolve credentials, see Using different Configuration Profiles per Connector.
When the configuration property “aws.eventbridge.iam.role.arn” is set, the
StsAssumeRoleCredentialsProvider
is directly used to assume the specified IAM role and periodically refresh credentials with STS. The STS client uses the
configured region of the connector for the STS client and retrieves credentials using the DefaultCredentialsProvider
retrieval chain described above.
Required Connector Permissions to send events to EventBridge (IAM Policy)
The connector only requires events:PutEvents permission as shown in the IAM policy example below. For details refer to
the “Managing access permissions to your Amazon EventBridge resources”
documentation.
[!IMPORTANT]
If you use the Glue Schema Registry, the IAM role needs additional permissions to retrieve schemas e.g.,
using the managed policy AWSGlueSchemaRegistryReadonlyAccess. Please refer to the Glue Schema Registry
documentation.
Using different Configuration Profiles per Connector
If you run multiple EventBridge connectors in your Kafka Connect environment, using environment variables or Java system
properties to configure your connectors means that each connector will be configured with the same IAM permissions.
If you want to configure multiple connectors with specific (different) IAM profiles from your config and credentials
files, the connector configuration option aws.eventbridge.iam.profile.name can be used.
With the connector configuration option aws.eventbridge.iam.profile.name you specify which profile the specific
connector will use.
[!IMPORTANT]
Environment variables, such as AWS_PROFILE or AWS access keys always take precedence over the configuration
files and must not be set for this configuration option to take effect.
Steps to configure a connector with a configuration profile:
First, set "aws.eventbridge.iam.profile.name": "my-custom-profile" in the connector JSON configuration file (replace
example values with your desired profile name). Then, create (or mount) the AWS config and credentials files in your
Kafka Connect host(s). If the configuration files are not located/mounted in the default
location, set the environment variables
AWS_CONFIG_FILE and AWS_SHARED_CREDENTIALS_FILE accordingly. For example, with Docker you can mount them from your
local machine using Docker volume mounts and environment variables (see example below).
Docker Compose Example for a custom profile “my-custom-profile”
config file:
[profile my-custom-profile]
output=text # not used by the SDK, for illustration purposes
connect:
# (snip)
environment:
AWS_CONFIG_FILE: '/aws/config'
AWS_SHARED_CREDENTIALS_FILE: '/aws/credentials'
volumes:
- /Users/example/.aws:/aws # mount credentials from local host to /aws folder
You can also use role-based authentication with this approach by referencing a source_profile in the config file:
config file (role-based authentication):
[profile my-custom-profile]
role_arn = arn:aws:iam::0123456789:role/KafkaConnectorPutEvents
source_profile = default # assume role using credentials using from the default profile specified in the credentials file
Custom credentials provider
To use your own credentials provider, the class must implement the interface of AwsCredentialsProvider with a no-arg constructor and optionally the Kafka Configurable interface to configure the provider after instantiation.
Example configuration to use custom credentials provider com.example.MyCustomCredentialsProvider:
{
"name": "EventBridgeSink-CustomCredentialsProvider",
"config": {
// other configuration attributes are omitted for clarity
"aws.eventbridge.auth.credentials_provider.class": "com.example.MyCustomCredentialsProvider"
}
}
[!IMPORTANT]
Since the class must be loadable from Kafka Connect, place the (uber) JAR with your custom credentials provider (and third-party dependencies) to a directory already listed in the plugin path (plugin.path).
Deployment to Kafka Connect
The connector can be deployed like any Kafka connector e.g., using the Kafka Connect REST API:
[!TIP]
Consult the EventBridge event patterns
documentation for a complete
explanation of available patterns.
Using EventBridge Input Transformations to restructure events
EventBridge input transformations allow you to reshape events before they’re delivered to targets. This is particularly useful when working with the Kafka connector to:
Flatten and restructure events
Remove unnecessary fields from the event
Add additional fields or context to the event
Transform the event format to match target system requirements
Example: Flattening Kafka Message Structure
To extract just the topic, key, and value from an incoming Kafka event (see example above), set up the Input Path and Template in your EventBridge rule and target as follows:
Common issues are around schema handling, authentication and authorization (IAM), and debugging the event flow.
Schemas
If you see the following errors, check your connector configuration if it uses the correct key and value schema
settings.
Error:
The following error is caused when the JsonConverter is used and configured to use a schema within the Kafka record.
If the Kafka record was not produced with a JSON schema, i.e., only the JSON value, deserialization will fail with:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload"
fields and may not contain additional fields. If you are trying to deserialize plain JSON data,
set schemas.enable=false in your converter configuration.
The following error is caused when an AvroConverter is used but the respective key/value is not Avro-encoded:
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Resolution:
Change the key and/or value converters from Avro to the actual schema/payload type stored in the topic.
IAM
When invalid IAM credentials are used, such as due to expired tokens or insufficient permissions, the connector will
throw an exception after an PutEvents API call attempt to EventBridge or during key/value deserialization when an
external schema registry with authentication is used. An example error message due to insufficient PutEvents
permissions looks like:
org.apache.kafka.connect.errors.ConnectException: software.amazon.event.kafkaconnector.exceptions.EventBridgeWriterException:
java.util.concurrent.ExecutionException: software.amazon.awssdk.services.eventbridge.model.EventBridgeException:
User: arn:aws:sts::1234567890:assumed-role/some/role is not authorized to perform: events:PutEvents on resource: arn:aws:events:us-east-1:1234567890:event-bus/kafkabus because no identity-based policy allows the events:PutEvents action
(Service: EventBridge, Status Code: 400, Request ID: e5ed0fb7-535d-4417-b38b-110f8495d0cb)
Throttling (API Rate Limiting)
By default, the underlying AWS SDK client used will automatically handle throttle errors (exceptions) when the
PutEvents ingestion quota for the account/region is exceeded.
However, depending on your quota and ingestion rate, if the client keeps hitting the rate limit it might throw an
exception to the connector. When setting aws.eventbridge.retries.max greater than 0, the connector will attempt to
retry such a failed PutEvents attempt up to aws.eventbridge.retries.max. If aws.eventbridge.retries.max is 0 or
the retry budget is exhausted, a terminal ConnectException is thrown and the task will be stopped.
We recommend to verify your PutEvents account quota for the specific AWS
region and adjusting the Kafka Connect sink setting
consumer.override.max.poll.records accordingly. For example, if your PutEvents quota is 500, setting
consumer.override.max.poll.records=400 leaves enough headroom.
[!NOTE]
The EventBridge PutEvents quota is an account-level soft quota, i.e., it applies to the sum of all PutEvents
requests in the same account, such as running multiple tasks of this connector. If you need to increase the quota
beyond the hard limit, reach out to the EventBridge service team to better understand your use case and needs.
[!NOTE]
consumer.override.max.poll.interval.ms is a related setting after which a consumer is considered failed and will
leave the consumer group. Continuing the example above, if consumer.override.max.poll.records=400 and
consumer.override.max.poll.interval.ms=300000 (the default as of Kafka 3.5), it means that processing 400 records
is allowed to take up to 5 minutes, i.e., 750 milliseconds per record/event, before considering the consumer (task)
failed.
Payloads exceeding PutEvents Limit
EventBridge has a limit where each
request entry size used in PutEvents must be less than 1MB. When a Kafka record exceeds this threshold, the connector will log a warning and
ignore (skip) over the record. Optionally, a dead-letter topic can be
configured where such records are sent to or offloading to
S3 can be enabled.
The connector will periodically (asynchronously) on a per-task basis report the count of successful PutEvents API
calls e.g.:
[2023-05-25 11:53:04,598] INFO [EventBridgeSink-Json|task-0] Total records sent=15 (software.amazon.event.kafkaconnector.util.StatusReporter:36)
[!TIP]
Depending on your Kafka Connect environment, you can enable [EventBridgeSink-Json|task-0] logging style using this environment variable in Kafka Connect CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
By enabling TRACE-level logging, the connector will emit additional log messages, such as the underlying AWS SDK
client configuration, records received from Kafka Connect, PutEvents stats, such as start, end time and duration, etc.
Depending on your Kafka Connect environment, you can enable TRACE-level logging via environment variables on Kafka
Connect using CONNECT_LOG4J_LOGGERS: "software.amazon.event.kafkaconnector=TRACE". Please consult your Kafka Connect
documentation how to configure and change log levels for a particular connector.
Warning Enabling TRACE-level logging can expose sensitive information due to logging record keys and values. It is strongly
recommended to audit changes to the log level to guard against leaking sensitive data, such as personally identifiable
information (PII).
Kafka Connector for Amazon EventBridge
This Kafka sink connector for Amazon EventBridge allows you to send events (records) from one or multiple Kafka topic(s) to the specified event bus, including useful features such as:
v1.3.0)detail-typename mapping with option to provide a custom class to customize eventdetail-typenaming (✨ new inv1.3.0)JsonPathDetailTypeMapperas a custom implementation to set the EventBridgedetail-typeusing JsonPath expressions from a Kafka sink record (✨ new inv1.4.0)v1.3.3)See configuration below for details.
Amazon EventBridge Event Bus is a serverless event router that enables you to create scalable event-driven applications by routing events between your own applications, third-party SaaS applications, and other AWS services. You can set up routing rules to determine where to send your events, allowing for application architectures to react to changes in your systems as they occur. To get started with Amazon EventBridge, visit our documentation.
Getting Help
The best way to interact with our team is through GitHub. You can open an issue and choose from one of our templates for bug reports, feature requests, security questions, or guidance.
If you have a support plan with AWS Support, you can also create a new support case.
Installation
Amazon Managed Streaming for Apache Kafka (MSK)
Amazon MSK Connect provides a fully managed experience for using Kafka Connect with Amazon MSK. For a complete step-by-step tutorial on setting up this connector with Amazon MSK Connect, refer to the official Amazon MSK Connect tutorial for the EventBridge Kafka Connector.
Confluent Connector Hub
Download the connector from Confluent Connector Hub.
Java Archive (JAR)
Two
kafka-eventbridge-sinkJAR files, are created on each release. The JAR file*-with-dependencies.jarcontains all required dependencies of the connector, excluding Kafka Connect dependencies and (de)serializers, such asconnect-apiandconnect-json. To support additional (de)serializers, such as Avro and Protobuf using the AWS Glue Schema Registry, install these dependencies in your Kafka Connect environment before deploying this connector.From Source
The following steps describe how to clone the repo and perform a clean packaging of the connector. Requires Maven and Java Development Kit (JDK 17 or later).
Clone the repo:
Create JAR artifacts:
From Source (Docker)
The following steps describe how to clone the repo and perform a clean packaging of the connector using Docker.
Clone the repo:
Create JAR artifacts:
Configuration
In addition to the common Kafka Connect sink-related configuration options, this connector defines the following configuration properties.
aws.eventbridge.connector.idsourcefield as a suffix onkafka-connect.to uniquely identify a connector).aws.eventbridge.regionaws.eventbridge.eventbus.arnaws.eventbridge.endpoint.uriaws.eventbridge.eventbus.global.endpoint.idabcde.xyzsyntax (see API documentation).aws.eventbridge.eventbus.resourcesResources(comma-seperated) to add to each EventBridge event.aws.eventbridge.detail.types"kafka-connect-${topic}"detail-typethat will be used for the EventBridge events. Can be defined per topic e.g.,"topic1:MyDetailType, topic2:MyDetailType", as a single expression with a dynamic${topic}placeholder for all topics e.g.,"my-detail-type-${topic}"or as a static value without additional topic information for all topics e.g,"my-detail-type".aws.eventbridge.detail.types.mapper.classsoftware.amazon.event.kafkaconnector.mapping.DetailTypeMapperto customize the EventBridgedetail-typefield mapping (see Topic todetail-typemapping). If specified, the configuration propertyaws.eventbridge.detail.typesis ignored.aws.eventbridge.time.mapper.classsoftware.amazon.event.kafkaconnector.mapping.TimeMapperto customize the EventBridgeTimefield mapping. If not specified, the eventTimeis set by EventBridge.aws.eventbridge.retries.max2aws.eventbridge.retries.delay200aws.eventbridge.auth.credentials_provider.classsoftware.amazon.awssdk.auth.credentials.DefaultCredentialsProviderorsoftware.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProviderifaws.eventbridge.iam.role.arnis providedsoftware.amazon.awssdk.auth.credentials.AwsCredentialsProviderwith a no-arg constructor and optionallyorg.apache.kafka.common.Configurableto configure the provider after instantiation.aws.eventbridge.iam.profile.nameaws.eventbridge.iam.role.arnaws.eventbridge.iam.external.idaws.eventbridge.offloading.default.s3.bucketaws.eventbridge.offloading.default.fieldref$.detail.valueaws.eventbridge.offloading.default.s3.bucketis set)Examples
JSON Encoding
The following minimal configuration configures the connector with default values, consuming Kafka records from the topic
"json-values-topic"with record keys asStringandJSONvalues (without schema), and sending events to the custom EventBridge event bus"kafkabus"in region"us-east-1".JSON Encoding with Dead-Letter Queue
Continuing the example above, the following configuration defines a dead-letter queue (DLQ), i.e., topic,
"json-dlq"which will be created with an replication factor of1if it does not exist. Records which cannot be converted or delivered to EventBridge will be sent to this DLQ.Avro Encoding with multiple Topics, IAM Role and custom Retries
The following configuration shows some advanced options, such as multiple topics with customized
detail-typemapping, customized retry behavior, and IAM-based authentication, and how to deserialize Avro-encoded record values (with JSON-encoded keys) using AWS Glue Schema Registry (GSR).Topic to
detail-typeMappingThe main task of this connector is to convert Kafka records to EventBridge events. Since this connector can be used to consume from multiple Kafka topics, which an EventBridge user might want to filter later on, the mapping of topic names to the EventBridge
detail-type, i.e. event type, is customizable.The default, i.e., when the configuration option
aws.eventbridge.detail.typesis not set, useskafka-connect-as a prefix, followed by the topic name of each individual record. Alternatively, a customdetail-typecan be defined per topic, provided as a comma-separated list with the syntax"<topic_name>:<detail_type>,<topic_name>:<detail_type>,..."e.g.,"orders:com.example.org.orders.event.v0,customers:com.example.org.customers.event.v0". Records from theorderstopic would result in EventBridge events with adetail-type: com.example.org.orders.event.v0.If only the topic name should be used, a single expression with a dynamic
${topic}placeholder for all topics can be used e.g.,"my-detail-type-${topic}"(using a hardcoded prefix),"${topic}"(only topic name), or as a static value without additional topic information"my-detail-type".Using the custom
JsonPathDetailTypeMapperThe
JsonPathDetailTypeMapperis a built-in implementation that allows you to set the EventBridgedetail-typefrom a Kafka record using JsonPath expressions. This is particularly useful when you want to derive thedetail-typefrom the content of your Kafka messages instead of using topic names.How it works
The JsonPathDetailTypeMapper works as follows:
For example, given the following value in a Kafka record (after internal conversion to a JSON object by
JsonPathDetailTypeMapper):The following configuration would return the string
customer.order.createdto be used in the EventBridgedetail-type:Restrictions and Fallback Behavior
JsonPathDetailTypeMapperis thrown.String.Offloading large events (payloads) to S3
The current
PutEventssize limit in EventBridge requires the combined request entry size to be less than 1MB. This can be problematic in cases where Kafka topics contain records exceeding this limit. By default, the connector logs a warning when trying to send those events to EventBridge which can be ignored (dropped) or sent to a Kafka dead-letter topic (see Payloads exceeding PutEvents Limit).Alternatively, the connector can be configured to offload (parts of) the event to S3 before calling the
PutEventsAPI. This is also known as the claim-check pattern. When enabled (see Configuration), every record received from the associated Kafka topics in the connector which matches the JSONPath expression defined inaws.eventbridge.offloading.default.fieldref(default:$.detail.value) will be offloaded.Configure Offloading
To enable offloading, specify an S3 bucket via
aws.eventbridge.offloading.default.s3.bucket.Unless overwritten by
aws.eventbridge.offloading.default.fieldref, the connector will offload the value in$.detail.valueto S3, delete that key from the event and add claim-check information to the event metadata (see examples below). The JSONPath expression applies to the converted EventBridge event before callingPutEventsto EventBridge.The benefits of this approach over other offloading implementations is flexibility in which parts of the events should be offloaded and retaining as much of the original event as possible to harness the powerful event filtering capabilities in EventBridge. For example, some events in a topic might contain large blobs of binary/base64-encoded data which most consumers are not interested. In those cases, offloading helps to trim down event (payload) size and giving the consumer(s) interested in the full payload the option to fully reconstruct the event based on the offloaded S3 object and metadata added to the event structure.
Examples
Assuming offloading is enabled via the setting
aws.eventbridge.offloading.default.s3.bucket="my-offloading-bucket"and the following event structure which the S3 offloading logic in the connector operates on before making the finalPutEventsAPI call to EventBridge:If
aws.eventbridge.offloading.default.fieldrefis$.detail.value(the default), the resulting event sent to EventBridge would be:In the S3 bucket
my-offloading-bucketthere would be an object2d10c6f6-31e9-43b4-8706-51b4cf5534d8containing:Continuing the example, if
aws.eventbridge.offloading.default.fieldrefis$.detail.value.non-existing-key, offloading would pass this event through without modification. The resulting event would be the same as the input event without offloading information:If
aws.eventbridge.offloading.default.fieldrefis$.detail.value.orderPreferencesand matches a key with anullvalue, offloading is also skipped as there is nothing to offload. The resulting event would be the same as the input event without offloading information:Records with
nullKeys and Tombstone HandlingRecords with
nullkeys are processed and included in the EventBridge event detail structure. Thenullkey is preserved in the event, allowing downstream processors to know the original record had no key.Records with
nullvalues (tombstones) are processed and included in the EventBridge event detail structure. Thenullvalue is preserved, allowing downstream EventBridge rules and targets to handle tombstone records according to their requirements. This is particularly useful for:nullExample EventBridge event structure for
nullkeys and tombstone records:Example EventBridge Rule Filtering Patterns
Only match events where both,
keyandvaluearenull:Ignore tombstone events:
Retry Behavior
By default, the connector is configured to retry failed
PutEventsAPI calls, i.e. an Exception was thrown,2times, i.e.,3total attempts, with a constant delay between each retry of200milliseconds. These values can be configured (see configuration). The following exceptions (incl. their subclasses) are considered retryable:AwsServiceException,SdkClientException,ExecutionException,InterruptedException,TimeoutException.Authentication and Permissions
Authentication (IAM Credentials)
Each connector task creates an EventBridge client using the AWS DefaultCredentialsProvider to look up credentials. AWS credential providers use a predefined configuration and configuration order to retrieve credentials from the various credential sources.
For example, you can provide (temporary) credentials to the connector using
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, andAWS_SESSION_TOKENenvironment variables. For information how to use AWSconfigandcredentialsprofiles to resolve credentials, see Using different Configuration Profiles per Connector.When the configuration property
“aws.eventbridge.iam.role.arn”is set, the StsAssumeRoleCredentialsProvider is directly used to assume the specified IAM role and periodically refresh credentials with STS. The STS client uses the configuredregionof the connector for the STS client and retrieves credentials using theDefaultCredentialsProviderretrieval chain described above.Required Connector Permissions to send events to EventBridge (IAM Policy)
The connector only requires
events:PutEventspermission as shown in the IAM policy example below. For details refer to the “Managing access permissions to your Amazon EventBridge resources” documentation.Using different Configuration Profiles per Connector
If you run multiple EventBridge connectors in your Kafka Connect environment, using environment variables or Java system properties to configure your connectors means that each connector will be configured with the same IAM permissions. If you want to configure multiple connectors with specific (different) IAM profiles from your
configandcredentialsfiles, the connector configuration optionaws.eventbridge.iam.profile.namecan be used.With the connector configuration option
aws.eventbridge.iam.profile.nameyou specify which profile the specific connector will use.Steps to configure a connector with a configuration profile:
First, set
"aws.eventbridge.iam.profile.name": "my-custom-profile"in the connector JSON configuration file (replace example values with your desired profile name). Then, create (or mount) the AWSconfigandcredentialsfiles in your Kafka Connect host(s). If the configuration files are not located/mounted in the default location, set the environment variablesAWS_CONFIG_FILEandAWS_SHARED_CREDENTIALS_FILEaccordingly. For example, with Docker you can mount them from your local machine using Docker volume mounts and environment variables (see example below).Docker Compose Example for a custom profile “my-custom-profile”
configfile:credentialsfile:Docker Compose file (snippet):
You can also use role-based authentication with this approach by referencing a
source_profilein theconfigfile:configfile (role-based authentication):Custom credentials provider
To use your own credentials provider, the class must implement the interface of AwsCredentialsProvider with a no-arg constructor and optionally the Kafka Configurable interface to configure the provider after instantiation.
Example configuration to use custom credentials provider
com.example.MyCustomCredentialsProvider:Deployment to Kafka Connect
The connector can be deployed like any Kafka connector e.g., using the Kafka Connect REST API:
Example Event
Below is an example of an event received by an EventBridge target using the minimal JSON configuration described above.
Example EventBridge Rule Pattern
The following Rule pattern would match the above event, i.e., any event where:
sourceis exactlykafka-connect.my-json-values-connectoranddetail.keystarts withorderandorderItemsexists in thedetails.valueobjectUsing EventBridge Input Transformations to restructure events
EventBridge input transformations allow you to reshape events before they’re delivered to targets. This is particularly useful when working with the Kafka connector to:
Example: Flattening Kafka Message Structure
To extract just the topic, key, and value from an incoming Kafka event (see example above), set up the Input Path and Template in your EventBridge rule and target as follows:
Input path:
Input template:
Troubleshooting
Common issues are around schema handling, authentication and authorization (IAM), and debugging the event flow.
Schemas
If you see the following errors, check your connector configuration if it uses the correct key and value schema settings.
Error:
The following error is caused when the
JsonConverteris used and configured to use a schema within the Kafka record. If the Kafka record was not produced with a JSON schema, i.e., only the JSON value, deserialization will fail with:Resolution:
Error:
The following error is caused when an
AvroConverteris used but the respective key/value is not Avro-encoded:Resolution:
Change the key and/or value converters from Avro to the actual schema/payload type stored in the topic.
IAM
When invalid IAM credentials are used, such as due to expired tokens or insufficient permissions, the connector will throw an exception after an
PutEventsAPI call attempt to EventBridge or during key/value deserialization when an external schema registry with authentication is used. An example error message due to insufficientPutEventspermissions looks like:Throttling (API Rate Limiting)
By default, the underlying AWS SDK client used will automatically handle throttle errors (exceptions) when the
PutEventsingestion quota for the account/region is exceeded. However, depending on your quota and ingestion rate, if the client keeps hitting the rate limit it might throw an exception to the connector. When settingaws.eventbridge.retries.maxgreater than0, the connector will attempt to retry such a failedPutEventsattempt up toaws.eventbridge.retries.max. Ifaws.eventbridge.retries.maxis 0 or the retry budget is exhausted, a terminalConnectExceptionis thrown and the task will be stopped.We recommend to verify your
PutEventsaccount quota for the specific AWS region and adjusting the Kafka Connect sink settingconsumer.override.max.poll.recordsaccordingly. For example, if yourPutEventsquota is500, settingconsumer.override.max.poll.records=400leaves enough headroom.Payloads exceeding
PutEventsLimitEventBridge has a limit where each request entry size used in
PutEventsmust be less than 1MB. When a Kafka record exceeds this threshold, the connector will log a warning and ignore (skip) over the record. Optionally, a dead-letter topic can be configured where such records are sent to or offloading to S3 can be enabled.Debugging Event Flow (TRACE-level logging)
The connector will periodically (asynchronously) on a per-task basis report the count of successful
PutEventsAPI calls e.g.:By enabling
TRACE-level logging, the connector will emit additional log messages, such as the underlying AWS SDK client configuration, records received from Kafka Connect,PutEventsstats, such as start, end time and duration, etc.Depending on your Kafka Connect environment, you can enable
TRACE-level logging via environment variables on Kafka Connect usingCONNECT_LOG4J_LOGGERS: "software.amazon.event.kafkaconnector=TRACE". Please consult your Kafka Connect documentation how to configure and change log levels for a particular connector.Contributing and Security
See CONTRIBUTING for more information.
License
This project is licensed under the Apache-2.0 License.
Credits
A HUGE THANK YOU to @flo-mair and @maschnetwork for their initial contributions to this project.