Skip to content

Feat/table provider ffi#103

Draft
timsaucer wants to merge 22 commits into
apache:mainfrom
timsaucer:feat/table-provider-ffi
Draft

Feat/table provider ffi#103
timsaucer wants to merge 22 commits into
apache:mainfrom
timsaucer:feat/table-provider-ffi

Conversation

@timsaucer

Copy link
Copy Markdown
Member

DO NOT MERGE. THIS IS A WORK IN PROGRESS.

Which issue does this PR close?

None, but willing to open one for discussion.

Rationale for this change

This PR provides two major pieces

  • Support for Rust backed Table Providers to interact with Java session context
  • Exposing DataFusion table providers as Spark data sources

With these you can use DataFusion designed table providers with existing spark workflows as a columnar data source. There may be additional opportunity with Comet to identify these tables and use their underlying implementations directly. This extends the usefulness of DataFusion backed sources.

What changes are included in this PR?

  • Addition of a new crate
  • Addition of new java code to support FFI table providers
  • DataFusion data source for spark library that includes schema widening and type casting for types not supported by Spark
  • Full end to end example using an in memory datafusion table with pyspark

Are these changes tested?

  • End to end test is included
  • TODO: Evaluate what other testing requirements are needed

Are there any user-facing changes?

There are user facing changes, but they are addition only.

timsaucer and others added 22 commits June 10, 2026 10:30
Vendors the generic connector-core from rerun-io/rerun-spark-connector into
a new `spark/` Maven module (Apache 2.0 headers, Scala 2.13), wiring it
against this repo's `SessionContext.registerFfiTable`. Adds an
`ExampleFfiProviderFactory` against the existing example MemTable cdylib and
a self-contained pyspark demo that exercises schema inference, scan,
projection, and predicate pushdown across the FFI boundary.

- spark/: DSv2 plumbing (DatafusionSource/Table/Scan/Reader), Arrow → Spark
  schema converter, Spark V2 Predicate → DataFusion LogicalExprNode
  translator, and a widening cdylib (`libdatafusion_spark_helper`) wrapping
  Spark-incompatible Arrow types via `arrow::compute::cast` before scans.
- spark/.../META-INF/services/...DataSourceRegister: registers the
  `datafusion` short name so `spark.read.format("datafusion")` resolves.
- examples/ExampleFfiProviderFactory: minimal FfiProviderFactory delegating
  to FfiTableProviderExampleNative.createMemTableProvider().
- examples/python/: pyspark demo + README documenting the uv venv,
  side-loaded Scala 2.13 Spark distro, `-Dmaven.repo.local` flow, and the
  `extraClassPath` Arrow 19 / flatbuffers 25 / protobuf 3.25 overrides
  needed because Spark 3.5.7 ships older versions.
- pom.xml: add `spark` module, allow `**/*.md` and `**/Cargo.lock` through
  RAT, exclude `**/META-INF/services/**` (line-delimited SPI files).
- DatafusionSource.scala / DatafusionColumnarPartitionReader.scala: drop
  reflective `registerFfiTable` calls (carried over from the upstream
  pre-merge state) for direct invocation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The three Rust crates (`native`, `examples/native`, `spark/native`) now
live under one root Cargo workspace with all dependencies declared in
`[workspace.dependencies]`. Members reference shared deps via
`{ workspace = true }`; per-crate flags (`optional`, extra `features`)
stay at the use site. Only one `Cargo.lock` to maintain.

Cargo writes to `rust-target/` (overridden via `.cargo/config.toml`) so
`mvn clean` at the repo root does not nuke the Rust build cache. Maven
antrun copies, Makefile, GitHub Actions caches, examples README, and the
`FfiTableProviderExampleNative` lookup paths all repoint there.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Demonstrate how Spark DataSource V2 options flow through the
FfiProviderFactory into the native MemTable build. The example
factory now accepts name_prefix, num_rows, and num_batches, encodes
them as a small length-prefixed binary blob, and the Rust cdylib
decodes the blob to size the generated table accordingly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Replace `String[] listPartitions` with `PartitionInfo[]` carrying an opaque
per-partition byte payload and optional host hints, and pass that payload to
`createProvider(opts, partitionBytes)` on the executor. The partition record
overrides `preferredLocations()` so Spark co-locates tasks with the data.

This is the connector-API change needed before bridges can split a dataset
across N tasks (each materialising only its slice) or pin partitions to
specific Spark workers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e createProvider

Reflect the FfiProviderFactory signature change: `listPartitions` returns
`PartitionInfo[]` carrying the per-partition payload + preferred hosts, and
`createProvider` now takes `(opts, partitionBytes)`. Explain the slice/locality
semantics and refresh the "what runs where" table.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ng hook

Add a `reportPartitioning(opts)` default method on `FfiProviderFactory` that
returns a `ReportedPartitioning` (Transform[] keys, derived numPartitions) or
null for unknown. `DatafusionScan` now implements `SupportsReportPartitioning`
and emits `KeyGroupedPartitioning(keys, partitions.length)` when the bridge
opts in, `UnknownPartitioning` otherwise — letting Spark's optimizer skip
shuffles ahead of compatible joins/aggregations.

