Spooldir metadata¶
The following example takes the output from the Spooldir connector copies headers for the metadata to fields in the value.
{
"transforms" : "headerToField",
"transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
"transforms.headerToField.header.mappings" : "file.path:STRING:file_path,file.name:STRING:file_name,file.last.modified:INT64(Timestamp):file_last_modified"
}
{
"topic" : "testing",
"kafkaPartition" : 1,
"valueSchema" : {
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"firstName" : {
"type" : "STRING",
"isOptional" : true
},
"lastName" : {
"type" : "STRING",
"isOptional" : true
}
}
},
"value" : {
"schema" : {
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"firstName" : {
"type" : "STRING",
"isOptional" : true
},
"lastName" : {
"type" : "STRING",
"isOptional" : true
}
}
},
"fieldValues" : [ {
"name" : "firstName",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "example"
}, {
"name" : "lastName",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "user"
} ]
},
"timestamp" : 123412351,
"timestampType" : "NO_TIMESTAMP_TYPE",
"offset" : 12345,
"headers" : [ {
"name" : "file.path",
"schema" : {
"type" : "STRING",
"isOptional" : false
},
"storage" : "/tmp/input/test1.csv"
}, {
"name" : "file.name",
"schema" : {
"type" : "STRING",
"isOptional" : false
},
"storage" : "test1.csv"
}, {
"name" : "file.last.modified",
"schema" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"isOptional" : false
},
"storage" : 1610656447123
} ]
}
Change(s) in the output are emphasized if delta(s) are detected.
{
"topic" : "testing",
"kafkaPartition" : 1,
"valueSchema" : {
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"firstName" : {
"type" : "STRING",
"isOptional" : true
},
"lastName" : {
"type" : "STRING",
"isOptional" : true
},
"file_path" : {
"type" : "STRING",
"isOptional" : true
},
"file_name" : {
"type" : "STRING",
"isOptional" : true
},
"file_last_modified" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"version" : 1,
"isOptional" : true
}
}
},
"value" : {
"schema" : {
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"firstName" : {
"type" : "STRING",
"isOptional" : true
},
"lastName" : {
"type" : "STRING",
"isOptional" : true
},
"file_path" : {
"type" : "STRING",
"isOptional" : true
},
"file_name" : {
"type" : "STRING",
"isOptional" : true
},
"file_last_modified" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"version" : 1,
"isOptional" : true
}
}
},
"fieldValues" : [ {
"name" : "firstName",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "example"
}, {
"name" : "lastName",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "user"
}, {
"name" : "file_path",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "/tmp/input/test1.csv"
}, {
"name" : "file_name",
"schema" : {
"type" : "STRING",
"isOptional" : true
},
"storage" : "test1.csv"
}, {
"name" : "file_last_modified",
"schema" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"version" : 1,
"isOptional" : true
},
"storage" : 1610656447123
} ]
},
"timestamp" : 123412351,
"timestampType" : "NO_TIMESTAMP_TYPE",
"offset" : 12345,
"headers" : [ {
"name" : "file.path",
"schema" : {
"type" : "STRING",
"isOptional" : false
},
"storage" : "/tmp/input/test1.csv"
}, {
"name" : "file.name",
"schema" : {
"type" : "STRING",
"isOptional" : false
},
"storage" : "test1.csv"
}, {
"name" : "file.last.modified",
"schema" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"isOptional" : false
},
"storage" : 1610656447123
} ]
}