Skip to content

feat(table): expose registerStreamingTable for push-mode batch ingest #95

@LantaoJin

Description

@LantaoJin

Is your feature request related to a problem or challenge?

PR #65 shipped a Java-implemented TableProvider and SessionContext.registerTable(String, TableProvider). That covered the pull shape: DataFusion calls scan(BufferAllocator) and reads the returned ArrowReader.

But it does not cover the push shape that event-driven batch sources need:

  • A coordinator that reduces over shard responses arriving incrementally -- the producer can't materialise an ArrowReader because the next batch hasn't arrived yet.
  • A Flight stream feeding into a query -- same problem; the producer is event-driven.
  • Any in-process producer that emits batches as side-effects of other work and doesn't know in advance how many will arrive.

To bridge these into PR #65 today, callers have to write a BlockingArrowReader adapter that buffers pushed batches and serves them through the pull interface. That's a serialisation point: the producer blocks waiting for loadNextBatch() to be called, or DataFusion blocks waiting for the next batch -- the two ends can never run truly concurrently. The adapter also has to invent its own backpressure semantics, error propagation, end-of-stream signalling, and thread-safety story.

DataFusion itself solves this on the Rust side with StreamingTable + PartitionStream plus an mpsc channel: producer pushes Result<RecordBatch> into the sender, the consumer (DataFusion's StreamingTableExec) polls the receiver as part of normal query execution. The two ends decouple via the channel buffer, with the runtime providing backpressure and cancellation propagation.

Describe the solution you'd like

One new method on SessionContext returning a TableSink:

TableSink sink = ctx.registerStreamingTable("shard_results", schema, capacity);
// Producer thread (any thread, including outside any Tokio runtime):
try {
  while (hasMoreInput()) {
    sink.write(batch);   // backpressures when channel is full
  }
  sink.close();          // EOF: queries see end-of-stream cleanly
} catch (Throwable t) {
  sink.fail(t);          // signal error: queries see RuntimeException
}
public final class TableSink implements AutoCloseable {
  void write(VectorSchemaRoot batch);   // exports via Data.exportVectorSchemaRoot
  void close();                          // EOF
  void fail(Throwable cause);            // error propagated to readers
}

After registration the table can be referenced like any other registered table:

DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
ArrowReader r = df.executeStream(allocator);
// Producer thread continues writing as r.loadNextBatch() drains.

Single-scan semantics. The registered table can only be queried once. After that scan completes (or is cancelled), the sink is no longer usable and the table cannot be re-scanned. This is the natural semantic for an event-driven producer -- the data is consumed as it arrives -- and matches what every downstream Substrait/Calcite plan that uses streaming tables already assumes. Documented loudly on registerStreamingTable's Javadoc; trying to re-execute against the same registration throws.
This is intentional. Re-scannable streaming would require buffering every batch internally, which defeats the streaming use case. Callers who need to re-scan the same data should use the existing registerTable / SimpleTableProvider pull shape (PR #65) instead.

Describe alternatives you've considered

  • BlockingArrowReader adapter on top of PR feat(datasource): add Java-implemented data sources #65's registerTable. What every caller currently has to hand-roll. It works but pushes the channel + backpressure + EOF + error story onto every embedder. Bridging via the upstream-canonical StreamingTable shape is strictly less code and gets cancellation propagation for free.
  • Backpressure-free try_write. A non-blocking variant that returns false when the channel is full. Easy to add later as a follow-up if anyone wants it; not in scope here. Default write blocks, which is the contract every Java I/O caller expects.
  • Reuse PR feat(datasource): add Java-implemented data sources #65's TableProvider interface and wrap mpsc internally. Considered. The problem: PR feat(datasource): add Java-implemented data sources #65's scan(BufferAllocator) -> ArrowReader returns synchronously, so a mpsc-backed implementation has to block on loadNextBatch() waiting for the producer -- exactly the serialisation point we're trying to avoid. Going direct to StreamingTable + PartitionStream is the right layer.

Additional context

  • The OpenSearch backend's rust/src/api.rs:572 register_partition_stream is the prior-art template; it does almost exactly this. The Java side there uses a hand-rolled FFM bridge (sender_send) that can be replaced with this surface as soon as it lands.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions