Scanning Files

The commons configuration for Connect File Pulse.

The connector must be configured with a specific FSDirectoryWalker
that will be responsible for scanning an input directory to find files eligible to be streamed in Kafka.

The default FSDirectoryWalker implementation is :

io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker.

The FilePulseSourceConnector periodically triggers a file system scan of the directory specified in the input.directory.path connector property. Scan is executed in a background-thread invoking the configured FSDirectoryWalker.

Configuring Directory Scan (using LocalFSDirectoryWalker)

ConfigurationDescriptionTypeDefaultImportance
fs.scanner.classThe class used to scan file systemclassio.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalkermedium
fs.scan.directory.pathThe input directory to scanstring-high
fs.scan.interval.msTime interval in milliseconds at wish the input directory is scannedlong10000high
fs.scan.filtersThe comma-separated list of fully qualified class names of the filter-filters to be uses to list eligible input fileslist-medium
fs.recursive.scan.enableBoolean indicating whether local directory should be recursively scannedbooleantruemedium

Filtering input files

You can configure one or more FileFilter that will be used to determine if a file should be scheduled for processing or ignored. All files that are filtered out are simply ignored and remain untouched on the file system until the next scan. At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing.

FilePulse packs with the following built-in filters :

IgnoreHiddenFileFilter

The IgnoreHiddenFileFilter can be used to filter hidden files from being read.

Configuration example

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.IgnoreHiddenFileListFilter

LastModifiedFileFilter

The LastModifiedFileFilter can be used to filter files that have been modified to recently based on their last modified date property.

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.LastModifiedFileFilter
# The last modified time for a file can be accepted (default: 5000)
file.filter.minimum.age.ms=10000

RegexFileFilter

The RegexFileFilter can be used to filter files that do not match the specified regex.

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
# The regex pattern used to matches input files
file.filter.regex.pattern="\\.log$"

Supported File types

LocalFSDirectoryWalker will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : Files#probeContentType)

The connector supports the following content types :

  • GZIP : application/x-gzip
  • TAR : application/x-tar
  • ZIP : application/x-zip-compressed or application/zip