========================== CSV with Headers as fields ========================== This example will use a transformation to copy data from the header(s) of the message to field(s) in the message. .. NOTE:: The data for the following example is formatted as such. id,first_name,last_name,email,gender,ip_address,last_login,account_balance,country,favorite_color 1,Jack,Garcia,jgarcia0@shop-pro.jp,Male,196.56.44.185,2015-09-30T15:29:03Z,347.77,IT,#4a2313 .. code-block:: json :caption: Configuration { "connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "finished.path" : "/tmp", "input.path" : "/tmp", "error.path" : "/tmp", "input.file.pattern" : "^users\\d+\\.csv", "topic" : "users", "key.schema" : "{\n \"name\" : \"com.example.users.UserKey\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n }\n }\n}\n", "value.schema" : "{\n \"name\" : \"com.example.users.User\",\n \"type\" : \"STRUCT\",\n \"isOptional\" : false,\n \"fieldSchemas\" : {\n \"id\" : {\n \"type\" : \"INT64\",\n \"isOptional\" : false\n },\n \"first_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_name\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"email\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"gender\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"ip_address\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"last_login\" : {\n \"name\" : \"org.apache.kafka.connect.data.Timestamp\",\n \"type\" : \"INT64\",\n \"version\" : 1,\n \"isOptional\" : false\n },\n \"account_balance\" : {\n \"name\" : \"org.apache.kafka.connect.data.Decimal\",\n \"type\" : \"BYTES\",\n \"version\" : 1,\n \"parameters\" : {\n \"scale\" : \"2\"\n },\n \"isOptional\" : true\n },\n \"country\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n },\n \"favorite_color\" : {\n \"type\" : \"STRING\",\n \"isOptional\" : true\n }\n }\n}\n", "transforms" : "headerToField", "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value", "transforms.headerToField.header.mappings" : "file.path:STRING:file_path,file.name:STRING:file_name,file.last.modified:INT64(Timestamp):file_last_modified" }