Spooldir metadata

The following example takes the output from the Spooldir connector copies headers for the metadata to fields in the value.

Configuration
{
  "transforms" : "headerToField",
  "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
  "transforms.headerToField.header.mappings" : "file.path:STRING:file_path,file.name:STRING:file_name,file.last.modified:INT64(Timestamp):file_last_modified"
}
Input
{
  "topic" : "testing",
  "kafkaPartition" : 1,
  "valueSchema" : {
    "type" : "STRUCT",
    "isOptional" : false,
    "fieldSchemas" : {
      "firstName" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "lastName" : {
        "type" : "STRING",
        "isOptional" : true
      }
    }
  },
  "value" : {
    "schema" : {
      "type" : "STRUCT",
      "isOptional" : false,
      "fieldSchemas" : {
        "firstName" : {
          "type" : "STRING",
          "isOptional" : true
        },
        "lastName" : {
          "type" : "STRING",
          "isOptional" : true
        }
      }
    },
    "fieldValues" : [ {
      "name" : "firstName",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "example"
    }, {
      "name" : "lastName",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "user"
    } ]
  },
  "timestamp" : 123412351,
  "timestampType" : "NO_TIMESTAMP_TYPE",
  "offset" : 12345,
  "headers" : [ {
    "name" : "file.path",
    "schema" : {
      "type" : "STRING",
      "isOptional" : false
    },
    "storage" : "/tmp/input/test1.csv"
  }, {
    "name" : "file.name",
    "schema" : {
      "type" : "STRING",
      "isOptional" : false
    },
    "storage" : "test1.csv"
  }, {
    "name" : "file.last.modified",
    "schema" : {
      "name" : "org.apache.kafka.connect.data.Timestamp",
      "type" : "INT64",
      "isOptional" : false
    },
    "storage" : 1610656447123
  } ]
}

Change(s) in the output are emphasized if delta(s) are detected.

Output
{
  "topic" : "testing",
  "kafkaPartition" : 1,
  "valueSchema" : {
    "type" : "STRUCT",
    "isOptional" : false,
    "fieldSchemas" : {
      "firstName" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "lastName" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "file_path" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "file_name" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "file_last_modified" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "isOptional" : true
      }
    }
  },
  "value" : {
    "schema" : {
      "type" : "STRUCT",
      "isOptional" : false,
      "fieldSchemas" : {
        "firstName" : {
          "type" : "STRING",
          "isOptional" : true
        },
        "lastName" : {
          "type" : "STRING",
          "isOptional" : true
        },
        "file_path" : {
          "type" : "STRING",
          "isOptional" : true
        },
        "file_name" : {
          "type" : "STRING",
          "isOptional" : true
        },
        "file_last_modified" : {
          "name" : "org.apache.kafka.connect.data.Timestamp",
          "type" : "INT64",
          "version" : 1,
          "isOptional" : true
        }
      }
    },
    "fieldValues" : [ {
      "name" : "firstName",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "example"
    }, {
      "name" : "lastName",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "user"
    }, {
      "name" : "file_path",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "/tmp/input/test1.csv"
    }, {
      "name" : "file_name",
      "schema" : {
        "type" : "STRING",
        "isOptional" : true
      },
      "storage" : "test1.csv"
    }, {
      "name" : "file_last_modified",
      "schema" : {
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "type" : "INT64",
        "version" : 1,
        "isOptional" : true
      },
      "storage" : 1610656447123
    } ]
  },
  "timestamp" : 123412351,
  "timestampType" : "NO_TIMESTAMP_TYPE",
  "offset" : 12345,
  "headers" : [ {
    "name" : "file.path",
    "schema" : {
      "type" : "STRING",
      "isOptional" : false
    },
    "storage" : "/tmp/input/test1.csv"
  }, {
    "name" : "file.name",
    "schema" : {
      "type" : "STRING",
      "isOptional" : false
    },
    "storage" : "test1.csv"
  }, {
    "name" : "file.last.modified",
    "schema" : {
      "name" : "org.apache.kafka.connect.data.Timestamp",
      "type" : "INT64",
      "isOptional" : false
    },
    "storage" : 1610656447123
  } ]
}