Getting Started

Get started with Connect File Pulse through a step by step tutorial.

In this tutorial we will explore how to deploy a basic Connect File Pulse connector step by step.

The prerequisites for this tutorial are :

  • IDE or Text editor.
  • Maven 3+
  • Docker (for running a Kafka Cluster 2.x).

Start Docker Environment

Set the following environment variable to execute next commands.

$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/

1 ) Run Confluent Platforms with Connect File Pulse


$ curl -sSL $GITHUB_REPO_MASTER/docker-compose.yml -o docker-compose.yml
$ docker-compose up -d

2 ) Verify that Connect Worker is running (optional)

$ docker-compose logs "connect-file-pulse"

3 ) Check that Connect File Pulse plugin is correctly loaded (optional)

$ curl -sX GET http://localhost:8083/connector-plugins | grep FilePulseSourceConnector

Example : Logs Parsing (Log4j)

This example starts a new connector instance to parse the Kafka Connect container log4j logs before writing them into a configured topic.

1 ) Start a new connector instance

$ curl -sSL $GITHUB_REPO_MASTER/config/connect-file-pulse-quickstart-log4j.json -o connect-file-pulse-quickstart-log4j.json
 
$ curl -sX POST http://localhost:8083/connectors \
-d @connect-file-pulse-quickstart-log4j.json \
--header "Content-Type: application/json" | jq

2 ) Check connector status

$ curl -X GET http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j | jq

3 ) Consume output topics

$ docker exec -it -e KAFKA_OPTS="" connect kafka-avro-console-consumer \
--topic connect-file-pulse-quickstart-log4j \
--from-beginning \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://schema-registry:8081

(output)

...
{"loglevel":{"string":"INFO"},"logdate":{"string":"2019-06-16 20:41:15,247"},"message":{"string":"[main] Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectDistributed)"}}
{"loglevel":{"string":"INFO"},"logdate":{"string":"2019-06-16 20:41:15,270"},"message":{"string":"[main] Loading plugin from: /usr/share/java/schema-registry (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)"}}
{"loglevel":{"string":"INFO"},"logdate":{"string":"2019-06-16 20:41:16,115"},"message":{"string":"[main] Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/schema-registry/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)"}}
{"loglevel":{"string":"INFO"},"logdate":{"string":"2019-06-16 20:41:16,115"},"message":{"string":"[main] Added plugin 'org.apache.kafka.common.config.provider.FileConfigProvider' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)"}}
...

4) Observe Connect status

Connect File Pulse use an internal topic to track the current state of files being processing.

$ docker exec -it -e KAFKA_OPTS="" connect kafka-console-consumer \
--topic connect-file-pulse-status \
--from-beginning \
--bootstrap-server broker:29092

(output)

{"hostname":"f51d45f96ed5","status":"SCHEDULED","metadata":{"name":"kafka-connect.log","path":"/var/log/kafka","size":172559,"lastModified":1560772525000,"inode":1705406,"hash":661976312},"offset":{"position":-1,"rows":0,"timestamp":1560772525527}}
{"hostname":"f51d45f96ed5","status":"STARTED","metadata":{"name":"kafka-connect.log","path":"/var/log/kafka","size":172559,"lastModified":1560772525000,"inode":1705406,"hash":661976312},"offset":{"position":-1,"rows":0,"timestamp":1560772525719}}
{"hostname":"f51d45f96ed5","status":"READING","metadata":{"name":"kafka-connect.log","path":"/var/log/kafka","size":172559,"lastModified":1560772525000,"inode":1705406,"hash":661976312},"offset":{"position":174780,"rows":1911,"timestamp":1560772535322}}
...

5 ) Stop all containers

docker-compose down

Example : CSV File Parsing

This example starts a new connector instance that parse a CSV file and filter rows based on column’s values before writing record into Kafka.

1 ) Start a new connector instance

$ curl -sSL $GITHUB_REPO_MASTER/config/connect-file-pulse-quickstart-csv.json -o connect-file-pulse-quickstart-csv.json

$ curl -sX POST http://localhost:8083/connectors \
-d @connect-file-pulse-quickstart-csv.json \
--header "Content-Type: application/json" | jq

2 ) Copy example csv file into container

$ curl -sSL $GITHUB_REPO_MASTER/examples/quickstart-musics-dataset.csv -o quickstart-musics-dataset.csv
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp quickstart-musics-dataset.csv connect://tmp/kafka-connect/examples/quickstart-musics-dataset.csv

3 ) Check connector status

$ curl -X GET http://localhost:8083/connectors/connect-file-pulse-quickstart-csv | jq

4 ) Check for task completion

$ docker logs --tail="all" -f connect | grep "Orphan task detected"

5 ) Consume output topics

$ docker exec -it connect kafka-avro-console-consumer \
--topic connect-file-pulse-quickstart-csv \
--from-beginning \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://schema-registry:8081