FileSystem Listing

Learn how to configure Connect FilePulse for listing files from local or remote storage system.

The FilePulseSourceConnector periodically lists object files that may be streamed into Kafka using the FileSystemListing
configured in the connector's configuration.

Supported Filesystems

Currently, Kafka Connect FilePulse supports the following implementations:

  • AmazonS3FileSystemListing
  • AzureBlobStorageFileSystemListing
  • GcsFileSystemListing
  • LocalFSDirectoryListing (default)

Local Filesystem (default)

The LocalFSDirectoryListing class can be used for listing files that exist in a local filesystem directory.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing

Configuration

Configuration Description Type Default Importance
fs.listing.directory.path The input directory to scan string - HIGH
fs.listing.recursive.enabled Flag indicating whether local directory should be recursively scanned boolean true MEDIUM

Supported File types

The LocalFSDirectoryListing will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : Files#probeContentType) Supported content-types are:

  • GZIP : application/x-gzip
  • TAR : application/x-tar
  • ZIP : application/x-zip-compressed or application/zip

Amazon S3

The AmazonS3FileSystemListing class can be used for listing objects that exist in a specific Amazon S3 bucket.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing

Configuration

Configuration Description Type Default Importance
aws.access.key.id AWS Access Key ID AWS string - HIGH
aws.secret.access.key AWS Secret Access Key string - HIGH
aws.secret.session.token AWS Secret Session Token string - HIGH
aws.s3.region The AWS S3 Region, e.g. us-east-1 string Regions.DEFAULT_REGION.getName() MEDIUM
aws.s3.path.style.access.enabled Configures the client to use path-style access for all requests. string - MEDIUM
aws.s3.bucket.name The name of the Amazon S3 bucket. string - HIGH
aws.s3.bucket.prefix The prefix to be used for restricting the listing of the objects in the bucket string - MEDIUM
aws.credentials.provider.class The AWSCredentialsProvider to use if no access key id and secret access key is configured. class com.amazonaws.auth.EnvironmentVariableCredentialsProvider LOW

Google Cloud Storage

The GcsFileSystemListing class can be used for listing objects that exist in a specific Google Cloud Storage bucket.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing

Configuration

Configuration Description Type Default Importance
gcs.credentials.path The path to GCP credentials file. Cannot be set when GCS_CREDENTIALS_JSON_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS. string - HIGH
gcs.credentials.json The GCP credentials as JSON string. Cannot be set when GCS_CREDENTIALS_PATH_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS. string - HIGH
gcs.bucket.name The GCS bucket name to download the object files from. string - HIGH
gcs.blobs.filter.prefix The prefix to be used for filtering blobs whose names begin with it. string - MEDIUM

Azure Blob Storage

The AzureBlobStorageConfig class can be used for listing objects that exist in a specific Azure Storage Container.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AzureBlobStorageConfig

Configuration

Configuration Description Type Default Importance
azure.storage.connection.string Azure storage account connection string. string - HIGH
azure.storage.account.name The Azure storage account name. string - HIGH
azure.storage.account.key The Azure storage account key. string - HIGH
azure.storage.container.name The Azure storage container name. string - MEDIUM
azure.storage.blob.prefix The prefix to be used for restricting the listing of the blobs in the container. string - MEDIUM

Filtering input files

You can configure one or more FileFilter that will be used to determine if a file should be scheduled for processing or ignored. All files that are filtered out are simply ignored and remain untouched on the file system until the next scan. At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing.

FilePulse packs with the following built-in filters :

IgnoreHiddenFileFilter

The IgnoreHiddenFileFilter can be used to filter hidden files from being read.

Configuration example

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.IgnoreHiddenFileListFilter

LastModifiedFileFilter

The LastModifiedFileFilter can be used to filter files that have been modified to recently based on their last modified date property.

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.LastModifiedFileFilter
# The last modified time for a file can be accepted (default: 5000)
file.filter.minimum.age.ms=10000

RegexFileFilter

The RegexFileFilter can be used to filter files that do not match the specified regex.

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
# The regex pattern used to matches input files
file.filter.regex.pattern="\\.log$"
Last modified August 2, 2021: release version 2.1.0 (94509d4)