From 9f78869ea7de9893e0a0ac2670e471fdb64cafdd Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 12 Jun 2026 13:23:43 +0200 Subject: [PATCH 1/2] build: consolidate Rust crates into a Cargo workspace + extract native-common Move the standalone `native` crate into a root Cargo workspace and extract shared JNI plumbing (error->exception mapping, Tokio runtime singleton, StreamingReader) into a new `datafusion-jni-common` crate under `native-common/`. `native/src/errors.rs` moves to `native-common/src/errors.rs`; the nine native modules now import error/runtime helpers from `datafusion_jni_common`. Build glue follows: single root `Cargo.lock`, `.cargo/config.toml` redirects output to `rust-target/`, Makefile/CI/poms updated to build `--workspace` and target `-p datafusion-jni`. Core javadoc build commands updated to match. Pure refactor; no behavior change. First of a 6-PR stack splitting the Spark DataSource V2 connector work. Co-Authored-By: Claude Opus 4.8 (1M context) --- .cargo/config.toml | 21 ++ .github/workflows/build.yml | 4 +- .github/workflows/lint.yml | 8 +- .gitignore | 1 + native/Cargo.lock => Cargo.lock | 269 ++++++++++-------- Cargo.toml | 47 +++ Makefile | 10 +- core/pom.xml | 4 +- .../org/apache/datafusion/SessionContext.java | 11 +- .../SessionContextRuntimeStatsTest.java | 2 +- .../SessionContextSubstraitTest.java | 2 +- docs/source/contributor-guide/development.md | 21 +- .../updating-datafusion-version.md | 10 +- native-common/Cargo.toml | 35 +++ {native => native-common}/src/errors.rs | 7 +- native-common/src/lib.rs | 104 +++++++ native/Cargo.toml | 41 ++- native/src/arrow.rs | 2 +- native/src/avro.rs | 2 +- native/src/cache_manager.rs | 2 +- native/src/csv.rs | 2 +- native/src/json.rs | 2 +- native/src/lib.rs | 78 +---- native/src/object_store.rs | 2 +- native/src/proto.rs | 2 +- native/src/runtime_metrics.rs | 6 +- native/src/schema.rs | 2 +- pom.xml | 17 +- 28 files changed, 467 insertions(+), 247 deletions(-) create mode 100644 .cargo/config.toml rename native/Cargo.lock => Cargo.lock (94%) create mode 100644 Cargo.toml create mode 100644 native-common/Cargo.toml rename {native => native-common}/src/errors.rs (97%) create mode 100644 native-common/src/lib.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..d7e0ee2 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Keep Cargo's workspace output out of `target/` so `mvn clean` (which deletes +# the root `target/`) does not nuke the Rust build cache. +[build] +target-dir = "rust-target" diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c5db936..da8e65a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -83,8 +83,8 @@ jobs: path: | ~/.cargo/registry ~/.cargo/git - native/target - key: ${{ runner.os }}-cargo-${{ hashFiles('native/Cargo.lock') }} + rust-target + key: ${{ runner.os }}-cargo-${{ hashFiles('Cargo.lock') }} restore-keys: ${{ runner.os }}-cargo- - name: Build native and run tests diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 4cf628f..952bf34 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -54,7 +54,7 @@ jobs: run: ./mvnw -q spotless:check - name: Check Rust formatting - run: cd native && cargo fmt --all -- --check + run: cargo fmt --all -- --check clippy: name: Clippy @@ -81,9 +81,9 @@ jobs: path: | ~/.cargo/registry ~/.cargo/git - native/target - key: ${{ runner.os }}-clippy-${{ hashFiles('native/Cargo.lock') }} + rust-target + key: ${{ runner.os }}-clippy-${{ hashFiles('Cargo.lock') }} restore-keys: ${{ runner.os }}-clippy- - name: Run clippy - run: cd native && cargo clippy --all-targets -- -D warnings + run: cargo clippy --workspace --all-targets -- -D warnings diff --git a/.gitignore b/.gitignore index 719a2a4..25c9216 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ target/ +rust-target/ *.class .idea/ .vscode/ diff --git a/native/Cargo.lock b/Cargo.lock similarity index 94% rename from native/Cargo.lock rename to Cargo.lock index 8c56280..286f96f 100644 --- a/native/Cargo.lock +++ b/Cargo.lock @@ -98,9 +98,9 @@ dependencies = [ [[package]] name = "ar_archive_writer" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb93bbb63b9c227414f6eb3a0adfddca591a8ce1e9b60661bb08969b87e340b" +checksum = "4087686b4b0a3427190bae57a1d9a478dbb2d40c5dc1bd6e2b6d797913bdd348" dependencies = [ "object", ] @@ -119,9 +119,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "607e64bb911ee4f90483e044fe78f175989148c2892e659a2cd25429e782ec54" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" dependencies = [ "arrow-arith", "arrow-array", @@ -140,9 +140,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e754319ed8a85d817fe7adf183227e0b5308b82790a737b426c1124626b48118" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" dependencies = [ "arrow-array", "arrow-buffer", @@ -154,9 +154,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841321891f247aa86c6112c80d83d89cb36e0addd020fa2425085b8eb6c3f579" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" dependencies = [ "ahash", "arrow-buffer", @@ -173,9 +173,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f955dfb73fae000425f49c8226d2044dab60fb7ad4af1e24f961756354d996c9" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" dependencies = [ "bytes", "half", @@ -185,9 +185,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5e686972523798f76bef355145bc1ae25a84c731e650268d31ab763c701663" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" dependencies = [ "arrow-array", "arrow-buffer", @@ -207,9 +207,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c276756867fc8186ec380c72c290e6e3b23a1d4fb05df6b1d62d2e62666d48" +checksum = "e94e8cf7e517657a52b91ea1263acf38c4ca62a84655d72458a3359b12ab97de" dependencies = [ "arrow-array", "arrow-cast", @@ -222,9 +222,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3b5846209775b6dc8056d77ff9a032b27043383dd5488abd0b663e265b9373" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" dependencies = [ "arrow-buffer", "arrow-schema", @@ -235,9 +235,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd8907ddd8f9fbabf91ec2c85c1d81fe2874e336d2443eb36373595e28b98dd5" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" dependencies = [ "arrow-array", "arrow-buffer", @@ -251,9 +251,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4518c59acc501f10d7dcae397fe12b8db3d81bc7de94456f8a58f9165d6f502" +checksum = "205ca2119e6d679d5c133c6f30e68f027738d95ed948cf77677ea69c7800036b" dependencies = [ "arrow-array", "arrow-buffer", @@ -276,9 +276,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa70d9d6b1356f1fb9f1f651b84a725b7e0abb93f188cf7d31f14abfa2f2e6f" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -289,9 +289,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faec88a945338192beffbbd4be0def70135422930caa244ac3cec0cd213b26b4" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" dependencies = [ "arrow-array", "arrow-buffer", @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18aa020f6bc8e5201dcd2d4b7f98c68f8a410ef37128263243e6ff2a47a67d4f" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" dependencies = [ "bitflags", "serde_core", @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a657ab5132e9c8ca3b24eb15a823d0ced38017fe3930ff50167466b02e2d592c" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" dependencies = [ "ahash", "arrow-array", @@ -327,9 +327,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6de2efbbd1a9f9780ceb8d1ff5d20421b35863b361e3386b4f571f1fc69fcb8" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" dependencies = [ "arrow-array", "arrow-buffer", @@ -393,9 +393,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "base64" @@ -419,9 +419,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.11.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" [[package]] name = "blake2" @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.9.1" +version = "3.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" +checksum = "b2f04f6fef12d70d42a77b1433c9e0f065238479a6cefc4f5bab105e9873a3c3" dependencies = [ "bon-macros", "rustversion", @@ -467,9 +467,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.9.1" +version = "3.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" +checksum = "7d0bd4c2f75335ad98052a37efb54f428b492f64340257143b3429c8a508fa7b" dependencies = [ "darling", "ident_case", @@ -482,9 +482,9 @@ dependencies = [ [[package]] name = "brotli" -version = "8.0.2" +version = "8.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +checksum = "8119e4516436f5708bbc474a9d395bf12f1b5395e93a92a56e647ac3388c8610" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -493,9 +493,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "5.0.0" +version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +checksum = "5962523e1b92ce1b5e793d9169b9943eece10d39f62550bc04bb605d75b94924" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -503,9 +503,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "byteorder" @@ -530,9 +530,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.62" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "jobserver", @@ -571,9 +571,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "iana-time-zone", "num-traits", @@ -789,9 +789,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "6.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1299,6 +1299,16 @@ dependencies = [ "datafusion-physical-expr-common", ] +[[package]] +name = "datafusion-java-example-bridge" +version = "0.1.0" +dependencies = [ + "arrow", + "datafusion", + "datafusion-spark-bridge", + "tokio", +] + [[package]] name = "datafusion-jni" version = "0.1.0" @@ -1306,6 +1316,7 @@ dependencies = [ "arrow", "async-trait", "datafusion", + "datafusion-jni-common", "datafusion-proto", "datafusion-substrait", "futures", @@ -1319,6 +1330,16 @@ dependencies = [ "url", ] +[[package]] +name = "datafusion-jni-common" +version = "0.1.0" +dependencies = [ + "datafusion", + "futures", + "jni", + "tokio", +] + [[package]] name = "datafusion-macros" version = "53.1.0" @@ -1527,6 +1548,21 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "datafusion-spark-bridge" +version = "0.1.0" +dependencies = [ + "arrow", + "async-trait", + "datafusion", + "datafusion-jni-common", + "datafusion-proto", + "futures", + "jni", + "prost", + "tokio", +] + [[package]] name = "datafusion-sql" version = "53.1.0" @@ -1579,9 +1615,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" dependencies = [ "proc-macro2", "quote", @@ -1596,9 +1632,9 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" [[package]] name = "equivalent" @@ -1904,9 +1940,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "http" -version = "1.4.0" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" dependencies = [ "bytes", "itoa", @@ -1949,9 +1985,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hyper" -version = "1.9.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" dependencies = [ "atomic-waker", "bytes", @@ -2241,13 +2277,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.98" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +checksum = "f2025f20d7a4fa7785846e7b63d10a76d3f1cee98ee5cb79ea59703f95e42162" dependencies = [ "cfg-if", "futures-util", - "once_cell", "wasm-bindgen", ] @@ -2316,9 +2351,9 @@ dependencies = [ [[package]] name = "libbz2-rs-sys" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3a6a8c165077efc8f3a971534c50ea6a1a18b329ef4a66e897a7e3a1494565f" +checksum = "34b357333733e8260735ba5894eb928c02ecc69c78715f01a8019e7fa7f2db4c" [[package]] name = "libc" @@ -2375,9 +2410,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "953f07c43838f8e6f9758cab68bf5bed85465e7587ebe0b823f1bcd81978ad3a" [[package]] name = "lru-slab" @@ -2406,9 +2441,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "miniz_oxide" @@ -2422,9 +2457,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "wasi", @@ -2570,9 +2605,9 @@ dependencies = [ [[package]] name = "parquet" -version = "58.2.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d7efd3052f7d6ef601085559a246bc991e9a8cc77e02753737df6322ce35f1" +checksum = "5dafa7d01085b62a47dd0c1829550a0a36710ea9c4fe358a05a85477cec8a908" dependencies = [ "ahash", "arrow-array", @@ -2734,9 +2769,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -2744,9 +2779,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" +checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", "itertools", @@ -2763,9 +2798,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools", @@ -2776,9 +2811,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a" dependencies = [ "prost", ] @@ -3035,9 +3070,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.3" +version = "1.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +checksum = "f1292b7759ae1cb9ec195452d1390a074f0cd8541ab7a5a8c31cd6db45d4a6ba" dependencies = [ "aho-corasick", "memchr", @@ -3064,9 +3099,9 @@ checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" [[package]] name = "regex-syntax" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" [[package]] name = "regress" @@ -3178,9 +3213,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +checksum = "dab5152771c58876a2146916e53e35057e1a4dfa2b9df0f0305b07f611fdea4d" dependencies = [ "openssl-probe", "rustls-pki-types", @@ -3361,9 +3396,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", @@ -3422,9 +3457,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "simd-adler32" @@ -3464,9 +3499,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys 0.61.2", @@ -3861,9 +3896,9 @@ checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" [[package]] name = "typenum" -version = "1.20.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" +checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" [[package]] name = "typify" @@ -3920,9 +3955,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.13.2" +version = "1.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +checksum = "c6f5d3c3b1bf09027a88a6bc961fc00497d651009560b5463668dc81b0fa87a8" [[package]] name = "unicode-width" @@ -3968,9 +4003,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -4029,9 +4064,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.121" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +checksum = "a254a4b10c19a76f09a27640e7ffbf9bc30bf67e16a3bf28aaefa4920fe81563" dependencies = [ "cfg-if", "once_cell", @@ -4042,9 +4077,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.71" +version = "0.4.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" +checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf" dependencies = [ "js-sys", "wasm-bindgen", @@ -4052,9 +4087,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.121" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +checksum = "24a40fc75b0ec6f3746ceb10d36f53a93dcd68a93b11b6445983945d79eba0dc" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4062,9 +4097,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.121" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +checksum = "908f34bd9b9ce3d4caf07b72dfab63d61504d156856c6bd3cd87fa350cf3985b" dependencies = [ "bumpalo", "proc-macro2", @@ -4075,9 +4110,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.121" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +checksum = "7acbf7616c27b194bbb550bf77ed0c2c3e5b7fd1260a93082b95fb7f47959b92" dependencies = [ "unicode-ident", ] @@ -4131,9 +4166,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.98" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" +checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69" dependencies = [ "js-sys", "wasm-bindgen", @@ -4541,9 +4576,9 @@ checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "yoke" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +checksum = "709fe23a0424b6a435d82152b1bd3fdfb0833487d5fa90d05d42762a9891fef5" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -4564,18 +4599,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "ce1022995ff5ff5d841ad7d994facc23098cd40152f2c1d11cd607c6f530653f" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "1ae7f38b72ec2a254e2b87ef277cf2cd4fb97cbebf944faa6f33354da0867930" dependencies = [ "proc-macro2", "quote", @@ -4584,9 +4619,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4be0260 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[workspace] +resolver = "2" +members = [ + "native", + "native-common", +] + +# Every dependency used by any workspace member is declared here so version +# bumps live in one place and the resolver picks a single version of each +# crate across the workspace. Members reference these via `{ workspace = true }` +# and add per-crate flags (optional, features, default-features) at the use +# site. +[workspace.dependencies] +arrow = { version = "58", features = ["ffi"] } +async-trait = "0.1" +datafusion = { version = "53.1.0" } +datafusion-proto = "53.1.0" +datafusion-substrait = "53.1.0" +futures = "0.3" +jni = "0.21" +# Pinned to the major DataFusion 53.1 pulls in transitively (0.13.x) so we +# share the same `dyn ObjectStore` vtable and don't double-link. +object_store = { version = "0.13", default-features = false } +prost = "0.14" +prost-build = "0.14" +protoc-bin-vendored = "3" +tokio = { version = "1", features = ["rt-multi-thread"] } +# Optional, cfg-gated. See `native/Cargo.toml` for the build-flag dance. +tokio-metrics = "0.5" +url = "2" diff --git a/Makefile b/Makefile index 6d9b0ae..d6bcf2c 100644 --- a/Makefile +++ b/Makefile @@ -20,14 +20,14 @@ all: native jvm native: - cd native && cargo build + cargo build --workspace -# Build the native crate with the `runtime-metrics` Cargo feature enabled. +# Build the JNI crate with the `runtime-metrics` Cargo feature enabled. # Requires `--cfg tokio_unstable` because tokio-metrics gates its API there. # Default `make native` does not pull this in; callers who need # SessionContext.runtimeStats() pick this target explicitly. native-runtime-metrics: - cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics + RUSTFLAGS="--cfg tokio_unstable" cargo build -p datafusion-jni --features runtime-metrics jvm: ./mvnw package -DskipTests @@ -39,10 +39,10 @@ test: native # `:check` form inline in .github/workflows/lint.yml. format: ./mvnw -q spotless:apply - cd native && cargo fmt --all + cargo fmt --all clean: - cd native && cargo clean + cargo clean ./mvnw clean tpch-data: diff --git a/core/pom.xml b/core/pom.xml index 5ddf107..1e25736 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -102,8 +102,8 @@ under the License. - + value="${maven.multiModuleProjectDirectory}/rust-target/${datafusion.native.profile}/${datafusion.lib.filename}"/> + diff --git a/core/src/main/java/org/apache/datafusion/SessionContext.java b/core/src/main/java/org/apache/datafusion/SessionContext.java index ffc58dd..27d2b16 100644 --- a/core/src/main/java/org/apache/datafusion/SessionContext.java +++ b/core/src/main/java/org/apache/datafusion/SessionContext.java @@ -113,10 +113,11 @@ public DataFrame fromProto(byte[] planBytes) { * other Substrait-emitting tool — and hand them to DataFusion without round-tripping through SQL. * *

