Configuration

The common configurations for deploying a File Pulse connector.

Commons configuration

Whatever the kind of files you are processing a connector should always be configured with the below properties. These configurations are described in detail in subsequent chapters.

Common Kafka Connect properties

Configuration Description Type Default Importance
topic The default output topic to write string - high
tasks.max The maximum number of tasks that should be created for this connector. string - high

Properties for listing and cleaning object files (FileSystemListing)

Configuration Description Type Default Importance
fs.listing.class Class which is used to list eligible files from the scanned file system. class - MEDIUM
fs.listing.filters Filters use to list eligible input files list - MEDIUM
fs.listing.interval.ms Time interval (in milliseconds) at wish to scan input directory long 10000 HIGH
fs.listing.task.delegation.enabled Boolean indicating whether the file listing process should be delegated to tasks. boolean false LOW
fs.cleanup.policy.class The fully qualified name of the class which is used to cleanup files class - HIGH
fs.cleanup.policy.triggered.on Specify the status when a file get cleanup. Valid values are: COMPLETED, COMMITTED string COMPLETED MEDIUM
max.scheduled.files Maximum number of files that can be schedules to tasks. long 1000 HIGH
allow.tasks.reconfiguration.after.timeout.ms Specify the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed. long - LOW
task.partitioner.class The TaskPartitioner to be used for partitioning files to tasks. class io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner HIGH
tasks.halt.on.error Should a task halt when it encounters an error or continue to the next file. boolean false HIGH
tasks.empty.poll.wait.ms The amount of time in millisecond a tasks should wait if a poll returns an empty list of records. long 500 HIGH
ignore.committed.offsets Should a task ignore committed offsets while scheduling a file. boolean false LOW
value.connect.schema The schema for the record-value. string - MEDIUM

Properties for transforming object file record(Filters Chain Definition)

Configuration Description Type Default Importance
filters List of filters aliases to apply on each data (order is important) list - MEDIUM

Properties for reading object file record(FileReaders)

Configuration Description Type Default Importance
tasks.reader.class The fully qualified name of the class which is used by tasks to read input files class - HIGH

Properties for uniquely identifying object files and records (FileReaders)

Configuration Description Type Default Importance
offset.policy.class Class which is used to determine the source partition and offset that uniquely identify a input record class io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy HIGH

Properties for synchronizing Connector and Tasks

Configuration Description Type Default Importance
tasks.file.status.storage.class The FileObjectStateBackingStore class to be used for storing status state of file objects. Class io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore HIGH

Available implementations are :

  • io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore
  • io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore

Properties for configuring the KafkaFileObjectStateBackingStore class

Configuration Description Type Default Importance
tasks.file.status.storage.topic Name of the internal topic used by tasks and connector to report and monitor file progression. class connect-file-pulse-status HIGH
tasks.file.status.storage.bootstrap.servers A list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster. string - HIGH
tasks.file.status.storage.topic.partitions The number of partitions to be used for the status storage topic. int - LOW
tasks.file.status.storage.topic.replication.factor The replication factor to be used for the status storage topic. float - LOW

Examples

Some configuration examples are available here.

Defining Connect Record Schema

The optional value.connect.schema config property can be used to set the connect-record schema that should be used. If there is no schema pass through the connector configuration, a schema will be resolved for each record produced.

The value.connect.schema must be passed as a JSON string that respects the following schema (using Avro representation):

{
   "type":"record",
   "name":"Schema",
   "fields":[
      {
         "name":"name",
         "type":"string",
         "doc": "The name of this schema"
      },
      {
         "name":"type",
         "type":{
            "type":"enum",
            "name":"Type",
            "symbols":[
               "STRUCT",
               "STRING",
               "BOOLEAN",
               "INT8",
               "INT16",
               "INT32",
               "INT64",
               "FLOAT32",
               "FLOAT64",
               "BYTES",
               "MAP",
               "ARRAY"
            ]
         },
         "doc": "The type of this schema"
      },
      {
         "name":"doc",
         "type":[
            "null",
            "string"
         ],
         "default":null,  
         "doc": "The documentation for this schema"
      },
      {
         "name":"fieldSchemas",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The fields for this Schema. Throws a DataException if this schema is not a struct."
      },
      {
         "name":"valueSchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The value schema for this map or array schema. Throws a DataException if this schema is not a map or array."
      },
      {
         "name":"keySchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The key schema for this map schema. Throws a DataException if this schema is not a map."
      },
      {
         "name":"defaultValue",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"isOptional",
         "type":"boolean",
         "default":false,
         "doc": "true if this field is optional, false otherwise"
      },
      {
         "name":"version",
         "type":[
            "null",
            "integer"
         ],
         "default":null,
         "doc": "The optional version of the schema. If a version is included, newer versions *must* be larger than older ones."
      }
   ]
}

Example:

{
   "name":"com.example.User",
   "type":"STRUCT",
   "isOptional":false,
   "fieldSchemas":{
      "id":{
         "type":"INT64",
         "isOptional":false
      },
      "first_name":{
         "type":"STRING",
         "isOptional":true
      },
      "last_name":{
         "type":"STRING",
         "isOptional":true
      },
      "email":{
         "type":"STRING",
         "isOptional":true
      },
      "gender":{
         "type":"STRING",
         "isOptional":true
      },
      "country":{
         "type":"STRING",
         "isOptional":true
      },
      "favorite_colors":{
         "type":"ARRAY",
         "isOptional":true,
         "valueSchema": {"type": "STRING"}
      }
   }
}
Last modified October 3, 2021: release version 2.4.0 (af52553)