Processing Filters

The list of available transformation filters.

These filters are available for use with Kafka Connect File Pulse:

FilterDescriptionSince
AppendFilterAppends one or more values to an existing or non-existing array field
ConvertFilterConverts a message field’s value to a specific type
DateFilterConverts a field’s value containing a date to a unix epoch time
DelimitedRowFilterParses a message field’s value containing columns delimited by a separator into a struct
DropFilterDrops messages satisfying a specific condition without throwing exception.
ExcludeFilterExcludes one or more fields from the input record.v1.4.0
ExplodeFilterExplodes an array or list field into separate records.v1.4.0
FailFilterThrows an exception when a message satisfy a specific condition
GrokFilterParses an unstructured message field’s value to a struct by combining Grok patterns
GroupRowFilterRegroups multiple following messages into a single message by composing a grouping key
JoinFilterJoins values of an array field with a specified separator
JSONFilterUnmarshallings a JSON message field’s value to a complex struct
MultiRowFilterCombines following message lines into single one by combining patterns
MoveFilterMoves an existing record field’s value to a specific target pathv1.5.0
RenameFilterRenames a message field
SplitFilterSplits a message field’s value to array

AppendFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter

The AppendFilter is probably one of the most important processing filters to know. It allows you to manipulate a source record by easily adding or replacing a field with a constant value or a value extracted from another existing field using ScEL.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe name of the field to be addedstring (ScEL supported)-high
valueThe value of the field to be addedstring (ScEL supported)-high
overwriteIs existing field should be overwritebooleanfalsehigh

Examples

The following examples shows how to use the AppendFilter to concat two values from the array field named values using a substitution expression. The concat value is then added to the a field named result.

Configuration

filters.SubstituteFilter.field="$.result"
filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}"

Input

{
    "record" :{
      "value": ["Hello", "World"]
    }
} 

Output

{
    "record" :{
      "value": ["Hello", "World"],
      "result" : "Hello-World"
    }
}

In the previous example, we used the simple property expression result to indicate the target field to which our value is added. We have actually omitted the expression scope $value. By default, if no scope is defined in an expression, the scope $value is implicitly applied. Hence, we could have used the fully expression $value.result which is similar to the simplified expression result.

But, you can perfectly used another expression scope. For example, you can leverage the AppendFilter to dynamically resolve the record-key or the output topic based on the record data.

The following configuration show how to use the $topic scope :

filters.SubstituteFilter.field="$topic"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}"

Input

{
  "record" : {
    "value": ["Hello", "World"]
  }
} 

Output

{
  "context": { "topic" : "my-topic-hello" },
  "record" :{
    "value": ["Hello", "World"]
  }
} 

Finally the AppendFilter can also accept an substitution expression for the property field. This allows to dynamically determine the name of the field to be added.

The following examples show how to use a property expression to get the named of the field from a

filters.SubstituteFilter.field="$.target"
filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}"

Input

{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"]
  }
}

Output

{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"],
    "result" : "Hello-World"
  }
}

ConvertFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter

The ConvertFilter can be used to convert a field’s value into a specific type.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to convert (dot notation is supported)string-high
toThe type to which the field must be convertedstring,high
defaultThe default value to apply if the field cannot be convertedstring,medium
ignoreMissingIf true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified.booleantruehigh

Supported types are :

  • SHORT
  • INTEGER
  • LONG
  • FLOAT
  • DOUBLE
  • BOOLEAN
  • STRING
  • ARRAY
  • BYTES

Examples

The following example shows how to convert a a field’s value containing the string yes into a boolean.

Configuration

filters.BooleanConverter.field="target"
filters.BooleanConverter.to="BOOLEAN"

Input

{ "record" : { "target": "yes" } }

Output

{ "record" : { "target": true } }

DateFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DateFilter

The DateFilter converts a field’s value containing a date to a unix epoch time.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to get the date from .string(ScEL supported)-high
targetThe target field.string(ScEL supported)-high
timezoneThe timezone to use for parsing date.stringUTChigh
localeThe locale to use for parsing date.stringen_ENhigh
formatList of the expected date formats.list-high

