Skip to content

feat: Alternative table provider as spark data source#110

Draft
timsaucer wants to merge 21 commits into
apache:mainfrom
timsaucer:feat/plain-c-scan-ffi
Draft

feat: Alternative table provider as spark data source#110
timsaucer wants to merge 21 commits into
apache:mainfrom
timsaucer:feat/plain-c-scan-ffi

Conversation

@timsaucer

Copy link
Copy Markdown
Member

DO NOT MERGE

This PR is so drafty you can't stand next to it without wearing a sweater.

Alternative to #103 attempting to follow the suggestion in: #104 (comment)

timsaucer and others added 20 commits June 12, 2026 13:58
…e-common

Move the standalone `native` crate into a root Cargo workspace and extract
shared JNI plumbing (error->exception mapping, Tokio runtime singleton,
StreamingReader) into a new `datafusion-jni-common` crate under `native-common/`.
`native/src/errors.rs` moves to `native-common/src/errors.rs`; the nine native
modules now import error/runtime helpers from `datafusion_jni_common`.

Build glue follows: single root `Cargo.lock`, `.cargo/config.toml` redirects
output to `rust-target/`, Makefile/CI/poms updated to build `--workspace` and
target `-p datafusion-jni`. Core javadoc build commands updated to match.

Pure refactor; no behavior change. First of a 6-PR stack splitting the Spark
DataSource V2 connector work.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…on-jni-common

Add [workspace.package] (version, edition, license, repository) to the root
manifest and have both crates inherit it via `*.workspace = true`, so a single
bump re-versions the workspace in lock step.

Make datafusion-jni-common publishable: drop `publish = false` and add a
`description` (crates.io requires it). All its dependencies are registry
crates, so nothing blocks publish. datafusion-jni stays `publish = false`
since it is a JVM-loaded cdylib, not a crates.io library.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The `**/META-INF/services/**` and `spark/scaffold/bridge-template/**` RAT
excludes guard files that do not exist until later splits in the stack. They
are dead config here. Re-added in the splits that introduce the files they
cover (META-INF/services in 04-spark-scala-connector, bridge-template in
05-bridge-scaffold).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The `**/*.md` exclude dropped every markdown file repo-wide from license
checking, not just docs. It was redundant (docs are already covered by
`docs/**`) and overly broad. Removing it; RAT still reports 0 unapproved
because the remaining markdown carries valid headers.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a crate README so the crates.io listing has front-page content, and wire
it in via `readme = "README.md"`. The ASF license header is an HTML comment
(matching dev/release/README.md) so RAT approves it while it stays invisible
in the rendered markdown.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback on the workspace-foundation PR:

