Configuration
Commons configuration
Whatever the kind of files you are processing a connector should always be configured with the below properties. These configurations are described in detail in subsequent chapters.
Common Kafka Connect properties
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
topic | The default output topic to write | string | - | high | 
tasks.max | The maximum number of tasks that should be created for this connector. | string | - | high | 
Properties for listing and cleaning object files (FileSystemListing)
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
fs.listing.class | Class which is used to list eligible files from the scanned file system. | class | - | MEDIUM | 
fs.listing.filters | Filters use to list eligible input files | list | - | MEDIUM | 
fs.listing.interval.ms | Time interval (in milliseconds) at wish to scan input directory | long | 10000 | HIGH | 
fs.listing.task.delegation.enabled | Boolean indicating whether the file listing process should be delegated to tasks. | boolean | false | LOW | 
fs.cleanup.policy.class | The fully qualified name of the class which is used to cleanup files | class | - | HIGH | 
fs.cleanup.policy.triggered.on | Specify the status when a file get cleanup. Valid values are: COMPLETED, COMMITTED | string | COMPLETED | MEDIUM | 
max.scheduled.files | Maximum number of files that can be schedules to tasks. | long | 1000 | HIGH | 
allow.tasks.reconfiguration.after.timeout.ms | Specify the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed. | long | - | LOW | 
task.partitioner.class | The TaskPartitioner to be used for partitioning files to tasks. | class | io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner | HIGH | 
tasks.halt.on.error | Should a task halt when it encounters an error or continue to the next file. | boolean | false | HIGH | 
tasks.file.processing.order.by | The strategy to be used for sorting files for processing. Valid values are: LAST_MODIFIED, URI, CONTENT_LENGTH, CONTENT_LENGTH_DESC. | string | LAST_MODIFIED | MEDIUM | 
tasks.empty.poll.wait.ms | The amount of time in millisecond a tasks should wait if a poll returns an empty list of records. | long | 500 | HIGH | 
ignore.committed.offsets | Should a task ignore committed offsets while scheduling a file. | boolean | false | LOW | 
value.connect.schema | The schema for the record-value. | string | - | MEDIUM | 
Properties for transforming object file record(Filters Chain Definition)
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
filters | List of filters aliases to apply on each data (order is important) | list | - | MEDIUM | 
Properties for reading object file record(FileReaders)
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
tasks.reader.class | The fully qualified name of the class which is used by tasks to read input files | class | - | HIGH | 
Properties for uniquely identifying object files and records (FileReaders)
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
offset.policy.class | Class which is used to determine the source partition and offset that uniquely identify a input record | class | io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy | HIGH | 
Properties for synchronizing Connector and Tasks
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
tasks.file.status.storage.class | The FileObjectStateBackingStore class to be used for storing status state of file objects. | Class | io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore | HIGH | 
Available implementations are :
io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStoreio.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore
Limitation
TheInMemoryFileObjectStateBackingStore implement is not fault-tolerant and should be only when using Kafka Connect in standalone mode or a single worker.Properties for configuring the KafkaFileObjectStateBackingStore class
| Configuration | Description | Type | Default | Importance | 
|---|---|---|---|---|
tasks.file.status.storage.topic | Name of the internal topic used by tasks and connector to report and monitor file progression. | class | connect-file-pulse-status | HIGH | 
tasks.file.status.storage.bootstrap.servers | A list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster. | string | - | HIGH | 
tasks.file.status.storage.topic.partitions | The number of partitions to be used for the status storage topic. | int | - | LOW | 
tasks.file.status.storage.topic.replication.factor | The replication factor to be used for the status storage topic. | float | - | LOW | 
Properties for configuring the InMemoryFileObjectStateBackingStore class
| Configuration | Description | Type | Default | Importance | Since | 
|---|---|---|---|---|---|
tasks.file.status.storage.cache.max.size.capacity | Specifies the max size capacity of the LRU in-memory cache. | int | 10000 | LOW | v2.5.0 | 
In addition, to override the default configuration for the internal consumer and producer clients, you can use one of the following override prefixes :
tasks.file.status.storage.consumer.<consumer_property>tasks.file.status.storage.producer.<producer_property>
Examples
Some configuration examples are available here.
Defining Connect Record Schema
The optional value.connect.schema config property can be used to set the connect-record schema that should be used.
If there is no schema pass through the connector configuration, a schema will be resolved for each record produced.
The value.connect.schema must be passed as a JSON string that respects the following schema (using Avro representation):
{
   "type":"record",
   "name":"Schema",
   "fields":[
      {
         "name":"name",
         "type":"string",
         "doc": "The name of this schema"
      },
      {
         "name":"type",
         "type":{
            "type":"enum",
            "name":"Type",
            "symbols":[
               "STRUCT",
               "STRING",
               "BOOLEAN",
               "INT8",
               "INT16",
               "INT32",
               "INT64",
               "FLOAT32",
               "FLOAT64",
               "BYTES",
               "MAP",
               "ARRAY"
            ]
         },
         "doc": "The type of this schema"
      },
      {
         "name":"doc",
         "type":[
            "null",
            "string"
         ],
         "default":null,  
         "doc": "The documentation for this schema"
      },
      {
         "name":"fieldSchemas",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The fields for this Schema. Throws a DataException if this schema is not a struct."
      },
      {
         "name":"valueSchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The value schema for this map or array schema. Throws a DataException if this schema is not a map or array."
      },
      {
         "name":"keySchema",
         "type":[
            "null",
            {
               "type":"map",
               "values":"Schema"
            }
         ],
         "default":null,
         "doc": "The key schema for this map schema. Throws a DataException if this schema is not a map."
      },
      {
         "name":"defaultValue",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"isOptional",
         "type":"boolean",
         "default":false,
         "doc": "true if this field is optional, false otherwise"
      },
      {
         "name":"version",
         "type":[
            "null",
            "integer"
         ],
         "default":null,
         "doc": "The optional version of the schema. If a version is included, newer versions *must* be larger than older ones."
      }
   ]
}
Example:
{
   "name":"com.example.User",
   "type":"STRUCT",
   "isOptional":false,
   "fieldSchemas":{
      "id":{
         "type":"INT64",
         "isOptional":false
      },
      "first_name":{
         "type":"STRING",
         "isOptional":true
      },
      "last_name":{
         "type":"STRING",
         "isOptional":true
      },
      "email":{
         "type":"STRING",
         "isOptional":true
      },
      "gender":{
         "type":"STRING",
         "isOptional":true
      },
      "country":{
         "type":"STRING",
         "isOptional":true
      },
      "favorite_colors":{
         "type":"ARRAY",
         "isOptional":true,
         "valueSchema": {"type": "STRING"}
      }
   }
}