Examples

filters.MyDateFilter.field="$.date"
filters.MyDateFilter.target="$.timestamp"
filters.MyDateFilter.format="yyyy-MM-dd'T'HH:mm:ss"

Input

{
  "record" : {
    "date": "2001-07-04T12:08:56"
  }
}

Output

{
  "record" : {
    "date": "2001-07-04T12:08:56",
    "timestamp": 994248536000
  }
}

DelimitedRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter.

The DelimitedRowFilter can be used to parse and stream delimited row files (i.e CSV) into Kafka. Each row is parsed and published into a configured topic as a single Kafka data.

Configuration

ConfigurationDescriptionTypeDefaultImportance
separatorThe character used as a delimiter/separator between each valuestring;high
trimColumnRemove the leading and trailing whitespaces from all columns.booleanfalselow
extractColumnNameDefine the field from which the schema should be detected (all columns will be of type ‘string’)stringhigh
autoGenerateColumnNamesDefine whether column names should autogenerated or not (column names will of the form ‘column1, column2’)truebooleanhigh
columnsThe list of comma-separated column names in order they appear in each row. columns must be in the form of NAME:TYPEstringhigh

Examples

The following example shows the use of the DelimitedRowFilter to split the message field using | as a separator character. The name of each column is extracted from the fields headers.

filters.ParseDelimitedRow.extractColumnNam="headers"
filters.ParseDelimitedRow.separator="\\|"
filters.ParseDelimitedRow.trimColumn="true"
filters.ParseDelimitedRow.type="io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"

DropFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DropFilter.

The DropFilter can be used to prevent some messages (i.e records) to be written into Kafka.

Configuration

ConfigurationDescriptionTypeDefaultImportance
ifCondition to apply the filter on the current record.string ScEL supported)-high
invertInvert the boolean value return from the filter condition.booleanfalsemedium

For more information about if property, see : Conditional execution.

Examples

The following example shows the usage of DropFilter to only keep records with a field level containing to ERROR.

filters=Drop
filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.Drop.if={{ equals($.level, 'ERROR') }}
filters.Drop.invert=true

ExcludeFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter.

The ExcludeFilter can be used to exclude one or more fields from the input record.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldsThe comma-separated list of field names to excludelist**high

Examples

The following example shows the usage of ExplodeFilter.

filters=Exclude
filters.Exclude.type=io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter
filters.Exclude.fields=message

Input

{ "record" : { "message": "{\"name\":\"pulse\"}", "name": "pulse" } }

Output

{ "record" : {"name": "pulse" } }

ExplodeFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.

The ExplodeFilter can be used to explode an array or list field into separate records.

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe input field on which to apply the filterstringmessagemedium

Examples

The following example shows the usage of ExplodeFilter.

filters=Explode
filters.Explode.type=io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter
filters.Explode.source=measurements

Input (single record)

{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": [38, 40, 42, 37] } }

Output (multiple records)

{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 38 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 40 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 42 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 37 } }

FailFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.FailFilter.

The fail filter can be used to throw an exception with a provided error message. For example, this can be useful to stop processing a file when a non-conform record is read.

Configuration

ConfigurationDescriptionTypeDefaultImportance
ifCondition to apply the filter on the current record.string-high
invertInvert the boolean value return from the filter condition.booleanfalsemedium
messageThe error message thrown by the filter. (ScEL supported)string-high

Examples

The following example shows the usage of FailFilter to stop processing a file when a field is equals to null.

filters=Fail
filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
filters.Fail.if={{ is_null($.user_id) }}
filters.Fail.message=Invalid row, user_id is missing : {{ $value }}

GrokFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter.

The GrokFilter allows you to parse unstructured data like applications logs to extract structured and meaningful data fields.

Regular Expressions Grok are built on top of on regular expressions, so you can use any regular expressions as well to define your own patterns.

Internally, FilePulse uses the regular expression library is Joni , a Java port of Oniguruma regexp library.

