Processing Filters
These filters are available for use with Kafka Connect File Pulse:
| Filter | Description | Since | 
|---|---|---|
| AppendFilter | Appends one or more values to an existing or non-existing array field | |
| ConvertFilter | Converts a message field’s value to a specific type | |
| DateFilter | Converts a field’s value containing a date to a unix epoch time | |
| DelimitedRowFilter | Parses a message field’s value containing columns delimited by a separator into a struct | |
| DropFilter | Drops messages satisfying a specific condition without throwing exception. | |
| ExcludeFilter | Excludes one or more fields from the input record. | v1.4.0 | 
| ExplodeFilter | Explodes an array or list field into separate records. | v1.4.0 | 
| FailFilter | Throws an exception when a message satisfy a specific condition | |
| GrokFilter | Parses an unstructured message field’s value to a struct by combining Grok patterns | |
| GroupRowFilter | Regroups multiple following messages into a single message by composing a grouping key | |
| JoinFilter | Joins values of an array field with a specified separator | |
| JSONFilter | Unmarshallings a JSON message field’s value to a complex struct | |
| MultiRowFilter | Combines following message lines into single one by combining patterns | |
| MoveFilter | Moves an existing record field’s value to a specific target path | v1.5.0 | 
| RenameFilter | Renames a message field | |
| SplitFilter | Splits a message field’s value to array | 
AppendFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
The AppendFilter is probably one of the most important processing filters to know.
It allows you to manipulate a source record by easily adding or replacing a field with a constant
value or a value extracted from another existing field using ScEL.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| field | The name of the field to be added | string (ScEL supported) | - | high | 
| value | The value of the field to be added | string (ScEL supported) | - | high | 
| overwrite | Is existing field should be overwrite | boolean | false | high | 
Examples
The following examples shows how to use the AppendFilter to concat two values from the array field named values
using a substitution expression.
The concat value is then added to the a field named result.
Configuration
filters.SubstituteFilter.field="$.result"
filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}"
Input
{
    "record" :{
      "value": ["Hello", "World"]
    }
} 
Output
{
    "record" :{
      "value": ["Hello", "World"],
      "result" : "Hello-World"
    }
}
In the previous example, we used the simple property expression result to indicate the target field to which our value is added.
We have actually omitted the expression scope $value.
By default, if no scope is defined in an expression, the scope $value is implicitly applied.
Hence, we could have used the fully expression $value.result which is similar to the simplified expression result.
But, you can perfectly used another expression scope. For example, you can leverage the AppendFilter to dynamically
resolve the record-key or the output topic based on the record data.
The following configuration show how to use the $topic scope :
filters.SubstituteFilter.field="$topic"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}"
Input
{
  "record" : {
    "value": ["Hello", "World"]
  }
} 
Output
{
  "context": { "topic" : "my-topic-hello" },
  "record" :{
    "value": ["Hello", "World"]
  }
} 
Finally the AppendFilter can also accept an substitution expression for the property field.
This allows to dynamically determine the name of the field to be added.
The following examples show how to use a property expression to get the named of the field from a
filters.SubstituteFilter.field="$.target"
filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}"
Input
{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"]
  }
}
Output
{
  "record" : {
    "target": "result",
    "value": ["Hello", "World"],
    "result" : "Hello-World"
  }
}
ConvertFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter
The ConvertFilter can be used to convert a field’s value into a specific type.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| field | The field to convert (dot notation is supported) | string | - | high | 
| to | The type to which the field must be converted | string | , | high | 
| default | The default value to apply if the field cannot be converted | string | , | medium | 
| ignoreMissing | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | true | high | 
Supported types are :
- SHORT
- INTEGER
- LONG
- FLOAT
- DOUBLE
- BOOLEAN
- STRING
- ARRAY
- BYTES
Examples
The following example shows how to convert a a field’s value containing the string yes into a boolean.
Configuration
filters.BooleanConverter.field="target"
filters.BooleanConverter.to="BOOLEAN"
Input
{ "record" : { "target": "yes" } }
Output
{ "record" : { "target": true } }
DateFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DateFilter
The DateFilter converts a field’s value containing a date to a unix epoch time.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| field | The field to get the date from . | string(ScEL supported) | - | high | 
| target | The target field. | string(ScEL supported) | - | high | 
| timezone | The timezone to use for parsing date. | string | UTC | high | 
| locale | The locale to use for parsing date. | string | en_EN | high | 
| format | List of the expected date formats. | list | - | high | 
Examples
filters.MyDateFilter.field="$.date"
filters.MyDateFilter.target="$.timestamp"
filters.MyDateFilter.format="yyyy-MM-dd'T'HH:mm:ss"
Input
{
  "record" : {
    "date": "2001-07-04T12:08:56"
  }
}
Output
{
  "record" : {
    "date": "2001-07-04T12:08:56",
    "timestamp": 994248536000
  }
}
DelimitedRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter.
The DelimitedRowFilter can be used to parse and stream delimited row files (i.e CSV) into Kafka.
Each row is parsed and published into a configured topic as a single Kafka data.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| separator | The character used as a delimiter/separator between each value | string | ; | high | 
| trimColumn | Remove the leading and trailing whitespaces from all columns. | boolean | false | low | 
| extractColumnName | Define the field from which the schema should be detected (all columns will be of type ‘string’) | string | high | |
| autoGenerateColumnNames | Define whether column names should autogenerated or not (column names will of the form ‘column1, column2’) | true | boolean | high | 
| columns | The list of comma-separated column names in order they appear in each row. columns must be in the form of NAME:TYPE | string | high | 
Examples
The following example shows the use of the DelimitedRowFilter to split the message field using | as a separator character.
The name of each column is extracted from the fields headers.
filters.ParseDelimitedRow.extractColumnNam="headers"
filters.ParseDelimitedRow.separator="\\|"
filters.ParseDelimitedRow.trimColumn="true"
filters.ParseDelimitedRow.type="io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
Important
Under the hood, theDelimitedRowFilter will use the String#split method to parse the input line. This
method accepts a regex as argument then any special character must be escaped.DropFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DropFilter.
The DropFilter can be used to prevent some messages (i.e records) to be written into Kafka.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| if | Condition to apply the filter on the current record. | string ScEL supported) | - | high | 
| invert | Invert the boolean value return from the filter condition. | boolean | false | medium | 
For more information about if property, see : Conditional execution.
Examples
The following example shows the usage of DropFilter to only keep records with a field level containing to ERROR.
filters=Drop
filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.Drop.if={{ equals($.level, 'ERROR') }}
filters.Drop.invert=true
ExcludeFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter.
The ExcludeFilter can be used to exclude one or more fields from the input record.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| fields | The comma-separated list of field names to exclude | list | ** | high | 
Examples
The following example shows the usage of ExplodeFilter.
filters=Exclude
filters.Exclude.type=io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter
filters.Exclude.fields=message
Input
{ "record" : { "message": "{\"name\":\"pulse\"}", "name": "pulse" } }
Output
{ "record" : {"name": "pulse" } }
ExplodeFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.
The ExplodeFilter can be used to explode an array or list field into separate records.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| source | The input field on which to apply the filter | string | message | medium | 
Examples
The following example shows the usage of ExplodeFilter.
filters=Explode
filters.Explode.type=io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter
filters.Explode.source=measurements
Input (single record)
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": [38, 40, 42, 37] } }
Output (multiple records)
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 38 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 40 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 42 } }
{ "record" : { "id": "captor-0001", "date": "2020-08-06T17:00:00", "measurements": 37 } }
FailFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.FailFilter.
The fail filter can be used to throw an exception with a provided error message. For example, this can be useful to stop processing a file when a non-conform record is read.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| if | Condition to apply the filter on the current record. | string | - | high | 
| invert | Invert the boolean value return from the filter condition. | boolean | false | medium | 
| message | The error message thrown by the filter. (ScEL supported) | string | - | high | 
Examples
The following example shows the usage of FailFilter to stop processing a file when a field is equals to null.
filters=Fail
filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
filters.Fail.if={{ is_null($.user_id) }}
filters.Fail.message=Invalid row, user_id is missing : {{ $value }}
GrokFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter.
The GrokFilter allows you to parse unstructured data like applications logs to extract structured and meaningful data fields.
Regular Expressions Grok are built on top of on regular expressions, so you can use any regular expressions as well to define your own patterns.
Internally, FilePulse uses the regular expression library is Joni , a Java port of Oniguruma regexp library.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| namedCapturesOnly | If true, only store named captures from grok. | boolean | true | high | 
| matches | The Grok pattern to match. | string | - | high | 
| overwrite | The fields to overwrite. | list | medium | |
| patternDefinitions | Custom pattern definitions. | list | - | low | 
| patternsDir | List of user-defined pattern directories | string | - | low | 
| source | The input field on which to apply the filter | string | message | medium | 
Examples
The following example shows the usage of GrokFilter to parse and extract fields from application log message.
filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
filters.ParseLog4jLog.overwrite="message"
filters.ParseLog4jLog.source="message"
filters.ParseLog4jLog.type="io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter"
filters.ParseLog4jLog.ignoreFailure="true"
GroupRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| fields | List of fields used to regroup records | list | high | |
| max.buffered.records | The maximum number of records to group (default : -1). | integer | -1 | high | 
| target | The target array field to put the grouped field | integer | records | high | 
Examples
JoinFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JoinFilter.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| field | The field to get the date from | string(ScEL supported) | - | high | 
| target | The target field | string(ScEL supported) | - | high | 
| separator | The separator used for joining array values. | string | , | high | 
Examples
JSONFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.
The JSONFilter parses an input json field.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| overwrite | The fields to overwrite | list | - | medium | 
| source | The input field on which to apply the filter | string | message | medium | 
| target | The target field to put the parsed JSON data | string | - | high | 
| charset | The charset to be used for reading the source field (if source if of type BYTES | string | UTF-8 | medium | 
| explode.array | A boolean that specifies whether to explode arrays into separate records | boolean | false | medium | 
| merge | boolean that specifies whether to merge the JSON object into the top level of the input record | boolean | false | medium | 
Examples
filters=MyJsonFilter
filters.MyJsonFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter
filters.MyJsonFilter.source=message
filters.MyJsonFilter.target=payload
MultiRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter.
The MultiRowFilter joins multiple lines into a single Struct using a regex pattern.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| negate | Negate the regexp pattern (if not matched)." | boolean | - | medium | 
| pattern | The pattern to match multiline | string | - | high | 
| patternDefinitions | Custom pattern definitions. | list | - | low | 
| patternsDir | List of user-defined pattern directories | string | - | low | 
| separator | The character to be used to concat multi lines | string | “\n” | high | 
MoveFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter.
The MoveFilter moves an existing record field’s value to a specific target path.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| source | The path of the field to move" | string | - | high | 
| target | The path to move the field | string | - | high | 
Examples
The following example shows the usage of the MoveFilter.
filters=MyMoveFilter
filters.MyMoveFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter
filters.MyMoveFilter.source=field.child
filters.MyMoveFilter.target=moved
Input
{ "record" : { "field": { "child" : "foo" } } }
Output
{ "record" : { "moved": "foo" } }
RenameFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter.
The RenameFilter is used to rename a specified field.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| field | The field to rename | string | - | high | 
| target | The target name | string | - | high | 
| ignoreMissing | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | true | high | 
Examples
filters.MyRenameFilter.field=input
filters.MyRenameFilter.target=renamed
Input
{ "record" : { "input": "foo" } }
Output
{ "record" : { "renamed": "foo" } }
SplitFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter.
The SplitFilter splits a field’s value of type string into an array by using a specific separator.
Configuration
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
| split | The comma-separated list of fields to split | string | - | high | 
| separator | The separator used for splitting a message field’s value to array | string | , | high | 
| target | The target field to put the parsed JSON data | string | - | high | 
Example
Configuration
filters.MySplitterFilter.split=input
filters.MySplitterFilter.separator=,
Input
{
  "record" : {
    "input": "val0,val1,val2"
  }
}
Output
{
  "record" : {
    "input": "val0,val1,val2",
    "output": [ "val0", "val1", "val2"]
  }
}