Schema Less Json Source Connector

com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector

This connector is used to stream <https://en.wikipedia.org/wiki/JSON_Streaming>_ JSON files from a directory while converting the data based on the schema supplied in the configuration.

Important

This connector does not try to convert the json records to a schema. The recommended converter to use is the StringConverter. Example: value.converter=org.apache.kafka.connect.storage.StringConverter

Configuration

File System

error.path

The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect.

Importance: HIGH

Type: STRING

Validator: Absolute path to a directory that exists and is writable.

input.file.pattern

Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches().

Importance: HIGH

Type: STRING

input.path

The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect.

Importance: HIGH

Type: STRING

Validator: Absolute path to a directory that exists and is writable.

finished.path

The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect.

Importance: HIGH

Type: STRING

halt.on.error

Should the task halt when it encounters an error or continue to the next file.

Importance: HIGH

Type: BOOLEAN

Default Value: true

cleanup.policy

Determines how the connector should cleanup the files that have been successfully processed. NONE leaves the files in place which could cause them to be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE will move the file to a finished directory. MOVEBYDATE will move the file to a finished directory with subdirectories by date

Importance: MEDIUM

Type: STRING

Default Value: MOVE

Validator: Matches: NONE, DELETE, MOVE, MOVEBYDATE

task.partitioner

The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.

Importance: MEDIUM

Type: STRING

Default Value: ByName

Validator: Matches: ByName

file.buffer.size.bytes

The size of buffer for the BufferedInputStream that will be used to interact with the file system.

Importance: LOW

Type: INT

Default Value: 131072

Validator: [1,…]

file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed.

Importance: LOW

Type: LONG

Default Value: 0

Validator: [0,…]

files.sort.attributes

The attributes each file will use to determine the sort order. Name is name of the file. Length is the length of the file preferring larger files first. LastModified is the LastModified attribute of the file preferring older files first.

Importance: LOW

Type: LIST

Default Value: [NameAsc]

Validator: Matches: NameAsc, NameDesc, LengthAsc, LengthDesc, LastModifiedAsc, LastModifiedDesc

processing.file.extension

Before a file is processed, a flag is created in its directory to indicate the file is being handled. The flag file has the same name as the file, but with this property appended as a suffix.

Importance: LOW

Type: STRING

Default Value: .PROCESSING

Validator: Matches regex( ^.*..+$ )

General

topic

The Kafka topic to write the data to.

Importance: HIGH

Type: STRING

batch.size

The number of records that should be returned with each batch.

Importance: LOW

Type: INT

Default Value: 1000

empty.poll.wait.ms

The amount of time to wait if a poll returns an empty list of records.

Importance: LOW

Type: LONG

Default Value: 500

Validator: [1,…,9223372036854775807]

task.count

Internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.

Importance: LOW

Type: INT

Default Value: 1

Validator: [1,…]

task.index

Internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.

Importance: LOW

Type: INT

Default Value: 0

Validator: [0,…]

Timestamps

timestamp.mode

Determines how the connector will set the timestamp for the [ConnectRecord](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#timestamp()). If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a [Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html). Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used.

Importance: MEDIUM

Type: STRING

Default Value: PROCESS_TIME

Validator: Matches: FIELD, FILE_TIME, PROCESS_TIME

Metadata

metadata.field

The name of the field in the value where the metadata will be stored.

Importance: LOW

Type: STRING

Default Value: metadata

metadata.location

Location that metadata about the input file will be stored. FIELD - Metadata about the file will be stored in a field in the value of the record. HEADERS - Metadata about the input file will be stored as headers on the record. NONE - no metadata about the input file will be stored.

Importance: LOW

Type: STRING

Default Value: HEADERS

Validator: Matches: NONE, HEADERS, FIELD