- development.md: trim the repo-layout section to the crates this PR
  actually ships (native, native-common). It was forward-referencing
  spark/, spark/bridge, datafusion-spark-bridge, and examples/native --
  none of which exist until later PRs in the stack -- and called the
  member list "three" while listing four. Later PRs (apache#105/apache#106/apache#107/apache#109)
  carry notes to re-add their own slice when those dirs land.

- rat_exclude_files.txt: the Rust lockfile moved to the workspace root,
  so the stale native/Cargo.lock entry left the root Cargo.lock with no
  RAT exclude for the source-tarball check (check-rat-report.py). Point
  it at Cargo.lock.

- native-common: dedupe the panic-payload downcast -- StreamingReader::next
  now calls errors::panic_message instead of repeating the String/&str
  match inline.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The Cargo workspace conversion redirects build output to rust-target/
(via .cargo/config.toml), but the dev/release scripts still built from
native/ and read native/target/release/, which is no longer populated
even when cargo runs inside native/ (config is discovered up-tree).

- build-native-libs.sh / build-release.sh: build from the repo root with
  `-p datafusion-jni` and copy from rust-target/{release,<triple>/release}/.
- verify-release-candidate.sh: run `cargo fmt --all` workspace-wide so the
  new native-common crate is covered (matches CI lint.yml).
- updating-datafusion-version.md: list the workspace.dependencies entries
  that actually exist (datafusion, -proto, -spark, -substrait); drop the
  stray datafusion-ffi reference.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
No release script or doc describes publishing the Rust crates to
crates.io, and the crate is an internal implementation detail of the
native crates (its README already says the API may change to track
their needs). Match `publish = false` on datafusion-jni so an
accidental `cargo publish` can't push it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Introduce datafusion-scan-ffi: a cdylib exposing a DataFusion
TableProvider scan through extern "C" entrypoints that speak only C
primitives and the standard Arrow C Data/Stream interface
(ArrowSchema/ArrowArrayStream). No JVM/JNI dependency, so the surface is
consumable from Java (via a thin shim or FFM), Python, Go, or Rust, and
is a candidate to live closer to DataFusion proper.

This is the JNI-free reshaping of PR apache#103's scan logic per review
feedback on PR apache#104: providers are compiled in and registered by name
(approach A), filters cross as datafusion.LogicalExprNode protobufs
(shared vocabulary with datafusion-ffi/Comet), and each scanned
partition is handed back as a zero-copy FFI_ArrowArrayStream.

- abi.rs: df_scan_{schema,create,partition_count,execute_partition,
  execute,close}, df_error_free, df_scan_abi_version
- scan.rs: build -> register -> project -> filter -> plan core
- registry.rs: name-keyed provider builders
- reader.rs: SendableRecordBatchStream -> panic-safe RecordBatchReader
- include/datafusion_scan.h: the C header
- tests/roundtrip.rs: drives the ABI and re-imports the stream via the
  Arrow C Stream interface, no JVM involved (6 tests)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Two messages defining the wire formats around datafusion-scan-ffi:

- ScanConfig (+ ListingSource, ScanPartition): the provider-config blob
  carried in the ABI's opaque `options`/`partition` arguments and decoded
  by the registered provider builder. Reuses the existing per-format
  read-option messages in a source oneof, with a `custom` bytes escape
  hatch for builders that define their own format.

- ScanRequest: the pushdown a query engine (Spark DataSourceV2, ...)
  captures during planning -- projection, filters (each a serialized
  datafusion.LogicalExprNode), limit, partition/batch tuning, config
  overrides. It is the engine-side staging object that the JNI/FFM shim
  explodes into df_scan_create's typed arguments, NOT a single blob
  passed through the ABI; keeping the C arguments typed stays
  FFM-friendly and language-neutral.

Validated with protoc (proto3, imports resolve).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a build.rs (prost-build + vendored protoc, mirroring native/build.rs)
that compiles scan_config.proto / scan_request.proto and the per-format
read-option messages they embed, and expose the generated types as the
`proto` module so provider builders can decode the `options` blob and an
engine can build a `ScanRequest`.

build.rs honors a caller-set PROTOC before falling back to the vendored
binary. Java codegen needs no change: core's protobuf-maven-plugin
`compile-local` execution already scans the whole proto/ dir.

Adds tests/proto.rs: round-trips ScanConfig (with an embedded
CsvReadOptionsProto through the source oneof) and ScanRequest, proving
the cross-file imports resolve. Full suite now 8 tests; clippy clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Register a real file-backed provider builder under "datafusion.listing"
that decodes the ScanConfig blob into a DataFusion ListingTable: parses
the paths, maps the per-format read-option message (CSV/JSON/Parquet/
Avro/Arrow) to a FileFormat + ListingOptions, and either applies an
explicit Arrow-IPC schema or infers one from the data.

Schema inference needs the session, so ProviderBuilder now takes a
&SessionContext and scan::create builds the context before the provider
(then registers it on the same context). The demo builder ignores it.

Enables datafusion's `avro` feature for AvroFormat. Adds tests/listing.rs:
writes a CSV, scans it through the C ABI end to end (schema probe +
full-plan execute, summing ids), exercising the inference path. Full
suite now 10 tests; clippy + fmt clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Rust cdylib datafusion-scan-jni: a thin JVM adapter that marshals a
provider name + ScanConfig/ScanRequest byte[]s into the in-process scan
core of datafusion-scan-ffi and writes FFI_ArrowArrayStream /
FFI_ArrowSchema into the addresses arrow-java allocated. No Arrow data
crosses JNI -- batches flow through the Arrow C Stream interface. Six
Java_org_apache_datafusion_scan_NativeScan_* entry points; non-Java
consumers still use the df_scan_* C symbols directly.

Java side (in core, org.apache.datafusion.scan):
- NativeScan: raw native declarations
- ScanNativeLoader: loads datafusion_scan_jni from java.library.path
- DatafusionScan: AutoCloseable wrapper returning ArrowReader via
  Data.importArrayStream, mirroring DataFrame#collect; schema() probes
  via Data.importField

The control plane is ~6 pass-through methods; the data plane reuses
arrow-java's existing C Data interface. ScanRequest.limit is decoded but
not yet applied (follow-up to wire through the core + C ABI together).

cargo build/clippy clean, all six JNI symbols exported; mvn -pl core
compile builds the Java against arrow-c-data.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add DatafusionScanTest: builds a ScanConfig with the generated protobuf
builders, drives the datafusion.listing provider over a CSV entirely
from Java via DatafusionScan, and reads the result back through
Data.importArrayStream. Asserts inferred schema [id, name] and that the
rows scan correctly.

This closes the one ABI risk flagged in the design: the FFI_ArrowArrayStream
produced by arrow-rs 58 imports cleanly into arrow-java 19 through this
path. Point surefire's java.library.path at rust-target/<profile> so
System.loadLibrary finds the datafusion_scan_jni shim.

Tests run: 2, Failures: 0, Errors: 0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Extend DatafusionScanTest with two pushdown cases driven through a
ScanRequest:

- projectionPrunesColumns: addProjection("name") -> result schema is just
  [name], rows still 3.
- filterPushdownSelectsRows: a hand-built LogicalExprNode for `id >= 2`
  (Column / ScalarValue from datafusion_common, BinaryExprNode op "GtEq")
  serialized into ScanRequest.filters -> only ids 2 and 3 returned.

Proves the projection/filter path the ABI and proto plumb is exercised
end to end from Java, and that a Java-built datafusion.LogicalExprNode
decodes and applies in the scan core (same datafusion.proto both sides).

Tests run: 4, Failures: 0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Apply ScanRequest.limit end to end so all consumers agree:

- scan core: ScanRequest gains `limit: Option<usize>`, applied as
  df.limit(0, Some(n)) after filters.
- C ABI: df_scan_create gains an `int64 limit` parameter (negative means
  none); header updated.
- JNI shim: build_request maps the proto's optional limit through.

Tests: Rust limit_caps_row_count (demo provider, cap 2 across two
partitions) and Java limitCapsRows (limit 2 over a 3-row CSV). Existing
df_scan_create call sites pass -1. Rust suite 11, Java scan suite 5;
clippy/fmt clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A Spark DataSourceV2 ("datafusion") backed by a DataFusion TableProvider
through the plain-C scan ABI. Registered via DataSourceRegister, so
spark.read.format("datafusion").option("path", ...).load() works.

Wiring (org.apache.datafusion.spark):
- DatafusionTableProvider / DatafusionTable / DatafusionScanBuilder /
  DatafusionScanImpl / DatafusionInputPartition /
  DatafusionPartitionReaderFactory / DatafusionPartitionReader
- OptionsCodec: Spark options -> ScanConfig (csv/parquet/json listing)
- SchemaConverter: Arrow schema -> Spark StructType (our Arrow only)
- SparkFilters: Spark Filters -> datafusion.LogicalExprNode (comparisons,
  And/Or/Not, IsNull/IsNotNull over primitive literals)
- ArrowToInternalRow: row-based conversion, so the connector never shares
  Arrow with Spark's bundled copy (Arrow excluded from spark-sql in the
  pom). Columnar via ArrowColumnVector is a future optimization.

