diff --git a/Cargo.lock b/Cargo.lock index 9bd245b86e..ba7df70abb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "ar_archive_writer" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7eb93bbb63b9c227414f6eb3a0adfddca591a8ce1e9b60661bb08969b87e340b" +dependencies = [ + "object", +] + [[package]] name = "arbitrary" version = "1.4.2" @@ -484,6 +493,27 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "arrow" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + [[package]] name = "arrow-arith" version = "57.3.0" @@ -509,6 +539,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", + "chrono-tz", "half", "hashbrown 0.16.1", "num-complex", @@ -543,12 +574,28 @@ dependencies = [ "atoi", "base64 0.22.1", "chrono", + "comfy-table", "half", "lexical-core", "num-traits", "ryu", ] +[[package]] +name = "arrow-csv" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" +dependencies = [ + "arrow-array", + "arrow-cast", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "regex", +] + [[package]] name = "arrow-data" version = "57.3.0" @@ -613,11 +660,29 @@ dependencies = [ "arrow-select", ] +[[package]] +name = "arrow-row" +version = "57.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + [[package]] name = "arrow-schema" version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" +dependencies = [ + "bitflags 2.10.0", + "serde", + "serde_core", +] [[package]] name = "arrow-select" @@ -978,6 +1043,48 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "aws-config" +version = "1.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.4.0", + "sha1", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" version = "1.15.4" @@ -1000,6 +1107,325 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-runtime" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "bytes-utils", + "fastrand", + "http 1.4.0", + "http-body 1.0.1", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-dynamodb" +version = "1.108.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aec313992e654999e9106edb2ed5287e73495b05385b4baae9bbed1c999db88" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64a6eded248c6b453966e915d32aeddb48ea63ad17932682774eb026fbef5b1" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.98.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db96d720d3c622fcbe08bae1c4b04a72ce6257d8b0584cb5418da00ae20a344f" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.100.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fafbdda43b93f57f699c5dfe8328db590b967b8a820a13ccdd6687355dfcc7ca" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.4.0", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.63.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1ab2dc1c2c3749ead27180d333c42f11be8b0e934058fb4b2258ee8dbe5231" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a2f165a7feee6f263028b899d0a181987f4fa7179a6411a32a439fba7c5f769" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.4.13", + "http 1.4.0", + "hyper", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.62.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06c2315d173edbf1920da8ba3a7189695827002e4c0fc961973ab1c54abca9c" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a56d79744fb3edb5d722ef79d86081e121d3b9422cb209eb03aea6aa4f21ebd" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.4.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b1117b3b2bbe166d11199b540ceed0d0f7676e36e7b962b5a437a9971eac75" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.8.8" @@ -1012,7 +1438,7 @@ dependencies = [ "form_urlencoded", "futures-util", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "hyper", "hyper-util", @@ -1043,7 +1469,7 @@ dependencies = [ "bytes", "futures-core", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -1075,7 +1501,7 @@ dependencies = [ "either", "fs-err", "http 1.4.0", - "http-body", + "http-body 1.0.1", "hyper", "hyper-util", "pin-project-lite", @@ -1594,6 +2020,16 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytestring" version = "1.5.0" @@ -1761,6 +2197,16 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf", +] + [[package]] name = "cipher" version = "0.4.4" @@ -2238,6 +2684,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db05ffb6856bf0ecdf6367558a76a0e8a77b1713044eb92845c692100ed50190" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -2486,6 +2941,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "ctor" version = "0.6.3" @@ -2785,47 +3261,225 @@ dependencies = [ ] [[package]] -name = "data-encoding" -version = "2.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" - -[[package]] -name = "data-url" -version = "0.3.2" +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "data-url" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1e0bca6c3637f992fc1cc7cbc52a78c1ef6db076dbf1059c4323d6a2048376" + +[[package]] +name = "dbus" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b3aa68d7e7abee336255bd7248ea965cc393f3e70411135a6f6a4b651345d4" +dependencies = [ + "libc", + "libdbus-sys", + "windows-sys 0.59.0", +] + +[[package]] +name = "dbus-secret-service" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "708b509edf7889e53d7efb0ffadd994cc6c2345ccb62f55cfd6b0682165e4fa6" +dependencies = [ + "dbus", + "openssl", + "zeroize", +] + +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "serde", + "uuid", +] + +[[package]] +name = "delta_kernel" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06f7fc164b1557731fcc68a198e813811a000efade0f112d4f0a002e65042b83" +dependencies = [ + "arrow", + "bytes", + "chrono", + "comfy-table", + "crc", + "delta_kernel_derive", + "futures", + "indexmap 2.13.0", + "itertools 0.14.0", + "object_store", + "parquet", + "reqwest", + "roaring", + "rustc_version", + "serde", + "serde_json", + "strum", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "uuid", + "z85", +] + +[[package]] +name = "delta_kernel_derive" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86815a2c475835751ffa9b8d9ac8ed86cf86294304c42bedd1103d54f25ecbfe" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.115", +] + +[[package]] +name = "deltalake" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5ace194fd6a5db14d4b4973c5780cf4569650716594ffd25297343be2e7cb0c" +dependencies = [ + "ctor", + "delta_kernel", + "deltalake-aws", + "deltalake-azure", + "deltalake-core", + "deltalake-gcp", +] + +[[package]] +name = "deltalake-aws" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b60353287c8dc49bc21caa77c62e6eca4141bdcaf967365553dc62b518c7d2f1" +dependencies = [ + "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "aws-sdk-sts", + "aws-smithy-runtime-api", + "backon", + "bytes", + "chrono", + "deltalake-core", + "futures", + "object_store", + "regex", + "thiserror 2.0.18", + "tokio", + "tracing", + "typed-builder 0.23.2", + "url", + "uuid", +] + +[[package]] +name = "deltalake-azure" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be1e0bca6c3637f992fc1cc7cbc52a78c1ef6db076dbf1059c4323d6a2048376" +checksum = "b8d28cb05f3254ddbcef665ee900f36bb6a34728d4ececb5f177cfdf2383f142" +dependencies = [ + "bytes", + "deltalake-core", + "object_store", + "thiserror 2.0.18", + "tokio", + "url", +] [[package]] -name = "dbus" -version = "0.9.10" +name = "deltalake-core" +version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b3aa68d7e7abee336255bd7248ea965cc393f3e70411135a6f6a4b651345d4" +checksum = "5b098d0ce09726f10a08b102c885a501ee18f06ea4aca864570508a9d5b620d1" dependencies = [ - "libc", - "libdbus-sys", - "windows-sys 0.59.0", + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-trait", + "bytes", + "cfg-if", + "chrono", + "dashmap", + "delta_kernel", + "deltalake-derive", + "dirs", + "either", + "futures", + "humantime", + "indexmap 2.13.0", + "itertools 0.14.0", + "num_cpus", + "object_store", + "parking_lot", + "parquet", + "percent-encoding", + "percent-encoding-rfc3986", + "pin-project-lite", + "rand 0.8.5", + "regex", + "serde", + "serde_json", + "sqlparser", + "strum", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "uuid", + "validator", ] [[package]] -name = "dbus-secret-service" -version = "4.1.0" +name = "deltalake-derive" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708b509edf7889e53d7efb0ffadd994cc6c2345ccb62f55cfd6b0682165e4fa6" +checksum = "3963d9fe965af7b1dea433271389e1e39c6a97ffdbc2e81d808f5b329e4577b3" dependencies = [ - "dbus", - "openssl", - "zeroize", + "convert_case 0.9.0", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.115", ] [[package]] -name = "debugid" -version = "0.8.0" +name = "deltalake-gcp" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +checksum = "6fc0da3f4db3e508d180650b0f802d63d494aafc2cec0f5031e85ef4f93dd78e" dependencies = [ - "serde", - "uuid", + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", ] [[package]] @@ -4593,6 +5247,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -4612,7 +5277,7 @@ dependencies = [ "bytes", "futures-core", "http 1.4.0", - "http-body", + "http-body 1.0.1", "pin-project-lite", ] @@ -4692,7 +5357,7 @@ dependencies = [ "futures-core", "h2 0.4.13", "http 1.4.0", - "http-body", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -4761,7 +5426,7 @@ dependencies = [ "futures-channel", "futures-util", "http 1.4.0", - "http-body", + "http-body 1.0.1", "hyper", "ipnet", "libc", @@ -5266,6 +5931,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_delta_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "dashmap", + "deltalake", + "iggy_connector_sdk", + "once_cell", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", + "url", +] + [[package]] name = "iggy_connector_elasticsearch_sink" version = "0.3.1-edge.1" @@ -5629,6 +6312,7 @@ dependencies = [ "configs", "configs_derive", "ctor", + "deltalake", "figment", "futures", "harness_derive", @@ -6854,6 +7538,53 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + +[[package]] +name = "object_store" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "form_urlencoded", + "futures", + "http 1.4.0", + "http-body-util", + "httparse", + "humantime", + "hyper", + "itertools 0.14.0", + "md-5", + "parking_lot", + "percent-encoding", + "quick-xml 0.38.4", + "rand 0.9.2", + "reqwest", + "ring", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "walkdir", + "wasm-bindgen-futures", + "web-time", +] + [[package]] name = "octocrab" version = "0.49.5" @@ -6872,7 +7603,7 @@ dependencies = [ "futures-util", "getrandom 0.2.17", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "hyper", "hyper-rustls", @@ -6937,7 +7668,7 @@ dependencies = [ "futures", "getrandom 0.2.17", "http 1.4.0", - "http-body", + "http-body 1.0.1", "jiff", "log", "md-5", @@ -7240,6 +7971,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", + "object_store", "paste", "seq-macro", "simdutf8", @@ -7403,6 +8135,12 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "percent-encoding-rfc3986" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3637c05577168127568a64e9dc5a6887da720efef07b3d9472d45f63ab191166" + [[package]] name = "pest" version = "2.8.6" @@ -7446,6 +8184,24 @@ dependencies = [ "sha2", ] +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "pico-args" version = "0.5.0" @@ -7724,6 +8480,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.115", +] + [[package]] name = "proc-macro-rules" version = "0.4.0" @@ -7882,6 +8660,16 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "psm" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3852766467df634d74f0b2d7819bf8dc483a0eb2e3b0f50f756f9cfe8b0d18d8" +dependencies = [ + "ar_archive_writer", + "cc", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -8223,6 +9011,26 @@ dependencies = [ "yasna", ] +[[package]] +name = "recursive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0786a43debb760f491b1bc0269fe5e84155353c67482b9e60d0cfb596054b43e" +dependencies = [ + "recursive-proc-macro-impl", + "stacker", +] + +[[package]] +name = "recursive-proc-macro-impl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" +dependencies = [ + "quote", + "syn 2.0.115", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -8356,8 +9164,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.4.13", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "hyper", "hyper-rustls", @@ -8368,6 +9177,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", @@ -8546,7 +9356,7 @@ dependencies = [ "chrono", "futures", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "pastey 0.2.1", "pin-project-lite", @@ -9642,6 +10452,16 @@ dependencies = [ "der", ] +[[package]] +name = "sqlparser" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4591acadbcf52f0af60eafbb2c003232b2b4cd8de5f0e9437cb8b1b59046cc0f" +dependencies = [ + "log", + "recursive", +] + [[package]] name = "sqlx" version = "0.8.6" @@ -9848,7 +10668,7 @@ checksum = "eb4dc4d33c68ec1f27d386b5610a351922656e1fdf5c05bbaad930cd1519479a" dependencies = [ "bytes", "futures-util", - "http-body", + "http-body 1.0.1", "http-body-util", "pin-project-lite", ] @@ -9859,6 +10679,19 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "stacker" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d74a23609d509411d10e2176dc2a4346e3b4aea2e7b1869f19fdedbc71c013" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "windows-sys 0.59.0", +] + [[package]] name = "static-toml" version = "1.3.0" @@ -10631,7 +11464,7 @@ dependencies = [ "bytes", "h2 0.4.13", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "hyper", "hyper-timeout", @@ -10703,7 +11536,7 @@ dependencies = [ "futures-core", "futures-util", "http 1.4.0", - "http-body", + "http-body 1.0.1", "http-body-util", "iri-string", "pin-project-lite", @@ -11209,6 +12042,36 @@ version = "0.15.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" +[[package]] +name = "validator" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b4a29d8709210980a09379f27ee31549b73292c87ab9899beee1c0d3be6303" +dependencies = [ + "idna", + "once_cell", + "regex", + "serde", + "serde_derive", + "serde_json", + "url", + "validator_derive", +] + +[[package]] +name = "validator_derive" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac855a2ce6f843beb229757e6e570a42e837bcb15e5f449dd48d5747d41bf77" +dependencies = [ + "darling 0.20.11", + "once_cell", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.115", +] + [[package]] name = "valuable" version = "0.1.1" @@ -12246,6 +13109,12 @@ dependencies = [ "rustix 1.1.3", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "xmlwriter" version = "0.1.0" @@ -12366,6 +13235,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "z85" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6e61e59a957b7ccee15d2049f86e8bfd6f66968fcd88f018950662d9b86e675" + [[package]] name = "zerocopy" version = "0.8.39" diff --git a/Cargo.toml b/Cargo.toml index d2bbfe5670..b2a0ed56ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "core/configs_derive", "core/connectors/runtime", "core/connectors/sdk", + "core/connectors/sinks/delta_sink", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/postgres_sink", @@ -124,6 +125,7 @@ cyper = { version = "0.8.0", features = ["rustls"], default-features = false } cyper-axum = { version = "0.8.0" } darling = "0.23" dashmap = "6.1.0" +deltalake = { version = "0.30.2", features = ["azure", "gcs", "s3"] } derive-new = "0.7.0" derive_builder = "0.20.2" derive_more = { version = "2.1.1", features = ["full"] } diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index e3f450a7e9..606ffb98b3 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -32,6 +32,7 @@ anstyle-query: 1.1.5, "Apache-2.0 OR MIT", anstyle-wincon: 3.0.11, "Apache-2.0 OR MIT", anyhow: 1.0.101, "Apache-2.0 OR MIT", apache-avro: 0.21.0, "Apache-2.0", +ar_archive_writer: 0.5.1, "Apache-2.0 WITH LLVM-exception", arbitrary: 1.4.2, "Apache-2.0 OR MIT", arc-swap: 1.8.1, "Apache-2.0 OR MIT", arg_enum_proc_macro: 0.3.4, "MIT", @@ -39,14 +40,17 @@ argon2: 0.5.3, "Apache-2.0 OR MIT", array-init: 2.1.0, "Apache-2.0 OR MIT", arrayref: 0.3.9, "BSD-2-Clause", arrayvec: 0.7.6, "Apache-2.0 OR MIT", +arrow: 57.3.0, "Apache-2.0", arrow-arith: 57.3.0, "Apache-2.0", arrow-array: 57.3.0, "Apache-2.0", arrow-buffer: 57.3.0, "Apache-2.0", arrow-cast: 57.3.0, "Apache-2.0", +arrow-csv: 57.3.0, "Apache-2.0", arrow-data: 57.3.0, "Apache-2.0", arrow-ipc: 57.3.0, "Apache-2.0", arrow-json: 57.3.0, "Apache-2.0", arrow-ord: 57.3.0, "Apache-2.0", +arrow-row: 57.3.0, "Apache-2.0", arrow-schema: 57.3.0, "Apache-2.0", arrow-select: 57.3.0, "Apache-2.0", arrow-string: 57.3.0, "Apache-2.0", @@ -79,8 +83,27 @@ autotools: 0.2.7, "MIT", av-scenechange: 0.14.1, "MIT", av1-grain: 0.2.5, "BSD-2-Clause", avif-serialize: 0.8.8, "BSD-3-Clause", +aws-config: 1.8.15, "Apache-2.0", +aws-credential-types: 1.2.14, "Apache-2.0", aws-lc-rs: 1.15.4, "(Apache-2.0 OR ISC) AND ISC", aws-lc-sys: 0.37.1, "(Apache-2.0 OR ISC) AND ISC AND OpenSSL", +aws-runtime: 1.7.2, "Apache-2.0", +aws-sdk-dynamodb: 1.108.0, "Apache-2.0", +aws-sdk-sso: 1.96.0, "Apache-2.0", +aws-sdk-ssooidc: 1.98.0, "Apache-2.0", +aws-sdk-sts: 1.100.0, "Apache-2.0", +aws-sigv4: 1.4.2, "Apache-2.0", +aws-smithy-async: 1.2.14, "Apache-2.0", +aws-smithy-http: 0.63.6, "Apache-2.0", +aws-smithy-http-client: 1.1.12, "Apache-2.0", +aws-smithy-json: 0.62.5, "Apache-2.0", +aws-smithy-observability: 0.2.6, "Apache-2.0", +aws-smithy-query: 0.60.15, "Apache-2.0", +aws-smithy-runtime: 1.10.3, "Apache-2.0", +aws-smithy-runtime-api: 1.11.6, "Apache-2.0", +aws-smithy-types: 1.4.6, "Apache-2.0", +aws-smithy-xml: 0.60.15, "Apache-2.0", +aws-types: 1.3.14, "Apache-2.0", axum: 0.8.8, "MIT", axum-core: 0.5.6, "MIT", axum-macros: 0.5.0, "MIT", @@ -133,6 +156,7 @@ bytemuck: 1.25.0, "Apache-2.0 OR MIT OR Zlib", byteorder: 1.5.0, "MIT OR Unlicense", byteorder-lite: 0.1.0, "MIT OR Unlicense", bytes: 1.11.1, "MIT", +bytes-utils: 0.1.4, "Apache-2.0 OR MIT", bytestring: 1.5.0, "Apache-2.0 OR MIT", bzip2: 0.6.1, "Apache-2.0 OR MIT", camino: 1.2.2, "Apache-2.0 OR MIT", @@ -149,6 +173,7 @@ chacha20: 0.10.0, "Apache-2.0 OR MIT", charming: 0.6.0, "Apache-2.0 OR MIT", charming_macros: 0.1.0, "Apache-2.0 OR MIT", chrono: 0.4.43, "Apache-2.0 OR MIT", +chrono-tz: 0.10.4, "Apache-2.0 OR MIT", cipher: 0.4.4, "Apache-2.0 OR MIT", clang-sys: 1.8.1, "Apache-2.0", clap: 4.5.58, "Apache-2.0 OR MIT", @@ -189,6 +214,7 @@ const-random: 0.1.18, "Apache-2.0 OR MIT", const-random-macro: 0.1.16, "Apache-2.0 OR MIT", constant_time_eq: 0.4.2, "Apache-2.0 OR CC0-1.0 OR MIT-0", convert_case: 0.6.0, "MIT", +convert_case: 0.9.0, "MIT", convert_case: 0.10.0, "MIT", cooked-waker: 5.0.0, "MPL-2.0", cookie: 0.16.2, "Apache-2.0 OR MIT", @@ -216,6 +242,8 @@ crossterm_winapi: 0.9.1, "MIT", crunchy: 0.2.4, "MIT", crypto-bigint: 0.5.5, "Apache-2.0 OR MIT", crypto-common: 0.1.7, "Apache-2.0 OR MIT", +csv: 1.4.0, "MIT OR Unlicense", +csv-core: 0.1.13, "MIT OR Unlicense", ctor: 0.6.3, "Apache-2.0 OR MIT", ctor-proc-macro: 0.0.7, "Apache-2.0 OR MIT", ctr: 0.9.2, "Apache-2.0 OR MIT", @@ -243,6 +271,14 @@ data-url: 0.3.2, "Apache-2.0 OR MIT", dbus: 0.9.10, "Apache-2.0 OR MIT", dbus-secret-service: 4.1.0, "Apache-2.0 OR MIT", debugid: 0.8.0, "Apache-2.0", +delta_kernel: 0.19.2, "Apache-2.0", +delta_kernel_derive: 0.19.2, "Apache-2.0", +deltalake: 0.30.2, "Apache-2.0", +deltalake-aws: 0.13.1, "Apache-2.0", +deltalake-azure: 0.13.0, "Apache-2.0", +deltalake-core: 0.30.2, "Apache-2.0", +deltalake-derive: 0.30.0, "Apache-2.0", +deltalake-gcp: 0.14.0, "Apache-2.0", deno_core: 0.351.0, "MIT", deno_core_icudata: 0.74.0, "MIT", deno_error: 0.6.1, "MIT", @@ -408,6 +444,7 @@ home: 0.5.12, "Apache-2.0 OR MIT", hostname: 0.4.2, "MIT", http: 0.2.12, "Apache-2.0 OR MIT", http: 1.4.0, "Apache-2.0 OR MIT", +http-body: 0.4.6, "MIT", http-body: 1.0.1, "MIT", http-body-util: 0.1.3, "MIT", http-range: 0.1.5, "MIT", @@ -447,6 +484,7 @@ iggy-connectors: 0.3.1-edge.1, "Apache-2.0", iggy-mcp: 0.3.1-edge.1, "Apache-2.0", iggy_binary_protocol: 0.9.1-edge.1, "Apache-2.0", iggy_common: 0.9.1-edge.1, "Apache-2.0", +iggy_connector_delta_sink: 0.1.0, "Apache-2.0", iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0", iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0", iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0", @@ -599,6 +637,8 @@ objc2: 0.6.3, "MIT", objc2-core-foundation: 0.3.1, "Apache-2.0 OR MIT OR Zlib", objc2-encode: 4.1.0, "MIT", objc2-io-kit: 0.3.1, "Apache-2.0 OR MIT OR Zlib", +object: 0.37.3, "Apache-2.0 OR MIT", +object_store: 0.12.5, "Apache-2.0 OR MIT", octocrab: 0.49.5, "Apache-2.0 OR MIT", oid-registry: 0.8.1, "Apache-2.0 OR MIT", once_cell: 1.21.3, "Apache-2.0 OR MIT", @@ -646,10 +686,13 @@ peg-runtime: 0.6.3, "MIT", pem: 3.0.6, "MIT", pem-rfc7468: 0.7.0, "Apache-2.0 OR MIT", percent-encoding: 2.3.2, "Apache-2.0 OR MIT", +percent-encoding-rfc3986: 0.1.3, "Apache-2.0 OR MIT", pest: 2.8.6, "Apache-2.0 OR MIT", pest_derive: 2.8.6, "Apache-2.0 OR MIT", pest_generator: 2.8.6, "Apache-2.0 OR MIT", pest_meta: 2.8.6, "Apache-2.0 OR MIT", +phf: 0.12.1, "MIT", +phf_shared: 0.12.1, "MIT", pico-args: 0.5.0, "MIT", pin-project: 1.1.10, "Apache-2.0 OR MIT", pin-project-internal: 1.1.10, "Apache-2.0 OR MIT", @@ -679,6 +722,8 @@ proc-macro-crate: 1.3.1, "Apache-2.0 OR MIT", proc-macro-crate: 3.4.0, "Apache-2.0 OR MIT", proc-macro-error: 1.0.4, "Apache-2.0 OR MIT", proc-macro-error-attr: 1.0.4, "Apache-2.0 OR MIT", +proc-macro-error-attr2: 2.0.0, "Apache-2.0 OR MIT", +proc-macro-error2: 2.0.1, "Apache-2.0 OR MIT", proc-macro-rules: 0.4.0, "Apache-2.0 OR MIT", proc-macro-rules-macros: 0.4.0, "Apache-2.0 OR MIT", proc-macro2: 1.0.106, "Apache-2.0 OR MIT", @@ -693,6 +738,7 @@ prost-reflect: 0.16.3, "Apache-2.0 OR MIT", prost-types: 0.14.3, "Apache-2.0", protox: 0.9.1, "Apache-2.0 OR MIT", protox-parse: 0.9.0, "Apache-2.0 OR MIT", +psm: 0.1.30, "Apache-2.0 OR MIT", ptr_meta: 0.1.4, "MIT", ptr_meta_derive: 0.1.4, "MIT", pxfm: 0.1.27, "Apache-2.0 OR BSD-3-Clause", @@ -723,6 +769,8 @@ raw-cpuid: 11.6.0, "MIT", rayon: 1.11.0, "Apache-2.0 OR MIT", rayon-core: 1.13.0, "Apache-2.0 OR MIT", rcgen: 0.14.7, "Apache-2.0 OR MIT", +recursive: 0.1.1, "MIT", +recursive-proc-macro-impl: 0.1.1, "MIT", redox_syscall: 0.5.18, "MIT", redox_syscall: 0.7.1, "MIT", redox_users: 0.5.2, "MIT", @@ -843,6 +891,7 @@ sourcemap: 9.3.2, "BSD-3-Clause", spin: 0.9.8, "MIT", spinning_top: 0.3.0, "Apache-2.0 OR MIT", spki: 0.7.3, "Apache-2.0 OR MIT", +sqlparser: 0.59.0, "Apache-2.0", sqlx: 0.8.6, "Apache-2.0 OR MIT", sqlx-core: 0.8.6, "Apache-2.0 OR MIT", sqlx-macros: 0.8.6, "Apache-2.0 OR MIT", @@ -852,6 +901,7 @@ sqlx-postgres: 0.8.6, "Apache-2.0 OR MIT", sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT", sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT", +stacker: 0.1.23, "Apache-2.0 OR MIT", static-toml: 1.3.0, "MIT", static_assertions: 1.1.0, "Apache-2.0 OR MIT", strict-num: 0.1.1, "MIT", @@ -982,6 +1032,8 @@ uuid: 1.20.0, "Apache-2.0 OR MIT", v8: 137.3.0, "MIT", v_frame: 0.3.9, "BSD-2-Clause", v_htmlescape: 0.15.8, "Apache-2.0 OR MIT", +validator: 0.19.0, "MIT", +validator_derive: 0.19.0, "MIT", valuable: 0.1.1, "MIT", value-trait: 0.12.1, "Apache-2.0 OR MIT", vcpkg: 0.2.15, "Apache-2.0 OR MIT", @@ -1095,6 +1147,7 @@ writeable: 0.6.2, "Unicode-3.0", wyz: 0.5.1, "MIT", x509-parser: 0.18.1, "Apache-2.0 OR MIT", xattr: 1.6.1, "Apache-2.0 OR MIT", +xmlparser: 0.13.6, "Apache-2.0 OR MIT", xmlwriter: 0.1.0, "MIT", y4m: 0.8.0, "MIT", yansi: 1.0.1, "Apache-2.0 OR MIT", @@ -1105,6 +1158,7 @@ yew-router: 0.19.0, "Apache-2.0 OR MIT", yew-router-macro: 0.19.0, "Apache-2.0 OR MIT", yoke: 0.8.1, "Unicode-3.0", yoke-derive: 0.8.1, "Unicode-3.0", +z85: 3.0.7, "Apache-2.0 OR MIT", zerocopy: 0.8.39, "Apache-2.0 OR BSD-2-Clause OR MIT", zerocopy-derive: 0.8.39, "Apache-2.0 OR BSD-2-Clause OR MIT", zerofrom: 0.1.6, "Unicode-3.0", diff --git a/core/connectors/runtime/example_config/connectors/delta_sink.toml b/core/connectors/runtime/example_config/connectors/delta_sink.toml new file mode 100644 index 0000000000..c750596b1a --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/delta_sink.toml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "delta" +enabled = true +version = 0 +name = "Delta Lake sink" +path = "target/release/libiggy_connector_delta_sink" +verbose = true + +[[streams]] +stream = "qw" +topics = ["records"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "delta_sink_connector" + +[plugin_config] +table_uri = "file:///tmp/iggy_testing/iggy_delta_table" diff --git a/core/connectors/sinks/delta_sink/Cargo.toml b/core/connectors/sinks/delta_sink/Cargo.toml new file mode 100644 index 0000000000..11cf15902a --- /dev/null +++ b/core/connectors/sinks/delta_sink/Cargo.toml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_delta_sink" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +deltalake = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +url = "2" diff --git a/core/connectors/sinks/delta_sink/README.md b/core/connectors/sinks/delta_sink/README.md new file mode 100644 index 0000000000..ae75c2bc79 --- /dev/null +++ b/core/connectors/sinks/delta_sink/README.md @@ -0,0 +1,94 @@ +# Delta Lake Sink Connector + +The Delta Lake Sink Connector allows you to consume messages from Iggy topics and store them in Delta Lake tables. + +## Features + +- **Support for local filesystem, AWS S3, Azure Blob Storage, and Google Cloud Storage** +- **Intelligent type coercion** to match Delta table schemas (e.g. ISO 8601 strings to timestamps) +- **Transactional writes** with atomic flush-and-commit operations + +## Configuration example + +### Local filesystem + +```toml +[plugin_config] +table_uri = "file:///tmp/iggy_delta_table" +``` + +### AWS S3 + +```toml +[plugin_config] +table_uri = "s3://my-bucket/delta-tables/users" +storage_backend_type = "s3" +aws_s3_access_key = "your-access-key" +aws_s3_secret_key = "your-secret-key" +aws_s3_region = "us-east-1" +aws_s3_endpoint_url = "https://s3.amazonaws.com" +aws_s3_allow_http = false +``` + +### Azure Blob Storage + +```toml +[plugin_config] +table_uri = "az://my-container/delta-tables/users" +storage_backend_type = "azure" +azure_storage_account_name = "mystorageaccount" +azure_storage_account_key = "account-key" +azure_storage_sas_token = "sas-token" +azure_container_name = "my-container" +``` + +### Google Cloud Storage + +```toml +[plugin_config] +table_uri = "gs://my-bucket/delta-tables/users" +storage_backend_type = "gcs" +gcs_service_account_key = '{"type": "service_account", "project_id": "...", ...}' +gcs_bucket = "my-bucket" +``` + +## Configuration Options + +### Core + +- **table_uri** (required): Path or URI to the Delta table. Supported schemes: `file://`, `s3://`, `az://`, `gs://`. +- **storage_backend_type** (optional): The cloud storage backend to use. One of `"s3"`, `"azure"`, or `"gcs"`. Omit for local filesystem tables. + +### AWS S3 + +Required when `storage_backend_type = "s3"`. + +- **aws_s3_access_key**: AWS access key ID. +- **aws_s3_secret_key**: AWS secret access key. +- **aws_s3_region**: AWS region (e.g. `us-east-1`). +- **aws_s3_endpoint_url**: S3 endpoint URL. Useful for S3-compatible services like MinIO. +- **aws_s3_allow_http**: Set to `true` to allow HTTP connections (for local development). + +### Azure Blob Storage + +Required when `storage_backend_type = "azure"`. + +- **azure_storage_account_name**: Azure storage account name. +- **azure_storage_account_key**: Azure storage account key. +- **azure_storage_sas_token**: Shared Access Signature token. +- **azure_container_name**: Azure container name. + +### Google Cloud Storage + +Required when `storage_backend_type = "gcs"`. + +- **gcs_service_account_key**: GCS service account JSON key (as a string). +- **gcs_bucket**: GCS bucket name. + +## Type Coercion + +The connector automatically coerces JSON values to match the Delta table schema: + +- **Timestamp fields**: ISO 8601 / RFC 3339 formatted strings (e.g. `"2021-11-11T22:11:58Z"`) are converted to microsecond timestamps. Numeric timestamps pass through unchanged. +- **String fields**: Non-string values (numbers, booleans, objects, arrays) are converted to their string representation. +- **Nested fields**: Coercions are applied recursively to nested structs and arrays. diff --git a/core/connectors/sinks/delta_sink/config.toml b/core/connectors/sinks/delta_sink/config.toml new file mode 100644 index 0000000000..c2f6781142 --- /dev/null +++ b/core/connectors/sinks/delta_sink/config.toml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "delta" +enabled = true +version = 0 +name = "Delta Lake sink" +path = "../../target/release/libiggy_connector_delta_sink" +verbose = true + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "delta_sink_connector" + +# table_uri: Path or URI of the Delta table to write to. +# - Local: "file:///path/to/delta_table" +# - S3: "s3://bucket/path/to/delta_table" +# - Azure: "az://container/path/to/delta_table" +# - GCS: "gs://bucket/path/to/delta_table" + +[plugin_config] +table_uri = "file:///tmp/iggy_delta_table" + +# storage_backend_type: "s3", "azure", or "gcs". Omit for local filesystem. + +# S3 options: +# storage_backend_type = "s3" +# aws_s3_access_key = "your-access-key" +# aws_s3_secret_key = "your-secret-key" +# aws_s3_region = "us-east-1" +# aws_s3_endpoint_url = "http://localhost:9000" +# aws_s3_allow_http = true + +# Azure options: +# storage_backend_type = "azure" +# azure_storage_account_name = "your-account" +# azure_storage_account_key = "your-key" +# azure_storage_sas_token = "your-sas-token" +# azure_container_name = "your-container" + +# GCS options: +# storage_backend_type = "gcs" +# gcs_service_account_key = '{"type": "service_account", ...}' +# gcs_bucket = "your-bucket" diff --git a/core/connectors/sinks/delta_sink/src/coercions.rs b/core/connectors/sinks/delta_sink/src/coercions.rs new file mode 100644 index 0000000000..f5fd637ac1 --- /dev/null +++ b/core/connectors/sinks/delta_sink/src/coercions.rs @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, + } +} + +/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. +pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { + if let Some(context) = value.as_object_mut() { + for (field_name, coercion) in coercion_tree.root.iter() { + if let Some(value) = context.get_mut(field_name) { + apply_coercion(value, coercion); + } + } + } +} + +fn apply_coercion(value: &mut Value, node: &CoercionNode) { + match node { + CoercionNode::Coercion(Coercion::ToString) => { + if !value.is_string() { + *value = Value::String(value.to_string()); + } + } + CoercionNode::Coercion(Coercion::ToTimestamp) => { + if let Some(as_str) = value.as_str() + && let Some(parsed) = string_to_timestamp(as_str) + { + *value = parsed + } + } + CoercionNode::Tree(tree) => { + for (name, node) in tree.root.iter() { + let fields = value.as_object_mut(); + if let Some(fields) = fields + && let Some(value) = fields.get_mut(name) + { + apply_coercion(value, node); + } + } + } + CoercionNode::ArrayPrimitive(coercion) => { + let values = value.as_array_mut(); + if let Some(values) = values { + let node = CoercionNode::Coercion(coercion.clone()); + for value in values { + apply_coercion(value, &node); + } + } + } + CoercionNode::ArrayTree(tree) => { + let values = value.as_array_mut(); + if let Some(values) = values { + let node = CoercionNode::Tree(tree.clone()); + for value in values { + apply_coercion(value, &node); + } + } + } + } +} + +fn string_to_timestamp(string: &str) -> Option { + let parsed = DateTime::from_str(string); + if let Err(e) = parsed { + tracing::error!( + "Error coercing timestamp from string. String: {}. Error: {}", + string, + e + ) + } + parsed + .ok() + .map(|dt: DateTime| Value::Number(dt.timestamp_micros().into())) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::sync::LazyLock; + + #[test] + fn test_string_to_timestamp_valid() { + let result = string_to_timestamp("2010-01-01T22:11:58Z"); + assert!(result.is_some()); + // exceeds nanos size + let result = string_to_timestamp("2400-01-01T22:11:58Z"); + assert!(result.is_some()); + } + + #[test] + fn test_string_to_timestamp_invalid() { + assert!(string_to_timestamp("not a date").is_none()); + assert!(string_to_timestamp("").is_none()); + assert!(string_to_timestamp("2021-13-01T00:00:00Z").is_none()); + assert!(string_to_timestamp("1636668718000000").is_none()); + } + + static SCHEMA: LazyLock = LazyLock::new(|| { + json!({ + "type": "struct", + "fields": [ + { "name": "level1_string", "type": "string", "nullable": true, "metadata": {} }, + { "name": "level1_integer", "type": "integer", "nullable": true, "metadata": {} }, + { "name": "level1_timestamp", "type": "timestamp", "nullable": true, "metadata": {} }, + { + "name": "level2", + "type": { + "type": "struct", + "fields": [ + { + "name": "level2_string", + "type": "string", + "nullable": true, "metadata": {} + }, + { + "name": "level2_int", + "type": "integer", + "nullable": true, "metadata": {} + }, + { + "name": "level2_timestamp", + "type": "timestamp", + "nullable": true, "metadata": {} + }] + }, + "nullable": true, "metadata": {} + }, + { + "name": "array_timestamp", + "type": { + "type": "array", + "containsNull": true, + "elementType": "timestamp", + }, + "nullable": true, "metadata": {}, + }, + { + "name": "array_string", + "type": { + "type": "array", + "containsNull": true, + "elementType": "string", + }, + "nullable": true, "metadata": {}, + }, + { + "name": "array_int", + "type": { + "type": "array", + "containsNull": true, + "elementType": "integer", + }, + "nullable": true, "metadata": {}, + }, + { + "name": "array_struct", + "type": { + "type": "array", + "containsNull": true, + "elementType": { + "type": "struct", + "fields": [ + { + "name": "level2_string", + "type": "string", + "nullable": true, "metadata": {} + }, + { + "name": "level2_int", + "type": "integer", + "nullable": true, "metadata": {} + }, + { + "name": "level2_timestamp", + "type": "timestamp", + "nullable": true, "metadata": {} + }, + ], + }, + }, + "nullable": true, "metadata": {}, + } + ] + }) + }); + + #[test] + fn test_coercion_tree() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + + let tree = create_coercion_tree(&delta_schema); + + let mut top_level_keys: Vec<&String> = tree.root.keys().collect(); + top_level_keys.sort(); + + let level2 = tree.root.get("level2"); + let level2_root = match level2 { + Some(CoercionNode::Tree(tree)) => tree.root.clone(), + _ => unreachable!(""), + }; + let mut level2_keys: Vec<&String> = level2_root.keys().collect(); + level2_keys.sort(); + + let array_struct = tree.root.get("array_struct"); + let array_struct_root = match array_struct { + Some(CoercionNode::ArrayTree(tree)) => tree.root.clone(), + _ => unreachable!(""), + }; + + assert_eq!( + vec![ + "array_string", + "array_struct", + "array_timestamp", + "level1_string", + "level1_timestamp", + "level2" + ], + top_level_keys + ); + + assert_eq!(vec!["level2_string", "level2_timestamp"], level2_keys); + + assert_eq!( + CoercionNode::Coercion(Coercion::ToString), + tree.root.get("level1_string").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::Coercion(Coercion::ToTimestamp), + tree.root.get("level1_timestamp").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::Coercion(Coercion::ToString), + level2_root.get("level2_string").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::Coercion(Coercion::ToTimestamp), + level2_root.get("level2_timestamp").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::ArrayPrimitive(Coercion::ToString), + tree.root.get("array_string").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::ArrayPrimitive(Coercion::ToTimestamp), + tree.root.get("array_timestamp").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::Coercion(Coercion::ToString), + array_struct_root.get("level2_string").unwrap().to_owned() + ); + assert_eq!( + CoercionNode::Coercion(Coercion::ToTimestamp), + array_struct_root + .get("level2_timestamp") + .unwrap() + .to_owned() + ); + } + + #[test] + fn test_coercions() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + + let coercion_tree = create_coercion_tree(&delta_schema); + + let mut messages = vec![ + json!({ + "level1_string": "a", + "level1_integer": 0, + // Timestamp passed in as an i64. We won't coerce it, but it will work anyway. + "level1_timestamp": 1636668718000000i64, + "level2": { + "level2_string": { "x": "x", "y": "y" }, + "level2_timestamp": "2021-11-11T22:11:58Z" + }, + "array_timestamp": ["2021-11-17T01:02:03Z", "2021-11-17T02:03:04Z"], + "array_string": ["a", "b", {"a": 1}], + "array_int": [1, 2, 3], + "array_struct": [ + { + "level2_string": r#"{"a":1}"#, + "level2_int": 1, + "level2_timestamp": "2021-11-17T00:00:01Z" + }, + { + "level2_string": { "a": 2 }, + "level2_int": 2, + "level2_timestamp": 1637107202000000i64 + }, + ] + }), + json!({ + "level1_string": { "a": "a", "b": "b"}, + "level1_integer": 42, + // Complies with ISO 8601 and RFC 3339. We WILL coerce it. + "level1_timestamp": "2021-11-11T22:11:58Z" + }), + json!({ + "level1_integer": 99, + }), + json!({ + // Complies with ISO 8601 and RFC 3339. We WILL coerce it. + "level1_timestamp": "2021-11-11T22:11:58+00:00", + }), + json!({ + // RFC 3339 but not ISO 8601. We WILL coerce it. + "level1_timestamp": "2021-11-11T22:11:58-00:00", + }), + json!({ + // ISO 8601 but not RFC 3339. We WON'T coerce it. + "level1_timestamp": "20211111T22115800Z", + }), + json!({ + // This is a Java date style timestamp. We WON'T coerce it. + "level1_timestamp": "2021-11-11 22:11:58", + }), + json!({ + "level1_timestamp": "This definitely is not a timestamp", + }), + json!({ + // This is valid epoch micros, but typed as a string on the way in. We WON'T coerce it. + "level1_timestamp": "1636668718000000", + }), + ]; + + for message in messages.iter_mut() { + coerce(message, &coercion_tree); + } + + let expected = vec![ + json!({ + "level1_string": "a", + "level1_integer": 0, + // Timestamp passed in as an i64. We won't coerce it, but it will work anyway. + "level1_timestamp": 1636668718000000i64, + "level2": { + "level2_string": r#"{"x":"x","y":"y"}"#, + "level2_timestamp": 1636668718000000i64 + }, + "array_timestamp": [1637110923000000i64, 1637114584000000i64], + "array_string": ["a", "b", r#"{"a":1}"#], + "array_int": [1, 2, 3], + "array_struct": [ + { + "level2_string": "{\"a\":1}", + "level2_int": 1, + "level2_timestamp": 1637107201000000i64 + }, + { + "level2_string": r#"{"a":2}"#, + "level2_int": 2, + "level2_timestamp": 1637107202000000i64 + }, + ] + }), + json!({ + "level1_string": r#"{"a":"a","b":"b"}"#, + "level1_integer": 42, + // Complies with ISO 8601 and RFC 3339. We WILL coerce it. + "level1_timestamp": 1636668718000000i64 + }), + json!({ + "level1_integer": 99, + }), + json!({ + // Complies with ISO 8601 and RFC 3339. We WILL coerce it. + "level1_timestamp": 1636668718000000i64 + }), + json!({ + // RFC 3339 but not ISO 8601. We WILL coerce it. + "level1_timestamp": 1636668718000000i64 + }), + json!({ + // ISO 8601 but not RFC 3339. We WON'T coerce it. + "level1_timestamp": "20211111T22115800Z", + }), + json!({ + // This is a Java date style timestamp. We WON'T coerce it. + "level1_timestamp": "2021-11-11 22:11:58", + }), + json!({ + "level1_timestamp": "This definitely is not a timestamp", + }), + json!({ + // This is valid epoch micros, but typed as a string on the way in. We WON'T coerce it. + "level1_timestamp": "1636668718000000", + }), + ]; + + for i in 0..messages.len() { + assert_eq!(messages[i], expected[i]); + } + } + + #[test] + fn test_empty_schema() { + let schema_json = json!({ + "type": "struct", + "fields": [] + }); + let delta_schema: DeltaSchema = serde_json::from_value(schema_json).unwrap(); + let tree = create_coercion_tree(&delta_schema); + assert!(tree.root.is_empty()); + } + + #[test] + fn test_coerce_empty_object() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + let mut value = json!({}); + coerce(&mut value, &tree); + assert_eq!(value, json!({})); + } + + #[test] + fn test_coerce_null_values() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + let mut value = json!({ + "level1_string": null, + "level1_timestamp": null, + "level2": null, + "array_timestamp": null, + "array_struct": null, + }); + let expected = value.clone(); + coerce(&mut value, &tree); + // Null values should pass through without panicking. + // ToString coercion converts null to "null" since it's not a string. + let mut expected = expected; + *expected.get_mut("level1_string").unwrap() = json!("null"); + assert_eq!(value, expected); + } + + #[test] + fn test_coerce_nested_struct_with_missing_fields() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + let mut value = json!({ + "level2": { + "level2_int": 42 + // level2_string and level2_timestamp are missing + } + }); + let expected = value.clone(); + coerce(&mut value, &tree); + assert_eq!(value, expected); + } + + #[test] + fn test_coerce_empty_arrays() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + let mut value = json!({ + "array_timestamp": [], + "array_string": [], + "array_struct": [], + }); + let expected = value.clone(); + coerce(&mut value, &tree); + assert_eq!(value, expected); + } + + #[test] + fn test_coerce_non_object_top_level() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + + // Array at top level + let mut value = json!([1, 2, 3]); + let expected = value.clone(); + coerce(&mut value, &tree); + assert_eq!(value, expected); + + // Primitive at top level + let mut value = json!("hello"); + let expected = value.clone(); + coerce(&mut value, &tree); + assert_eq!(value, expected); + + // Null at top level + let mut value = json!(null); + let expected = value.clone(); + coerce(&mut value, &tree); + assert_eq!(value, expected); + } + + #[test] + fn test_coerce_tostring_various_types() { + let delta_schema: DeltaSchema = serde_json::from_value(SCHEMA.clone()).unwrap(); + let tree = create_coercion_tree(&delta_schema); + + // Boolean + let mut value = json!({ "level1_string": true }); + coerce(&mut value, &tree); + assert_eq!(value["level1_string"], json!("true")); + + // Float + let mut value = json!({ "level1_string": 3.15 }); + coerce(&mut value, &tree); + assert_eq!(value["level1_string"], json!("3.15")); + + // Integer + let mut value = json!({ "level1_string": 42 }); + coerce(&mut value, &tree); + assert_eq!(value["level1_string"], json!("42")); + + // Null gets stringified + let mut value = json!({ "level1_string": null }); + coerce(&mut value, &tree); + assert_eq!(value["level1_string"], json!("null")); + + // Array gets stringified + let mut value = json!({ "level1_string": [1, 2, 3] }); + coerce(&mut value, &tree); + assert_eq!(value["level1_string"], json!("[1,2,3]")); + } +} diff --git a/core/connectors/sinks/delta_sink/src/lib.rs b/core/connectors/sinks/delta_sink/src/lib.rs new file mode 100644 index 0000000000..ba9c79de76 --- /dev/null +++ b/core/connectors/sinks/delta_sink/src/lib.rs @@ -0,0 +1,90 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::DeltaTable; +use deltalake::writer::JsonWriter; +use iggy_connector_sdk::sink_connector; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; + +mod coercions; +mod sink; +mod storage; + +use crate::coercions::CoercionTree; + +sink_connector!(DeltaSink); + +#[derive(Debug)] +pub struct DeltaSink { + id: u32, + config: DeltaSinkConfig, + state: Mutex>, +} + +#[derive(Debug)] +struct SinkState { + table: DeltaTable, + writer: JsonWriter, + coercion_tree: CoercionTree, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeltaSinkConfig { + pub table_uri: String, + #[serde(default)] + pub storage_backend_type: Option, + + // AWS S3 + #[serde(default)] + pub aws_s3_access_key: Option, + #[serde(default)] + pub aws_s3_secret_key: Option, + #[serde(default)] + pub aws_s3_region: Option, + #[serde(default)] + pub aws_s3_endpoint_url: Option, + #[serde(default)] + pub aws_s3_allow_http: Option, + + // Azure Blob Storage + #[serde(default)] + pub azure_storage_account_name: Option, + #[serde(default)] + pub azure_storage_account_key: Option, + #[serde(default)] + pub azure_storage_sas_token: Option, + #[serde(default)] + pub azure_container_name: Option, + + // Google Cloud Storage + #[serde(default)] + pub gcs_service_account_key: Option, + #[serde(default)] + pub gcs_bucket: Option, +} + +impl DeltaSink { + pub fn new(id: u32, config: DeltaSinkConfig) -> Self { + DeltaSink { + id, + config, + state: Mutex::new(None), + } + } +} diff --git a/core/connectors/sinks/delta_sink/src/sink.rs b/core/connectors/sinks/delta_sink/src/sink.rs new file mode 100644 index 0000000000..71f19f6513 --- /dev/null +++ b/core/connectors/sinks/delta_sink/src/sink.rs @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { + error!("Failed to write to Delta writer: {e}"); + Error::Storage(format!("Failed to write to Delta writer: {e}")) + })?; + + // Flush buffers to object store and commit to Delta log + let version = match state.writer.flush_and_commit(&mut state.table).await { + Ok(v) => v, + Err(e) => { + state.writer.reset(); + error!("Failed to flush and commit to Delta table: {e}"); + return Err(Error::Storage(format!("Failed to flush and commit: {e}"))); + } + }; + + debug!( + "Delta sink with ID: {} committed version {}", + self.id, version + ); + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(mut state) = self.state.lock().await.take() + && let Err(e) = state.writer.flush_and_commit(&mut state.table).await + { + error!( + "Delta sink with ID: {} failed to flush on close: {e}", + self.id + ); + return Err(Error::Storage(format!("Failed to flush on close: {e}"))); + } + info!("Delta Lake sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} + +fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> serde_json::Value { + match value { + simd_json::OwnedValue::Static(s) => match s { + simd_json::StaticNode::Null => serde_json::Value::Null, + simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(*b), + simd_json::StaticNode::I64(n) => serde_json::Value::Number((*n).into()), + simd_json::StaticNode::U64(n) => serde_json::Value::Number((*n).into()), + simd_json::StaticNode::F64(n) => serde_json::Number::from_f64(*n) + .map(serde_json::Value::Number) + .unwrap_or(serde_json::Value::Null), + }, + simd_json::OwnedValue::String(s) => serde_json::Value::String(s.to_string()), + simd_json::OwnedValue::Array(arr) => { + serde_json::Value::Array(arr.iter().map(owned_value_to_serde_json).collect()) + } + simd_json::OwnedValue::Object(obj) => { + let map: serde_json::Map = obj + .iter() + .map(|(k, v)| (k.to_string(), owned_value_to_serde_json(v))) + .collect(); + serde_json::Value::Object(map) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use simd_json::{OwnedValue, StaticNode}; + + #[test] + fn test_null() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::Null)), + serde_json::Value::Null + ); + } + + #[test] + fn test_bool() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::Bool(true))), + serde_json::Value::Bool(true) + ); + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::Bool(false))), + serde_json::Value::Bool(false) + ); + } + + #[test] + fn test_i64() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::I64(-42))), + serde_json::json!(-42) + ); + } + + #[test] + fn test_u64() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::U64(100))), + serde_json::json!(100) + ); + } + + #[test] + fn test_f64_finite() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::F64(1.5))), + serde_json::json!(1.5) + ); + } + + #[test] + fn test_f64_nan_becomes_null() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::F64(f64::NAN))), + serde_json::Value::Null + ); + } + + #[test] + fn test_f64_infinity_becomes_null() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::F64(f64::INFINITY))), + serde_json::Value::Null + ); + assert_eq!( + owned_value_to_serde_json(&OwnedValue::Static(StaticNode::F64(f64::NEG_INFINITY))), + serde_json::Value::Null + ); + } + + #[test] + fn test_string() { + assert_eq!( + owned_value_to_serde_json(&OwnedValue::String("hello".into())), + serde_json::Value::String("hello".to_string()) + ); + } + + #[test] + fn test_array() { + let input = OwnedValue::Array(Box::new(vec![ + OwnedValue::Static(StaticNode::Null), + OwnedValue::Static(StaticNode::Bool(true)), + ])); + assert_eq!( + owned_value_to_serde_json(&input), + serde_json::json!([null, true]) + ); + } + + #[test] + fn test_object() { + let mut obj = simd_json::owned::Object::new(); + obj.insert("k".into(), OwnedValue::Static(StaticNode::I64(1))); + let input = OwnedValue::Object(Box::new(obj)); + assert_eq!( + owned_value_to_serde_json(&input), + serde_json::json!({"k": 1}) + ); + } + + #[test] + fn test_nested_object() { + let mut inner = simd_json::owned::Object::new(); + inner.insert("b".into(), OwnedValue::String("v".into())); + let mut outer = simd_json::owned::Object::new(); + outer.insert("a".into(), OwnedValue::Object(Box::new(inner))); + let input = OwnedValue::Object(Box::new(outer)); + assert_eq!( + owned_value_to_serde_json(&input), + serde_json::json!({"a": {"b": "v"}}) + ); + } +} diff --git a/core/connectors/sinks/delta_sink/src/storage.rs b/core/connectors/sinks/delta_sink/src/storage.rs new file mode 100644 index 0000000000..e48523bdbb --- /dev/null +++ b/core/connectors/sinks/delta_sink/src/storage.rs @@ -0,0 +1,277 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSinkConfig; +use iggy_connector_sdk::Error; +use std::collections::HashMap; + +pub(crate) fn build_storage_options( + config: &DeltaSinkConfig, +) -> Result, Error> { + let mut opts = HashMap::new(); + + match config.storage_backend_type.as_deref() { + Some("s3") => { + let access_key = config + .aws_s3_access_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let secret_key = config + .aws_s3_secret_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let region = config.aws_s3_region.as_ref().ok_or(Error::InvalidConfig)?; + let endpoint_url = config + .aws_s3_endpoint_url + .as_ref() + .ok_or(Error::InvalidConfig)?; + let allow_http = config.aws_s3_allow_http.ok_or(Error::InvalidConfig)?; + + opts.insert("AWS_ACCESS_KEY_ID".into(), access_key.clone()); + opts.insert("AWS_SECRET_ACCESS_KEY".into(), secret_key.clone()); + opts.insert("AWS_REGION".into(), region.clone()); + opts.insert("AWS_ENDPOINT_URL".into(), endpoint_url.clone()); + opts.insert("AWS_ALLOW_HTTP".into(), allow_http.to_string()); + opts.insert("AWS_S3_ALLOW_HTTP".into(), allow_http.to_string()); + } + Some("azure") => { + let account_name = config + .azure_storage_account_name + .as_ref() + .ok_or(Error::InvalidConfig)?; + let account_key = config + .azure_storage_account_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let sas_token = config + .azure_storage_sas_token + .as_ref() + .ok_or(Error::InvalidConfig)?; + let container_name = config + .azure_container_name + .as_ref() + .ok_or(Error::InvalidConfig)?; + + opts.insert("AZURE_STORAGE_ACCOUNT_NAME".into(), account_name.clone()); + opts.insert("AZURE_STORAGE_ACCOUNT_KEY".into(), account_key.clone()); + opts.insert("AZURE_STORAGE_SAS_TOKEN".into(), sas_token.clone()); + opts.insert("AZURE_CONTAINER_NAME".into(), container_name.clone()); + } + Some("gcs") => { + let service_account_key = config + .gcs_service_account_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let bucket = config.gcs_bucket.as_ref().ok_or(Error::InvalidConfig)?; + + opts.insert( + "GOOGLE_SERVICE_ACCOUNT_KEY".into(), + service_account_key.clone(), + ); + opts.insert("GCS_BUCKET".into(), bucket.clone()); + } + Some(_) => { + return Err(Error::InvalidConfig); + } + None => {} + } + + Ok(opts) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::DeltaSinkConfig; + + fn default_config() -> DeltaSinkConfig { + DeltaSinkConfig { + table_uri: "file:///tmp/test".into(), + storage_backend_type: None, + aws_s3_access_key: None, + aws_s3_secret_key: None, + aws_s3_region: None, + aws_s3_endpoint_url: None, + aws_s3_allow_http: None, + azure_storage_account_name: None, + azure_storage_account_key: None, + azure_storage_sas_token: None, + azure_container_name: None, + gcs_service_account_key: None, + gcs_bucket: None, + } + } + + #[test] + fn no_backend_type_returns_empty() { + let config = default_config(); + let opts = build_storage_options(&config).unwrap(); + assert!(opts.is_empty()); + } + + fn s3_config() -> DeltaSinkConfig { + DeltaSinkConfig { + storage_backend_type: Some("s3".into()), + aws_s3_access_key: Some("AKID".into()), + aws_s3_secret_key: Some("SECRET".into()), + aws_s3_region: Some("us-east-1".into()), + aws_s3_endpoint_url: Some("http://localhost:9000".into()), + aws_s3_allow_http: Some(true), + ..default_config() + } + } + + fn azure_config() -> DeltaSinkConfig { + DeltaSinkConfig { + storage_backend_type: Some("azure".into()), + azure_storage_account_name: Some("myaccount".into()), + azure_storage_account_key: Some("mykey".into()), + azure_storage_sas_token: Some("mysas".into()), + azure_container_name: Some("mycontainer".into()), + ..default_config() + } + } + + fn gcs_config() -> DeltaSinkConfig { + DeltaSinkConfig { + storage_backend_type: Some("gcs".into()), + gcs_service_account_key: Some("{\"key\": \"value\"}".into()), + gcs_bucket: Some("mybucket".into()), + ..default_config() + } + } + + #[test] + fn s3_backend_maps_all_fields() { + let opts = build_storage_options(&s3_config()).unwrap(); + assert_eq!(opts.get("AWS_ACCESS_KEY_ID").unwrap(), "AKID"); + assert_eq!(opts.get("AWS_SECRET_ACCESS_KEY").unwrap(), "SECRET"); + assert_eq!(opts.get("AWS_REGION").unwrap(), "us-east-1"); + assert_eq!( + opts.get("AWS_ENDPOINT_URL").unwrap(), + "http://localhost:9000" + ); + assert_eq!(opts.get("AWS_ALLOW_HTTP").unwrap(), "true"); + assert_eq!(opts.get("AWS_S3_ALLOW_HTTP").unwrap(), "true"); + } + + #[test] + fn s3_backend_missing_access_key_errors() { + let mut config = s3_config(); + config.aws_s3_access_key = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn s3_backend_missing_secret_key_errors() { + let mut config = s3_config(); + config.aws_s3_secret_key = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn s3_backend_missing_region_errors() { + let mut config = s3_config(); + config.aws_s3_region = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn s3_backend_missing_endpoint_url_errors() { + let mut config = s3_config(); + config.aws_s3_endpoint_url = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn s3_backend_missing_allow_http_errors() { + let mut config = s3_config(); + config.aws_s3_allow_http = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn azure_backend_maps_all_fields() { + let opts = build_storage_options(&azure_config()).unwrap(); + assert_eq!(opts.get("AZURE_STORAGE_ACCOUNT_NAME").unwrap(), "myaccount"); + assert_eq!(opts.get("AZURE_STORAGE_ACCOUNT_KEY").unwrap(), "mykey"); + assert_eq!(opts.get("AZURE_STORAGE_SAS_TOKEN").unwrap(), "mysas"); + assert_eq!(opts.get("AZURE_CONTAINER_NAME").unwrap(), "mycontainer"); + } + + #[test] + fn azure_backend_missing_account_name_errors() { + let mut config = azure_config(); + config.azure_storage_account_name = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn azure_backend_missing_account_key_errors() { + let mut config = azure_config(); + config.azure_storage_account_key = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn azure_backend_missing_sas_token_errors() { + let mut config = azure_config(); + config.azure_storage_sas_token = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn azure_backend_missing_container_name_errors() { + let mut config = azure_config(); + config.azure_container_name = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn gcs_backend_maps_all_fields() { + let opts = build_storage_options(&gcs_config()).unwrap(); + assert_eq!( + opts.get("GOOGLE_SERVICE_ACCOUNT_KEY").unwrap(), + "{\"key\": \"value\"}" + ); + assert_eq!(opts.get("GCS_BUCKET").unwrap(), "mybucket"); + } + + #[test] + fn gcs_backend_missing_service_account_key_errors() { + let mut config = gcs_config(); + config.gcs_service_account_key = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn gcs_backend_missing_bucket_errors() { + let mut config = gcs_config(); + config.gcs_bucket = None; + assert!(build_storage_options(&config).is_err()); + } + + #[test] + fn unknown_backend_type_errors() { + let config = DeltaSinkConfig { + storage_backend_type: Some("unknown".into()), + ..default_config() + }; + assert!(build_storage_options(&config).is_err()); + } +} diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 13afb04dcf..0b396eb9e5 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -36,6 +36,7 @@ compio = { workspace = true } configs = { workspace = true } configs_derive = { workspace = true } ctor = { workspace = true } +deltalake = { workspace = true } figment = { workspace = true } futures = { workspace = true } harness_derive = { workspace = true } diff --git a/core/integration/tests/connectors/delta/delta_sink.rs b/core/integration/tests/connectors/delta/delta_sink.rs new file mode 100644 index 0000000000..099e2dcf80 --- /dev/null +++ b/core/integration/tests/connectors/delta/delta_sink.rs @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::create_test_messages; +use crate::connectors::fixtures::{DeltaFixture, DeltaS3Fixture}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_binary_protocol::MessageClient; +use iggy_common::Identifier; +use iggy_connector_sdk::api::SinkInfoResponse; +use integration::harness::seeds; +use integration::iggy_harness; +use reqwest::Client; + +const API_KEY: &str = "test-api-key"; +const DELTA_SINK_KEY: &str = "delta"; +const VERSION_POLL_ATTEMPTS: usize = 30; +const VERSION_POLL_INTERVAL_MS: u64 = 500; +const BULK_VERSION_POLL_ATTEMPTS: usize = 60; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/delta/sink.toml")), + seed = seeds::connector_stream +)] +async fn delta_sink_initializes_and_runs(harness: &TestHarness, fixture: DeltaFixture) { + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let response = http_client + .get(format!("{}/sinks", api_address)) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to get sinks"); + + assert_eq!(response.status(), 200); + let sinks: Vec = response.json().await.expect("Failed to parse sinks"); + + assert_eq!(sinks.len(), 1); + assert_eq!(sinks[0].key, DELTA_SINK_KEY); + assert!(sinks[0].enabled); + + drop(fixture); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/delta/sink.toml")), + seed = seeds::connector_stream +)] +async fn delta_sink_consumes_json_messages(harness: &TestHarness, fixture: DeltaFixture) { + let client = harness.root_client().await.unwrap(); + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let message_count = 5; + let test_messages = create_test_messages(message_count); + let mut messages: Vec = test_messages + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + let version_count = fixture + .wait_for_delta_log(1, VERSION_POLL_ATTEMPTS, VERSION_POLL_INTERVAL_MS) + .await + .expect("Data should be written to Delta table"); + + assert!(version_count >= 1); + + let response = http_client + .get(format!("{}/sinks", api_address)) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to get sinks"); + + assert_eq!(response.status(), 200); + let sinks: Vec = response.json().await.expect("Failed to parse sinks"); + + assert_eq!(sinks.len(), 1); + assert_eq!(sinks[0].key, DELTA_SINK_KEY); + assert!(sinks[0].last_error.is_none()); + + drop(fixture); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/delta/sink.toml")), + seed = seeds::connector_stream +)] +async fn delta_sink_handles_bulk_messages(harness: &TestHarness, fixture: DeltaFixture) { + let client = harness.root_client().await.unwrap(); + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let message_count = 100; + let test_messages = create_test_messages(message_count); + let mut messages: Vec = test_messages + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + let version_count = fixture + .wait_for_delta_log(1, BULK_VERSION_POLL_ATTEMPTS, VERSION_POLL_INTERVAL_MS) + .await + .expect("Data should be written to Delta table"); + + assert!(version_count >= 1); + + let response = http_client + .get(format!("{}/sinks", api_address)) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to get sinks"); + + assert_eq!(response.status(), 200); + let sinks: Vec = response.json().await.expect("Failed to parse sinks"); + + assert_eq!(sinks.len(), 1); + assert!(sinks[0].last_error.is_none()); + + drop(fixture); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/delta/sink.toml")), + seed = seeds::connector_stream +)] +async fn delta_sink_writes_to_s3(harness: &TestHarness, fixture: DeltaS3Fixture) { + let client = harness.root_client().await.unwrap(); + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let message_count = 5; + let test_messages = create_test_messages(message_count); + let mut messages: Vec = test_messages + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + // For S3, we can't check local delta log files. + // Instead, poll the connector runtime API to verify successful consumption. + let max_attempts = 30; + let interval_ms = 500; + let mut last_error = None; + + for _ in 0..max_attempts { + let response = http_client + .get(format!("{}/sinks", api_address)) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to get sinks"); + + let sinks: Vec = response.json().await.expect("Failed to parse sinks"); + if !sinks.is_empty() && sinks[0].last_error.is_none() && sinks[0].enabled { + last_error = None; + // Give the sink time to process the messages + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Verify still healthy after processing + let response = http_client + .get(format!("{}/sinks", api_address)) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to get sinks"); + let sinks: Vec = + response.json().await.expect("Failed to parse sinks"); + assert_eq!(sinks.len(), 1); + assert_eq!(sinks[0].key, DELTA_SINK_KEY); + assert!( + sinks[0].last_error.is_none(), + "Sink reported error: {:?}", + sinks[0].last_error + ); + break; + } + + last_error = sinks.first().and_then(|s| s.last_error.clone()); + tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; + } + + assert!( + last_error.is_none(), + "Delta S3 sink had errors: {:?}", + last_error + ); + + drop(fixture); +} diff --git a/core/integration/tests/connectors/delta/mod.rs b/core/integration/tests/connectors/delta/mod.rs new file mode 100644 index 0000000000..8b166b7d46 --- /dev/null +++ b/core/integration/tests/connectors/delta/mod.rs @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod delta_sink; diff --git a/core/integration/tests/connectors/delta/sink.toml b/core/integration/tests/connectors/delta/sink.toml new file mode 100644 index 0000000000..8fd034fa1a --- /dev/null +++ b/core/integration/tests/connectors/delta/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/delta_sink" diff --git a/core/integration/tests/connectors/fixtures/delta/fixture.rs b/core/integration/tests/connectors/fixtures/delta/fixture.rs new file mode 100644 index 0000000000..e1bc2822bc --- /dev/null +++ b/core/integration/tests/connectors/fixtures/delta/fixture.rs @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use deltalake::kernel::{DataType, PrimitiveType, StructField}; +use deltalake::operations::create::CreateBuilder; +use integration::harness::{TestBinaryError, TestFixture}; +use std::collections::HashMap; +use std::path::PathBuf; +use tempfile::TempDir; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tracing::info; +use uuid::Uuid; + +const ENV_SINK_TABLE_URI: &str = "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI"; +const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH"; +const ENV_SINK_STORAGE_BACKEND_TYPE: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE"; +const ENV_SINK_AWS_S3_ACCESS_KEY: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY"; +const ENV_SINK_AWS_S3_SECRET_KEY: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY"; +const ENV_SINK_AWS_S3_REGION: &str = "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION"; +const ENV_SINK_AWS_S3_ENDPOINT_URL: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL"; +const ENV_SINK_AWS_S3_ALLOW_HTTP: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP"; + +const MINIO_IMAGE: &str = "minio/minio"; +const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z"; +const MINIO_PORT: u16 = 9000; +const MINIO_CONSOLE_PORT: u16 = 9001; +const MINIO_ACCESS_KEY: &str = "admin"; +const MINIO_SECRET_KEY: &str = "password"; +const MINIO_BUCKET: &str = "delta-warehouse"; + +pub struct DeltaFixture { + _temp_dir: TempDir, + table_path: PathBuf, +} + +impl DeltaFixture { + pub async fn wait_for_delta_log( + &self, + min_versions: usize, + max_attempts: usize, + interval_ms: u64, + ) -> Result { + let delta_log_dir = self.table_path.join("_delta_log"); + + for _ in 0..max_attempts { + let count = Self::count_delta_versions(&delta_log_dir); + if count >= min_versions { + info!("Found {count} delta log versions (required: {min_versions})"); + return Ok(count); + } + tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; + } + + let final_count = Self::count_delta_versions(&delta_log_dir); + Err(TestBinaryError::InvalidState { + message: format!( + "Expected at least {min_versions} delta log versions, found {final_count} after {max_attempts} attempts" + ), + }) + } + + async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> { + let columns = vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), true), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true), + StructField::new("amount", DataType::Primitive(PrimitiveType::Double), true), + StructField::new("active", DataType::Primitive(PrimitiveType::Boolean), true), + StructField::new("timestamp", DataType::Primitive(PrimitiveType::Long), true), + ]; + CreateBuilder::new() + .with_location(table_uri) + .with_columns(columns) + .await + .map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaFixture".to_string(), + message: format!("Failed to create Delta table: {error}"), + })?; + Ok(()) + } + + fn count_delta_versions(delta_log_dir: &std::path::Path) -> usize { + std::fs::read_dir(delta_log_dir) + .map(|entries| { + entries + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "json")) + .count() + }) + .unwrap_or(0) + } +} + +#[async_trait] +impl TestFixture for DeltaFixture { + async fn setup() -> Result { + let temp_dir = TempDir::new().map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaFixture".to_string(), + message: format!("Failed to create temp directory: {error}"), + })?; + + let table_path = temp_dir.path().join("delta_table"); + let table_uri = format!("file://{}", table_path.display()); + Self::create_table(&table_uri).await?; + info!( + "Delta fixture created with table path: {}", + table_path.display() + ); + + Ok(Self { + _temp_dir: temp_dir, + table_path, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let table_uri = format!("file://{}", self.table_path.display()); + + let mut envs = HashMap::new(); + envs.insert(ENV_SINK_TABLE_URI.to_string(), table_uri); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_delta_sink".to_string(), + ); + envs + } +} + +pub struct DeltaS3Fixture { + #[allow(dead_code)] + minio: ContainerAsync, + minio_endpoint: String, +} + +impl DeltaS3Fixture { + async fn start_minio( + network: &str, + container_name: &str, + ) -> Result<(ContainerAsync, String), TestBinaryError> { + let container = GenericImage::new(MINIO_IMAGE, MINIO_TAG) + .with_exposed_port(MINIO_PORT.tcp()) + .with_exposed_port(MINIO_CONSOLE_PORT.tcp()) + .with_wait_for(WaitFor::message_on_stderr("API:")) + .with_network(network) + .with_container_name(container_name) + .with_env_var("MINIO_ROOT_USER", MINIO_ACCESS_KEY) + .with_env_var("MINIO_ROOT_PASSWORD", MINIO_SECRET_KEY) + .with_cmd(vec!["server", "/data", "--console-address", ":9001"]) + .with_mapped_port(0, MINIO_PORT.tcp()) + .with_mapped_port(0, MINIO_CONSOLE_PORT.tcp()) + .start() + .await + .map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: format!("Failed to start MinIO container: {error}"), + })?; + + info!("Started MinIO container for Delta S3 tests"); + + let mapped_port = container + .ports() + .await + .map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: format!("Failed to get ports: {error}"), + })? + .map_to_host_port_ipv4(MINIO_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: "No mapping for MinIO port".to_string(), + })?; + + let endpoint = format!("http://localhost:{mapped_port}"); + info!("MinIO container available at {endpoint}"); + + Ok((container, endpoint)) + } + + async fn create_bucket(minio_endpoint: &str) -> Result<(), TestBinaryError> { + use std::process::Command; + + let host = minio_endpoint.trim_start_matches("http://"); + let mc_host = format!("http://{}:{}@{}", MINIO_ACCESS_KEY, MINIO_SECRET_KEY, host); + + let output = Command::new("docker") + .args([ + "run", + "--rm", + "--network=host", + "-e", + &format!("MC_HOST_minio={}", mc_host), + "minio/mc", + "mb", + "--ignore-existing", + &format!("minio/{}", MINIO_BUCKET), + ]) + .output() + .map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: format!("Failed to run mc command: {error}"), + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: format!("Failed to create bucket: stderr={stderr}, stdout={stdout}"), + }); + } + + info!("Created MinIO bucket: {MINIO_BUCKET}"); + Ok(()) + } + + async fn create_table(minio_endpoint: &str) -> Result<(), TestBinaryError> { + let table_uri = format!("s3://{MINIO_BUCKET}/delta_table"); + let columns = vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), true), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true), + StructField::new("amount", DataType::Primitive(PrimitiveType::Double), true), + StructField::new("active", DataType::Primitive(PrimitiveType::Boolean), true), + StructField::new("timestamp", DataType::Primitive(PrimitiveType::Long), true), + ]; + let storage_options = HashMap::from([ + ("AWS_ACCESS_KEY_ID".into(), MINIO_ACCESS_KEY.into()), + ("AWS_SECRET_ACCESS_KEY".into(), MINIO_SECRET_KEY.into()), + ("AWS_REGION".into(), "us-east-1".into()), + ("AWS_ENDPOINT_URL".into(), minio_endpoint.into()), + ("AWS_ALLOW_HTTP".into(), "true".into()), + ("AWS_S3_ALLOW_HTTP".into(), "true".into()), + ]); + CreateBuilder::new() + .with_location(table_uri) + .with_storage_options(storage_options) + .with_columns(columns) + .await + .map_err(|error| TestBinaryError::FixtureSetup { + fixture_type: "DeltaS3Fixture".to_string(), + message: format!("Failed to create Delta table in MinIO: {error}"), + })?; + Ok(()) + } +} + +#[async_trait] +impl TestFixture for DeltaS3Fixture { + async fn setup() -> Result { + let id = Uuid::new_v4(); + let network = format!("iggy-delta-s3-{id}"); + let minio_name = format!("minio-delta-{id}"); + + let (minio, minio_endpoint) = Self::start_minio(&network, &minio_name).await?; + Self::create_bucket(&minio_endpoint).await?; + Self::create_table(&minio_endpoint).await?; + + info!("Delta S3 fixture ready with MinIO at {minio_endpoint}"); + + Ok(Self { + minio, + minio_endpoint, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let table_uri = format!("s3://{MINIO_BUCKET}/delta_table"); + + let mut envs = HashMap::new(); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_delta_sink".to_string(), + ); + envs.insert(ENV_SINK_TABLE_URI.to_string(), table_uri); + envs.insert(ENV_SINK_STORAGE_BACKEND_TYPE.to_string(), "s3".to_string()); + envs.insert( + ENV_SINK_AWS_S3_ACCESS_KEY.to_string(), + MINIO_ACCESS_KEY.to_string(), + ); + envs.insert( + ENV_SINK_AWS_S3_SECRET_KEY.to_string(), + MINIO_SECRET_KEY.to_string(), + ); + envs.insert(ENV_SINK_AWS_S3_REGION.to_string(), "us-east-1".to_string()); + envs.insert( + ENV_SINK_AWS_S3_ENDPOINT_URL.to_string(), + self.minio_endpoint.clone(), + ); + envs.insert(ENV_SINK_AWS_S3_ALLOW_HTTP.to_string(), "true".to_string()); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/delta/mod.rs b/core/integration/tests/connectors/fixtures/delta/mod.rs new file mode 100644 index 0000000000..001dbed12c --- /dev/null +++ b/core/integration/tests/connectors/fixtures/delta/mod.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod fixture; + +pub use fixture::{DeltaFixture, DeltaS3Fixture}; diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 3c0ac0d93b..e34afeb56a 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -17,12 +17,14 @@ * under the License. */ +mod delta; mod elasticsearch; mod iceberg; mod postgres; mod quickwit; mod wiremock; +pub use delta::{DeltaFixture, DeltaS3Fixture}; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; pub use postgres::{ diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index a85453d52c..13d0de8bd2 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -18,6 +18,7 @@ */ mod api; +mod delta; mod elasticsearch; mod fixtures; mod http_config_provider;