Substrait support is gated behind the {@code substrait} Cargo feature on the native crate - * and is off by default. Rebuild the native crate with {@code cargo build - * --features substrait} (or {@code cargo build --features substrait,protoc} for hermetic builds - * that vendor {@code protoc} via {@code cmake}) to enable it. If invoked against a native binary - * built without the feature, this method throws {@link RuntimeException} pointing at the flag. + * and is off by default. Rebuild the native crate with {@code cargo build -p + * datafusion-jni --features substrait} (or {@code ... --features substrait,protoc} for hermetic + * builds that vendor {@code protoc} via {@code cmake}) to enable it. If invoked against a native + * binary built without the feature, this method throws {@link RuntimeException} pointing at the + * flag. * * @throws IllegalArgumentException if {@code planBytes} is {@code null}. * @throws IllegalStateException if this context is closed. @@ -183,7 +184,7 @@ public MemoryUsage memoryUsage() { * Rebuild with: * *

{@code
-   * RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+   * RUSTFLAGS="--cfg tokio_unstable" cargo build -p datafusion-jni --features runtime-metrics
    * }
* *

If invoked against a native binary built without the feature, this method throws {@link diff --git a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java index 120d179..d567275 100644 --- a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java +++ b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java @@ -37,7 +37,7 @@ * #checkFeatureEnabled}. Run * *

{@code
- * (cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics)
+ * RUSTFLAGS="--cfg tokio_unstable" cargo build -p datafusion-jni --features runtime-metrics
  * }
* * before {@code ./mvnw test} to exercise this class. diff --git a/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java b/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java index 34db3b5..a2cfb0a 100644 --- a/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java +++ b/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java @@ -50,7 +50,7 @@ * *

The {@code substrait} Cargo feature is off by default in {@code native/Cargo.toml}; if the * native crate was built without it, every test here is skipped (see {@link #checkFeatureEnabled}). - * Run {@code (cd native && cargo build --features substrait)} before {@code ./mvnw test} to + * Run {@code cargo build -p datafusion-jni --features substrait} before {@code ./mvnw test} to * exercise this class. */ class SessionContextSubstraitTest { diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md index 984d77c..fdb00f4 100644 --- a/docs/source/contributor-guide/development.md +++ b/docs/source/contributor-guide/development.md @@ -42,7 +42,7 @@ This builds the native Rust crate and runs the JUnit tests. The steps can be run individually: ```sh -cd native && cargo build +cargo build --workspace ./mvnw test ``` @@ -74,14 +74,25 @@ disk space. The repository is a multi-module Maven build: -- `pom.xml` — parent POM declaring the `core` and `examples` modules and - shared plugin/dependency versions. +- `Cargo.toml` — Rust workspace root declaring the three crate members + (`native`, `native-common`, `examples/native`, `spark/bridge`) and `[workspace.dependencies]` + that pin shared versions in one place. Cargo writes artifacts to + `rust-target/` (overridden in `.cargo/config.toml`) so `mvn clean` at the + repo root does not nuke the Rust build cache. +- `pom.xml` — parent POM declaring the `core`, `spark`, and `examples` + modules and shared plugin/dependency versions. - `core/` — `datafusion-java` library module (Java sources, tests, and generated protobuf classes). +- `spark/` — `datafusion-java-spark` Spark DataSource V2 connector + (Scala + Java, pure JVM) and its `spark/bridge/` Rust SDK crate + (`datafusion-spark-bridge`: widening, scan machinery, `export_bridge!`). - `examples/` — `datafusion-java-examples` module containing runnable examples that depend on the library; built alongside the library so they - cannot fall out of sync with the API. -- `native/` — Rust crate (JNI + Arrow C Data Interface). + cannot fall out of sync with the API. Includes `examples/native/`, a + small `export_bridge!` cdylib used by the Spark connector demo + (`ExampleBridgeProviderFactory` + the pyspark script under + `examples/python/`). +- `native/` — `datafusion-jni` Rust crate (JNI + Arrow C Data Interface). - `proto/` — Protobuf definitions shared between Java and Rust. - `Makefile` — top-level build orchestration (`make test`, `make format`, `make tpch-data`). diff --git a/docs/source/contributor-guide/updating-datafusion-version.md b/docs/source/contributor-guide/updating-datafusion-version.md index 56d50dc..ef6cd10 100644 --- a/docs/source/contributor-guide/updating-datafusion-version.md +++ b/docs/source/contributor-guide/updating-datafusion-version.md @@ -21,7 +21,9 @@ under the License. Three things must move together when bumping DataFusion: -1. `native/Cargo.toml` — the `datafusion` crate dependency. +1. `Cargo.toml` (workspace root) — the `datafusion`, `datafusion-ffi`, + `datafusion-proto`, and `datafusion-substrait` entries in + `[workspace.dependencies]`. Members inherit from there. 2. `pom.xml` — the `` Maven property. **Must equal the Cargo version**; a mismatch means JVM-built protobuf plans won't deserialize on the native side. @@ -32,9 +34,9 @@ Three things must move together when bumping DataFusion: ## Recipe ```sh -# 1. Bump the Cargo dep -$EDITOR native/Cargo.toml # set datafusion = "" -(cd native && cargo update -p datafusion) +# 1. Bump the workspace dep +$EDITOR Cargo.toml # set datafusion = "" in [workspace.dependencies] +cargo update -p datafusion # 2. Bump the Maven property to match $EDITOR pom.xml # set diff --git a/native-common/Cargo.toml b/native-common/Cargo.toml new file mode 100644 index 0000000..0a797b4 --- /dev/null +++ b/native-common/Cargo.toml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-jni-common" +version = "0.1.0" +edition = "2021" +publish = false + +[features] +# `datafusion-jni` builds DataFusion with `avro`, which adds the +# `DataFusionError::AvroError` variant our classifier maps to IoException. +# Feature-forwarded so consumers that don't read Avro (the Spark helper) +# don't pull the apache-avro stack into their cdylib. +avro = ["datafusion/avro"] + +[dependencies] +datafusion = { workspace = true } +futures = { workspace = true } +jni = { workspace = true } +tokio = { workspace = true } diff --git a/native/src/errors.rs b/native-common/src/errors.rs similarity index 97% rename from native/src/errors.rs rename to native-common/src/errors.rs index d926544..caa2540 100644 --- a/native/src/errors.rs +++ b/native-common/src/errors.rs @@ -96,8 +96,11 @@ fn classify(err: &DataFusionError) -> &'static str { } DataFusionError::IoError(_) | DataFusionError::ObjectStore(_) - | DataFusionError::ParquetError(_) - | DataFusionError::AvroError(_) => "org/apache/datafusion/IoException", + | DataFusionError::ParquetError(_) => "org/apache/datafusion/IoException", + // The AvroError variant only exists when DataFusion is built with its + // `avro` feature, forwarded by this crate's own `avro` feature. + #[cfg(feature = "avro")] + DataFusionError::AvroError(_) => "org/apache/datafusion/IoException", // ArrowError is a 21-variant grab bag -- only some of those variants // are actually IO-shaped. DivideByZero / ArithmeticOverflow / Compute // / Cast / InvalidArgument / Memory etc. are execution-time failures diff --git a/native-common/src/lib.rs b/native-common/src/lib.rs new file mode 100644 index 0000000..f143d43 --- /dev/null +++ b/native-common/src/lib.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JNI plumbing shared by this workspace's native crates (`datafusion-jni` +//! and `datafusion-spark-bridge`, and through the latter every bridge +//! cdylib): the error-to-Java-exception mapping, the per-cdylib Tokio +//! runtime singleton, and the async-stream-to-`FFI_ArrowArrayStream` +//! bridge. +//! +//! Each cdylib statically links its own copy of this rlib, so [`runtime`] is +//! a per-cdylib singleton -- exactly the behaviour each crate had when this +//! code lived inline. Nothing here is exported with `#[no_mangle]`, so +//! linking this crate into several cdylibs loaded in one JVM cannot collide. + +pub mod errors; + +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::sync::OnceLock; + +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::record_batch::RecordBatchReader; +use datafusion::execution::SendableRecordBatchStream; +use futures::StreamExt; +use tokio::runtime::{Handle, Runtime}; + +static RT: OnceLock = OnceLock::new(); + +/// The cdylib-wide Tokio runtime. +pub fn runtime() -> &'static Runtime { + runtime_with_init(|_| {}) +} + +/// Same singleton as [`runtime`], with a hook that runs exactly once, when +/// the runtime is created. `datafusion-jni` uses it to install its +/// runtime-metrics accumulator so the sampling baseline coincides with +/// runtime start; every later call (either entry point) returns the existing +/// runtime without invoking the hook. +pub fn runtime_with_init(init: impl FnOnce(&Handle)) -> &'static Runtime { + RT.get_or_init(|| { + let rt = Runtime::new().expect("failed to create Tokio runtime"); + init(rt.handle()); + rt + }) +} + +/// Bridges DataFusion's async [`SendableRecordBatchStream`] to the synchronous +/// [`RecordBatchReader`] interface that `FFI_ArrowArrayStream` (and therefore +/// the Java `ArrowReader`) consumes. Each call to `next()` drives one +/// `runtime().block_on(stream.next())`, so memory pressure stays bounded by the +/// executor pipeline plus a single in-flight batch. +pub struct StreamingReader { + pub schema: SchemaRef, + pub stream: SendableRecordBatchStream, +} + +impl Iterator for StreamingReader { + type Item = Result; + + fn next(&mut self) -> Option { + // Arrow's C ABI invokes this iterator through FFI_ArrowArrayStream's + // vtable, outside the JNI handler's try_unwrap_or_throw guard. A panic + // here (buggy UDF, arrow cast that panics, runtime poison) would + // unwind across C/FFI -- undefined behaviour. Catch it and surface as + // an ArrowError so the Java side sees a normal exception instead. + let next = catch_unwind(AssertUnwindSafe(|| runtime().block_on(self.stream.next()))); + match next { + Ok(item) => item.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e)))), + Err(panic) => { + let msg = if let Some(s) = panic.downcast_ref::() { + s.clone() + } else if let Some(s) = panic.downcast_ref::<&str>() { + (*s).to_string() + } else { + "rust panic with non-string payload".to_string() + }; + Some(Err(ArrowError::ExternalError( + format!("panic in DataFrame stream: {msg}").into(), + ))) + } + } + } +} + +impl RecordBatchReader for StreamingReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/native/Cargo.toml b/native/Cargo.toml index c462408..0f4ca83 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -23,8 +23,8 @@ publish = false [lib] # `rlib` alongside `cdylib` so `cargo test` has a Rust-level harness for -# native-only invariants (e.g. error-classification routing through wrapped -# DataFusionError chains). The `cdylib` is still the artifact the JVM loads. +# native-only invariants (the error-classification tests now live in +# `datafusion-jni-common`). The `cdylib` is still the artifact the JVM loads. crate-type = ["cdylib", "rlib"] [features] @@ -69,24 +69,23 @@ protoc = ["datafusion-substrait?/protoc"] runtime-metrics = ["dep:tokio-metrics"] [dependencies] -arrow = { version = "58", features = ["ffi"] } -async-trait = "0.1" -datafusion = { version = "53.1.0", features = ["avro"] } -datafusion-proto = "53.1.0" -datafusion-substrait = { version = "53.1.0", optional = true } -futures = "0.3" -jni = "0.21" -# Pin to the same major as DataFusion 53.1 pulls in transitively (0.13.x) -# so we share the same `dyn ObjectStore` vtable and don't double-link. -object_store = { version = "0.13", default-features = false } -prost = "0.14" -tokio = { version = "1", features = ["rt-multi-thread"] } -# Tokio runtime metrics. Optional + cfg-gated: this crate's API surface lives -# behind `--cfg tokio_unstable`, so enabling the `runtime-metrics` feature also -# requires the caller to set `RUSTFLAGS="--cfg tokio_unstable"` at build time. -tokio-metrics = { version = "0.5", optional = true } -url = "2" +arrow = { workspace = true } +async-trait = { workspace = true } +datafusion = { workspace = true, features = ["avro"] } +# Shared JNI plumbing (error->exception mapping, runtime singleton, +# StreamingReader). `avro` keeps the classifier's AvroError->IoException arm +# in sync with the `avro` feature on `datafusion` above. +datafusion-jni-common = { path = "../native-common", features = ["avro"] } +datafusion-proto = { workspace = true } +datafusion-substrait = { workspace = true, optional = true } +futures = { workspace = true } +jni = { workspace = true } +object_store = { workspace = true } +prost = { workspace = true } +tokio = { workspace = true } +tokio-metrics = { workspace = true, optional = true } +url = { workspace = true } [build-dependencies] -prost-build = "0.14" -protoc-bin-vendored = "3" +prost-build = { workspace = true } +protoc-bin-vendored = { workspace = true } diff --git a/native/src/arrow.rs b/native/src/arrow.rs index 2bbe7b0..67e5caf 100644 --- a/native/src/arrow.rs +++ b/native/src/arrow.rs @@ -23,10 +23,10 @@ use jni::sys::jlong; use jni::JNIEnv; use prost::Message; -use crate::errors::{try_unwrap_or_throw, JniResult}; use crate::proto_gen::ArrowReadOptionsProto; use crate::runtime; use crate::schema::decode_optional_schema; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; fn with_arrow_options( env: &mut JNIEnv, diff --git a/native/src/avro.rs b/native/src/avro.rs index 85d4a07..257ae32 100644 --- a/native/src/avro.rs +++ b/native/src/avro.rs @@ -23,10 +23,10 @@ use jni::sys::jlong; use jni::JNIEnv; use prost::Message; -use crate::errors::{try_unwrap_or_throw, JniResult}; use crate::proto_gen::AvroReadOptionsProto; use crate::runtime; use crate::schema::decode_optional_schema; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; fn with_avro_options( env: &mut JNIEnv, diff --git a/native/src/cache_manager.rs b/native/src/cache_manager.rs index 3b9e286..ec38dc8 100644 --- a/native/src/cache_manager.rs +++ b/native/src/cache_manager.rs @@ -34,8 +34,8 @@ use datafusion::execution::cache::cache_unit::{ }; use datafusion::execution::cache::DefaultListFilesCache; -use crate::errors::JniResult; use crate::proto_gen::CacheManagerOptionsProto; +use datafusion_jni_common::errors::JniResult; /// Build a [`CacheManagerConfig`] from the proto. Returns `Ok(None)` if the /// caller did not set any cache-manager field, so the JNI layer can skip the diff --git a/native/src/csv.rs b/native/src/csv.rs index 3ae4627..b79ed59 100644 --- a/native/src/csv.rs +++ b/native/src/csv.rs @@ -26,12 +26,12 @@ use jni::sys::jlong; use jni::JNIEnv; use prost::Message; -use crate::errors::{try_unwrap_or_throw, JniResult}; use crate::proto_gen::{ CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType, }; use crate::runtime; use crate::schema::decode_optional_schema; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; fn with_csv_options( env: &mut JNIEnv, diff --git a/native/src/json.rs b/native/src/json.rs index 8eea32f..b87be78 100644 --- a/native/src/json.rs +++ b/native/src/json.rs @@ -27,12 +27,12 @@ use jni::sys::jlong; use jni::JNIEnv; use prost::Message; -use crate::errors::{try_unwrap_or_throw, JniResult}; use crate::proto_gen::{ FileCompressionType as ProtoFileCompressionType, JsonWriteOptionsProto, NdJsonReadOptionsProto, }; use crate::runtime; use crate::schema::decode_optional_schema; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; fn with_json_options( env: &mut JNIEnv, diff --git a/native/src/lib.rs b/native/src/lib.rs index 4fd7a8a..6e1a79f 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -19,7 +19,6 @@ mod arrow; mod avro; mod cache_manager; mod csv; -mod errors; mod jni_util; mod json; mod memory; @@ -34,16 +33,13 @@ pub(crate) mod proto_gen { include!(concat!(env!("OUT_DIR"), "/datafusion_java.rs")); } -use std::panic::{catch_unwind, AssertUnwindSafe}; use std::path::PathBuf; use std::sync::{Arc, OnceLock}; -use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::arrow::error::ArrowError; use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream; use datafusion::arrow::ipc::writer::StreamWriter; -use datafusion::arrow::record_batch::{RecordBatchIterator, RecordBatchReader}; +use datafusion::arrow::record_batch::RecordBatchIterator; use datafusion::common::{JoinType, UnnestOptions}; use datafusion::config::TableParquetOptions; use datafusion::dataframe::DataFrame; @@ -51,11 +47,9 @@ use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::Expr; use datafusion::logical_expr::{col, Partitioning, ScalarUDF, Signature, SortExpr}; use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; -use futures::StreamExt; use jni::objects::{JBooleanArray, JByteArray, JClass, JObject, JObjectArray, JString}; use jni::sys::{jboolean, jbyte, jbyteArray, jint, jlong}; use jni::JNIEnv; @@ -63,7 +57,10 @@ use jni::JavaVM; use prost::Message; use tokio::runtime::Runtime; -use crate::errors::{try_unwrap_or_throw, JniResult}; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; +// Re-exported so sibling modules keep their crate-local `crate::StreamingReader` path. +pub(crate) use datafusion_jni_common::StreamingReader; + use crate::proto_gen::ParquetReadOptionsProto; use crate::proto_gen::SessionOptions; use crate::schema::decode_optional_schema; @@ -84,18 +81,15 @@ pub(crate) fn jvm() -> &'static JavaVM { } pub(crate) fn runtime() -> &'static Runtime { - static RT: OnceLock = OnceLock::new(); - RT.get_or_init(|| { - let rt = Runtime::new().expect("failed to create Tokio runtime"); - // Eagerly install the runtime-metrics accumulator (no-op when the - // `runtime-metrics` Cargo feature is off). Initialising here -- not - // lazily on the first `runtimeStats()` call -- means the - // RuntimeMonitor's sampling baseline coincides with runtime start, so - // poll/park/busy totals reflect activity from the first query onward - // rather than from the first observation. - crate::runtime_metrics::init(rt.handle()); - rt - }) + // The singleton itself lives in datafusion-jni-common (shared with the + // datafusion-spark-bridge SDK; each cdylib statically links its own + // copy, so the runtime stays per-library). The init hook eagerly installs the + // runtime-metrics accumulator (no-op when the `runtime-metrics` Cargo + // feature is off). Initialising here -- not lazily on the first + // `runtimeStats()` call -- means the RuntimeMonitor's sampling baseline + // coincides with runtime start, so poll/park/busy totals reflect activity + // from the first query onward rather than from the first observation. + datafusion_jni_common::runtime_with_init(crate::runtime_metrics::init) } /// Wrap the (already-built) `RuntimeEnvBuilder`'s memory pool with a @@ -289,50 +283,6 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_collectDataFrame<'lo }) } -/// Bridges DataFusion's async [`SendableRecordBatchStream`] to the synchronous -/// [`RecordBatchReader`] interface that `FFI_ArrowArrayStream` (and therefore -/// the Java `ArrowReader`) consumes. Each call to `next()` drives one -/// `runtime().block_on(stream.next())`, so memory pressure stays bounded by the -/// executor pipeline plus a single in-flight batch. -struct StreamingReader { - schema: SchemaRef, - stream: SendableRecordBatchStream, -} - -impl Iterator for StreamingReader { - type Item = Result; - - fn next(&mut self) -> Option { - // Arrow's C ABI invokes this iterator through FFI_ArrowArrayStream's - // vtable, outside the JNI handler's try_unwrap_or_throw guard. A panic - // here (buggy UDF, arrow cast that panics, runtime poison) would - // unwind across C/FFI -- undefined behaviour. Catch it and surface as - // an ArrowError so the Java side sees a normal exception instead. - let next = catch_unwind(AssertUnwindSafe(|| runtime().block_on(self.stream.next()))); - match next { - Ok(item) => item.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e)))), - Err(panic) => { - let msg = if let Some(s) = panic.downcast_ref::() { - s.clone() - } else if let Some(s) = panic.downcast_ref::<&str>() { - (*s).to_string() - } else { - "rust panic with non-string payload".to_string() - }; - Some(Err(ArrowError::ExternalError( - format!("panic in DataFrame stream: {msg}").into(), - ))) - } - } - } -} - -impl RecordBatchReader for StreamingReader { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - #[no_mangle] pub extern "system" fn Java_org_apache_datafusion_DataFrame_executeStreamDataFrame<'local>( mut env: JNIEnv<'local>, diff --git a/native/src/object_store.rs b/native/src/object_store.rs index eefccf2..985d721 100644 --- a/native/src/object_store.rs +++ b/native/src/object_store.rs @@ -28,9 +28,9 @@ use std::sync::Arc; use datafusion::prelude::SessionContext; use url::Url; -use crate::errors::JniResult; use crate::proto_gen::object_store_registration::Backend; use crate::proto_gen::ObjectStoreRegistration; +use datafusion_jni_common::errors::JniResult; #[cfg(feature = "object-store-gcp")] use crate::proto_gen::GcsOptions; diff --git a/native/src/proto.rs b/native/src/proto.rs index 4f187bc..c1315f9 100644 --- a/native/src/proto.rs +++ b/native/src/proto.rs @@ -28,8 +28,8 @@ use jni::sys::{jbyteArray, jlong}; use jni::JNIEnv; use prost::Message; -use crate::errors::{try_unwrap_or_throw, JniResult}; use crate::runtime; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; #[no_mangle] pub extern "system" fn Java_org_apache_datafusion_SessionContext_createDataFrameFromProto< diff --git a/native/src/runtime_metrics.rs b/native/src/runtime_metrics.rs index e69410e..dd60dcb 100644 --- a/native/src/runtime_metrics.rs +++ b/native/src/runtime_metrics.rs @@ -38,7 +38,7 @@ //! 10 totalOverflowCount #[cfg(not(feature = "runtime-metrics"))] -use crate::errors::JniResult; +use datafusion_jni_common::errors::JniResult; /// Number of i64 values in the snapshot array; kept here so the Java side and /// the feature-off stub agree on the layout. @@ -51,7 +51,7 @@ mod imp { use tokio_metrics::{RuntimeIntervals, RuntimeMonitor}; use super::STATS_FIELD_COUNT; - use crate::errors::JniResult; + use datafusion_jni_common::errors::JniResult; /// `RuntimeMonitor::intervals().next()` returns *delta* metrics covering /// the period since the previous call (or, on the very first call, since @@ -196,7 +196,7 @@ pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> { Err( "datafusion-jni was built without the `runtime-metrics` Cargo feature; \ rebuild the native crate with \ - `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics` \ + `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build -p datafusion-jni --features runtime-metrics` \ to enable SessionContext.runtimeStats" .into(), ) diff --git a/native/src/schema.rs b/native/src/schema.rs index 968a73a..0c3c7ab 100644 --- a/native/src/schema.rs +++ b/native/src/schema.rs @@ -20,7 +20,7 @@ use datafusion::arrow::ipc::reader::StreamReader; use jni::objects::JByteArray; use jni::JNIEnv; -use crate::errors::JniResult; +use datafusion_jni_common::errors::JniResult; /// Decode an optional Arrow-IPC schema byte array passed in from Java. /// Returns `None` if the byte-array reference is null. diff --git a/pom.xml b/pom.xml index 6210841..b92cf72 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,11 @@ under the License. + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + org.apache.maven.plugins maven-surefire-plugin @@ -159,6 +164,7 @@ under the License. README.md CONTRIBUTING.md docs/** + **/*.md .gitignore .idea/** @@ -173,12 +179,17 @@ under the License. .mvn/** **/target/** - native/target/** + rust-target/** tpch-data/** - - native/Cargo.lock + + Cargo.lock + + **/META-INF/services/** dev/release/rat_exclude_files.txt + + spark/scaffold/bridge-template/** From 28efd7c971b1200a84202fb75f585a17436027f2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 12 Jun 2026 13:24:28 +0200 Subject: [PATCH 2/2] feat(spark): add datafusion-spark-bridge SDK for static bridges New `spark/bridge` workspace crate providing the `export_bridge!` macro that generates the six JNI entry points a Spark connector bridge exposes (providerSchemaIpc, createScan, partitionCount, executeStreamPartition, executeStream, closeScan). Includes the options decoder, scan planning/execution glue, and the Arrow type-widening layer (wraps any TableProvider for Spark type compatibility). Self-contained SDK with no Java/Scala coupling. Depends only on datafusion-jni-common. Second of the 6-PR connector stack. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 10 - Cargo.toml | 1 + spark/bridge/Cargo.toml | 34 +++ spark/bridge/src/lib.rs | 213 ++++++++++++++++ spark/bridge/src/options.rs | 158 ++++++++++++ spark/bridge/src/scan.rs | 325 +++++++++++++++++++++++++ spark/bridge/src/widening.rs | 376 +++++++++++++++++++++++++++++ spark/bridge/tests/export_macro.rs | 52 ++++ 8 files changed, 1159 insertions(+), 10 deletions(-) create mode 100644 spark/bridge/Cargo.toml create mode 100644 spark/bridge/src/lib.rs create mode 100644 spark/bridge/src/options.rs create mode 100644 spark/bridge/src/scan.rs create mode 100644 spark/bridge/src/widening.rs create mode 100644 spark/bridge/tests/export_macro.rs diff --git a/Cargo.lock b/Cargo.lock index 286f96f..307f24b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1299,16 +1299,6 @@ dependencies = [ "datafusion-physical-expr-common", ] -[[package]] -name = "datafusion-java-example-bridge" -version = "0.1.0" -dependencies = [ - "arrow", - "datafusion", - "datafusion-spark-bridge", - "tokio", -] - [[package]] name = "datafusion-jni" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4be0260..fffea43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ resolver = "2" members = [ "native", "native-common", + "spark/bridge", ] # Every dependency used by any workspace member is declared here so version diff --git a/spark/bridge/Cargo.toml b/spark/bridge/Cargo.toml new file mode 100644 index 0000000..8ed4684 --- /dev/null +++ b/spark/bridge/Cargo.toml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-spark-bridge" +version = "0.1.0" +edition = "2021" +publish = false +description = "SDK for building Spark connector bridges over DataFusion TableProviders" + +[dependencies] +arrow = { workspace = true } +async-trait = { workspace = true } +datafusion = { workspace = true } +datafusion-jni-common = { path = "../../native-common" } +datafusion-proto = { workspace = true } +futures = { workspace = true } +jni = { workspace = true } +prost = { workspace = true } +tokio = { workspace = true } diff --git a/spark/bridge/src/lib.rs b/spark/bridge/src/lib.rs new file mode 100644 index 0000000..52ef1c1 --- /dev/null +++ b/spark/bridge/src/lib.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SDK for building Spark connector bridges over DataFusion `TableProvider`s. +//! +//! Everything the Spark connector needs DataFusion-side lives here: the +//! Spark-type [`widening`] layer, and the [`scan`] machinery (session from +//! pinned config, projection, proto filters, planning, partition streams). +//! A bridge cdylib depends on this crate and invokes [`export_bridge!`] with +//! a builder that constructs its concrete `TableProvider` from option / +//! partition bytes — one cdylib, no FFI provider boundary; the only foreign +//! interface is JNI plus Arrow's C stream for the results. + +pub mod options; +pub mod scan; +pub mod widening; + +// Re-exported so `export_bridge!` expansions resolve these crates inside the +// bridge author's crate without extra dependencies, and so builder signatures +// can be written against `datafusion_spark_bridge::datafusion::...`. +pub use datafusion; +pub use datafusion_jni_common::errors::JniResult; +pub use jni; + +use tokio::runtime::Handle; + +/// Execution environment handed to a bridge's provider builder. +/// +/// Provider construction frequently needs async IO (remote catalogs, +/// object-store metadata); run it on the bridge runtime via [`block_on`] +/// rather than creating a runtime of your own. +/// +/// [`block_on`]: BridgeContext::block_on +pub struct BridgeContext { + handle: &'static Handle, +} + +impl BridgeContext { + /// Used by `export_bridge!` expansions; not part of the public API. + #[doc(hidden)] + pub fn get() -> Self { + BridgeContext { + handle: runtime_handle(), + } + } + + /// The cdylib-wide Tokio runtime handle (also the runtime scans run on). + pub fn handle(&self) -> &Handle { + self.handle + } + + /// Block the current (JVM) thread on `fut`, driving it on the bridge + /// runtime. + pub fn block_on(&self, fut: F) -> F::Output { + self.handle.block_on(fut) + } +} + +/// Per-cdylib Tokio runtime (the singleton from `datafusion-jni-common`). +pub(crate) fn runtime_handle() -> &'static Handle { + datafusion_jni_common::runtime().handle() +} + +/// Generate the JNI entry points for a bridge cdylib. +/// +/// `jni_class` is the **underscore-mangled** binary name of the Java class +/// declaring the matching `native` methods: dots become underscores +/// (`com.example.mybridge.BridgeNative` → `"com_example_mybridge_BridgeNative"`). +/// If the class or package name itself contains an underscore, JNI mangling +/// requires it written as `_1`. Per-bridge class names are what let several +/// bridges coexist in one Spark JVM. +/// +/// `build_provider` is anything callable as +/// `Fn(&BridgeContext, &[u8], &[u8]) -> JniResult>`, +/// receiving the options bytes and partition bytes your JVM factory encoded. +/// The schema probe calls it with empty partition bytes; the scan path passes +/// each task's payload. Return errors boxed from `DataFusionError` to surface +/// as the typed `org.apache.datafusion.*` exception hierarchy. +/// +/// The generated Java-side surface (declare these as `static native` on the +/// class named by `jni_class`): +/// +/// ```java +/// static native byte[] providerSchemaIpc(byte[] options, byte[] partition); +/// static native long createScan(byte[] options, byte[] partition, +/// int targetPartitions, int batchSize, String[] optionKeys, +/// String[] optionValues, String[] projectionColumns, byte[][] filterProtos); +/// static native int partitionCount(long scanHandle); +/// static native void executeStreamPartition(long scanHandle, int partition, long ffiStreamAddr); +/// static native void executeStream(long scanHandle, long ffiStreamAddr); +/// static native void closeScan(long scanHandle); +/// ``` +#[macro_export] +macro_rules! export_bridge { + (jni_class: $cls:literal, build_provider: $builder:expr $(,)?) => { + const _: () = { + use $crate::jni::objects::{JByteArray, JClass, JObjectArray}; + use $crate::jni::sys::{jbyteArray, jint, jlong}; + use $crate::jni::JNIEnv; + + fn __df_bridge_build( + env: &mut JNIEnv, + options: &JByteArray, + partition: &JByteArray, + ) -> $crate::JniResult> + { + let opts: Vec = if options.is_null() { + Vec::new() + } else { + env.convert_byte_array(options)? + }; + let part: Vec = if partition.is_null() { + Vec::new() + } else { + env.convert_byte_array(partition)? + }; + let ctx = $crate::BridgeContext::get(); + ($builder)(&ctx, opts.as_slice(), part.as_slice()) + } + + #[export_name = concat!("Java_", $cls, "_providerSchemaIpc")] + extern "system" fn __df_bridge_provider_schema_ipc<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + options: JByteArray<'local>, + partition: JByteArray<'local>, + ) -> jbyteArray { + $crate::scan::provider_schema_ipc(&mut env, |env| { + __df_bridge_build(env, &options, &partition) + }) + } + + #[export_name = concat!("Java_", $cls, "_createScan")] + #[allow(clippy::too_many_arguments)] + extern "system" fn __df_bridge_create_scan<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + options: JByteArray<'local>, + partition: JByteArray<'local>, + target_partitions: jint, + batch_size: jint, + option_keys: JObjectArray<'local>, + option_values: JObjectArray<'local>, + projection_columns: JObjectArray<'local>, + filter_protos: JObjectArray<'local>, + ) -> jlong { + $crate::scan::create_scan( + &mut env, + |env| __df_bridge_build(env, &options, &partition), + target_partitions, + batch_size, + &option_keys, + &option_values, + &projection_columns, + &filter_protos, + ) + } + + #[export_name = concat!("Java_", $cls, "_partitionCount")] + extern "system" fn __df_bridge_partition_count<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + ) -> jint { + $crate::scan::partition_count(&mut env, handle) + } + + #[export_name = concat!("Java_", $cls, "_executeStreamPartition")] + extern "system" fn __df_bridge_execute_stream_partition<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + partition: jint, + ffi_stream_addr: jlong, + ) { + $crate::scan::execute_stream_partition(&mut env, handle, partition, ffi_stream_addr) + } + + #[export_name = concat!("Java_", $cls, "_executeStream")] + extern "system" fn __df_bridge_execute_stream<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + ffi_stream_addr: jlong, + ) { + $crate::scan::execute_stream(&mut env, handle, ffi_stream_addr) + } + + #[export_name = concat!("Java_", $cls, "_closeScan")] + extern "system" fn __df_bridge_close_scan<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + ) { + $crate::scan::close_scan(&mut env, handle) + } + }; + }; +} diff --git a/spark/bridge/src/options.rs b/spark/bridge/src/options.rs new file mode 100644 index 0000000..117ca9d --- /dev/null +++ b/spark/bridge/src/options.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Decoder for the connector's default options wire format. +//! +//! `BridgeProviderFactory.encodeOptions`'s default (`OptionsCodec` on the JVM +//! side) encodes the Spark options map as length-prefixed UTF-8 pairs, +//! sorted by key: big-endian `i32` entry count, then per entry key length, +//! key bytes, value length, value bytes. Key-sorting makes the bytes a pure +//! function of the map contents — the shared-scan determinism contract uses +//! the options bytes as the scan identity. +//! +//! Bridges using the default JVM encoding read their options here: +//! +//! ```ignore +//! let opts = datafusion_spark_bridge::options::decode_options(options_bytes)?; +//! let url = opts.get("url").ok_or("missing required option 'url'")?; +//! ``` +//! +//! The two implementations are pinned to each other by the shared fixture in +//! the tests below; `OptionsCodecTest` on the JVM side asserts the same +//! bytes. + +use std::collections::BTreeMap; + +/// Decode bytes produced by the JVM `OptionsCodec.encode` (or +/// [`encode_options`]). Empty input decodes as an empty map. +pub fn decode_options(bytes: &[u8]) -> Result, String> { + let mut out = BTreeMap::new(); + if bytes.is_empty() { + return Ok(out); + } + let mut cursor = Cursor { bytes, pos: 0 }; + let count = cursor.read_len("entry count")?; + for i in 0..count { + let key = cursor.read_string(&format!("key of entry {i}"))?; + let value = cursor.read_string(&format!("value of entry {i}"))?; + out.insert(key, value); + } + if cursor.pos != bytes.len() { + return Err(format!( + "options blob has {} trailing byte(s) after {count} entries", + bytes.len() - cursor.pos + )); + } + Ok(out) +} + +/// Encode in the same format (key-sorted via `BTreeMap`). Primarily for +/// tests and Rust-side tooling; production encoding normally happens on the +/// JVM driver. +pub fn encode_options(options: &BTreeMap) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(&(options.len() as i32).to_be_bytes()); + for (key, value) in options { + out.extend_from_slice(&(key.len() as i32).to_be_bytes()); + out.extend_from_slice(key.as_bytes()); + out.extend_from_slice(&(value.len() as i32).to_be_bytes()); + out.extend_from_slice(value.as_bytes()); + } + out +} + +struct Cursor<'a> { + bytes: &'a [u8], + pos: usize, +} + +impl Cursor<'_> { + fn read_len(&mut self, what: &str) -> Result { + if self.bytes.len() - self.pos < 4 { + return Err(format!("options blob truncated reading {what}")); + } + let raw = i32::from_be_bytes(self.bytes[self.pos..self.pos + 4].try_into().unwrap()); + self.pos += 4; + usize::try_from(raw).map_err(|_| format!("negative length for {what}: {raw}")) + } + + fn read_string(&mut self, what: &str) -> Result { + let len = self.read_len(&format!("length of {what}"))?; + if self.bytes.len() - self.pos < len { + return Err(format!("options blob truncated reading {what}")); + } + let slice = &self.bytes[self.pos..self.pos + len]; + self.pos += len; + String::from_utf8(slice.to_vec()).map_err(|e| format!("{what} is not UTF-8: {e}")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Shared fixture: must stay byte-identical to the one asserted by the + /// JVM-side `OptionsCodecTest`. {"table": "t1", "url": "grpc://h:1"} + /// encodes (sorted: table < url) as below. + fn fixture_bytes() -> Vec { + let mut b = Vec::new(); + b.extend_from_slice(&2i32.to_be_bytes()); + for (k, v) in [("table", "t1"), ("url", "grpc://h:1")] { + b.extend_from_slice(&(k.len() as i32).to_be_bytes()); + b.extend_from_slice(k.as_bytes()); + b.extend_from_slice(&(v.len() as i32).to_be_bytes()); + b.extend_from_slice(v.as_bytes()); + } + b + } + + #[test] + fn decodes_fixture() { + let map = decode_options(&fixture_bytes()).unwrap(); + assert_eq!(map.len(), 2); + assert_eq!(map.get("table").map(String::as_str), Some("t1")); + assert_eq!(map.get("url").map(String::as_str), Some("grpc://h:1")); + } + + #[test] + fn round_trips() { + let mut map = BTreeMap::new(); + map.insert("b".to_string(), "2".to_string()); + map.insert("a".to_string(), "1".to_string()); + map.insert("unicode".to_string(), "héllo→world".to_string()); + let bytes = encode_options(&map); + assert_eq!(decode_options(&bytes).unwrap(), map); + } + + #[test] + fn empty_input_is_empty_map() { + assert!(decode_options(&[]).unwrap().is_empty()); + let empty = encode_options(&BTreeMap::new()); + assert!(decode_options(&empty).unwrap().is_empty()); + } + + #[test] + fn rejects_truncation_and_trailing_bytes() { + let bytes = fixture_bytes(); + assert!(decode_options(&bytes[..bytes.len() - 1]) + .unwrap_err() + .contains("truncated")); + let mut extended = bytes.clone(); + extended.push(0); + assert!(decode_options(&extended).unwrap_err().contains("trailing")); + } +} diff --git a/spark/bridge/src/scan.rs b/spark/bridge/src/scan.rs new file mode 100644 index 0000000..ad27dff --- /dev/null +++ b/spark/bridge/src/scan.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Planning and execution of a Spark scan. +//! +//! Every function here is the body of one JNI entry point generated by a +//! bridge's `export_bridge!` expansion, which supplies only how the provider +//! is obtained, as a `make` closure. The provider is wrapped in a +//! [`WideningTableProvider`] here, so every bridge gets identical +//! Spark-compatible Arrow types. +//! +//! [`create_scan`] registers the widened provider on a private +//! `SessionContext` built from the caller-pinned config, applies the pruned +//! projection and the proto-encoded pushed filters, and plans exactly once. +//! The returned handle supports: +//! +//! - [`partition_count`] — output partitions of the physical plan +//! (shared-scan mode probes this on the driver and indexes tasks by it); +//! - [`execute_stream_partition`] — an independent stream over ONE plan +//! partition, concurrently callable from multiple JVM threads +//! (`ExecutionPlan` and `TaskContext` are `Send + Sync`; each call only +//! clones their `Arc`s). Re-executing the same partition index (Spark +//! task retry / speculative execution) opens its own stream, but only +//! succeeds when every operator in that partition's pipeline supports +//! repeated `execute()` — stateless scans do, `RepartitionExec` +//! pipelines do not; +//! - [`execute_stream`] — the whole plan as one stream (per-partition +//! mode, where the provider itself is the task's slice); +//! - [`close_scan`] — drop the plan. The single unsafe interleaving is +//! closing a handle that still has an in-flight call; the Java consumer +//! (the shared-scan cache) prevents it with a refcount covering every +//! open reader. +//! +//! Pinned-config determinism: the driver resolves `target_partitions` / +//! `batch_size` / option overrides once and ships them to every executor, so +//! a plan that yields N partitions on the driver yields N everywhere. This +//! module applies whatever it is handed and stays policy-free. + +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream; +use datafusion::arrow::ipc::writer::StreamWriter; +use datafusion::catalog::TableProvider; +use datafusion::dataframe::DataFrame; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::{execute_stream as df_execute_stream, ExecutionPlan}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; +use datafusion_jni_common::StreamingReader; +use datafusion_proto::logical_plan::from_proto::parse_expr; +use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; +use datafusion_proto::protobuf::LogicalExprNode; +use jni::objects::{JByteArray, JObjectArray, JString}; +use jni::sys::{jbyteArray, jint, jlong}; +use jni::JNIEnv; +use prost::Message; + +use crate::runtime_handle; +use crate::widening::WideningTableProvider; + +/// Registration name of the (single) provider on the scan's private context. +/// Never surfaces in SQL — the plan is built through the DataFrame API — so +/// no quoting/collision concerns. +const SCAN_TABLE_NAME: &str = "df_spark_scan"; + +struct ScanState { + /// Kept alive for the plan's lifetime; the registered provider and the + /// runtime env both hang off it. + _ctx: SessionContext, + plan: Arc, + task_ctx: Arc, +} + +fn widen(provider: Arc) -> Arc { + Arc::new(WideningTableProvider::new(provider)) +} + +fn collect_string_array(env: &mut JNIEnv, arr: &JObjectArray) -> JniResult> { + if arr.is_null() { + return Ok(Vec::new()); + } + let len = env.get_array_length(arr)?; + let mut owned: Vec = Vec::with_capacity(len as usize); + for i in 0..len { + let elem = env.get_object_array_element(arr, i)?; + let jstr: JString = elem.into(); + owned.push(env.get_string(&jstr)?.into()); + } + Ok(owned) +} + +fn collect_byte_arrays(env: &mut JNIEnv, arr: &JObjectArray) -> JniResult>> { + if arr.is_null() { + return Ok(Vec::new()); + } + let len = env.get_array_length(arr)?; + let mut owned: Vec> = Vec::with_capacity(len as usize); + for i in 0..len { + let elem = env.get_object_array_element(arr, i)?; + let bytes: JByteArray = elem.into(); + owned.push(env.convert_byte_array(&bytes)?); + } + Ok(owned) +} + +/// Driver-side schema probe: widened Arrow schema of the provider, as IPC +/// bytes (deserialized JVM-side with `MessageSerializer.deserializeSchema`). +/// `make` runs once; the provider drops before returning. +pub fn provider_schema_ipc( + env: &mut JNIEnv, + make: impl FnOnce(&mut JNIEnv) -> JniResult>, +) -> jbyteArray { + try_unwrap_or_throw(env, std::ptr::null_mut(), |env| -> JniResult { + let widened = widen(make(env)?); + let schema = widened.schema(); + let mut buf: Vec = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref())?; + writer.finish()?; + } + let arr = env.byte_array_from_slice(&buf)?; + Ok(arr.into_raw()) + }) +} + +/// Build the scan: widen the provider from `make`, register it on a private +/// context with the pinned config, apply projection + pushed filters, plan +/// once. +/// +/// `target_partitions` / `batch_size` <= 0 leave the DataFusion defaults; +/// `option_keys`/`option_values` are parallel arrays of config overrides; +/// empty `projection_columns` selects all columns; each element of +/// `filter_protos` is a serialized `datafusion.LogicalExprNode`. +#[allow(clippy::too_many_arguments)] +pub fn create_scan( + env: &mut JNIEnv, + make: impl FnOnce(&mut JNIEnv) -> JniResult>, + target_partitions: jint, + batch_size: jint, + option_keys: &JObjectArray, + option_values: &JObjectArray, + projection_columns: &JObjectArray, + filter_protos: &JObjectArray, +) -> jlong { + try_unwrap_or_throw(env, 0, |env| -> JniResult { + let widened = widen(make(env)?); + + let keys = collect_string_array(env, option_keys)?; + let values = collect_string_array(env, option_values)?; + if keys.len() != values.len() { + return Err(format!( + "option key/value arrays differ in length: {} vs {}", + keys.len(), + values.len() + ) + .into()); + } + let projection = collect_string_array(env, projection_columns)?; + let filters = collect_byte_arrays(env, filter_protos)?; + + let mut config = SessionConfig::new(); + if target_partitions > 0 { + config = config.with_target_partitions(target_partitions as usize); + } + if batch_size > 0 { + config = config.with_batch_size(batch_size as usize); + } + for (key, value) in keys.iter().zip(values.iter()) { + config.options_mut().set(key, value)?; + } + + let ctx = SessionContext::new_with_config(config); + ctx.register_table(SCAN_TABLE_NAME, widened)?; + + let mut df: DataFrame = runtime_handle().block_on(ctx.table(SCAN_TABLE_NAME))?; + if !projection.is_empty() { + let refs: Vec<&str> = projection.iter().map(String::as_str).collect(); + df = df.select_columns(&refs)?; + } + for bytes in &filters { + let node = LogicalExprNode::decode(bytes.as_slice())?; + // TaskContext implements FunctionRegistry; the default codec is + // enough because the translator only emits column/literal/builtin + // expressions. + let registry = df.task_ctx(); + let expr = parse_expr(&node, ®istry, &DefaultLogicalExtensionCodec {})?; + df = df.filter(expr)?; + } + + // task_ctx() borrows; capture before create_physical_plan consumes df. + let task_ctx = Arc::new(df.task_ctx()); + let plan = runtime_handle().block_on(df.create_physical_plan())?; + + let state = ScanState { + _ctx: ctx, + plan, + task_ctx, + }; + Ok(Box::into_raw(Box::new(state)) as jlong) + }) +} + +/// Output partition count of the planned physical plan. +pub fn partition_count(env: &mut JNIEnv, handle: jlong) -> jint { + try_unwrap_or_throw(env, 0, |_env| -> JniResult { + if handle == 0 { + return Err("scan handle is null".into()); + } + let state = unsafe { &*(handle as *const ScanState) }; + Ok(state + .plan + .properties() + .output_partitioning() + .partition_count() as jint) + }) +} + +/// Open an independent stream over one plan partition, writing an +/// `FFI_ArrowArrayStream` into the caller-allocated struct at +/// `ffi_stream_addr`. +pub fn execute_stream_partition( + env: &mut JNIEnv, + handle: jlong, + partition: jint, + ffi_stream_addr: jlong, +) { + try_unwrap_or_throw(env, (), |_env| -> JniResult<()> { + if handle == 0 { + return Err("scan handle is null".into()); + } + if ffi_stream_addr == 0 { + return Err("ffi stream address is null".into()); + } + let state = unsafe { &*(handle as *const ScanState) }; + + let partition_count = state + .plan + .properties() + .output_partitioning() + .partition_count(); + if partition < 0 || partition as usize >= partition_count { + return Err(format!( + "partition index {partition} out of range: plan has {partition_count} partition(s)" + ) + .into()); + } + + let plan = Arc::clone(&state.plan); + let task_ctx = Arc::clone(&state.task_ctx); + let schema: SchemaRef = plan.schema(); + + // ExecutionPlan::execute is synchronous, but operators may + // tokio::spawn at execute() time (RepartitionExec et al.), which + // requires a runtime context to be entered. + let stream = { + let _guard = runtime_handle().enter(); + plan.execute(partition as usize, task_ctx)? + }; + + let reader = StreamingReader { schema, stream }; + let ffi = FFI_ArrowArrayStream::new(Box::new(reader)); + unsafe { + std::ptr::write(ffi_stream_addr as *mut FFI_ArrowArrayStream, ffi); + } + Ok(()) + }) +} + +/// Whole-plan stream for per-partition mode (the provider +/// itself is the task's slice, so all plan partitions merge into one reader). +pub fn execute_stream(env: &mut JNIEnv, handle: jlong, ffi_stream_addr: jlong) { + try_unwrap_or_throw(env, (), |_env| -> JniResult<()> { + if handle == 0 { + return Err("scan handle is null".into()); + } + if ffi_stream_addr == 0 { + return Err("ffi stream address is null".into()); + } + let state = unsafe { &*(handle as *const ScanState) }; + + let plan = Arc::clone(&state.plan); + let task_ctx = Arc::clone(&state.task_ctx); + let schema: SchemaRef = plan.schema(); + + // execute_stream coalesces multi-partition plans behind one stream. + let stream = { + let _guard = runtime_handle().enter(); + df_execute_stream(plan, task_ctx)? + }; + + let reader = StreamingReader { schema, stream }; + let ffi = FFI_ArrowArrayStream::new(Box::new(reader)); + unsafe { + std::ptr::write(ffi_stream_addr as *mut FFI_ArrowArrayStream, ffi); + } + Ok(()) + }) +} + +/// Drop the planned scan. Must not race an in-flight stream-open on the same +/// handle; the Java consumer's refcount enforces this. +pub fn close_scan(env: &mut JNIEnv, handle: jlong) { + try_unwrap_or_throw(env, (), |_env| -> JniResult<()> { + if handle == 0 { + return Err("scan handle is null".into()); + } + drop(unsafe { Box::from_raw(handle as *mut ScanState) }); + Ok(()) + }) +} diff --git a/spark/bridge/src/widening.rs b/spark/bridge/src/widening.rs new file mode 100644 index 0000000..86c4abf --- /dev/null +++ b/spark/bridge/src/widening.rs @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Kernel-level Arrow type widening for Spark consumption. +//! +//! Spark 3.5's `ArrowColumnVector` has no accessor for unsigned ints, Time*, +//! Float16, or non-microsecond Timestamp. The widening machinery here wraps +//! an inner `TableProvider` with one that exposes a "widened" schema — +//! UInt*→Int wider, Float16→Float32, Time*→Int wider, Timestamp(*, tz)→ +//! Timestamp(Microsecond, tz), recursing into List/LargeList/FixedSizeList +//! children — and applies `arrow::compute::cast` to each produced +//! RecordBatch column-wise. No SQL, no SessionContext, no view machinery. + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::compute::cast; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef, TimeUnit}; +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::{DataFusionError, Result}; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, +}; +use futures::stream::StreamExt; + +/// Compute the cast-target DataType for an Arrow type not directly readable +/// by Spark's `ArrowColumnVector`. Returns `None` if the type passes through. +pub fn arrow_cast_widening(dt: &DataType) -> Option { + match dt { + DataType::UInt8 => Some(DataType::Int16), + DataType::UInt16 => Some(DataType::Int32), + DataType::UInt32 => Some(DataType::Int64), + // UInt64 → Int64: lossy for values ≥ 2^63. Documented in REARCHITECTURE.md. + DataType::UInt64 => Some(DataType::Int64), + DataType::Float16 => Some(DataType::Float32), + DataType::Time32(_) => Some(DataType::Int32), + DataType::Time64(_) => Some(DataType::Int64), + DataType::Timestamp(unit, tz) => { + if *unit == TimeUnit::Microsecond { + None + } else { + Some(DataType::Timestamp(TimeUnit::Microsecond, tz.clone())) + } + } + DataType::List(field) => arrow_cast_widening(field.data_type()) + .map(|inner| DataType::List(widened_child(field, inner))), + DataType::LargeList(field) => arrow_cast_widening(field.data_type()) + .map(|inner| DataType::LargeList(widened_child(field, inner))), + // Spark 3.5's ArrowColumnVector cannot read FixedSizeList at all, so + // always convert it to a (variable) List — which Spark maps to + // ArrayType — widening the child element type when needed too. + DataType::FixedSizeList(field, _size) => { + let child = match arrow_cast_widening(field.data_type()) { + Some(inner) => widened_child(field, inner), + None => Arc::clone(field), + }; + Some(DataType::List(child)) + } + _ => None, + } +} + +fn widened_child(field: &Arc, new_type: DataType) -> Arc { + Arc::new(Field::new(field.name(), new_type, field.is_nullable())) +} + +/// Build the widened schema by walking inner fields and replacing types. +/// Returns the widened schema plus per-column target types (None where no cast). +fn widened_schema(inner: &ArrowSchema) -> (SchemaRef, Vec>) { + let mut fields = Vec::with_capacity(inner.fields().len()); + let mut targets = Vec::with_capacity(inner.fields().len()); + for f in inner.fields() { + match arrow_cast_widening(f.data_type()) { + Some(target) => { + fields.push(Arc::new(Field::new( + f.name(), + target.clone(), + f.is_nullable(), + ))); + targets.push(Some(target)); + } + None => { + fields.push(Arc::clone(f)); + targets.push(None); + } + } + } + (Arc::new(ArrowSchema::new(fields)), targets) +} + +/// TableProvider wrapping an inner provider, exposing a widened schema and +/// emitting RecordBatches whose columns have been cast to the widened types. +#[derive(Debug)] +pub struct WideningTableProvider { + inner: Arc, + widened: SchemaRef, + /// Targets indexed by the inner-schema column position; `None` = pass through. + targets: Vec>, +} + +impl WideningTableProvider { + pub fn new(inner: Arc) -> Self { + let (widened, targets) = widened_schema(&inner.schema()); + Self { + inner, + widened, + targets, + } + } +} + +#[async_trait] +impl TableProvider for WideningTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.widened) + } + + fn table_type(&self) -> TableType { + self.inner.table_type() + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + self.inner.supports_filters_pushdown(filters) + } + + async fn scan( + &self, + session: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let inner_plan = self.inner.scan(session, projection, filters, limit).await?; + let (projected_widened, projected_targets) = match projection { + Some(idxs) => { + let fields: Vec> = idxs + .iter() + .map(|i| Arc::clone(&self.widened.fields()[*i])) + .collect(); + let targets: Vec> = + idxs.iter().map(|i| self.targets[*i].clone()).collect(); + (Arc::new(ArrowSchema::new(fields)) as SchemaRef, targets) + } + None => (Arc::clone(&self.widened), self.targets.clone()), + }; + Ok(Arc::new(WideningExec::new( + inner_plan, + projected_widened, + projected_targets, + ))) + } +} + +/// ExecutionPlan that runs the inner plan and casts each output RecordBatch +/// column-wise per the supplied targets. Pure stream-map wrapper; no +/// buffering, no internal state. +pub struct WideningExec { + inner: Arc, + schema: SchemaRef, + /// One entry per output column; `None` = pass through. + targets: Vec>, + properties: Arc, +} + +impl WideningExec { + fn new( + inner: Arc, + schema: SchemaRef, + targets: Vec>, + ) -> Self { + let inner_props = inner.properties(); + let properties = Arc::new(PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + inner_props.partitioning.clone(), + inner_props.emission_type, + inner_props.boundedness, + )); + Self { + inner, + schema, + targets, + properties, + } + } +} + +impl fmt::Debug for WideningExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WideningExec") + .field("schema", &self.schema) + .field("targets", &self.targets) + .finish() + } +} + +impl DisplayAs for WideningExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let cast_count = self.targets.iter().filter(|t| t.is_some()).count(); + write!(f, "WideningExec: casts={cast_count}") + } +} + +impl ExecutionPlan for WideningExec { + fn name(&self) -> &str { + "WideningExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 1 { + return Err(DataFusionError::Internal( + "WideningExec::with_new_children expects exactly one child".to_string(), + )); + } + Ok(Arc::new(WideningExec::new( + children.into_iter().next().unwrap(), + Arc::clone(&self.schema), + self.targets.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let inner_stream = self.inner.execute(partition, context)?; + let schema = Arc::clone(&self.schema); + let targets = self.targets.clone(); + let mapped = inner_stream.map(move |batch_res| match batch_res { + Err(e) => Err(e), + Ok(batch) => cast_batch(&batch, &schema, &targets), + }); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + mapped, + ))) + } +} + +fn cast_batch( + batch: &RecordBatch, + out_schema: &SchemaRef, + targets: &[Option], +) -> Result { + if batch.num_columns() != targets.len() { + return Err(DataFusionError::Internal(format!( + "WideningExec: produced batch has {} columns, expected {}", + batch.num_columns(), + targets.len() + ))); + } + let mut new_cols = Vec::with_capacity(batch.num_columns()); + for (col, target) in batch.columns().iter().zip(targets.iter()) { + match target { + Some(t) => new_cols.push(cast(col, t).map_err(DataFusionError::from)?), + None => new_cols.push(Arc::clone(col)), + } + } + RecordBatch::try_new(Arc::clone(out_schema), new_cols).map_err(DataFusionError::from) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn unsigned_ints_widen_to_signed_wider() { + assert_eq!(arrow_cast_widening(&DataType::UInt8), Some(DataType::Int16)); + assert_eq!( + arrow_cast_widening(&DataType::UInt16), + Some(DataType::Int32) + ); + assert_eq!( + arrow_cast_widening(&DataType::UInt32), + Some(DataType::Int64) + ); + assert_eq!( + arrow_cast_widening(&DataType::UInt64), + Some(DataType::Int64) + ); + } + + #[test] + fn float16_widens_to_float32() { + assert_eq!( + arrow_cast_widening(&DataType::Float16), + Some(DataType::Float32) + ); + } + + #[test] + fn time_widens_to_int() { + assert_eq!( + arrow_cast_widening(&DataType::Time32(TimeUnit::Millisecond)), + Some(DataType::Int32) + ); + assert_eq!( + arrow_cast_widening(&DataType::Time64(TimeUnit::Nanosecond)), + Some(DataType::Int64) + ); + } + + #[test] + fn timestamp_normalizes_unit_preserving_tz() { + let ns = DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("UTC"))); + assert_eq!( + arrow_cast_widening(&ns), + Some(DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from("UTC")) + )) + ); + let us_no_tz = DataType::Timestamp(TimeUnit::Microsecond, None); + assert_eq!(arrow_cast_widening(&us_no_tz), None); + } + + #[test] + fn list_recurses_into_children() { + let inner_field = Arc::new(Field::new("item", DataType::UInt16, true)); + let list_ty = DataType::List(inner_field); + let widened = arrow_cast_widening(&list_ty).expect("should widen"); + match widened { + DataType::List(field) => assert_eq!(field.data_type(), &DataType::Int32), + other => panic!("expected List, got {other:?}"), + } + } + + #[test] + fn signed_int_passes_through() { + assert_eq!(arrow_cast_widening(&DataType::Int32), None); + assert_eq!(arrow_cast_widening(&DataType::Utf8), None); + } +} diff --git a/spark/bridge/tests/export_macro.rs b/spark/bridge/tests/export_macro.rs new file mode 100644 index 0000000..14751c8 --- /dev/null +++ b/spark/bridge/tests/export_macro.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Compile-level test of `export_bridge!`: the macro must expand to valid +//! `extern "system"` items against a plain builder function. JNI entry +//! points can't be exercised without a live JVM, so the assertion here is +//! that this test crate links with the generated symbols present. + +use std::sync::Arc; + +use datafusion_spark_bridge::datafusion::arrow::datatypes::Schema; +use datafusion_spark_bridge::datafusion::catalog::TableProvider; +use datafusion_spark_bridge::datafusion::datasource::MemTable; +use datafusion_spark_bridge::{export_bridge, BridgeContext, JniResult}; + +fn build_provider( + _ctx: &BridgeContext, + _options: &[u8], + _partition: &[u8], +) -> JniResult> { + let schema = Arc::new(Schema::empty()); + let table = MemTable::try_new(schema, vec![vec![]])?; + Ok(Arc::new(table)) +} + +export_bridge! { + jni_class: "com_example_testbridge_BridgeNative", + build_provider: build_provider, +} + +#[test] +fn builder_contract_runs_outside_jvm() { + // Expansion + linking is the macro test; this additionally runs the + // builder through the same BridgeContext the expansion hands it. + let ctx = BridgeContext::get(); + let provider = build_provider(&ctx, &[], &[]).expect("builder failed"); + assert_eq!(provider.schema().fields().len(), 0); +}