Partitions ship config+request bytes (never a native handle); each
executor rebuilds and runs its partition. New Maven module datafusion-spark
(Java, Spark 3.5.3 / Scala 2.13), provided scope, with the Java 17
add-opens + java.library.path surefire args.

End-to-end test against a local SparkSession: schema inference, full
scan, projection, and filter pushdown over a CSV. Tests run: 4. Spotless
clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Switch the Spark connector to zero-copy columnar reads:

- Target Spark 4.0.0 (Arrow 18.1.0). Declare arrow-java `provided` (and
  arrow-c-data explicitly, also provided), so the cluster's Arrow is the
  single arrow-java in the executor JVM -- shared by our stream import
  AND Spark's ArrowColumnVector. We compile against the target Spark's
  Arrow but never bundle it.
- DatafusionColumnarPartitionReader: wrap the imported Arrow vectors
  directly in ArrowColumnVector -> ColumnarBatch, no per-cell copy. The
  ArrowReader owns the vectors; we don't double-close via the batch.
- Factory: supportColumnarReads -> true, createColumnarReader.
- Remove the row-based reader + ArrowToInternalRow.

datafusion-java (core) is untouched -- still Arrow 19 for standalone use;
its Arrow transitive is just excluded from this module so the
Spark-provided Arrow wins. The Rust side is unchanged: the C Data
interface is version-independent.