Move the `listPartitions` + `reportPartitioning` call sites into
`DatafusionScanBuilder.build()` so the Scan has both facts cached up front;
`DatafusionBatch.planInputPartitions` now reuses the cached `PartitionInfo[]`
instead of re-invoking the factory.

`ReportedPartitioning` ships with `identity(cols…)` and `bucket(n, cols…)`
convenience builders for the common cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One provider per task makes createProvider cost dominate scans with
thousands of small partitions. Shared-scan mode (opt-in via
FfiProviderFactory.sharedScan) builds the provider once per
(executor JVM x query), plans it once, and maps one Spark task onto
each DataFusion output partition via the new
DataFrame.toPartitionedExecution / PartitionedExecution core API.

- SharedScanCache: scanId-keyed, refcounted, idle-TTL evicted;
  exactly-once build per task wave; failures not cached
- PinnedSessionConfig: driver-resolved session knobs shipped to
  executors (default target_partitions is core-count-dependent and
  would desync partition indices across machines); executors fail
  fast on partition-count divergence
- listPartitions(opts, filters) overload: bridges can prune whole
  partitions from pushed predicates
- PartitionInfo.partitionKeyValues + HasPartitionKey wiring: the
  reported KeyGroupedPartitioning was inert on Spark 3.3+ without
  per-partition key values
- legacy reader: close intermediate DataFrames in the filter fold

Determinism contract (snapshot pinning, re-executable execute(i))
documented on FfiProviderFactory.sharedScan and in
SPARK_INTEGRATION.md; equal partition counts with different contents
are undetectable by construction.

BREAKING CHANGE: PartitionInfo's canonical record constructor gains a
fourth component (partitionKeyValues). Source-compatible via the
3-arg delegating constructor; binary compatibility with pre-existing
compiled bridges is broken.
The connector now consumes a bridge's FFI_TableProvider entirely in
process: FfiHelperNative.createScan widens the provider, builds the
pinned-config SessionContext, applies projection and pushed proto
filters, and plans once; partition streams cross back as
FFI_ArrowArrayStream. This drops the re-FFI widening hop and the SQL
string round-trip, and shrinks the datafusion-ffi version-lockstep
surface from three cdylibs to two (bridge <-> connector).

- extract datafusion-jni-common (error->exception mapping, runtime
  singleton, StreamingReader) shared by datafusion-jni and the
  connector cdylib
- revert the core additions the old path needed: registerFfiTable,
  DataFrame.filterFromProto/toPartitionedExecution, and
  PartitionedExecution, restoring core's public surface to main
- core jar remains a Spark dependency only for the generated
  datafusion.protobuf classes and typed exceptions; its cdylib no
  longer loads in Spark JVMs
- delete DatafusionSqlBuilder and the plain-Java FfiTableProviderExample

Verified: cargo and mvn suites green; pyspark demo passes legacy and
shared-scan modes with pushdown, projection, and partition parity.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
examples/README.md targets first-time users: prerequisites, two-step
build with the why behind install-vs-package and exec:exec, entry
points and expected output per example, troubleshooting.

SPARK_INTEGRATION.md becomes spark/README.md, rewritten as a
connector-builder guide: the three implementation points with file
paths, the Spark-task-vs-DataFusion-partition mapping in both scan
modes, and task sizing guidance (bin-pack small partitions via opaque
partitionBytes, keep per-stage task counts in the low thousands).

Also drop product-specific (Rerun) references from doc comments;
bridge examples now name neutral domains.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Bridges that own their provider's source shouldn't pay the
FFI_TableProvider hop or the datafusion-ffi ABI lockstep it forces.
The new spark/bridge rlib carries the widening + scan machinery,
provider-source-agnostic: each JNI body takes a closure that supplies
the provider. export_bridge! generates the full JNI surface for a
bridge's own cdylib under a bridge-chosen class name, so several
bridges can coexist in one Spark JVM; the builder receives the raw
option/partition bytes and returns a concrete Arc<dyn TableProvider>.

- datafusion-ffi import lives behind a default-on `ffi` feature;
  static bridges build with no-default-features and drop the
  dependency entirely
- datafusion-spark-helper shrinks to JNI shims for the generic
  io.datafusion.spark.FfiHelperNative path (symbol set unchanged)
- document the FFI_TaskContextProvider lifetime contract: the host
  SessionContext must outlive every provider built from it

JVM-side dispatch to bridge-named native classes is a follow-up;
until then export_bridge! cdylibs build but the connector still
routes through FfiHelperNative.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Static export_bridge! bridges had no JVM route: the connector called
FfiHelperNative statically, hardwiring every scan to the generic FFI
cdylib. The plumbing now talks to a ScanBackend obtained from
FfiProviderFactory.scanBackend(), so a bridge can point the connector
at its own JNI class (per-bridge class names; several bridges per
JVM) while the default FfiScanBackend keeps the FFI path intact.