Configuration

ConfigurationDescriptionTypeDefaultImportance
namedCapturesOnlyIf true, only store named captures from grok.booleantruehigh
matchesThe Grok pattern to match.string-high
overwriteThe fields to overwrite.listmedium
patternDefinitionsCustom pattern definitions.list-low
patternsDirList of user-defined pattern directoriesstring-low
sourceThe input field on which to apply the filterstringmessagemedium

Examples

The following example shows the usage of GrokFilter to parse and extract fields from application log message.

filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
filters.ParseLog4jLog.overwrite="message"
filters.ParseLog4jLog.source="message"
filters.ParseLog4jLog.type="io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter"
filters.ParseLog4jLog.ignoreFailure="true"

GroupRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldsList of fields used to regroup recordslisthigh
max.buffered.recordsThe maximum number of records to group (default : -1).integer-1high
targetThe target array field to put the grouped fieldintegerrecordshigh

Examples

JoinFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JoinFilter.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to get the date fromstring(ScEL supported)-high
targetThe target fieldstring(ScEL supported)-high
separatorThe separator used for joining array values.string,high

Examples

JSONFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.

The JSONFilter parses an input json field.

Configuration

ConfigurationDescriptionTypeDefaultImportance
overwriteThe fields to overwritelist-medium
sourceThe input field on which to apply the filterstringmessagemedium
targetThe target field to put the parsed JSON datastring-high
charsetThe charset to be used for reading the source field (if source if of type BYTESstringUTF-8medium
explode.arrayA boolean that specifies whether to explode arrays into separate recordsbooleanfalsemedium
mergeboolean that specifies whether to merge the JSON object into the top level of the input recordbooleanfalsemedium

Examples

filters=MyJsonFilter
filters.MyJsonFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter
filters.MyJsonFilter.source=message
filters.MyJsonFilter.target=payload

MultiRowFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter.

The MultiRowFilter joins multiple lines into a single Struct using a regex pattern.

Configuration

ConfigurationDescriptionTypeDefaultImportance
negateNegate the regexp pattern (if not matched)."boolean-medium
patternThe pattern to match multilinestring-high
patternDefinitionsCustom pattern definitions.list-low
patternsDirList of user-defined pattern directoriesstring-low
separatorThe character to be used to concat multi linesstring“\n”high

MoveFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter.

The MoveFilter moves an existing record field’s value to a specific target path.

Configuration

ConfigurationDescriptionTypeDefaultImportance
sourceThe path of the field to move"string-high
targetThe path to move the fieldstring-high

Examples

The following example shows the usage of the MoveFilter.

filters=MyMoveFilter
filters.MyMoveFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter
filters.MyMoveFilter.source=field.child
filters.MyMoveFilter.target=moved

Input

{ "record" : { "field": { "child" : "foo" } } }

Output

{ "record" : { "moved": "foo" } }

RenameFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter.

The RenameFilter is used to rename a specified field.

Configuration

ConfigurationDescriptionTypeDefaultImportance
fieldThe field to renamestring-high
targetThe target namestring-high
ignoreMissingIf true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified.booleantruehigh

Examples

filters.MyRenameFilter.field=input
filters.MyRenameFilter.target=renamed

Input

{ "record" : { "input": "foo" } }

Output

{ "record" : { "renamed": "foo" } }

SplitFilter

The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter.

The SplitFilter splits a field’s value of type string into an array by using a specific separator.

Configuration

ConfigurationDescriptionTypeDefaultImportance
splitThe comma-separated list of fields to splitstring-high
separatorThe separator used for splitting a message field’s value to arraystring,high
targetThe target field to put the parsed JSON datastring-high

Example

Configuration

filters.MySplitterFilter.split=input
filters.MySplitterFilter.separator=,

Input

{
  "record" : {
    "input": "val0,val1,val2"
  }
}

Output

{
  "record" : {
    "input": "val0,val1,val2",
    "output": [ "val0", "val1", "val2"]
  }
}