+ value="${maven.multiModuleProjectDirectory}/rust-target/${datafusion.native.profile}/${datafusion.lib.filename}"/>
+
diff --git a/core/src/main/java/org/apache/datafusion/SessionContext.java b/core/src/main/java/org/apache/datafusion/SessionContext.java
index ffc58dd..27d2b16 100644
--- a/core/src/main/java/org/apache/datafusion/SessionContext.java
+++ b/core/src/main/java/org/apache/datafusion/SessionContext.java
@@ -113,10 +113,11 @@ public DataFrame fromProto(byte[] planBytes) {
* other Substrait-emitting tool — and hand them to DataFusion without round-tripping through SQL.
*
* Substrait support is gated behind the {@code substrait} Cargo feature on the native crate
- * and is off by default. Rebuild the native crate with {@code cargo build
- * --features substrait} (or {@code cargo build --features substrait,protoc} for hermetic builds
- * that vendor {@code protoc} via {@code cmake}) to enable it. If invoked against a native binary
- * built without the feature, this method throws {@link RuntimeException} pointing at the flag.
+ * and is off by default. Rebuild the native crate with {@code cargo build -p
+ * datafusion-jni --features substrait} (or {@code ... --features substrait,protoc} for hermetic
+ * builds that vendor {@code protoc} via {@code cmake}) to enable it. If invoked against a native
+ * binary built without the feature, this method throws {@link RuntimeException} pointing at the
+ * flag.
*
* @throws IllegalArgumentException if {@code planBytes} is {@code null}.
* @throws IllegalStateException if this context is closed.
@@ -183,7 +184,7 @@ public MemoryUsage memoryUsage() {
* Rebuild with:
*
*
{@code
- * RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics
+ * RUSTFLAGS="--cfg tokio_unstable" cargo build -p datafusion-jni --features runtime-metrics
* }
*
* If invoked against a native binary built without the feature, this method throws {@link
diff --git a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java
index 120d179..d567275 100644
--- a/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java
+++ b/core/src/test/java/org/apache/datafusion/SessionContextRuntimeStatsTest.java
@@ -37,7 +37,7 @@
* #checkFeatureEnabled}. Run
*
*
{@code
- * (cd native && RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics)
+ * RUSTFLAGS="--cfg tokio_unstable" cargo build -p datafusion-jni --features runtime-metrics
* }
*
* before {@code ./mvnw test} to exercise this class.
diff --git a/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java b/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java
index 34db3b5..a2cfb0a 100644
--- a/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java
+++ b/core/src/test/java/org/apache/datafusion/SessionContextSubstraitTest.java
@@ -50,7 +50,7 @@
*
* The {@code substrait} Cargo feature is off by default in {@code native/Cargo.toml}; if the
* native crate was built without it, every test here is skipped (see {@link #checkFeatureEnabled}).
- * Run {@code (cd native && cargo build --features substrait)} before {@code ./mvnw test} to
+ * Run {@code cargo build -p datafusion-jni --features substrait} before {@code ./mvnw test} to
* exercise this class.
*/
class SessionContextSubstraitTest {
diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md
index 984d77c..fdb00f4 100644
--- a/docs/source/contributor-guide/development.md
+++ b/docs/source/contributor-guide/development.md
@@ -42,7 +42,7 @@ This builds the native Rust crate and runs the JUnit tests. The steps can
be run individually:
```sh
-cd native && cargo build
+cargo build --workspace
./mvnw test
```
@@ -74,14 +74,25 @@ disk space.
The repository is a multi-module Maven build:
-- `pom.xml` — parent POM declaring the `core` and `examples` modules and
- shared plugin/dependency versions.
+- `Cargo.toml` — Rust workspace root declaring the three crate members
+ (`native`, `native-common`, `examples/native`, `spark/bridge`) and `[workspace.dependencies]`
+ that pin shared versions in one place. Cargo writes artifacts to
+ `rust-target/` (overridden in `.cargo/config.toml`) so `mvn clean` at the
+ repo root does not nuke the Rust build cache.
+- `pom.xml` — parent POM declaring the `core`, `spark`, and `examples`
+ modules and shared plugin/dependency versions.
- `core/` — `datafusion-java` library module (Java sources, tests, and
generated protobuf classes).
+- `spark/` — `datafusion-java-spark` Spark DataSource V2 connector
+ (Scala + Java, pure JVM) and its `spark/bridge/` Rust SDK crate
+ (`datafusion-spark-bridge`: widening, scan machinery, `export_bridge!`).
- `examples/` — `datafusion-java-examples` module containing runnable
examples that depend on the library; built alongside the library so they
- cannot fall out of sync with the API.
-- `native/` — Rust crate (JNI + Arrow C Data Interface).
+ cannot fall out of sync with the API. Includes `examples/native/`, a
+ small `export_bridge!` cdylib used by the Spark connector demo
+ (`ExampleBridgeProviderFactory` + the pyspark script under
+ `examples/python/`).
+- `native/` — `datafusion-jni` Rust crate (JNI + Arrow C Data Interface).
- `proto/` — Protobuf definitions shared between Java and Rust.
- `Makefile` — top-level build orchestration (`make test`, `make format`,
`make tpch-data`).
diff --git a/docs/source/contributor-guide/updating-datafusion-version.md b/docs/source/contributor-guide/updating-datafusion-version.md
index 56d50dc..ef6cd10 100644
--- a/docs/source/contributor-guide/updating-datafusion-version.md
+++ b/docs/source/contributor-guide/updating-datafusion-version.md
@@ -21,7 +21,9 @@ under the License.
Three things must move together when bumping DataFusion:
-1. `native/Cargo.toml` — the `datafusion` crate dependency.
+1. `Cargo.toml` (workspace root) — the `datafusion`, `datafusion-ffi`,
+ `datafusion-proto`, and `datafusion-substrait` entries in
+ `[workspace.dependencies]`. Members inherit from there.
2. `pom.xml` — the `` Maven property. **Must equal
the Cargo version**; a mismatch means JVM-built protobuf plans won't
deserialize on the native side.
@@ -32,9 +34,9 @@ Three things must move together when bumping DataFusion:
## Recipe
```sh
-# 1. Bump the Cargo dep
-$EDITOR native/Cargo.toml # set datafusion = ""
-(cd native && cargo update -p datafusion)
+# 1. Bump the workspace dep
+$EDITOR Cargo.toml # set datafusion = "" in [workspace.dependencies]
+cargo update -p datafusion
# 2. Bump the Maven property to match
$EDITOR pom.xml # set
diff --git a/native-common/Cargo.toml b/native-common/Cargo.toml
new file mode 100644
index 0000000..0a797b4
--- /dev/null
+++ b/native-common/Cargo.toml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-jni-common"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[features]
+# `datafusion-jni` builds DataFusion with `avro`, which adds the
+# `DataFusionError::AvroError` variant our classifier maps to IoException.
+# Feature-forwarded so consumers that don't read Avro (the Spark helper)
+# don't pull the apache-avro stack into their cdylib.
+avro = ["datafusion/avro"]
+
+[dependencies]
+datafusion = { workspace = true }
+futures = { workspace = true }
+jni = { workspace = true }
+tokio = { workspace = true }
diff --git a/native/src/errors.rs b/native-common/src/errors.rs
similarity index 97%
rename from native/src/errors.rs
rename to native-common/src/errors.rs
index d926544..caa2540 100644
--- a/native/src/errors.rs
+++ b/native-common/src/errors.rs
@@ -96,8 +96,11 @@ fn classify(err: &DataFusionError) -> &'static str {
}
DataFusionError::IoError(_)
| DataFusionError::ObjectStore(_)
- | DataFusionError::ParquetError(_)
- | DataFusionError::AvroError(_) => "org/apache/datafusion/IoException",
+ | DataFusionError::ParquetError(_) => "org/apache/datafusion/IoException",
+ // The AvroError variant only exists when DataFusion is built with its
+ // `avro` feature, forwarded by this crate's own `avro` feature.
+ #[cfg(feature = "avro")]
+ DataFusionError::AvroError(_) => "org/apache/datafusion/IoException",
// ArrowError is a 21-variant grab bag -- only some of those variants
// are actually IO-shaped. DivideByZero / ArithmeticOverflow / Compute
// / Cast / InvalidArgument / Memory etc. are execution-time failures
diff --git a/native-common/src/lib.rs b/native-common/src/lib.rs
new file mode 100644
index 0000000..f143d43
--- /dev/null
+++ b/native-common/src/lib.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JNI plumbing shared by this workspace's native crates (`datafusion-jni`
+//! and `datafusion-spark-bridge`, and through the latter every bridge
+//! cdylib): the error-to-Java-exception mapping, the per-cdylib Tokio
+//! runtime singleton, and the async-stream-to-`FFI_ArrowArrayStream`
+//! bridge.
+//!
+//! Each cdylib statically links its own copy of this rlib, so [`runtime`] is
+//! a per-cdylib singleton -- exactly the behaviour each crate had when this
+//! code lived inline. Nothing here is exported with `#[no_mangle]`, so
+//! linking this crate into several cdylibs loaded in one JVM cannot collide.
+
+pub mod errors;
+
+use std::panic::{catch_unwind, AssertUnwindSafe};
+use std::sync::OnceLock;
+
+use datafusion::arrow::array::RecordBatch;
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::ArrowError;
+use datafusion::arrow::record_batch::RecordBatchReader;
+use datafusion::execution::SendableRecordBatchStream;
+use futures::StreamExt;
+use tokio::runtime::{Handle, Runtime};
+
+static RT: OnceLock = OnceLock::new();
+
+/// The cdylib-wide Tokio runtime.
+pub fn runtime() -> &'static Runtime {
+ runtime_with_init(|_| {})
+}
+
+/// Same singleton as [`runtime`], with a hook that runs exactly once, when
+/// the runtime is created. `datafusion-jni` uses it to install its
+/// runtime-metrics accumulator so the sampling baseline coincides with
+/// runtime start; every later call (either entry point) returns the existing
+/// runtime without invoking the hook.
+pub fn runtime_with_init(init: impl FnOnce(&Handle)) -> &'static Runtime {
+ RT.get_or_init(|| {
+ let rt = Runtime::new().expect("failed to create Tokio runtime");
+ init(rt.handle());
+ rt
+ })
+}
+
+/// Bridges DataFusion's async [`SendableRecordBatchStream`] to the synchronous
+/// [`RecordBatchReader`] interface that `FFI_ArrowArrayStream` (and therefore
+/// the Java `ArrowReader`) consumes. Each call to `next()` drives one
+/// `runtime().block_on(stream.next())`, so memory pressure stays bounded by the
+/// executor pipeline plus a single in-flight batch.
+pub struct StreamingReader {
+ pub schema: SchemaRef,
+ pub stream: SendableRecordBatchStream,
+}
+
+impl Iterator for StreamingReader {
+ type Item = Result;
+
+ fn next(&mut self) -> Option {
+ // Arrow's C ABI invokes this iterator through FFI_ArrowArrayStream's
+ // vtable, outside the JNI handler's try_unwrap_or_throw guard. A panic
+ // here (buggy UDF, arrow cast that panics, runtime poison) would
+ // unwind across C/FFI -- undefined behaviour. Catch it and surface as
+ // an ArrowError so the Java side sees a normal exception instead.
+ let next = catch_unwind(AssertUnwindSafe(|| runtime().block_on(self.stream.next())));
+ match next {
+ Ok(item) => item.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e)))),
+ Err(panic) => {
+ let msg = if let Some(s) = panic.downcast_ref::() {
+ s.clone()
+ } else if let Some(s) = panic.downcast_ref::<&str>() {
+ (*s).to_string()
+ } else {
+ "rust panic with non-string payload".to_string()
+ };
+ Some(Err(ArrowError::ExternalError(
+ format!("panic in DataFrame stream: {msg}").into(),
+ )))
+ }
+ }
+ }
+}
+
+impl RecordBatchReader for StreamingReader {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
diff --git a/native/Cargo.toml b/native/Cargo.toml
index c462408..0f4ca83 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -23,8 +23,8 @@ publish = false
[lib]
# `rlib` alongside `cdylib` so `cargo test` has a Rust-level harness for
-# native-only invariants (e.g. error-classification routing through wrapped
-# DataFusionError chains). The `cdylib` is still the artifact the JVM loads.
+# native-only invariants (the error-classification tests now live in
+# `datafusion-jni-common`). The `cdylib` is still the artifact the JVM loads.
crate-type = ["cdylib", "rlib"]
[features]
@@ -69,24 +69,23 @@ protoc = ["datafusion-substrait?/protoc"]
runtime-metrics = ["dep:tokio-metrics"]
[dependencies]
-arrow = { version = "58", features = ["ffi"] }
-async-trait = "0.1"
-datafusion = { version = "53.1.0", features = ["avro"] }
-datafusion-proto = "53.1.0"
-datafusion-substrait = { version = "53.1.0", optional = true }
-futures = "0.3"
-jni = "0.21"
-# Pin to the same major as DataFusion 53.1 pulls in transitively (0.13.x)
-# so we share the same `dyn ObjectStore` vtable and don't double-link.
-object_store = { version = "0.13", default-features = false }
-prost = "0.14"
-tokio = { version = "1", features = ["rt-multi-thread"] }
-# Tokio runtime metrics. Optional + cfg-gated: this crate's API surface lives
-# behind `--cfg tokio_unstable`, so enabling the `runtime-metrics` feature also
-# requires the caller to set `RUSTFLAGS="--cfg tokio_unstable"` at build time.
-tokio-metrics = { version = "0.5", optional = true }
-url = "2"
+arrow = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true, features = ["avro"] }
+# Shared JNI plumbing (error->exception mapping, runtime singleton,
+# StreamingReader). `avro` keeps the classifier's AvroError->IoException arm
+# in sync with the `avro` feature on `datafusion` above.
+datafusion-jni-common = { path = "../native-common", features = ["avro"] }
+datafusion-proto = { workspace = true }
+datafusion-substrait = { workspace = true, optional = true }
+futures = { workspace = true }
+jni = { workspace = true }
+object_store = { workspace = true }
+prost = { workspace = true }
+tokio = { workspace = true }
+tokio-metrics = { workspace = true, optional = true }
+url = { workspace = true }
[build-dependencies]
-prost-build = "0.14"
-protoc-bin-vendored = "3"
+prost-build = { workspace = true }
+protoc-bin-vendored = { workspace = true }
diff --git a/native/src/arrow.rs b/native/src/arrow.rs
index 2bbe7b0..67e5caf 100644
--- a/native/src/arrow.rs
+++ b/native/src/arrow.rs
@@ -23,10 +23,10 @@ use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
-use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::proto_gen::ArrowReadOptionsProto;
use crate::runtime;
use crate::schema::decode_optional_schema;
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_arrow_options(
env: &mut JNIEnv,
diff --git a/native/src/avro.rs b/native/src/avro.rs
index 85d4a07..257ae32 100644
--- a/native/src/avro.rs
+++ b/native/src/avro.rs
@@ -23,10 +23,10 @@ use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
-use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::proto_gen::AvroReadOptionsProto;
use crate::runtime;
use crate::schema::decode_optional_schema;
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_avro_options(
env: &mut JNIEnv,
diff --git a/native/src/cache_manager.rs b/native/src/cache_manager.rs
index 3b9e286..ec38dc8 100644
--- a/native/src/cache_manager.rs
+++ b/native/src/cache_manager.rs
@@ -34,8 +34,8 @@ use datafusion::execution::cache::cache_unit::{
};
use datafusion::execution::cache::DefaultListFilesCache;
-use crate::errors::JniResult;
use crate::proto_gen::CacheManagerOptionsProto;
+use datafusion_jni_common::errors::JniResult;
/// Build a [`CacheManagerConfig`] from the proto. Returns `Ok(None)` if the
/// caller did not set any cache-manager field, so the JNI layer can skip the
diff --git a/native/src/csv.rs b/native/src/csv.rs
index 3ae4627..b79ed59 100644
--- a/native/src/csv.rs
+++ b/native/src/csv.rs
@@ -26,12 +26,12 @@ use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
-use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::proto_gen::{
CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType,
};
use crate::runtime;
use crate::schema::decode_optional_schema;
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_csv_options(
env: &mut JNIEnv,
diff --git a/native/src/json.rs b/native/src/json.rs
index 8eea32f..b87be78 100644
--- a/native/src/json.rs
+++ b/native/src/json.rs
@@ -27,12 +27,12 @@ use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
-use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::proto_gen::{
FileCompressionType as ProtoFileCompressionType, JsonWriteOptionsProto, NdJsonReadOptionsProto,
};
use crate::runtime;
use crate::schema::decode_optional_schema;
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_json_options(
env: &mut JNIEnv,
diff --git a/native/src/lib.rs b/native/src/lib.rs
index 4fd7a8a..6e1a79f 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -19,7 +19,6 @@ mod arrow;
mod avro;
mod cache_manager;
mod csv;
-mod errors;
mod jni_util;
mod json;
mod memory;
@@ -34,16 +33,13 @@ pub(crate) mod proto_gen {
include!(concat!(env!("OUT_DIR"), "/datafusion_java.rs"));
}
-use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
-use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream;
use datafusion::arrow::ipc::writer::StreamWriter;
-use datafusion::arrow::record_batch::{RecordBatchIterator, RecordBatchReader};
+use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::common::{JoinType, UnnestOptions};
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrame;
@@ -51,11 +47,9 @@ use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
-use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::Expr;
use datafusion::logical_expr::{col, Partitioning, ScalarUDF, Signature, SortExpr};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
-use futures::StreamExt;
use jni::objects::{JBooleanArray, JByteArray, JClass, JObject, JObjectArray, JString};
use jni::sys::{jboolean, jbyte, jbyteArray, jint, jlong};
use jni::JNIEnv;
@@ -63,7 +57,10 @@ use jni::JavaVM;
use prost::Message;
use tokio::runtime::Runtime;
-use crate::errors::{try_unwrap_or_throw, JniResult};
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
+// Re-exported so sibling modules keep their crate-local `crate::StreamingReader` path.
+pub(crate) use datafusion_jni_common::StreamingReader;
+
use crate::proto_gen::ParquetReadOptionsProto;
use crate::proto_gen::SessionOptions;
use crate::schema::decode_optional_schema;
@@ -84,18 +81,15 @@ pub(crate) fn jvm() -> &'static JavaVM {
}
pub(crate) fn runtime() -> &'static Runtime {
- static RT: OnceLock = OnceLock::new();
- RT.get_or_init(|| {
- let rt = Runtime::new().expect("failed to create Tokio runtime");
- // Eagerly install the runtime-metrics accumulator (no-op when the
- // `runtime-metrics` Cargo feature is off). Initialising here -- not
- // lazily on the first `runtimeStats()` call -- means the
- // RuntimeMonitor's sampling baseline coincides with runtime start, so
- // poll/park/busy totals reflect activity from the first query onward
- // rather than from the first observation.
- crate::runtime_metrics::init(rt.handle());
- rt
- })
+ // The singleton itself lives in datafusion-jni-common (shared with the
+ // datafusion-spark-bridge SDK; each cdylib statically links its own
+ // copy, so the runtime stays per-library). The init hook eagerly installs the
+ // runtime-metrics accumulator (no-op when the `runtime-metrics` Cargo
+ // feature is off). Initialising here -- not lazily on the first
+ // `runtimeStats()` call -- means the RuntimeMonitor's sampling baseline
+ // coincides with runtime start, so poll/park/busy totals reflect activity
+ // from the first query onward rather than from the first observation.
+ datafusion_jni_common::runtime_with_init(crate::runtime_metrics::init)
}
/// Wrap the (already-built) `RuntimeEnvBuilder`'s memory pool with a
@@ -289,50 +283,6 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_collectDataFrame<'lo
})
}
-/// Bridges DataFusion's async [`SendableRecordBatchStream`] to the synchronous
-/// [`RecordBatchReader`] interface that `FFI_ArrowArrayStream` (and therefore
-/// the Java `ArrowReader`) consumes. Each call to `next()` drives one
-/// `runtime().block_on(stream.next())`, so memory pressure stays bounded by the
-/// executor pipeline plus a single in-flight batch.
-struct StreamingReader {
- schema: SchemaRef,
- stream: SendableRecordBatchStream,
-}
-
-impl Iterator for StreamingReader {
- type Item = Result;
-
- fn next(&mut self) -> Option {
- // Arrow's C ABI invokes this iterator through FFI_ArrowArrayStream's
- // vtable, outside the JNI handler's try_unwrap_or_throw guard. A panic
- // here (buggy UDF, arrow cast that panics, runtime poison) would
- // unwind across C/FFI -- undefined behaviour. Catch it and surface as
- // an ArrowError so the Java side sees a normal exception instead.
- let next = catch_unwind(AssertUnwindSafe(|| runtime().block_on(self.stream.next())));
- match next {
- Ok(item) => item.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e)))),
- Err(panic) => {
- let msg = if let Some(s) = panic.downcast_ref::() {
- s.clone()
- } else if let Some(s) = panic.downcast_ref::<&str>() {
- (*s).to_string()
- } else {
- "rust panic with non-string payload".to_string()
- };
- Some(Err(ArrowError::ExternalError(
- format!("panic in DataFrame stream: {msg}").into(),
- )))
- }
- }
- }
-}
-
-impl RecordBatchReader for StreamingReader {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_DataFrame_executeStreamDataFrame<'local>(
mut env: JNIEnv<'local>,
diff --git a/native/src/object_store.rs b/native/src/object_store.rs
index eefccf2..985d721 100644
--- a/native/src/object_store.rs
+++ b/native/src/object_store.rs
@@ -28,9 +28,9 @@ use std::sync::Arc;
use datafusion::prelude::SessionContext;
use url::Url;
-use crate::errors::JniResult;
use crate::proto_gen::object_store_registration::Backend;
use crate::proto_gen::ObjectStoreRegistration;
+use datafusion_jni_common::errors::JniResult;
#[cfg(feature = "object-store-gcp")]
use crate::proto_gen::GcsOptions;
diff --git a/native/src/proto.rs b/native/src/proto.rs
index 4f187bc..c1315f9 100644
--- a/native/src/proto.rs
+++ b/native/src/proto.rs
@@ -28,8 +28,8 @@ use jni::sys::{jbyteArray, jlong};
use jni::JNIEnv;
use prost::Message;
-use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::runtime;
+use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_createDataFrameFromProto<
diff --git a/native/src/runtime_metrics.rs b/native/src/runtime_metrics.rs
index e69410e..dd60dcb 100644
--- a/native/src/runtime_metrics.rs
+++ b/native/src/runtime_metrics.rs
@@ -38,7 +38,7 @@
//! 10 totalOverflowCount
#[cfg(not(feature = "runtime-metrics"))]
-use crate::errors::JniResult;
+use datafusion_jni_common::errors::JniResult;
/// Number of i64 values in the snapshot array; kept here so the Java side and
/// the feature-off stub agree on the layout.
@@ -51,7 +51,7 @@ mod imp {
use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
use super::STATS_FIELD_COUNT;
- use crate::errors::JniResult;
+ use datafusion_jni_common::errors::JniResult;
/// `RuntimeMonitor::intervals().next()` returns *delta* metrics covering
/// the period since the previous call (or, on the very first call, since
@@ -196,7 +196,7 @@ pub fn runtime_stats() -> JniResult<[i64; STATS_FIELD_COUNT]> {
Err(
"datafusion-jni was built without the `runtime-metrics` Cargo feature; \
rebuild the native crate with \
- `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics` \
+ `RUSTFLAGS=\"--cfg tokio_unstable\" cargo build -p datafusion-jni --features runtime-metrics` \
to enable SessionContext.runtimeStats"
.into(),
)
diff --git a/native/src/schema.rs b/native/src/schema.rs
index 968a73a..0c3c7ab 100644
--- a/native/src/schema.rs
+++ b/native/src/schema.rs
@@ -20,7 +20,7 @@ use datafusion::arrow::ipc::reader::StreamReader;
use jni::objects::JByteArray;
use jni::JNIEnv;
-use crate::errors::JniResult;
+use datafusion_jni_common::errors::JniResult;
/// Decode an optional Arrow-IPC schema byte array passed in from Java.
/// Returns `None` if the byte-array reference is null.
diff --git a/pom.xml b/pom.xml
index 6210841..b92cf72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,11 @@ under the License.
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.13.0
+
org.apache.maven.plugins
maven-surefire-plugin
@@ -159,6 +164,7 @@ under the License.
README.md
CONTRIBUTING.md
docs/**
+ **/*.md
.gitignore
.idea/**
@@ -173,12 +179,17 @@ under the License.
.mvn/**
**/target/**
- native/target/**
+ rust-target/**
tpch-data/**
-
- native/Cargo.lock
+
+ Cargo.lock
+
+ **/META-INF/services/**
dev/release/rat_exclude_files.txt
+
+ spark/scaffold/bridge-template/**