Filter Chain Definition

Learn how to define complex pipelines to transform and structure your data before integration into Kafka.

The connector can be configured to apply complex transformations on messages before they are written to Kafka.

Configuration

A filter chain can be specified in the connector configuration.

  • filters - List of aliases for the filter, specifying the order in which the filters will be applied.
  • filters.$alias.type - Fully qualified class name for the filter.
  • filters.$alias.$filterSpecificConf - Configuration properties for the filter

For example, let’s parse a standard application logs file written with log4j using the build-in filters :

filters=GroupMultilineException, ExtractFirstLine, ParseLog4jLog

filters.GroupMultilineException.type=io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter
filters.GroupMultilineException.negate=false
filters.GroupMultilineException.pattern="^[\\t]"

filters.ExtractFirstLine.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.ExtractFirstLine.field=$.logmessage
filters.ExtractFirstLine.values={{ extract_array($.message, 0) }

filters.ParseLog4jLog.type=io.streamthoughts.kafka.connect.filepulse.filter.impl.GrokFilter
filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:thread} %{GREEDYDATA:logmessage}"
filters.ParseLog4jLog.source=log
filters.ParseLog4jLog.overwrite=logmessage

List of Processing Filters Available

These filters are available for use with Kafka Connect File Pulse:

FilterDescriptionSince
AppendFilterAppends one or more values to an existing or non-existing array field
ConvertFilterConverts a message field’s value to a specific type
CSVFilterParses a message field’s value containing columns delimited by a character into a structv2.7.0
DateFilterConverts a field’s value containing a date to a unix epoch time
DelimitedRowFilterParses a message field’s value containing columns delimited by a separator into a struct
DropFilterDrops messages satisfying a specific condition without throwing exception.
ExcludeFilterExcludes one or more fields from the input record.v1.4.0
ExplodeFilterExplodes an array or list field into separate records.v1.4.0
FailFilterThrows an exception when a message satisfy a specific condition
GrokFilterParses an unstructured message field’s value to a struct by combining Grok patterns
GroupRowFilterRegroups multiple following messages into a single message by composing a grouping key
JoinFilterJoins values of an array field with a specified separator
JSONFilterUnmarshallings a JSON message field’s value to a complex struct
MoveFilterMoves an existing record field’s value to a specific target pathv1.5.0
MultiRowFilterCombines following message lines into single one by combining patterns
NullValueFilterCombines following message lines into single one by combining patternsv2.3.0
RenameFilterRenames a message field
SplitFilterSplits a message field’s value to array
XmlToJsonFilterParses an XML record-field and convert it to a JSON stringv2.4.0
XmlToStructFilterParses an XML record-field into STRUCTv2.4.0

Difference between Kafka Connect Single Message Transforms (SMT) functionality

Filters can be compared to Kafka Connect built-in Transformers. However, filters allow more complex pipelines to be built for structuring file data. For example, they can be used to split one input message to multiple messages or to temporarily buffer consecutive messages in order to regroup them by fields or a pattern.

Last modified March 16, 2024: ci: release version 2.14.0 🎉 (49999b8f)