Processing Filters

The list of available transformation filters.

These filters are available for use with Kafka Connect File Pulse:

FilterDescriptionSince
AppendFilterAppends one or more values to an existing or non-existing array field
ConvertFilterConverts a message field’s value to a specific type
DateFilterConverts a field’s value containing a date to a unix epoch time
DelimitedRowFilterParses a message field’s value containing columns delimited by a separator into a struct
DropFilterDrops messages satisfying a specific condition without throwing exception.
ExcludeFilterExcludes one or more fields from the input record.v1.4.0
ExplodeFilterExplodes an array or list field into separate records.v1.4.0
FailFilterThrows an exception when a message satisfy a specific condition
GrokFilterParses an unstructured message field’s value to a struct by combining Grok patterns
GroupRowFilterRegroups multiple following messages into a single message by composing a grouping key
JoinFilterJoins values of an array field with a specified separator
JSONFilterUnmarshallings a JSON message field’s value to a complex struct
MoveFilterMoves an existing record field’s value to a specific target pathv1.5.0
MultiRowFilterCombines following message lines into single one by combining patterns
NullValueFilterCombines following message lines into single one by combining patternsv2.3.0
RenameFilterRenames a message field
SplitFilterSplits a message field’s value to array
XmlToJsonFilterParses an XML record-field and convert it to a JSON stringv2.4.0
XmlToStructFilterParses an XML record-field into STRUCTv2.4.0

AppendFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter

The AppendFilter is probably one of the most important processing filters to know. It allows you to manipulate a source record by easily adding or replacing a field with a constant value or a value extracted from another existing field using ScEL.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe name of the field to be addedstring (ScEL supported)-high
valueThe value of the field to be addedstring (ScEL supported)-high
overwriteIs existing field should be overwritebooleanfalsehigh

Examples

The following examples shows how to use the AppendFilter to concat two values from the array field named values using a substitution expression. The concat value is then added to the field named result.

Configuration

filters=SubstituteFilter
filters.SubstituteFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.SubstituteFilter.field="$.result"
filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}"

Input

{
    "record" :{
      "value": ["Hello", "World"]
    }
} 

Output

{
    "record" :{
      "value": ["Hello", "World"],
      "result" : "Hello-World"
    }
}

In the previous example, we used the simple property expression result to indicate the target field to which our value is added. We have actually omitted the expression scope $value. By default, if no scope is defined in an expression, the scope $value is implicitly applied. Hence, we could have used the fully expression $value.result which is similar to the simplified expression result.

But, you can perfectly use another expression scope. For example, you can leverage the AppendFilter to dynamically resolve the record-key or the output topic based on the record data.

The following configuration show how to use the $topic scope :

filters.SubstituteFilter.field="$topic"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}"

Input

{
  "record" : {
    "value": ["Hello", "World"]
  }
} 

Output

{
  "context": { "topic" : "my-topic-hello" },
  "record" :{
    "value": ["Hello", "World"]
  }
} 

Finally, the AppendFilter can also accept a substitution expression for the property field. This allows to dynamically determine the name of the field to be added.

The following examples show how to use a property expression to get the named of the field from a

filters.SubstituteFilter.field="$.target"
filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}"

Input

{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"]
  }
}

Output

{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"],
    "result" : "Hello-World"
  }
}

ConvertFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter

The ConvertFilter can be used to convert a field’s value into a specific type.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to convert (dot notation is supported)string-high
toThe type to which the field must be convertedstring,high
defaultThe default value to apply if the field cannot be convertedstring,medium
ignoreMissingIf true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified.booleantruehigh

Supported types are :

  • SHORT
  • INTEGER
  • LONG
  • FLOAT
  • DOUBLE
  • BOOLEAN
  • STRING
  • ARRAY
  • BYTES

Examples

The following example shows how to convert a a field’s value containing the string yes into a boolean.

Configuration

filters.BooleanConverter.field="target"
filters.BooleanConverter.to="BOOLEAN"

Input

{ "record" : { "target": "yes" } }

Output

{ "record" : { "target": true } }

DateFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DateFilter

The DateFilter converts a field’s value containing a date to a unix epoch time.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to get the date from .string(ScEL supported)-high
targetThe target field.string(ScEL supported)-high
timezoneThe timezone to use for parsing date.stringUTChigh
localeThe locale to use for parsing date.stringen_ENhigh
formatsList of the expected date formats.list-high

Examples

filters=ParseISODate
filters.ParseISODate.type=io.streamthoughts.kafka.connect.filepulse.filter.DateFilter
filters.ParseISODate.field="$.date"
filters.ParseISODate.target="$.timestamp"
filters.ParseISODate.formats="yyyy-MM-dd'T'HH:mm:ss"

Input

{
  "record" : {
    "date": "2001-07-04T12:08:56"
  }
}

Output

{
  "record" : {
    "date": "2001-07-04T12:08:56",
    "timestamp": 994248536000
  }
}

DelimitedRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter.

The DelimitedRowFilter can be used to parse and stream delimited row files (i.e CSV) into Kafka. Each row is parsed and published into a configured topic as a single Kafka data.

Configuration

ConfigurationDescriptionTypeDefaultImportance
separatorThe character used as a delimiter/separator between each valuestring;high
trimColumnRemove the leading and trailing whitespaces from all columns.booleanfalselow
extractColumnNameDefine the field from which the schema should be detected (all columns will be of type ‘string’)stringhigh
autoGenerateColumnNamesDefine whether column names should autogenerated or not (column names will of the form ‘column1, column2’)truebooleanhigh
columnsThe list of comma-separated column names in order they appear in each row. columns must be in the form of NAME:TYPEstringhigh

Examples

The following example shows the use of the DelimitedRowFilter to split the message field using | as a separator character. The name of each column is extracted from the fields headers.

filters=ParseDelimitedRow
filters.ParseDelimitedRow.extractColumnNam="headers"
filters.ParseDelimitedRow.separator="\\|"
filters.ParseDelimitedRow.trimColumn="true"
filters.ParseDelimitedRow.type="io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"

DropFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DropFilter.

The DropFilter can be used to prevent some messages (i.e records) to be written into Kafka.

Configuration

ConfigurationDescriptionTypeDefaultImportance
ifCondition to apply the filter on the current record.string ScEL supported)-high
invertInvert the boolean value return from the filter condition.booleanfalsemedium

For more information about if property, see : Conditional execution.

Examples

The following example shows the usage of DropFilter to only keep records with a field level containing to ERROR.

filters=Drop
filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.Drop.if={{ equals($.level, 'ERROR') }}
filters.Drop.invert=true

ExcludeFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter.

The ExcludeFilter can be used to exclude one or more fields from the input record.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldsThe comma-separated list of field names to excludelist**high

Examples

The following example shows the usage of ExplodeFilter.

filters=Exclude
filters.Exclude.type=io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter
filters.Exclude.fields=message

Input

{ "record" : { "message": "{\"name\":\"pulse\"}", "name": "pulse" } }

Output

{ "record" : {"name": "pulse" } }

ExplodeFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.

The ExplodeFilter can be used to explode an array or list field into separate records.

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe input field on which to apply the filterstringmessagemedium

Examples

The following example shows the usage of ExplodeFilter.

filters=Explode
filters.Explode.type=io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter
filters.Explode.source=measurements

Input (single record)

{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": [38, 40, 42, 37] } }

Output (multiple records)

{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 38 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 40 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 42 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 37 } }

FailFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.FailFilter.

The fail filter can be used to throw an exception with a provided error message. For example, this can be useful to stop processing a file when a non-conform record is read.

Configuration

ConfigurationDescriptionTypeDefaultImportance
ifCondition to apply the filter on the current record.string-high
invertInvert the boolean value return from the filter condition.booleanfalsemedium
messageThe error message thrown by the filter. (ScEL supported)string-high

Examples

The following example shows the usage of FailFilter to stop processing a file when a field is equals to null.

filters=Fail
filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
filters.Fail.if={{ is_null($.user_id) }}
filters.Fail.message=Invalid row, user_id is missing : {{ $value }}

GrokFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter.

The GrokFilter allows you to parse unstructured data like applications logs to extract structured and meaningful data fields.

The GrokFilteris based on: https://github.com/streamthoughts/kafka-connect-transform-grok

Configuration

ConfigurationDescriptionTypeDefaultImportance
namedCapturesOnlyIf true, only store named captures from grok.booleantruehigh
patternThe Grok pattern to match.string-high
overwriteThe fields to overwrite.listmedium
patternDefinitionsCustom pattern definitions.list-low
patternsDirList of user-defined pattern directoriesstring-low
sourceThe input field on which to apply the filterstringmessagemedium

Examples

The following example shows the usage of GrokFilter to parse and extract fields from application log message.

filters=ParseLog4jLog
filters.ParseLog4jLog.pattern="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
filters.ParseLog4jLog.overwrite="message"
filters.ParseLog4jLog.source="message"
filters.ParseLog4jLog.type="io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter"
filters.ParseLog4jLog.ignoreFailure="true"

GroupRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldsList of fields used to regroup recordslisthigh
max.buffered.recordsThe maximum number of records to group (default : -1).integer-1high
targetThe target array field to put the grouped fieldintegerrecordshigh

Examples

JoinFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JoinFilter.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to get the date fromstring(ScEL supported)-high
targetThe target fieldstring(ScEL supported)-high
separatorThe separator used for joining array values.string,high

Examples

JSONFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.

The JSONFilter parses an input json field.

Configuration

