FileSystem Listing

Learn how to configure Connect FilePulse for listing files from local or remote storage system.

The FilePulseSourceConnector periodically lists object files that may be streamed into Kafka using the FileSystemListing
configured in the connector’s configuration.

Supported Filesystems

Currently, Kafka Connect FilePulse supports the following implementations:

  • AmazonS3FileSystemListing
  • AzureBlobStorageFileSystemListing
  • GcsFileSystemListing
  • LocalFSDirectoryListing (default)

Local Filesystem (default)

The LocalFSDirectoryListing class can be used for listing files that exist in a local filesystem directory.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing

Configuration

ConfigurationDescriptionTypeDefaultImportance
fs.listing.directory.pathThe input directory to scanstring-HIGH
fs.listing.recursive.enabledFlag indicating whether local directory should be recursively scannedbooleantrueMEDIUM

Supported File types

The LocalFSDirectoryListing will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : Files#probeContentType) Supported content-types are:

  • GZIP : application/x-gzip
  • TAR : application/x-tar
  • ZIP : application/x-zip-compressed or application/zip

Amazon S3

The AmazonS3FileSystemListing class can be used for listing objects that exist in a specific Amazon S3 bucket.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing

Configuration

ConfigurationDescriptionTypeDefaultImportance
aws.access.key.idAWS Access Key ID AWSstring-HIGH
aws.secret.access.keyAWS Secret Access Keystring-HIGH
aws.secret.session.tokenAWS Secret Session Tokenstring-HIGH
aws.credentials.provider.classThe AWSCredentialsProvider to use if no access key id and secret access key is configured.classcom.amazonaws.auth.EnvironmentVariableCredentialsProviderLOW
aws.s3.regionThe AWS S3 Region, e.g. us-east-1stringRegions.DEFAULT_REGION.getName()MEDIUM
aws.s3.service.endpointAWS S3 custom service endpoint.string-MEDIUM
aws.s3.path.style.access.enabledConfigures the client to use path-style access for all requests.string-MEDIUM
aws.s3.bucket.nameThe name of the Amazon S3 bucket.string-HIGH
aws.s3.bucket.prefixThe prefix to be used for restricting the listing of the objects in the bucketstring-MEDIUM
aws.s3.default.object.storage.classThe AWS storage class to associate with an S3 object when it is copied by the connector (e.g., during a move operation). Accepted values are: STANDARD, GLACIER, REDUCED_REDUNDANCY, STANDARD_IA,ONEZONE_IA,INTELLIGENT_TIERING,DEEP_ARCHIVEstringLOW
aws.s3.backoff.delay.msThe base back-off time (milliseconds) before retrying a request.int100MEDIUM
aws.s3.backoff.max.delay.msThe maximum back-off time (in milliseconds) before retrying a request.int20_000MEDIUM
aws.s3.backoff.max.retriesThe maximum number of retry attempts for failed retryable requests.int3MEDIUM

Google Cloud Storage

The GcsFileSystemListing class can be used for listing objects that exist in a specific Google Cloud Storage bucket.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing

Configuration

ConfigurationDescriptionTypeDefaultImportance
gcs.credentials.pathThe path to GCP credentials file. Cannot be set when GCS_CREDENTIALS_JSON_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS.string-HIGH
gcs.credentials.jsonThe GCP credentials as JSON string. Cannot be set when GCS_CREDENTIALS_PATH_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS.string-HIGH
gcs.bucket.nameThe GCS bucket name to download the object files from.string-HIGH
gcs.blobs.filter.prefixThe prefix to be used for filtering blobs whose names begin with it.string-MEDIUM

Azure Blob Storage

The AzureBlobStorageConfig class can be used for listing objects that exist in a specific Azure Storage Container.

How to use it ?

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AzureBlobStorageFileSystemListing

Configuration

ConfigurationDescriptionTypeDefaultImportance
azure.storage.connection.stringAzure storage account connection string.string-HIGH
azure.storage.account.nameThe Azure storage account name.string-HIGH
azure.storage.account.keyThe Azure storage account key.string-HIGH
azure.storage.container.nameThe Azure storage container name.string-MEDIUM
azure.storage.blob.prefixThe prefix to be used for restricting the listing of the blobs in the container.string-MEDIUM

Filtering input files

You can configure one or more FileFilter that will be used to determine if a file should be scheduled for processing or ignored. All files that are filtered out are simply ignored and remain untouched on the file system until the next scan. At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing.

FilePulse packs with the following built-in filters :

IgnoreHiddenFileFilter

The IgnoreHiddenFileFilter can be used to filter hidden files from being read.

Configuration example

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.IgnoreHiddenFileListFilter

LastModifiedFileFilter

The LastModifiedFileFilter can be used to filter all files that have been modified to recently based on their last modified date property.

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.LastModifiedFileFilter
# The last modified time for a file can be accepted (default: 5000)
file.filter.minimum.age.ms=10000

RegexFileFilter

The RegexFileFilter can be used to filter all files that do not match the specified regex.

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
# The regex pattern used to match input files
file.filter.regex.pattern="\\.log$"

SizeFileListFilter

The SizeFileListFilter can be used to filter all files that are smaller or larger than a specific byte size.

fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
file.filter.minimum.size.bytes=0
file.filter.maximum.size.bytes=9223372036854775807
Last modified June 9, 2022: release version 2.7.0 (3941b557)