The connector must be configured with a specific FSDirectoryWalker
that will be responsible for scanning an input directory to find files eligible to be streamed in Kafka.
FSDirectoryWalker implementation is :
FilePulseSourceConnector periodically triggers a file system scan of the directory specified in the
connector property. Scan is executed in a background-thread invoking the configured
Configuring Directory Scan (using
|The class used to scan file system||class||io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker||medium|
|The input directory to scan||string||-||high|
|Time interval in milliseconds at wish the input directory is scanned||long||10000||high|
|The comma-separated list of fully qualified class names of the filter-filters to be uses to list eligible input files||list||-||medium|
|Boolean indicating whether local directory should be recursively scanned||boolean||true||medium|
|Specifies the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed||long||Long.MAX_VALUE||medium|
Filtering input files
You can configure one or more
FileFilter that will be used to determine if a file should be scheduled for processing or ignored.
All files that are filtered out are simply ignored and remain untouched on the file system until the next scan.
At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing.
FilePulse packs with the following built-in filters :
IgnoreHiddenFileFilter can be used to filter hidden files from being read.
LastModifiedFileFilter can be used to filter files that have been modified to recently based on their last modified date property.
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.LastModifiedFileFilter # The last modified time for a file can be accepted (default: 5000) file.filter.minimum.age.ms=10000
RegexFileFilter can be used to filter files that do not match the specified regex.
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter # The regex pattern used to matches input files file.filter.regex.pattern="\\.log$"
Supported File types
LocalFSDirectoryWalker will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : Files#probeContentType)
The connector supports the following content types :
- GZIP :
- TAR :
- ZIP :