E2E test (schema, full scan, projection, filter pushdown) green on Spark
4.0 columnar. Tests run: 4. Spotless clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
DatafusionScanBuilder now implements SupportsPushDownLimit: pushLimit
records the limit, build() sets ScanRequest.limit, and the scan core
applies it (df.limit after filters). pushLimit returns true -- DataFusion
enforces the bound exactly and a limited plan coalesces to one partition,
so the total row count is guaranteed.

Tests: DatafusionScanBuilderTest decodes the built ScanRequest to prove
limit/projection/filter are actually encoded (isolated from Spark's own
handling, which would mask non-pushdown); DatafusionSourceTest gains an
E2E limit case. Added a package-private scanRequestBytes() accessor for
the unit test. spark suite: 9 tests. Spotless clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Canonical design for the plain-C scan ABI, JNI shim, and Spark
DataSourceV2 connector: the two-plane (control / Arrow-C-data) model, the
df_scan_* ABI, the scan_config/scan_request wire formats, the
provided-Arrow strategy that keeps core on Arrow 19 while enabling
zero-copy columnar on Spark 4.0, the DSv2 mapping with projection/filter/
limit pushdown, the test matrix, a decisions log, and remaining gaps.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@paleolimbot paleolimbot left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for putting this together! It's a good thing I live in an old house in Winnipeg and own a number of excellent sweaters.

I left a few suggestions but in general I like this approach, or perhaps, I feel as though I can do a better job reviewing this than a more JNI heavy approach because I know the Arrow C interfaces better than I know JNI. In this approach we are basically building a mini ADBC but it's a constrained subset of the things ADBC is trying to do, so I think it is OK.

When there's consensus on the approach I'm happy to review any of them 🙂

Comment thread native-jni/src/lib.rs
Comment on lines +18 to +27
//! Thin JNI shim over the plain-C scan core (`datafusion-scan-ffi`).
//!
//! This is the JVM's path to the scan ABI. It is deliberately minimal: it
//! marshals Java arguments (a `String` provider name and two `byte[]` blobs)
//! into the in-process scan core, hands back an opaque handle as a `jlong`,
//! and -- for the data plane -- writes a standard `FFI_ArrowArrayStream` (or
//! `FFI_ArrowSchema`) into the address arrow-java allocated. **No Arrow data
//! crosses the JNI boundary**: batches flow through the Arrow C Stream
//! interface, which arrow-java imports with `Data.importArrayStream`.
//!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just highlighting for future me the key pieces here...this is the JNI piece. I'm biased because I don't know JNI but I do know Arrow, so the fact that this version only has a few hundred lines of JNI appeals to me.

