CSV with Headers as fields¶
This example will use a transformation to copy data from the header(s) of the message to field(s) in the message.
Note
The data for the following example is formatted as such.
id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color 1,Jack,Garcia,jgarcia0@shop-pro.jp,Male,196.56.44.185,2015-09-30T15:29:03Z,347.77,IT,#4a2313
{
"connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"finished.path" : "/tmp",
"input.path" : "/tmp",
"error.path" : "/tmp",
"input.file.pattern" : "^users\\d+\\.csv",
"topic" : "users",
"key.schema" : "{\n \"name\" : \"com.example.users.UserKey\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n }\n }\n}\n",
"value.schema" : "{\n \"name\" : \"com.example.users.User\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n },\n \"first_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"email\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"gender\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"ip_address\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_login\" : {\n \"name\" : \"org.apache.kafka.connect.data.Timestamp\",\n \"type\" : \"INT64\",\n \"version\" : 1,\n \"isOptional\" : false\n },\n \"account_balance\" : {\n \"name\" : \"org.apache.kafka.connect.data.Decimal\",\n \"type\" : \"BYTES\",\n \"version\" : 1,\n \"parameters\" : {\n \"scale\" : \"2\"\n },\n \"isOptional\" : true\n },\n \"country\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"favorite_color\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n }\n }\n}\n",
"transforms" : "headerToField",
"transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
"transforms.headerToField.header.mappings" : "file.path:STRING:file_path,file.name:STRING:file_name,file.last.modified:INT64(Timestamp):file_last_modified"
}