Configuration

The common configurations for deploying a File Pulse connector.

Commons configuration

Whatever the kind of files you are processing a connector should always be configured with the below properties. These configurations are described in detail in subsequent chapters.

Common Kafka Connect properties

ConfigurationDescriptionTypeDefaultImportance
topicThe default output topic to writestring-high
tasks.maxThe maximum number of tasks that should be created for this connector.string-high

Properties for listing and cleaning object files (FileSystemListing)

ConfigurationDescriptionTypeDefaultImportance
fs.listing.classClass which is used to list eligible files from the scanned file system.class-MEDIUM
fs.listing.filtersA comma-separated list of FileListFilter classes used to list eligible input files.list-MEDIUM
fs.listing.interval.msTime interval (in milliseconds) at wish to scan input directorylong10000HIGH
fs.listing.task.delegation.enabledBoolean indicating whether the file listing process should be delegated to tasks.booleanfalseLOW
fs.cleanup.policy.classThe fully qualified name of the class which is used to cleanup filesclass-HIGH
fs.cleanup.policy.triggered.onSpecify the status when a file get cleanup. Valid values are: COMPLETED, COMMITTEDstringCOMPLETEDMEDIUM
max.scheduled.filesMaximum number of files that can be schedules to tasks.long1000HIGH
allow.tasks.reconfiguration.after.timeout.msSpecify the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed.long-LOW
task.partitioner.classThe TaskPartitioner to be used for partitioning files to tasks.classio.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitionerHIGH
tasks.halt.on.errorShould a task halt when it encounters an error or continue to the next file.booleanfalseHIGH
tasks.file.processing.order.byThe strategy to be used for sorting files for processing. Valid values are: LAST_MODIFIED, URI, CONTENT_LENGTH, CONTENT_LENGTH_DESC.stringLAST_MODIFIEDMEDIUM
tasks.empty.poll.wait.msThe amount of time in millisecond a tasks should wait if a poll returns an empty list of records.long500HIGH
ignore.committed.offsetsShould a task ignore committed offsets while scheduling a file.booleanfalseLOW
value.connect.schemaThe schema for the record-value.string-MEDIUM

Properties for transforming object file record(Filters Chain Definition)

ConfigurationDescriptionTypeDefaultImportance
filtersList of filters aliases to apply on each data (order is important)list-MEDIUM

Properties for reading object file record(FileReaders)

ConfigurationDescriptionTypeDefaultImportance
tasks.reader.classThe fully qualified name of the class which is used by tasks to read input filesclass-HIGH

Properties for uniquely identifying object files and records (FileReaders)

ConfigurationDescriptionTypeDefaultImportance
offset.policy.classClass which is used to determine the source partition and offset that uniquely identify a input recordclassio.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicyHIGH

Properties for synchronizing Connector and Tasks

ConfigurationDescriptionTypeDefaultImportance
tasks.file.status.storage.classThe FileObjectStateBackingStore class to be used for storing status state of file objects.Classio.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStoreHIGH

Available implementations are :

  • io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore
  • io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore

Properties for configuring the KafkaFileObjectStateBackingStore class

ConfigurationDescriptionTypeDefaultImportance
tasks.file.status.storage.topicName of the internal topic used by tasks and connector to report and monitor file progression.classconnect-file-pulse-statusHIGH
tasks.file.status.storage.bootstrap.serversA list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster.string-HIGH
tasks.file.status.storage.topic.partitionsThe number of partitions to be used for the status storage topic.int-LOW
tasks.file.status.storage.topic.replication.factorThe replication factor to be used for the status storage topic.float-LOW

Properties for configuring the InMemoryFileObjectStateBackingStore class

ConfigurationDescriptionTypeDefaultImportanceSince
tasks.file.status.storage.cache.max.size.capacitySpecifies the max size capacity of the LRU in-memory cache.int10000LOWv2.5.0

In addition, to override the default configuration for the internal consumer and producer clients, you can use one of the following override prefixes :

  • tasks.file.status.storage.consumer.<consumer_property>
  • tasks.file.status.storage.producer.<producer_property>

Examples

Some configuration examples are available here.

Defining Connect Record Schema

The optional value.connect.schema config property can be used to set the connect-record schema that should be used. If there is no schema pass through the connector configuration, a schema will be resolved for each record produced.

The value.connect.schema must be passed as a JSON string that respects the following schema (using Avro representation):

{
   "type":"record",
   "name":"Schema",
   "fields":[
      {
         "name":"name",
         "type":"string",
         "doc": "The name of this schema"
      },
      {
         "name":"type",
         "type":{
            "type":"enum",
            "name":"Type",
            "symbols":[
               "STRUCT",
               "STRING",
               "BOOLEAN",
               "INT8",
               "INT16",
               "INT32",
               "INT64",
               "FLOAT32",
               "FLOAT64",
               "BYTES",
               "MAP",
               "ARRAY"
            ]
         },
         "doc": "The type of this schema"
      },
      {
         "name":"doc",
         "type":[
            "null",
            "string"
         ],
         "default":null,  
         "doc": "The documentation for this schema"
      },
      {
         "name":"fieldSchemas",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The fields for this Schema. Throws a DataException if this schema is not a struct."
      },
      {
         "name":"valueSchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The value schema for this map or array schema. Throws a DataException if this schema is not a map or array."
      },
      {
         "name":"keySchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The key schema for this map schema. Throws a DataException if this schema is not a map."
      },
      {
         "name":"defaultValue",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"isOptional",
         "type":"boolean",
         "default":false,
         "doc": "true if this field is optional, false otherwise"
      },
      {
         "name":"version",
         "type":[
            "null",
            "integer"
         ],
         "default":null,
         "doc": "The optional version of the schema. If a version is included, newer versions *must* be larger than older ones."
      }
   ]
}

Example:

{
   "name":"com.example.User",
   "type":"STRUCT",
   "isOptional":false,
   "fieldSchemas":{
      "id":{
         "type":"INT64",
         "isOptional":false
      },
      "first_name":{
         "type":"STRING",
         "isOptional":true
      },
      "last_name":{
         "type":"STRING",
         "isOptional":true
      },
      "email":{
         "type":"STRING",
         "isOptional":true
      },
      "gender":{
         "type":"STRING",
         "isOptional":true
      },
      "country":{
         "type":"STRING",
         "isOptional":true
      },
      "favorite_colors":{
         "type":"ARRAY",
         "isOptional":true,
         "valueSchema": {"type": "STRING"}
      }
   }
}
Last modified March 16, 2024: ci: release version 2.14.0 🎉 (49999b8f)