Json Source Connector¶
com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector
This connector is used to stream <https://en.wikipedia.org/wiki/JSON_Streaming> JSON files from a directory while converting the data based on the schema supplied in the configuration.
Important
There are some caveats to running this connector with schema.generation.enabled = true. If schema generation is enabled the connector will start by reading one of the files that match input.file.pattern in the path specified by input.path. If there are no files when the connector starts or is restarted the connector will fail to start. If there are different fields in other files they will not be detected. The recommended path is to specify a schema that the files will be parsed with. This will ensure that data written by this connector to Kafka will be consistent across files that have inconsistent columns. For example if some files have an optional column that is not always included, create a schema that includes the column marked as optional.
Tip
To get a starting point for a schema you can use the following command to generate an all String schema. This will give you the basic structure of a schema. From there you can changes the types to match what you expect.
mvn clean package
export CLASSPATH="$(find target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-spooldir -type f -name '*.jar' | tr '\n' ':')"
kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.AbstractSchemaGenerator -t json -f src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/json/FieldsMatch.data -c config/JsonExample.properties -i id
Note
If you want to import JSON node by node in the file and do not care about schemas, do not use this connector with Schema Generation enabled. Take a look at the Schema Less Json Source Connector.
Configuration¶
File System¶
error.path¶
The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect.
Importance: HIGH
Type: STRING
Validator: Absolute path to a directory that exists and is writable.
input.file.pattern¶
Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches().
Importance: HIGH
Type: STRING
input.path¶
The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect.
Importance: HIGH
Type: STRING
Validator: Absolute path to a directory that exists and is writable.
finished.path¶
The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect.
Importance: HIGH
Type: STRING
halt.on.error¶
Should the task halt when it encounters an error or continue to the next file.
Importance: HIGH
Type: BOOLEAN
Default Value: true
cleanup.policy¶
Determines how the connector should cleanup the files that have been successfully processed. NONE leaves the files in place which could cause them to be reprocessed if the connector is restarted. DELETE removes the file from the filesystem. MOVE will move the file to a finished directory. MOVEBYDATE will move the file to a finished directory with subdirectories by date
Importance: MEDIUM
Type: STRING
Default Value: MOVE
Validator: Matches: NONE
, DELETE
, MOVE
, MOVEBYDATE
task.partitioner¶
The task partitioner implementation is used when the connector is configured to use more than one task. This is used by each task to identify which files will be processed by that task. This ensures that each file is only assigned to one task.
Importance: MEDIUM
Type: STRING
Default Value: ByName
Validator: Matches: ByName
cleanup.policy.maintain.relative.path¶
If input.path.walk.recursively is enabled in combination with this flag being true, the walked sub-directories which contained files will be retained as-is under the input.path. The actual files within the sub-directories will moved (with a copy of the sub-dir structure) or deleted as per the cleanup.policy defined, but the parent sub-directory structure will remain.
Importance: LOW
Type: BOOLEAN
file.buffer.size.bytes¶
The size of buffer for the BufferedInputStream that will be used to interact with the file system.
Importance: LOW
Type: INT
Default Value: 131072
Validator: [1,…]
file.minimum.age.ms¶
The amount of time in milliseconds after the file was last written to before the file can be processed.
Importance: LOW
Type: LONG
Default Value: 0
Validator: [0,…]
files.sort.attributes¶
The attributes each file will use to determine the sort order. Name is name of the file. Length is the length of the file preferring larger files first. LastModified is the LastModified attribute of the file preferring older files first.
Importance: LOW
Type: LIST
Default Value: [NameAsc]
Validator: Matches: NameAsc
, NameDesc
, LengthAsc
, LengthDesc
, LastModifiedAsc
, LastModifiedDesc
input.path.walk.recursively¶
If enabled, any sub-directories dropped under input.path will be recursively walked looking for files matching the configured input.file.pattern. After processing is complete the discovered sub directory structure (as well as files within them) will handled according to the configured cleanup.policy (i.e. moved or deleted etc). For each discovered file, the walked sub-directory path will be set as a header named file.relative.path
Importance: LOW
Type: BOOLEAN
processing.file.extension¶
Before a file is processed, a flag is created in its directory to indicate the file is being handled. The flag file has the same name as the file, but with this property appended as a suffix.
Importance: LOW
Type: STRING
Default Value: .PROCESSING
Validator: Matches regex( ^.*..+$ )
General¶
batch.size¶
The number of records that should be returned with each batch.
Importance: LOW
Type: INT
Default Value: 1000
empty.poll.wait.ms¶
The amount of time to wait if a poll returns an empty list of records.
Importance: LOW
Type: LONG
Default Value: 500
Validator: [1,…,9223372036854775807]
task.count¶
Internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.
Importance: LOW
Type: INT
Default Value: 1
Validator: [1,…]
task.index¶
Internal setting to the connector used to instruct a task on which files to select. The connector will override this setting.
Importance: LOW
Type: INT
Default Value: 0
Validator: [0,…]
Schema¶
Schema Generation¶
schema.generation.enabled¶
Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set.
Importance: MEDIUM
Type: BOOLEAN
schema.generation.key.fields¶
The field(s) to use to build a key schema. This is only used during schema generation.
Importance: MEDIUM
Type: LIST
schema.generation.key.name¶
The name of the generated key schema.
Importance: MEDIUM
Type: STRING
Default Value: com.github.jcustenborder.kafka.connect.model.Key
schema.generation.value.name¶
The name of the generated value schema.
Importance: MEDIUM
Type: STRING
Default Value: com.github.jcustenborder.kafka.connect.model.Value
Timestamps¶
timestamp.field¶
The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a [Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html)
Importance: MEDIUM
Type: STRING
timestamp.mode¶
Determines how the connector will set the timestamp for the [ConnectRecord](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#timestamp()). If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a [Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html). Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used.
Importance: MEDIUM
Type: STRING
Default Value: PROCESS_TIME
Validator: Matches: FIELD
, FILE_TIME
, PROCESS_TIME
parser.timestamp.date.formats¶
The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html
Importance: LOW
Type: LIST
Default Value: [yyyy-MM-dd’T’HH:mm:ss, yyyy-MM-dd’ ‘HH:mm:ss]
parser.timestamp.timezone¶
The timezone that all of the dates will be parsed with.
Importance: LOW
Type: STRING
Default Value: UTC