ConfigurationDescriptionTypeDefaultImportance
overwriteThe fields to overwritelist-medium
sourceThe input field on which to apply the filterstringmessagemedium
targetThe target field to put the parsed JSON datastring-high
charsetThe charset to be used for reading the source field (if source if of type BYTESstringUTF-8medium
explode.arrayA boolean that specifies whether to explode arrays into separate recordsbooleanfalsemedium
mergeboolean that specifies whether to merge the JSON object into the top level of the input recordbooleanfalsemedium

Examples

filters=MyJsonFilter
filters.MyJsonFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter
filters.MyJsonFilter.source=message
filters.MyJsonFilter.target=payload

MultiRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter.

The MultiRowFilter joins multiple lines into a single Struct using a regex pattern.

Configuration

ConfigurationDescriptionTypeDefaultImportance
negateNegate the regexp pattern (if not matched)."boolean-medium
patternThe pattern to match multilinestring-high
patternDefinitionsCustom pattern definitions.list-low
patternsDirList of user-defined pattern directoriesstring-low
separatorThe character to be used to concat multi linesstring“\n”high

MoveFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter.

The MoveFilter moves an existing record field’s value to a specific target path.

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe path of the field to move"string-high
targetThe path to move the fieldstring-high

Examples

The following example shows the usage of the MoveFilter.

filters=MyMoveFilter
filters.MyMoveFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter
filters.MyMoveFilter.source=field.child
filters.MyMoveFilter.target=moved

Input

{ "record" : { "field": { "child" : "foo" } } }

Output

{ "record" : { "moved": "foo" } }

NullValueFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter.

The NullValueFilter is used to empty a record-value to null.

Example

filters=NullValueIfDeleteOp
filters.NullValueIfDeleteOp.type=io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter
filters.NullValueIfDeleteOp.if={{ equals($value.op, 'DELETE') }}

RenameFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter.

The RenameFilter is used to rename a specified field.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to renamestring-high
targetThe target namestring-high
ignoreMissingIf true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified.booleantruehigh

Examples

filters=RenameInputField
filters.RenameInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter
filters.RenameInputField.field=input
filters.RenameInputField.target=renamed

Input

{ "record" : { "input": "foo" } }

Output

{ "record" : { "renamed": "foo" } }

SplitFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter.

The SplitFilter splits a field’s value of type string into an array by using a specific separator.

Configuration

ConfigurationDescriptionTypeDefaultImportance
splitThe comma-separated list of fields to splitstring-high
separatorThe separator used for splitting a message field’s value to arraystring,high
targetThe target field to put the parsed JSON datastring-high

Example

Configuration

filters=SplitInputField
filters.SplitInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter
filters.SplitInputField.split=input
filters.SplitInputField.separator=,

Input

{ "record" : { "input": "val0,val1,val2" } }

Output

{
  "record" : {
    "input": "val0,val1,val2",
    "output": [ "val0", "val1", "val2"]
  }
}

XmlToJsonFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter.

The XmlToJsonFilter parses and converts an XML record-field it to a JSON string. This is filter is based on the org.json.XML library.

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe input field on which to apply the filter.string"message"high
source.charsetThe charset to be used for reading the source.string"UTF-8"high
xml.parser.keep.stringsWhen parsing the XML into JSON, specifies if values should be kept as strings (true), or if they should try to be guessed into JSON values (numeric, boolean, string)booleanfalsehigh
xml.parser.cDataTagNameThe name of the key in a JSON Object that indicates a CDATA section.string"value"high

Example

Configuration

filters=XmlToJson
filters.XmlToJson.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter
filters.XmlToJson.xml.parser.keep.strings=false
filters.XmlToJson.xml.parser.cDataTagName=data

XmlToStructFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter.

The XmlToStructFilter parses an XML record-field into STRUCT

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe input field on which to apply the filter.string"message"high
xml.force.array.on.fieldsThe comma-separated list of fields for which an array-type must be forcedList-High
xml.parser.validating.enabledSpecifies that the parser will validate documents as they are parsed (default: false).";
xml.parser.namespace.aware.enabledSpecifies that the XML parser will provide support for XML namespaces (default: false).";
xml.exclude.empty.elementsSpecifies that the reader should exclude element having no field (default: false).";
xml.exclude.node.attributesSpecifies that the reader should exclude all node attributes (default: false).";
xml.exclude.node.attributes.in.namespacesSpecifies that the reader should only exclude node attributes in the defined list of namespaces.";
xml.data.type.inference.enabledSpecifies that the reader should try to infer the type of data nodes (default: false).";
xml.attribute.prefixIf set, the name of attributes will be prepended with the specified prefix when they are added to a record (default: ‘’).";

Example

Configuration

filters=XmlToStruct
filters.ParseXmlDocument.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter
filters.ParseXmlDocument.source=message
filters.ParseXmlDocument.xml.parser.validating.enabled=true
filters.ParseXmlDocument.xml.parser.namespace.aware.enabled=true
filters.ParseXmlDocument.xml.exclude.empty.elements=true
filters.ParseXmlDocument.xml.data.type.inference.enabled=true