Processing Filters
These filters are available for use with Kafka Connect File Pulse:
Filter | Description | Since |
---|---|---|
AppendFilter | Appends one or more values to an existing or non-existing array field | |
ConvertFilter | Converts a message field’s value to a specific type | |
DateFilter | Converts a field’s value containing a date to a unix epoch time | |
DelimitedRowFilter | Parses a message field’s value containing columns delimited by a separator into a struct | |
DropFilter | Drops messages satisfying a specific condition without throwing exception. | |
ExcludeFilter | Excludes one or more fields from the input record. | v1.4.0 |
ExplodeFilter | Explodes an array or list field into separate records. | v1.4.0 |
FailFilter | Throws an exception when a message satisfy a specific condition | |
GrokFilter | Parses an unstructured message field’s value to a struct by combining Grok patterns | |
GroupRowFilter | Regroups multiple following messages into a single message by composing a grouping key | |
JoinFilter | Joins values of an array field with a specified separator | |
JSONFilter | Unmarshalling a JSON message field’s value to a complex struct | |
MoveFilter | Moves an existing record field’s value to a specific target path | v1.5.0 |
MultiRowFilter | Combines following message lines into single one by combining patterns | |
NullValueFilter | Combines following message lines into single one by combining patterns | v2.3.0 |
RenameFilter | Renames a message field | |
SplitFilter | Splits a message field’s value to array | |
XmlToJsonFilter | Parses an XML record-field and convert it to a JSON string | v2.4.0 |
XmlToStructFilter | Parses an XML record-field into STRUCT | v2.4.0 |
AppendFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
The AppendFilter
is probably one of the most important processing filters to know.
It allows you to manipulate a source record by easily adding or replacing a field with a constant
value or a value extracted from another existing field
using ScEL.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
field | The name of the field to be added | string (ScEL supported) | - | high |
value | The value of the field to be added | string (ScEL supported) | - | high |
overwrite | Is existing field should be overwrite | boolean | false | high |
Examples
The following examples shows how to use the AppendFilter
to concat two values from the array field named values
using
a substitution expression
.
The concat value is then added to the field named result
.
Configuration
filters=SubstituteFilter
filters.SubstituteFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.SubstituteFilter.field="$.result"
filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}"
Input
{
"record": {
"value": [
"Hello",
"World"
]
}
}
Output
{
"record": {
"value": [
"Hello",
"World"
],
"result": "Hello-World"
}
}
In the previous example, we used the simple property expression result
to indicate the target field to which our value
is added.
We have actually omitted
the expression scope $value
.
By default, if no scope is defined in an expression, the scope $value
is implicitly applied.
Hence, we could have used the fully expression $value.result
which is similar to the simplified expression result
.
But, you can perfectly use another expression scope. For example, you can leverage the AppendFilter
to dynamically
resolve the record-key or the output topic based on the record data.
The following configuration show how to use the $topic
scope :
filters.SubstituteFilter.field="$topic"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}"
Input
{
"record": {
"value": [
"Hello",
"World"
]
}
}
Output
{
"context": {
"topic": "my-topic-hello"
},
"record": {
"value": [
"Hello",
"World"
]
}
}
Finally, the AppendFilter
can also accept a substitution expression for the property field.
This allows to dynamically determine the name of the field to be added.
The following examples show how to use a property expression to get the named of the field from a
filters.SubstituteFilter.field="$.target"
filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}"
Input
{
"record": {
"target": "result",
"value": [
"Hello",
"World"
]
}
}
Output
{
"record": {
"target": "result",
"value": [
"Hello",
"World"
],
"result": "Hello-World"
}
}
ConvertFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter
The ConvertFilter
can be used to convert a field’s value into a specific type.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
field | The field to convert (dot notation is supported) | string | - | high |
to | The type to which the field must be converted | string | , | high |
default | The default value to apply if the field cannot be converted | string | , | medium |
ignoreMissing | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | true | high |
Supported types are :
SHORT
INTEGER
LONG
FLOAT
DOUBLE
BOOLEAN
STRING
ARRAY
BYTES
Examples
The following example shows how to convert a a field’s value containing the string yes
into a boolean.
Configuration
filters.BooleanConverter.field="target"
filters.BooleanConverter.to="BOOLEAN"
Input
{
"record": {
"target": "yes"
}
}
Output
{
"record": {
"target": true
}
}
DateFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DateFilter
The DateFilter
converts a field’s value containing a date to a unix epoch time.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
field | The field to get the date from . | string(ScEL supported) | - | high |
target | The target field. | string(ScEL supported) | - | high |
timezone | The timezone to use for parsing date. | string | UTC | high |
locale | The locale to use for parsing date. | string | en_EN | high |
formats | List of the expected date formats. | list | - | high |
Examples
filters=ParseISODate
filters.ParseISODate.type=io.streamthoughts.kafka.connect.filepulse.filter.DateFilter
filters.ParseISODate.field="$.date"
filters.ParseISODate.target="$.timestamp"
filters.ParseISODate.formats="yyyy-MM-dd'T'HH:mm:ss"
Input
{
"record": {
"date": "2001-07-04T12:08:56"
}
}
Output
{
"record": {
"date": "2001-07-04T12:08:56",
"timestamp": 994248536000
}
}
DelimitedRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
.
The DelimitedRowFilter
can be used to parse and stream delimited row files (i.e CSV) into Kafka.
Each row is parsed and published into a configured topic as a single Kafka data.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
separator | The character used as a delimiter/separator between each value | string | ; | high |
trimColumn | Remove the leading and trailing whitespaces from all columns. | boolean | false | low |
extractColumnName | Define the field from which the schema should be detected (all columns will be of type ‘string’) | string | high | |
columns | The list of comma-separated column names in order they appear in each row. columns must be in the form of NAME:TYPE | string | high |
Examples
The following example shows the use of the DelimitedRowFilter
to split the message
field using |
as a separator
character.
The name of each column is extracted from the fields headers
.
filters=ParseDelimitedRow
filters.ParseDelimitedRow.extractColumnNam="headers"
filters.ParseDelimitedRow.separator="\\|"
filters.ParseDelimitedRow.trimColumn="true"
filters.ParseDelimitedRow.type="io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
Important
Under the hood, theDelimitedRowFilter
will use
the String#split
method to
parse the input line. This
method accepts a regex as argument then any special character must be escaped.DropFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
.
The DropFilter
can be used to prevent some messages (i.e records) to be written into Kafka.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
if | Condition to apply the filter on the current record. | string ScEL supported) | - | high |
invert | Invert the boolean value return from the filter condition. | boolean | false | medium |
For more information about if
property, see : Conditional execution.
Examples
The following example shows the usage of DropFilter to only keep records with a field level
containing to ERROR
.
filters=Drop
filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.Drop.if={{ equals($.level, 'ERROR') }}
filters.Drop.invert=true
ExcludeFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter
.
The ExcludeFilter
can be used to exclude one or more fields from the input record.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
fields | The comma-separated list of field names to exclude | list | ** | high |
Examples
The following example shows the usage of ExplodeFilter.
filters=Exclude
filters.Exclude.type=io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter
filters.Exclude.fields=message
Input
{
"record": {
"message": "{\"name\":\"pulse\"}",
"name": "pulse"
}
}
Output
{
"record": {
"name": "pulse"
}
}
ExplodeFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter
.
The ExplodeFilter
can be used to explode an array or list field into separate records.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
source | The input field on which to apply the filter | string | message | medium |
Examples
The following example shows the usage of ExplodeFilter.
filters=Explode
filters.Explode.type=io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter
filters.Explode.source=measurements
Input (single record)
{
"record": {
"id": "captor-0001",
"date": "2020-08-06T17:00:00",
"measurements": [
38,
40,
42,
37
]
}
}
Output (multiple records)
{
"record": {
"id": "captor-0001",
"date": "2020-08-06T17:00:00",
"measurements": 38
}
}
{
"record": {
"id": "captor-0001",
"date": "2020-08-06T17:00:00",
"measurements": 40
}
}
{
"record": {
"id": "captor-0001",
"date": "2020-08-06T17:00:00",
"measurements": 42
}
}
{
"record": {
"id": "captor-0001",
"date": "2020-08-06T17:00:00",
"measurements": 37
}
}
FailFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
.
The fail filter can be used to throw an exception with a provided error message. For example, this can be useful to stop processing a file when a non-conform record is read.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
if | Condition to apply the filter on the current record. | string | - | high |
invert | Invert the boolean value return from the filter condition. | boolean | false | medium |
message | The error message thrown by the filter. (ScEL supported) | string | - | high |
Examples
The following example shows the usage of FailFilter to stop processing a file when a field is equals to null
.
filters=Fail
filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
filters.Fail.if={{ is_null($.user_id) }}
filters.Fail.message=Invalid row, user_id is missing : {{ $value }}
GrokFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter
.
The GrokFilter
allows you to parse unstructured data like applications logs to extract structured and meaningful data
fields.
The GrokFilter
is based on: https://github.com/streamthoughts/kafka-connect-transform-grok
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
namedCapturesOnly | If true, only store named captures from grok. | boolean | true | high |
pattern | The Grok pattern to match. | string | - | high |
overwrite | The fields to overwrite. | list | medium | |
patternDefinitions | Custom pattern definitions. | list | - | low |
patternsDir | List of user-defined pattern directories | string | - | low |
source | The input field on which to apply the filter | string | message | medium |
Examples
The following example shows the usage of GrokFilter to parse and extract fields from application log message.
filters=ParseLog4jLog
filters.ParseLog4jLog.pattern="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
filters.ParseLog4jLog.overwrite="message"
filters.ParseLog4jLog.source="message"
filters.ParseLog4jLog.type="io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter"
filters.ParseLog4jLog.ignoreFailure="true"
GroupRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter
.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
fields | List of fields used to regroup records | list | high | |
max.buffered.records | The maximum number of records to group (default : -1). | integer | -1 | high |
target | The target array field to put the grouped field | integer | records | high |
Examples
JoinFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JoinFilter
.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
field | The field to get the date from | string(ScEL supported) | - | high |
target | The target field | string(ScEL supported) | - | high |
separator | The separator used for joining array values. | string | , | high |
Examples
JSONFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter
.
The JSONFilter
parses an input json field.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
overwrite | The fields to overwrite | list | - | medium |
source | The input field on which to apply the filter | string | message | medium |
target | The target field to put the parsed JSON data | string | - | high |
charset | The charset to be used for reading the source field (if source if of type BYTES | string | UTF-8 | medium |
explode.array | A boolean that specifies whether to explode arrays into separate records | boolean | false | medium |
merge | boolean that specifies whether to merge the JSON object into the top level of the input record | boolean | false | medium |
Examples
filters=MyJsonFilter
filters.MyJsonFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter
filters.MyJsonFilter.source=message
filters.MyJsonFilter.target=payload
MultiRowFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter
.
The MultiRowFilter
joins multiple lines into a single Struct using a regex pattern.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
negate | Negate the regexp pattern (if not matched). | boolean | - | medium |
pattern | The pattern to match multiline | string | - | high |
patternDefinitions | Custom pattern definitions. | list | - | low |
patternsDir | List of user-defined pattern directories | string | - | low |
separator | The character to be used to concat multi lines | string | “\n” | high |
MoveFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter
.
The MoveFilter
moves an existing record field’s value to a specific target path.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
source | The path of the field to move" | string | - | high |
target | The path to move the field | string | - | high |
Examples
The following example shows the usage of the MoveFilter
.
filters=MyMoveFilter
filters.MyMoveFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.MoveFilter
filters.MyMoveFilter.source=field.child
filters.MyMoveFilter.target=moved
Input
{
"record": {
"field": {
"child": "foo"
}
}
}
Output
{
"record": {
"moved": "foo"
}
}
NullValueFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter
.
The NullValueFilter
is used to empty a record-value to null.
Example
filters=NullValueIfDeleteOp
filters.NullValueIfDeleteOp.type=io.streamthoughts.kafka.connect.filepulse.filter.NullValueFilter
filters.NullValueIfDeleteOp.if={{ equals($value.op, 'DELETE') }}
RenameFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter
.
The RenameFilter
is used to rename a specified field.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
field | The field to rename | string | - | high |
target | The target name | string | - | high |
ignoreMissing | If true and field does not exist the filter will be apply successfully without modifying the data. If field is null the schema will be modified. | boolean | true | high |
Examples
filters=RenameInputField
filters.RenameInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter
filters.RenameInputField.field=input
filters.RenameInputField.target=renamed
Input
{
"record": {
"input": "foo"
}
}
Output
{
"record": {
"renamed": "foo"
}
}
SplitFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter
.
The SplitFilter
splits a field’s value of type string into an array by using a specific separator.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
split | The comma-separated list of fields to split | string | - | high |
separator | The separator used for splitting a message field’s value to array | string | , | high |
target | The target field to put the parsed JSON data | string | - | high |
Example
Configuration
filters=SplitInputField
filters.SplitInputField.type=io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter
filters.SplitInputField.split=input
filters.SplitInputField.separator=,
Input
{
"record": {
"input": "val0,val1,val2"
}
}
Output
{
"record": {
"input": "val0,val1,val2",
"output": [
"val0",
"val1",
"val2"
]
}
}
XmlToJsonFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter
.
The XmlToJsonFilter
parses and converts an XML record-field it to a JSON string.
This is filter is based on the org.json.XML
library.
Configuration
Configuration | Description | Type | Default | Importance |
---|---|---|---|---|
source | The input field on which to apply the filter. | string | "message" | high |
source.charset | The charset to be used for reading the source. | string | "UTF-8" | high |
xml.parser.keep.strings | When parsing the XML into JSON, specifies if values should be kept as strings (true), or if they should try to be guessed into JSON values (numeric, boolean, string) | boolean | false | high |
xml.parser.cDataTagName | The name of the key in a JSON Object that indicates a CDATA section. | string | "value" | high |
Example
Configuration
filters=XmlToJson
filters.XmlToJson.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToJsonFilter
filters.XmlToJson.xml.parser.keep.strings=false
filters.XmlToJson.xml.parser.cDataTagName=data
XmlToStructFilter
The following provides usage information for : io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter
.
The XmlToStructFilter
parses an XML record-field into STRUCT
Configuration
Configuration | Since | Description | Type | Default | Importance |
---|---|---|---|---|---|
source | The input field on which to apply the filter. | string | "message" | High | |
xml.force.array.on.fields | The comma-separated list of fields for which an array-type must be forced | List | High | ||
xml.parser.validating.enabled | Specifies that the parser will validate documents as they are parsed. | boolean | false | Low | |
xml.parser.namespace.aware.enabled | Specifies that the XML parser will provide support for XML namespaces. | boolean | false | Low | |
xml.exclude.empty.elements | Specifies that the reader should exclude element having no field. | boolean | false | Low | |
xml.exclude.node.attributes | Specifies that the reader should exclude all node attributes. | boolean | false | Low | |
xml.exclude.node.attributes.in.namespaces | Specifies that the reader should only exclude node attributes in the defined list of namespaces. | List | false | Low | |
xml.data.type.inference.enabled | Specifies that the reader should try to infer the type of data nodes. | boolean | false | High | |
xml.attribute.prefix | If set, the name of attributes will be prepended with the specified prefix when they are added to a record. | string | "" | Low | |
xml.content.field.name | 2.5.0 | Specifies the name to be used for naming the field that will contain the value of a TextNode element having attributes. | string | value | Low |
xml.field.name.characters.regex.pattern | 2.5.0 | Specifies the regex pattern to use for matching the characters in XML element name to replace when converting a document to a struct. | string | `[.-]' | Low |
xml.field.name.character.string.replacement | 2.5.0 | Specifies the replacement string to be used when converting a document to a struct. | string | _ | Low |
xml.force.content.field.for.paths | 2.5.0 | The comma-separated list of field for which a content-field must be forced. | List | Low |
Example
Configuration
filters=XmlToStruct
filters.ParseXmlDocument.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter
filters.ParseXmlDocument.source=message
filters.ParseXmlDocument.xml.parser.validating.enabled=true
filters.ParseXmlDocument.xml.parser.namespace.aware.enabled=true
filters.ParseXmlDocument.xml.exclude.empty.elements=true
filters.ParseXmlDocument.xml.data.type.inference.enabled=true