Configuration

The common configurations for deploying a File Pulse connector.

Commons configuration

Whatever the kind of files you are processing a connector should always be configured with the below properties. These configurations are described in detail in subsequent chapters.

Common Kafka Connect properties

Configuration Description Type Default Importance
topic The default output topic to write string - high
tasks.max The maximum number of tasks that should be created for this connector. string - high

Properties for listing and cleaning object files (FileSystemListing)

Configuration Description Type Default Importance
fs.listing.class Class which is used to list eligible files from the scanned file system. class io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker MEDIUM
fs.listing.filters Filters use to list eligible input files list - MEDIUM
fs.listing.interval.ms Time interval (in milliseconds) at wish to scan input directory long 10000 HIGH
fs.cleanup.policy.class The fully qualified name of the class which is used to cleanup files class - HIGH
max.scheduled.files Maximum number of files that can be schedules to tasks. long 1000 HIGH

Properties for transforming object file record(Filters Chain Definition)

Configuration Description Type Default Importance
filters List of filters aliases to apply on each data (order is important) list - MEDIUM

Properties for reading object file record(FileReaders)

Configuration Description Type Default Importance
tasks.reader.class The fully qualified name of the class which is used by tasks to read input files class io.streamthoughts.kafka.connect.filepulse.reader.RowFileReader HIGH

Properties for uniquely identifying object files and records (FileReaders)

Configuration Description Type Default Importance
offset.policy.class Class which is used to determine the source partition and offset that uniquely identify a input record class io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy HIGH

Properties for synchronizing Connector and Tasks

Configuration Description Type Default Importance
tasks.file.status.storage.class The FileObjectStateBackingStore class to be used for storing status state of file objects. Class io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore HIGH

Available implementations are :

  • io.streamthoughts.kafka.connect.filepulse.state.InMemoryStateBackingStore
  • io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore

Properties for configuring the KafkaFileObjectStateBackingStore class

Configuration Description Type Default Importance
tasks.file.status.storage.topic Name of the internal topic used by tasks and connector to report and monitor file progression. class connect-file-pulse-status HIGH
tasks.file.status.storage.bootstrap.servers A list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster. string - HIGH
tasks.file.status.storage.topic.partitions The number of partitions to be used for the status storage topic. int - LOW
tasks.file.status.storage.topic.replication.factor The replication factor to be used for the status storage topic. float - LOW

Override Internal Consumer/Producer Configuration

To override the default configuration for the internal consumer and producer clients used for reporting file status, you can use one of the following override prefixes :

  • tasks.file.status.storage.consumer.<consumer_property>
  • tasks.file.status.storage.producer.<producer_property>

Examples

Some configuration examples are available here.

Last modified June 11, 2021: fix(site): fix documentation (4424faf)