Factory methods all gain working defaults — a minimal bridge
overrides exactly one method (createProvider or scanBackend):

- encodeOptions: OptionsCodec, key-sorted length-prefixed UTF-8
  pairs; sorted because shared-scan mode uses the bytes as the scan
  identity. Rust decoder in datafusion_spark_bridge::options, pinned
  to the Java encoder by a shared byte fixture in both test suites
- listPartitions: single whole-dataset partition
- createProvider: throws with guidance; static bridges never
  implement it

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Bridges hand-rolled cdylib extraction and shipped multi-jar setups.
NativeLibraryLoader goes public with load(anchor, resourcePrefix,
name) — anchor-classloader lookup so it works under Spark's per-app
classloaders, idempotent per (prefix, name) across per-task factory
instantiation, failures clear the guard so retries can succeed.

The examples module becomes the living packaging template: the
example cdylib is bundled into the jar via an antrun copy + per-host
profiles (same pattern as the connector pom), and the 40-line
path-searching loader collapses to one NativeLibraryLoader.load call
with -Dexample.ffi.lib.path as the unpackaged-build escape hatch.
The pyspark demo drops its os.chdir(REPO_ROOT) crutch.

spark/README.md gains "Packaging your bridge": single shaded fat
jar with ServicesResourceTransformer, what stays provided, and the
hard rule — no shade relocations, JNI binds natives by class FQCN —
with the userClassPathFirst consequence for Spark's bundled Arrow.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
dev/new_bridge.py stamps a standalone bridge project from
dev/bridge-template/: cdylib crate on the datafusion-spark-bridge SDK
(no-default-features, working demo MemTable provider), the four Java
classes, DataSourceRegister entry, shaded-jar pom with the cdylib
bundled, pyspark smoke test, README. A new bridge is one Rust
function away from spark.read.format("<name>") with a single jar.

Details that bite without the generator: JNI symbol mangling for the
package (underscores need _1), Scala protected compiling to
JVM-public (factoryFqcn override must be public), an empty
[workspace] table so the generated crate survives workspace-rooted
parent directories, and no shade relocations anywhere near
JNI-bound classes.

Also: dev/bridge-template/** excluded from RAT (generated projects
are user code, not ASF-headered) and spotless fixes for earlier
javadoc edits that only verify-phase builds had flagged.

Verified by generating an 'acme' bridge, building cdylib + shaded
jar, and scanning/filtering its demo table through PySpark.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Every known bridge owns its provider's Rust source, so the
FFI_TableProvider handover was speculative generality with real
costs: a second cdylib bundled in the connector jar, datafusion-ffi
ABI lockstep between artifacts, and pointer-ownership rules across
JNI. Static export_bridge! bridges are now the only path.

- delete the datafusion-spark-helper cdylib, FfiHelperNative,
  FfiScanBackend, and the bridge SDK's ffi module + feature;
  datafusion-ffi leaves the dependency tree entirely
- the connector jar goes pure JVM: no cargo prerequisite, no
  per-platform builds; native code ships inside each bridge's jar
- convert examples/native to an export_bridge! bridge
  (datafusion-java-example-bridge) — the committed, runnable
  concrete-provider example; demo renamed to bridge_demo.py

BREAKING CHANGE: FfiProviderFactory is renamed BridgeProviderFactory;
createProvider is gone and scanBackend() is the single required
method. Re-adding an FFI path later is mechanical: the ScanBackend
seam and scan.rs's provider-source closure are unchanged, and the
deleted code sits intact in this branch's history.

Verified: cargo + mvn suites, spotless/RAT verify, pyspark demo (both
scan modes), and a regenerated scaffold built + smoke-tested.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Doc comments still described two binding styles, a connector cdylib,
and a separate widening library — none of which exist. Module docs,
javadoc, error messages, and READMEs now describe the single path:
every bridge cdylib is an export_bridge! expansion over the
datafusion-spark-bridge SDK, widening included.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The default options encoding is OptionsCodec key/value strings, not
protobuf, so optionsProtoBytes was misleading. Rename to optionsBytes
across main, test, and examples. filterProtoBytes is left as-is — those
genuinely are LogicalExprNode proto bytes.

Also fix five doc references to the removed createProvider method (now
ScanBackend.createScan): two broken {@link}s in PartitionInfo.java that
would fail -Xdoclint, two comments in DatafusionInputPartition.scala,
and the bridge_demo.py note (which also claimed a non-existent stdout
line — reworded to the native build_provider).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The "legacy" naming implied a deprecated path, but this is all new,
unreleased code with no prior path. Rename the scan mode and scrub
"legacy" wording from comments to describe what the mode actually does.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The bridge generator and template are user-facing tooling, not maintainer
tooling like the rest of dev/ (release, changelog). Co-locate with the
spark/bridge SDK they wire to.

Update the script's repo-root resolution (parents[1] -> parents[2] for the
extra directory level) and all path references in pom.xml, spark/README.md,
and the template README.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant