FileSystem Listing

Learn how to configure Connect FilePulse for listing files from local or remote storage system.

The FilePulseSourceConnector periodically lists object files that may be streamed into Kafka using the FileSystemListing
configured in the connector’s configuration.

Supported Filesystems

Currently, Kafka Connect FilePulse supports the following implementations:

  • AmazonS3FileSystemListing
  • AzureBlobStorageFileSystemListing
  • GcsFileSystemListing
  • LocalFSDirectoryListing (default)

Local Filesystem (default)

The LocalFSDirectoryListing class can be used for listing files that exist in a local filesystem directory.

How to use it ?



ConfigurationDescriptionTypeDefaultImportance input directory to scanstring-HIGH
fs.listing.recursive.enabledFlag indicating whether local directory should be recursively scannedbooleantrueMEDIUM

Supported File types

The LocalFSDirectoryListing will try to detect if a file needs to be decompressed by probing its content type or its extension (javadoc : Files#probeContentType) Supported content-types are:

  • GZIP : application/x-gzip
  • TAR : application/x-tar
  • ZIP : application/x-zip-compressed or application/zip

Amazon S3

The AmazonS3FileSystemListing class can be used for listing objects that exist in a specific Amazon S3 bucket.

How to use it ?



aws.access.key.idAWS Access Key ID AWSstring-HIGH
aws.secret.access.keyAWS Secret Access Keystring-HIGH
aws.secret.session.tokenAWS Secret Session Tokenstring-HIGH
aws.credentials.provider.classThe AWSCredentialsProvider to use if no access key id and secret access key is configured.classcom.amazonaws.auth.EnvironmentVariableCredentialsProviderLOW
aws.s3.regionThe AWS S3 Region, e.g. us-east-1stringRegions.DEFAULT_REGION.getName()MEDIUM
aws.s3.service.endpointAWS S3 custom service endpoint.string-MEDIUM the client to use path-style access for all requests.string-MEDIUM
aws.s3.bucket.nameThe name of the Amazon S3 bucket.string-HIGH
aws.s3.bucket.prefixThe prefix to be used for restricting the listing of the objects in the bucketstring-MEDIUM AWS storage class to associate with an S3 object when it is copied by the connector (e.g., during a move operation). Accepted values are: STANDARD, GLACIER, REDUCED_REDUNDANCY, STANDARD_IA,ONEZONE_IA,INTELLIGENT_TIERING,DEEP_ARCHIVEstringLOW
aws.s3.backoff.delay.msThe base back-off time (milliseconds) before retrying a request.int100MEDIUM
aws.s3.backoff.max.delay.msThe maximum back-off time (in milliseconds) before retrying a request.int20_000MEDIUM
aws.s3.backoff.max.retriesThe maximum number of retry attempts for failed retryable requests.int3MEDIUM

Google Cloud Storage

The GcsFileSystemListing class can be used for listing objects that exist in a specific Google Cloud Storage bucket.

How to use it ?



gcs.credentials.pathThe path to GCP credentials file. Cannot be set when GCS_CREDENTIALS_JSON_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS.string-HIGH
gcs.credentials.jsonThe GCP credentials as JSON string. Cannot be set when GCS_CREDENTIALS_PATH_CONFIG is provided. If no credentials is specified the client library will look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS.string-HIGH
gcs.bucket.nameThe GCS bucket name to download the object files from.string-HIGH
gcs.blobs.filter.prefixThe prefix to be used for filtering blobs whose names begin with it.string-MEDIUM

Azure Blob Storage

The AzureBlobStorageConfig class can be used for listing objects that exist in a specific Azure Storage Container.

How to use it ?



ConfigurationDescriptionTypeDefaultImportance storage account connection string.string-HIGH Azure storage account name.string-HIGH Azure storage account key.string-HIGH Azure storage container name.string-MEDIUM prefix to be used for restricting the listing of the blobs in the container.string-MEDIUM

Filtering input files

You can configure one or more FileFilter that will be used to determine if a file should be scheduled for processing or ignored. All files that are filtered out are simply ignored and remain untouched on the file system until the next scan. At the next scan, previously filtered files will be evaluated again to determine if they are now eligible for processing.

FilePulse packs with the following built-in filters :


The IgnoreHiddenFileFilter can be used to filter hidden files from being read.

Configuration example



The LastModifiedFileFilter can be used to filter all files that have been modified to recently based on their last modified date property.

# The last modified time for a file can be accepted (default: 5000)


The RegexFileFilter can be used to filter all files that do not match the specified regex.

# The regex pattern used to match input files


The SizeFileListFilter can be used to filter all files that are smaller or larger than a specific byte size.

Last modified April 19, 2022: docs: update AWS config properties (d8b176bc)