RSS

Connect FilePulse 2.0 is Available 🚀

Connect FilePulse 2.0 is finally here! Here is an overview of what is new:

Supported Cloud Storage

Previously, Connect FilePulse was designed to provide direct integration between legacy systems and Apache Kafka. But, it could be only used to process and integrate data records from the local filesystem on which the connector was deployed.

As more and more organizations move from on-premises to cloud infrastructure, we’ve seen a growing demand from developers for the connector to support cloud storage.

Connect FilePulse 2.0 brings you the capabilities for reading files across different storage systems. Using a single Kafka Connect Source Connector you can now read files from the local filesystem, Amazon S3, Azure Blob Storage and Google Cloud Storage.

In addition, the connector supports a variety of formats equally for all storage systems, e.g., text files, CSV, XML, JSON, Avro, etc. At the same time, you can still benefit from the powerful processing-filters mechanism of Connect FilePulse to process data records as they are read by the connector.

For example, here is the configuration for reading CSV object files from an Amazon S3 bucket.

name=connect-file-pulse-amazon-s3-csv
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=connect-filepulse-csv-data-records
tasks.max=1

fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing
fs.listing.interval.ms=10000
fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.csv$

fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy

aws.access.key.id=xxxxxxxxx
aws.secret.access.key=xxxxxxxxx
aws.s3.region=eu-west-3
aws.s3.bucket.name=connect-filepulse

tasks.reader.class=io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader

skip.headers=1
offset.attributes.string=uri

filters=ParseLine
filters.ParseLine.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
filters.ParseLine.extractColumnName=headers
filters.ParseLine.trimColumn=true
filters.ParseLine.separator=;
tasks.file.status.storage.bootstrap.servers=kafka101:9092
tasks.file.status.storage.topic=connect-file-pulse-status
tasks.file.status.storage.topic.partitions=10
tasks.file.status.storage.topic.replication.factor=1

Auto-create Internal Topic

By default, Connect FilePulse uses the internal topic connect-file-pulse-status to track the current state of each file being scheduled and processed by tasks. This allows you to deploy Connect FilePulse is a distributed Kafka Connect cluster with each worker only processing a subset of files.

In version 2.0, this topic is will be automatically created by the connector if it doesn’t already exist. You can configure the number of partitions, as well as, the replication factor of this topic using the new properties tasks.file.status.storage.topic.partitions and tasks.file.status.storage.topic.replication.factor.

InMemoryFileObjectStateBackingStore

In version 2.0, we provide a new property tasks.file.status.storage.class that can be used to specify the class implementing the FileObjectStateBackingStore interface to be used for storing the status of each file. By default, Connect FilePulse uses the kafka-based implementation called i.s.k.c.f.state.KafkaFileObjectStateBackingStore.

But, in some context, it may be not necessary to deploy Connect FilePulse in distributed mode and this implementation can lead to additional costs if, for example, you are using a fully-managed Apache Kafka service. So now, we also provide the i.s.k.c.f.state.InMemoryFileObjectStateBackingStore implementation to only keep file status in-memory.

Improved Scalability

Connect FilePulse can be used to integrate a very large number of files in parallel. Unfortunately, too many files to process can result in a too-large message to produce in Kafka for configuring tasks (i.e. connect-config). To solve this blocking issue, in version 2.0, we have added the new property max.scheduled.files to limit the maximum number of files that can be scheduled at the same time (Default is 1000).

Improved Grok Expression

In a previous version, Connect FilePulse has brought the support for Grok expressions to parse data. Since this mechanism has been migrated to a new dedicated project kafka-connect-transform-grok in order to be able to use Grok expressions with Kafka Connect’s a SMTs. Now, Connect FilePulse directly depends on that project to provide the GrokFilter with a unified configuration.

Full Release Notes

Connect File Pulse 2.0 can be downloaded from the GitHub Releases Page.

Members of the open-source community who appear in these release notes:

  • @at0dd
  • @qgeffard

Thank you for your valuable contributions!

Features

  • 13eed7b feat(plugin): add support for auto-creating the internal topic used by ConnectFilePulse (#139)
  • 5c88877 feat(plugin): add InMemoryStateBackingStore for tracking status of file objects (#138)
  • 52adca9 feat(filesystems): add support for Google Cloud Storage (#121)
  • 7b49b81 feat(plugin): add new property max.scheduled.files (#122) (#123)
  • 390ad82 feat(filesystems): add support for AWS S3 (#111)
  • 92e3341 feat(filesystems): add support for Azure Blob Storage (#112)
  • 685618a refactor(filters): migrate GrokFilter to use classes from grok-transformer (#118)

Improvements & Bugfixes

  • 6ef5162 fix(api): fix decimal numbers not being correctly parsed (#142)
  • 6c779af refactor(filesystems): make cleanup policy storage aware
  • d13c236 fix(filesystems): make compression codec more robust to encoding
  • 7d2ddac docs(site): fix DateFilter formats config
  • e222414 fix(api): change digest value to string

SubTasks

  • 1658d35 refactor(api/filesystems): move FileInputIterator implementation to commons-fs
  • 06385b3 refactor(filesystems): add module filepulse-commons-fs
  • 57da04c subtask(all): refactor FilePulse API to support remote storages (#100)
  • ee4acad add github workflow
  • a3eb908 build(all): update to java 11
  • 98eb51f build(mvn): add maven-wrapper

Braking changes

  • Configurations for Connect FilePulse 1.x is not compatible with the version 2.x.

If you enjoyed reading this post, check out Connect FilePulse at GitHub and give us a ⭐!