Comment on lines +84 to +94
// Plan a scan. On success writes an owned handle to *out_handle (release with
// df_scan_close). projection is an array of column-name DfStr (empty = all);
// filters is an array of serialized datafusion.LogicalExprNode DfBytes;
// target_partitions / batch_size <= 0 keep DataFusion defaults; limit < 0 means
// no row limit.
int32_t df_scan_create(DfStr provider, DfBytes options, DfBytes partition,
int32_t target_partitions, int32_t batch_size, int64_t limit,
const DfKeyValue* config_overrides, size_t config_overrides_len,
const DfStr* projection, size_t projection_len,
const DfBytes* filters, size_t filters_len,
DfScanHandle** out_handle, char** out_err);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do this without the C symbols (just structs, like the Arrow C Data/Stream interfaces). The reason that's nice is that symbols can be hard to guarantee uniqueness (what if you have two Rust crates that want to export table provider and they're statically linked in the same pile of crates?). This is roughly how datafusion-python works with the generated FFI from datafusion-ffi.

struct DfScanHandle {
    int32_t (*partition_count)(const DfScanHandle* self);
    // -1 partition ID for coalesced stream
    int32_t (*execute)(const DfScanHandle* self, int32_t partition,
                                  struct ArrowArrayStream* out_stream);
    const char* (*get_last_error)(struct DfScanHandle* self);
    void* private_data;
    void (*release)(struct DfScanHandle* self);
};

struct DfSimpleTableProvider {
  int32_t (*schema)(struct DFTableProvider* self, struct ArrowSchema* out_schema);
  int32_t (*scan_create)(DfStr provider, DfBytes options, DfBytes partition,
                       int32_t target_partitions, int32_t batch_size, int64_t limit,
                       const DfKeyValue* config_overrides, size_t config_overrides_len,
                       const DfStr* projection, size_t projection_len,
                       const DfBytes* filters, size_t filters_len,
                       DfScanHandle** out_handle);
  const char* (*get_last_error)(struct DFTableProvider* self);
  void* private_data;
  void (*release)(struct DFTableProvider* self);
};

For Spark, a TableProvider exporter could define an entrypoint and build a cdylib. The entrypoint would have to have a common signature (like how AdbcInitFunc works).

int32_t MyProviderInit(void* table_provider, const char* name, int32_t df_abi_version);

...but Python packages that export one of these could just work with capsules.

Document how the scan core, provider registry, and common crate are
reusable behind an ADBC front-end, what adbc_core trait impls add, the
suggested shared native-exec-core layout, and why both front-ends should
coexist (Spark pre-resolved/runtime pushdown does not always re-serialize
to SQL).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@timsaucer

Copy link
Copy Markdown
Member Author

In this approach we are basically building a mini ADBC but it's a constrained subset of the things ADBC is trying to do

Yes, but I'm starting to think more about this. I did some analysis with my agent of where the gaps in the ADBC were and it was suggesting the problems with not having explicit push down filters. However I think if we use the substrait interface instead of the SQL interface then that argument disappears. I'm slightly outside my comfort zone on some of these aspects.

What I'm going to try next - instead lean in towards your suggestion of ADBC and see if that can support all of the spark options I'd need and still get the pushdown. Also I need to make sure the task/partition mapping would still work.

@paleolimbot

paleolimbot commented Jun 16, 2026

Copy link
Copy Markdown
Member

You're kind to try all these!

However I think if we use the substrait interface instead of the SQL interface then that argument disappears.

Also note that while these names are "substrait" and "SQL", the only API requirement is that the first one is "bytes" and the second one is "string". We can and should (personal opinion) abuse them (e.g., statement.set_option("datafusion.query_format", "protobuf"), statement.set_option("datafusion.statement.filters", b"xxxxx")).

@tokoko

tokoko commented Jun 16, 2026

Copy link
Copy Markdown

hey, just FYI, there's an issue in spark apache/spark#54603 for an adbc data source and a mostly working implementation (with sql pushdown) in a separate repo that you might want to check out.

@timsaucer

Copy link
Copy Markdown
Member Author

hey, just FYI, there's an issue in spark apache/spark#54603 for an adbc data source and a mostly working implementation (with sql pushdown) in a separate repo that you might want to check out.

Thank you! Looks very interesting. I will definitely follow that PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants