diff --git a/CLAUDE.md b/CLAUDE.md index 8e788598..1f01cec8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -16,6 +16,29 @@ The data flow is: **User code → Wrapper API → FFI boundary → Rust core → Go uses the C FFI layer (`rust/ffi/`, header auto-generated by cbindgen). Java has its own JNI crate (`rust/jni/`). Python and TypeScript bind Rust directly via PyO3 and NAPI-RS respectively. +## Writing SDK client code — performance rules + +These rules apply to **every example, snippet, doc comment, and test** you write across all SDKs, and to any client code you generate for a user. Ingestion is asynchronous and pipelined in all SDKs — getting this wrong is the single most common performance bug. + +**The cardinal rule: never wait for an acknowledgment after every `ingest` call.** + +- `ingest` (`ingest_record_offset` / `ingestRecordOffset` / `IngestRecordOffset`) returns as soon as the record is queued; sending and acknowledgment happen on background tasks. The returned offset is a *handle* you can wait on later — not a signal to wait now. +- Calling the wait method (`wait_for_offset` / `waitForOffset` / `WaitForOffset`, or `.join()` on a per-record future) inside the ingest loop forces one full server round-trip per record. Throughput collapses to ~1 record per round-trip — orders of magnitude below the SDK's capability. + +**The correct pattern (use this in all examples by default):** + +```text +for record in records: + stream.ingest(record) # queue only — do NOT wait here +stream.flush() # wait once for all pending acks at the end +``` + +- For continuous/unbounded streams, call `flush()` periodically (every N records) rather than per record, or register an **ack callback** for async notification. +- Reserve per-record `wait_for_offset` for genuinely low-volume cases where each record must be confirmed durable before continuing — and label it as such. +- Prefer the batch API (`ingest_records_offset` / `ingestRecordsOffset` / `IngestRecordsOffset`) in hot paths; it amortizes the per-call FFI crossing. + +When writing or reviewing a README, doc comment, or example, the **first** ingestion code a reader sees must demonstrate the loop-then-`flush()` pattern, not per-record waiting. + ## Versioning and Breaking Changes **Strict semver.** Breaking changes are only allowed in major version bumps. diff --git a/README.md b/README.md index 9275f9dd..ce97072d 100644 --- a/README.md +++ b/README.md @@ -14,25 +14,25 @@ Zerobus is a high-throughput streaming service for direct data ingestion into Da ## SDKs -| Language | Directory | Package | -|----------|-----------|---------| -| Rust | [`rust/`](rust/) | [`databricks-zerobus-ingest-sdk`](https://crates.io/crates/databricks-zerobus-ingest-sdk) | -| Python | [`python/`](python/) | [`databricks-zerobus-ingest-sdk`](https://pypi.org/project/databricks-zerobus-ingest-sdk/) | -| Go | [`go/`](go/) | [`github.com/databricks/zerobus-sdk/go`](https://pkg.go.dev/github.com/databricks/zerobus-sdk/go) | -| TypeScript | [`typescript/`](typescript/) | [`@databricks/zerobus-ingest-sdk`](https://www.npmjs.com/package/@databricks/zerobus-ingest-sdk) | -| Java | [`java/`](java/) | [`com.databricks:zerobus-ingest-sdk`](https://central.sonatype.com/artifact/com.databricks/zerobus-ingest-sdk) | +| Language | Directory | Package | +| ---------- | ---------------------------- | -------------------------------------------------------------------------------------------------------------- | +| Rust | [`rust/`](rust/) | [`databricks-zerobus-ingest-sdk`](https://crates.io/crates/databricks-zerobus-ingest-sdk) | +| Python | [`python/`](python/) | [`databricks-zerobus-ingest-sdk`](https://pypi.org/project/databricks-zerobus-ingest-sdk/) | +| Go | [`go/`](go/) | [`github.com/databricks/zerobus-sdk/go`](https://pkg.go.dev/github.com/databricks/zerobus-sdk/go) | +| TypeScript | [`typescript/`](typescript/) | [`@databricks/zerobus-ingest-sdk`](https://www.npmjs.com/package/@databricks/zerobus-ingest-sdk) | +| Java | [`java/`](java/) | [`com.databricks:zerobus-ingest-sdk`](https://central.sonatype.com/artifact/com.databricks/zerobus-ingest-sdk) | ## Platform Support We try to provide prebuilt native binaries for the following platforms: -| Platform | Architecture | -|----------|-------------| -| Linux | x86_64 | -| Linux | aarch64 | -| Windows | x86_64 | -| macOS | x86_64 | -| macOS | aarch64 (Apple Silicon) | +| Platform | Architecture | +| -------- | ----------------------- | +| Linux | x86_64 | +| Linux | aarch64 | +| Windows | x86_64 | +| macOS | x86_64 | +| macOS | aarch64 (Apple Silicon) | > **Note:** We do not currently have macOS CI runners, so macOS binaries are built locally and may not be available for every SDK or release. If your platform is not supported or you encounter compatibility issues, you can [build from source](CONTRIBUTING.md) or [file an issue](https://github.com/databricks/zerobus-sdk/issues). @@ -110,21 +110,21 @@ Use `proto2` syntax with `optional` fields to correctly represent nullable Delta ##### Delta → Protobuf Type Mappings -| Delta Type | Proto2 Type | -|-----------|-------------| -| TINYINT, BYTE, INT, SMALLINT, SHORT | int32 | -| BIGINT, LONG | int64 | -| FLOAT | float | -| DOUBLE | double | -| STRING, VARCHAR | string | -| BOOLEAN | bool | -| BINARY | bytes | -| DATE | int32 | -| TIMESTAMP, TIMESTAMP_NTZ | int64 | -| ARRAY\ | repeated type | -| MAP\ | map\ | -| STRUCT\ | nested message | -| VARIANT | string (JSON string) | +| Delta Type | Proto2 Type | +| ----------------------------------- | -------------------- | +| TINYINT, BYTE, INT, SMALLINT, SHORT | int32 | +| BIGINT, LONG | int64 | +| FLOAT | float | +| DOUBLE | double | +| STRING, VARCHAR | string | +| BOOLEAN | bool | +| BINARY | bytes | +| DATE | int32 | +| TIMESTAMP, TIMESTAMP_NTZ | int64 | +| ARRAY\ | repeated type | +| MAP\ | map\ | +| STRUCT\ | nested message | +| VARIANT | string (JSON string) | #### Schema Generation @@ -139,15 +139,25 @@ Supported by all SDKs starting from version 2.0.0. Currently in Beta — the API For sparse, one-row-at-a-time traffic, JSON or Protocol Buffers over the standard SDK gRPC path are usually simpler. See each SDK's `examples/arrow/` directory for usage. +### Acknowledgments and throughput + +Ingestion is asynchronous in every SDK. An `ingest` call returns as soon as the record is queued — the SDK sends it and tracks its acknowledgment on a background task. To confirm that records were durably committed, call `flush()`; it returns once everything queued so far has been acknowledged. + +The idiomatic flow is therefore **ingest in a loop, then `flush()`** — once at the end of a bounded batch, or periodically for a long-running stream. Where the SDK supports it, you can instead register an ack callback and be notified as records commit, without blocking at all. + +Each `ingest` also returns the record's offset, and `wait_for_offset(offset)` blocks until that offset is acknowledged. That's useful when a particular record must be confirmed before you continue; because acknowledgments are ordered, waiting on the last offset of a run confirms the whole run. The one thing to avoid is waiting on every record inside a tight loop — that turns the asynchronous pipeline into a synchronous request/response and limits throughput to a single record per network round-trip. + +See each SDK's README for exact method names and a runnable example. + ## HTTP Proxy Support All SDKs support HTTP CONNECT proxies via environment variables, following gRPC core conventions. The first variable found (in order) is used: -| Proxy | No-proxy | -|-------|----------| -| `grpc_proxy` / `GRPC_PROXY` | `no_grpc_proxy` / `NO_GRPC_PROXY` | -| `https_proxy` / `HTTPS_PROXY` | `no_proxy` / `NO_PROXY` | -| `http_proxy` / `HTTP_PROXY` | | +| Proxy | No-proxy | +| ----------------------------- | --------------------------------- | +| `grpc_proxy` / `GRPC_PROXY` | `no_grpc_proxy` / `NO_GRPC_PROXY` | +| `https_proxy` / `HTTPS_PROXY` | `no_proxy` / `NO_PROXY` | +| `http_proxy` / `HTTP_PROXY` | | The `no_proxy` value is a comma-separated list of hostnames (suffix-matched) or `*` to bypass the proxy entirely. diff --git a/go/CLAUDE.md b/go/CLAUDE.md index 02320ec9..82933ad4 100644 --- a/go/CLAUDE.md +++ b/go/CLAUDE.md @@ -2,6 +2,24 @@ Go wrapper around the Rust core via cgo and the C FFI library. +## Client code patterns (performance) + +When writing or reviewing client/example code, follow the idiomatic async flow. +`IngestRecordOffset()` (and `IngestRecordsOffset()` / `IngestBatch()`) return as +soon as the record is queued; the SDK sends it and tracks its acknowledgment in +the background. + +- Ingest in a loop, then call `Flush()` to confirm durability — once for a + bounded batch, or periodically for a long-running stream. +- Acks are ordered, so if you only need to confirm a group of records, call + `WaitForOffset()` on the LAST offset — it confirms all prior offsets too. +- Use `WaitForOffset()` when a specific record must be confirmed before + continuing; prefer `Flush()` for bulk durability. Avoid calling + `WaitForOffset()` after every record in a tight loop, since that limits + throughput to one record per round-trip. +- There is no ack-callback API in the Go SDK; use `Flush()` / `WaitForOffset()` + (or the fire-and-forget `IngestRecordNowait`/`IngestRecordsNowait`). + ## Structure ``` diff --git a/go/NEXT_CHANGELOG.md b/go/NEXT_CHANGELOG.md index 22554783..766d1673 100644 --- a/go/NEXT_CHANGELOG.md +++ b/go/NEXT_CHANGELOG.md @@ -10,6 +10,8 @@ ### Documentation +- Clarified throughput guidance in the README, godoc, and examples: ingest records in a loop without waiting and call `Flush()` once, rather than calling `WaitForOffset()` after every record. Documented that the ack watermark is monotonic, so waiting on the last offset confirms all prior records. + ### Internal Changes ### API Changes diff --git a/go/README.md b/go/README.md index 7660b636..618db4a2 100644 --- a/go/README.md +++ b/go/README.md @@ -188,15 +188,22 @@ func main() { } defer stream.Close() - // 4. Send record to server and get offset - // The offset is a logical sequence number assigned to this record + // 4. Send record to server and get offset. + // IngestRecordOffset returns as soon as the record is queued; the offset is a + // logical sequence number assigned to this record. The server round-trip + // happens in the background. offset, err := stream.IngestRecordOffset(`{"id": 1, "message": "Hello"}`) if err != nil { log.Fatal(err) } log.Printf("Record queued for ingestion with offset %d", offset) - // 5. Wait for server to acknowledge the record is durably written + // 5. Wait for the server to acknowledge the record is durably written. + // + // WaitForOffset confirms this one record, which is all this example needs. + // For real workloads, the idiomatic flow is to ingest many records in a loop + // and call stream.Flush() once at the end. See the "Idiomatic high-throughput + // flow" under Usage Guide → Ingest Data. if err := stream.WaitForOffset(offset); err != nil { log.Fatal(err) } @@ -247,7 +254,9 @@ if err != nil { } log.Printf("Record queued with offset %d", offset) -// 4. Optionally wait for server acknowledgment +// 4. Optionally wait for server acknowledgment of this specific record. +// For bulk ingestion, ingest in a loop and call stream.Flush() once instead — +// see the "Idiomatic high-throughput flow" under Usage Guide → Ingest Data. if err := stream.WaitForOffset(offset); err != nil { log.Fatal(err) } @@ -477,21 +486,40 @@ defer stream.Close() ### 4. Ingest Data -**Single record:** +> **Acknowledgments and throughput.** Ingestion is asynchronous. +> `IngestRecordOffset()` returns as soon as the record is queued; the SDK sends it +> and tracks its acknowledgment in the background. To confirm records are durably +> committed, call `Flush()` — it returns once everything queued so far is +> acknowledged. The idiomatic flow is **ingest in a loop, then `Flush()`** (once +> for a bounded batch, or periodically for a long-running stream). Each ingest +> also returns the record's offset, and `WaitForOffset(offset)` blocks until that +> offset is acknowledged — handy when a specific record must be confirmed before +> continuing (acks are ordered, so waiting on the last offset confirms the whole +> run). Just avoid calling `WaitForOffset()` after every record in a tight loop, +> since that limits throughput to one record per round-trip. + +**Idiomatic high-throughput flow: ingest in a loop, then `Flush()` once.** ```go -// JSON (string) - queues record and returns offset -offset, err := stream.IngestRecordOffset(`{"id": 1, "value": "hello"}`) -if err != nil { +// Ingest many records without waiting between them. +for i := 0; i < 100000; i++ { + jsonData := fmt.Sprintf(`{"id": %d, "timestamp": %d}`, i, time.Now().Unix()) + if _, err := stream.IngestRecordOffset(jsonData); err != nil { + log.Printf("Record %d failed: %v", i, err) + continue + } +} + +// Wait for ALL pending records to be acknowledged in a single call. +if err := stream.Flush(); err != nil { log.Fatal(err) } -log.Printf("Record queued at offset: %d", offset) ``` -**Batch ingestion for high throughput:** +**Batch ingestion for even higher throughput:** ```go -// Ingest multiple records at once +// Ingest multiple records in one call (one offset for the whole batch). records := []interface{}{ `{"id": 1, "value": "first"}`, `{"id": 2, "value": "second"}`, @@ -502,28 +530,26 @@ if err != nil { log.Fatal(err) } log.Printf("Batch queued with offset: %d", batchOffset) +// ... ingest more batches ... +stream.Flush() // wait for everything at the end ``` -**High throughput pattern:** +**Single record with explicit confirmation:** ```go -// Ingest many records without waiting -for i := 0; i < 100000; i++ { - jsonData := fmt.Sprintf(`{"id": %d, "timestamp": %d}`, i, time.Now().Unix()) - offset, err := stream.IngestRecordOffset(jsonData) - if err != nil { - log.Printf("Record %d failed: %v", i, err) - continue - } - - // Optional: log progress - if i%10000 == 0 { - log.Printf("Ingested %d records, latest offset: %d", i, offset) - } +// JSON (string) - queues record and returns offset. +offset, err := stream.IngestRecordOffset(`{"id": 1, "value": "hello"}`) +if err != nil { + log.Fatal(err) } +log.Printf("Record queued at offset: %d", offset) -// Wait for all records to be acknowledged -stream.Flush() +// WaitForOffset confirms this specific record before continuing. For bulk +// ingestion, prefer ingesting in a loop and calling Flush() once (see the +// high-throughput flow above). +if err := stream.WaitForOffset(offset); err != nil { + log.Fatal(err) +} ``` **Concurrent ingestion with goroutines:** @@ -620,13 +646,18 @@ if err := stream.WaitForOffset(batchOffset); err != nil { } log.Println("Batch confirmed") -// High-throughput: +// Idiomatic high-throughput flow: ingest in a loop, then Flush() once to +// confirm everything queued so far. for i := 0; i < 1000; i++ { - _, _ := stream.IngestRecordOffset(record) + if _, err := stream.IngestRecordOffset(record); err != nil { + log.Printf("Record %d failed: %v", i, err) + } } -// Use Flush() to wait for all pending acknowledgments at once -stream.Flush() +// Use Flush() to wait for all pending acknowledgments at once. +if err := stream.Flush(); err != nil { + log.Fatal(err) +} ``` ### 6. Error Handling @@ -898,16 +929,17 @@ The test suite includes: 1. **Reuse SDK Instances** - Create one `ZerobusSdk` per application and reuse for multiple streams 2. **Always Close Streams** - Use `defer stream.Close()` to ensure all data is flushed -3. **Choose the Right Ingestion Method**: +3. **Ingest, then `Flush()`** - `IngestRecordOffset()`/`IngestRecordsOffset()` return as soon as the record is queued and track acknowledgment in the background. The idiomatic flow is to ingest in a loop and call `Flush()` to confirm durability. Use `WaitForOffset()` when a specific record must be confirmed before continuing (acks are ordered, so the last offset confirms the whole group). Just avoid calling `WaitForOffset()` after every record in a tight loop, since that limits throughput to one record per round-trip. +4. **Choose the Right Ingestion Method**: - Use `IngestRecordsOffset()` for high throughput batch ingestion - Use `IngestRecordOffset()` when processing records individually - Both return offsets directly; use `WaitForOffset()` to explicitly wait for acknowledgments - The older `IngestRecord()` method is deprecated -4. **Tune Inflight Limits** - Adjust `MaxInflightRequests` based on memory and throughput needs -5. **Enable Recovery** - Always set `Recovery: true` in production environments -6. **Use Batch Ingestion** - For high throughput, ingest many records before calling `Flush()` -7. **Monitor Errors** - Log and alert on non-retryable errors -8. **Use Protocol Buffers for Production** - More efficient than JSON for high-volume scenarios +5. **Tune Inflight Limits** - Adjust `MaxInflightRequests` based on memory and throughput needs +6. **Enable Recovery** - Always set `Recovery: true` in production environments +7. **Use Batch Ingestion** - For high throughput, ingest many records before calling `Flush()` +8. **Monitor Errors** - Log and alert on non-retryable errors +9. **Use Protocol Buffers for Production** - More efficient than JSON for high-volume scenarios 9. **Secure Credentials** - Never hardcode secrets; use environment variables or secret managers 10. **Test Recovery** - Simulate failures to verify your error handling logic 11. **One Stream Per Goroutine** - Don't share streams across goroutines; create separate streams for concurrent ingestion @@ -1120,6 +1152,12 @@ Waits for the server to acknowledge that a specific record has been durably writ Unlike `Flush()` which waits for all pending records, this waits only for a specific offset, allowing more granular control. +> Use `WaitForOffset()` when a specific record must be confirmed before +> continuing; acks are ordered, so waiting on the last offset of a group confirms +> all prior offsets too. For bulk durability, prefer ingesting in a loop and +> calling `Flush()` once. Avoid calling `WaitForOffset()` after every record in a +> tight loop, since that limits throughput to one record per round-trip. + **Example:** ```go // Send multiple records diff --git a/go/examples/arrow/main.go b/go/examples/arrow/main.go index 3bff7748..67fa303f 100644 --- a/go/examples/arrow/main.go +++ b/go/examples/arrow/main.go @@ -88,7 +88,10 @@ func main() { {{device: "sensor-002", temp: 22.8, humid: 59.5}}, } - var offsets []int64 + // Ingest batches in a loop. IngestBatch returns as soon as the batch is + // queued; the SDK sends it and tracks its acknowledgment in the background. + // We confirm everything below with a single Flush() — keeping the ingest loop + // free of per-batch waits is what sustains throughput. for i, readings := range batches { ipcBytes := serializeBatch(schema, alloc, readings) @@ -104,26 +107,20 @@ func main() { } log.Printf("Batch %d (%d rows) queued at offset %d", i, len(readings), offset) - offsets = append(offsets, offset) } // ── 7. Wait for server acknowledgments ─────────────────────────────────── // - // WaitForOffset blocks until the server confirms that the batch at the - // given offset is durably stored. Call Flush() to wait for all pending - // batches at once. + // Flush() blocks until ALL pending batches are durably stored — a single + // round-trip for the whole stream, and the idiomatic way to confirm + // durability. (If you only need to confirm up to a specific point, call + // WaitForOffset() on the last offset instead; the ack watermark is monotonic.) log.Println("Waiting for acknowledgments...") - for _, offset := range offsets { - if err := stream.WaitForOffset(offset); err != nil { - log.Fatalf("WaitForOffset(%d) failed: %v", offset, err) - } - log.Printf("Offset %d acknowledged", offset) - } - - // ── 8. Flush and close ──────────────────────────────────────────────────── if err := stream.Flush(); err != nil { log.Fatalf("Flush failed: %v", err) } + + // ── 8. Close ────────────────────────────────────────────────────────────── if err := stream.Close(); err != nil { log.Fatalf("Close failed: %v", err) } diff --git a/go/examples/json/single/main.go b/go/examples/json/single/main.go index ebd1f62a..685a656c 100644 --- a/go/examples/json/single/main.go +++ b/go/examples/json/single/main.go @@ -46,8 +46,12 @@ func main() { } defer stream.Close() + // Ingest records in a loop. IngestRecordOffset returns as soon as the record + // is queued; the SDK sends it and tracks its acknowledgment in the background. + // We collect the last offset and confirm everything below — keeping the ingest + // loop free of per-record waits is what sustains throughput. log.Println("Ingesting records...") - var offsets []int64 + var lastOffset int64 = -1 for i := 0; i < 5; i++ { // Change this string to match the schema of your table. jsonRecord := `{ @@ -67,16 +71,17 @@ func main() { } log.Printf("Ingested record %d at offset %d", i, offset) - offsets = append(offsets, offset) + lastOffset = offset } - // Wait for specific offsets to be acknowledged. - log.Println("Waiting for acknowledgments...") - for _, offset := range offsets { - if err := stream.WaitForOffset(offset); err != nil { - log.Fatalf("Failed to wait for offset %d: %v", offset, err) + // Wait once on the LAST offset. The ack watermark is monotonic, so confirming + // the last offset confirms all prior records too. (Equivalently, call + // stream.Flush() to wait for all pending records.) + if lastOffset >= 0 { + log.Println("Waiting for acknowledgments...") + if err := stream.WaitForOffset(lastOffset); err != nil { + log.Fatalf("Failed to wait for offset %d: %v", lastOffset, err) } - log.Printf("Record at offset %d acknowledged", offset) } log.Println("All records successfully ingested and acknowledged!") diff --git a/go/examples/proto/single/main.go b/go/examples/proto/single/main.go index a9fc46cf..284111b9 100644 --- a/go/examples/proto/single/main.go +++ b/go/examples/proto/single/main.go @@ -63,8 +63,12 @@ func main() { } defer stream.Close() + // Ingest records in a loop. IngestRecordOffset returns as soon as the record + // is queued; the SDK sends it and tracks its acknowledgment in the background. + // We collect the last offset and confirm everything below — keeping the ingest + // loop free of per-record waits is what sustains throughput. log.Println("Ingesting records...") - var offsets []int64 + var lastOffset int64 = -1 for i := 0; i < 5; i++ { // Create a message using the generated struct. // Change this message to match the schema of your table. @@ -90,16 +94,17 @@ func main() { log.Printf("Ingested record %d at offset %d (temp=%d, humidity=%d)", i, offset, *message.Temp, *message.Humidity) - offsets = append(offsets, offset) + lastOffset = offset } - // Wait for specific offsets to be acknowledged. - log.Println("Waiting for acknowledgments...") - for _, offset := range offsets { - if err := stream.WaitForOffset(offset); err != nil { - log.Fatalf("Failed to wait for offset %d: %v", offset, err) + // Wait once on the LAST offset. The ack watermark is monotonic, so confirming + // the last offset confirms all prior records too. (Equivalently, call + // stream.Flush() to wait for all pending records.) + if lastOffset >= 0 { + log.Println("Waiting for acknowledgments...") + if err := stream.WaitForOffset(lastOffset); err != nil { + log.Fatalf("Failed to wait for offset %d: %v", lastOffset, err) } - log.Printf("Record at offset %d acknowledged", offset) } log.Println("All records successfully ingested and acknowledged!") diff --git a/go/zerobus.go b/go/zerobus.go index 4fa8e067..7534033f 100644 --- a/go/zerobus.go +++ b/go/zerobus.go @@ -366,7 +366,15 @@ func (st *ZerobusStream) IngestRecord(payload interface{}) (*RecordAck, error) { // IngestRecordOffset ingests a record into the stream and returns the offset directly. // This is the preferred API for ingesting records. -// This method blocks until the record is queued and returns the offset. +// This method returns as soon as the record is queued; the SDK sends it and +// tracks its acknowledgment in the background. +// +// The idiomatic flow is to ingest in a loop and call Flush() to confirm +// durability. Use WaitForOffset() with the returned offset when you need to +// confirm a specific record before continuing (acks are ordered, so the last +// offset confirms the whole group); prefer Flush() for bulk durability. Avoid +// calling WaitForOffset() after every record in a tight loop, since that limits +// throughput to one record per round-trip. // // The payload parameter accepts either: // - []byte for Protocol Buffer encoded records @@ -380,17 +388,15 @@ func (st *ZerobusStream) IngestRecord(payload interface{}) (*RecordAck, error) { // // Examples: // -// // Ingest records and get offsets directly -// offset1, err := stream.IngestRecordOffset(`{"field": "value1"}`) -// if err != nil { +// // High throughput: ingest in a loop without waiting, then flush once. +// for _, r := range records { +// if _, err := stream.IngestRecordOffset(r); err != nil { +// log.Fatal(err) +// } +// } +// if err := stream.Flush(); err != nil { // log.Fatal(err) // } -// -// // For concurrent ingestion, use goroutines -// go func() { -// offset, err := stream.IngestRecordOffset(data) -// // handle result -// }() func (st *ZerobusStream) IngestRecordOffset(payload interface{}) (int64, error) { if st.ptr == nil { return -1, &ZerobusError{Message: "Stream has been closed", IsRetryable: false} @@ -508,7 +514,16 @@ func (st *ZerobusStream) IngestRecordsNowait(records []interface{}) error { // IngestRecordsOffset ingests a batch of records into the stream and returns one offset for the entire batch. // This is an optimized API for ingesting multiple records at once. -// This method blocks until all records are queued and returns the batch offset. +// This method returns as soon as the batch is queued; the server round-trip +// happens in the background. +// +// Prefer this batch API over single-record calls in hot paths. The idiomatic +// flow is to ingest your batches in a loop and call Flush() to confirm +// durability. Use WaitForOffset() with a returned offset when you need to +// confirm a specific batch before continuing (acks are ordered, so the last +// offset confirms the whole group); prefer Flush() for bulk durability. Avoid +// calling WaitForOffset() after every batch in a tight loop, since that limits +// throughput to one batch per round-trip. // // The records parameter accepts a slice where each element is either: // - []byte for Protocol Buffer encoded records @@ -587,15 +602,25 @@ func (st *ZerobusStream) IngestRecordsOffset(records []interface{}) (int64, erro // WaitForOffset blocks until the server acknowledges the record at the specified offset. // This allows explicit control over when to wait for acknowledgments. // -// Use this with offsets returned from IngestRecordOffset() to wait for specific records -// to be durably written without waiting for all pending records (unlike Flush). +// Use this with offsets returned from IngestRecordOffset() to confirm a specific +// record before continuing, without waiting for all pending records (unlike Flush). +// Acks are ordered, so waiting on the last offset of a group confirms all prior +// offsets too. +// +// Use this when you need to confirm a specific record; prefer Flush() for bulk +// durability (ingest in a loop, then Flush() once). Avoid calling WaitForOffset() +// after every record in a tight loop, since that limits throughput to one record +// per round-trip. // // Example: // -// offset, _ := stream.IngestRecordOffset(data) -// // Do other work... -// if err := stream.WaitForOffset(offset); err != nil { -// log.Printf("Record at offset %d failed: %v", offset, err) +// // Confirm a group of records with a single wait on the last offset. +// var last int64 +// for _, r := range records { +// last, _ = stream.IngestRecordOffset(r) +// } +// if err := stream.WaitForOffset(last); err != nil { // confirms all prior offsets too +// log.Printf("Record at offset %d failed: %v", last, err) // } func (st *ZerobusStream) WaitForOffset(offset int64) error { if st.ptr == nil { @@ -643,12 +668,20 @@ func (st *ZerobusStream) GetUnackedRecords() ([]interface{}, error) { // Flush blocks until all pending records have been acknowledged by the server. // This ensures durability guarantees before proceeding. // +// This is the idiomatic way to confirm durability for high-throughput ingestion: +// ingest many records via IngestRecordOffset()/IngestRecordsOffset() in a loop, +// then call Flush() once. Use WaitForOffset() instead when you only need to +// confirm a specific record rather than everything queued so far. +// // Returns an error if: // - Flush timeout is exceeded // - Any record fails with a non-retryable error // // Example: // +// for _, r := range records { +// stream.IngestRecordOffset(r) +// } // if err := stream.Flush(); err != nil { // log.Printf("Flush failed: %v", err) // } diff --git a/java/CLAUDE.md b/java/CLAUDE.md index f3e6b12f..e403b969 100644 --- a/java/CLAUDE.md +++ b/java/CLAUDE.md @@ -2,6 +2,26 @@ Java wrapper around the Rust core via JNI. +## Client code patterns (read before writing or reviewing examples/docs) + +Ingestion is asynchronous. `ingestRecordOffset()` (and `ingestRecordsOffset()`) return as +soon as the record/batch is queued — the SDK sends it and tracks its acknowledgment in the +background. + +- **Idiomatic flow:** ingest in a loop, then call `flush()` once (after a bounded batch, or + periodically for a long-running stream) to confirm durability. Equivalently, call + `waitForOffset()` on the **last** returned offset — acks are ordered, so confirming the + last offset confirms every prior record. +- **Async monitoring:** register an `AckCallback` (`onAck`/`onError`) to track progress + without blocking the ingest loop. +- **Per-record `waitForOffset()`:** use when a specific record must be confirmed before + continuing. Avoid calling it after every record in a tight loop, since that limits + throughput to one record per round-trip. +- New code should use `ZerobusProtoStream` / `ZerobusJsonStream` (offset-based) rather than + the deprecated `ZerobusStream`. With the deprecated API, keep the last future and `.join()` + once after the loop rather than joining per record (the latter is discouraged for the same + throughput reason). + ## Structure ``` diff --git a/java/NEXT_CHANGELOG.md b/java/NEXT_CHANGELOG.md index 33072ed2..7e027843 100644 --- a/java/NEXT_CHANGELOG.md +++ b/java/NEXT_CHANGELOG.md @@ -12,6 +12,8 @@ ### Documentation +- Reworked README and Javadoc to steer users toward the high-throughput ingestion pattern: ingest with `ingestRecordOffset()` in a loop without waiting, then `flush()` (or `waitForOffset()` on the last offset) once. Added a performance callout warning against waiting for an acknowledgment after every record, documented `AckCallback` as the non-blocking alternative, and rewrote the Quick Start example to use the offset-based API instead of the deprecated `ingestRecord().join()` loop. + ### Internal Changes ### Breaking Changes diff --git a/java/README.md b/java/README.md index c1c88591..e6b26143 100644 --- a/java/README.md +++ b/java/README.md @@ -421,6 +421,10 @@ For detailed documentation and examples, see [tools/README.md](https://github.co #### 4. Write Your Client Code +> The idiomatic flow is **ingest in a loop, then `flush()`** once at the end. See +> [Acknowledgments and throughput](#acknowledgments-and-throughput) below the example for +> how acknowledgment works and when to use `waitForOffset()` or an `AckCallback`. + Create `src/main/java/com/example/ZerobusClient.java`: ```java @@ -441,21 +445,19 @@ public class ZerobusClient { // Initialize SDK ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); - // Configure table properties - TableProperties tableProperties = new TableProperties<>( + // Create stream (recommended offset-based proto stream) + ZerobusProtoStream stream = sdk.createProtoStream( tableName, - AirQuality.getDefaultInstance() - ); - - // Create stream - ZerobusStream stream = sdk.createStream( - tableProperties, + AirQuality.getDescriptor().toProto(), clientId, clientSecret ).join(); try { - // Ingest records + long lastOffset = -1; + + // Ingest in a loop. ingestRecordOffset() returns as soon as the record is + // queued; the SDK sends it and tracks its acknowledgment in the background. for (int i = 0; i < 100; i++) { AirQuality record = AirQuality.newBuilder() .setDeviceName("sensor-" + (i % 10)) @@ -463,11 +465,13 @@ public class ZerobusClient { .setHumidity(50 + (i % 40)) .build(); - stream.ingestRecord(record).join(); // Wait for durability - - System.out.println("Ingested record " + (i + 1)); + lastOffset = stream.ingestRecordOffset(record); // returns immediately } + // Confirm everything is durably committed. flush() does the same; + // waiting on the last offset works because acks are ordered. + stream.waitForOffset(lastOffset); + System.out.println("Successfully ingested 100 records!"); } finally { stream.close(); @@ -518,12 +522,24 @@ java -cp "lib/*:out" com.example.ZerobusClient You should see output like: ``` -Ingested record 1 -Ingested record 2 -... Successfully ingested 100 records! ``` +### Acknowledgments and throughput + +Ingestion is asynchronous. `ingestRecordOffset()` returns as soon as the record is queued; +the SDK sends it and tracks its acknowledgment in the background. To confirm records are +durably committed, call `flush()` — it returns once everything queued so far is +acknowledged. The idiomatic flow is **ingest in a loop, then `flush()`** (once for a +bounded batch, or periodically for a long-running stream); or register an +[`AckCallback`](#ackcallback-interface) to be notified as records commit. + +Each ingest also returns the record's offset, and `waitForOffset(offset)` blocks until that +offset is acknowledged — handy when a specific record must be confirmed before continuing +(acks are ordered, so waiting on the last offset confirms the whole run). Just avoid calling +`waitForOffset()` (or `.join()` on the deprecated per-record future) after every record in a +tight loop, since that limits throughput to one record per round-trip. + ## Usage Examples The `examples/` directory contains complete working examples organized by stream type: @@ -633,7 +649,13 @@ The SDK provides two ingestion styles: ### Offset-Based API (Recommended) -Use `ZerobusProtoStream` or `ZerobusJsonStream` for all new code. They use offset-based returns that avoid `CompletableFuture` allocation overhead: +Use `ZerobusProtoStream` or `ZerobusJsonStream` for all new code. They use offset-based returns that avoid `CompletableFuture` allocation overhead. + +> Each `ingestRecordOffset()` call returns as soon as the record is queued; the SDK sends +> it and tracks its acknowledgment in the background. The idiomatic flow is to ingest in a +> loop, then confirm durability **once** — with `flush()`, or by passing the last offset to +> `waitForOffset()` (acks are ordered, so the last offset confirms every prior record). See +> [Acknowledgments and throughput](#acknowledgments-and-throughput) for the full picture. ```java ZerobusProtoStream stream = sdk.createProtoStream( @@ -643,7 +665,7 @@ ZerobusProtoStream stream = sdk.createProtoStream( try { long lastOffset = -1; - // Ingest records as fast as possible + // Ingest in a loop for (int i = 0; i < 1000000; i++) { AirQuality record = AirQuality.newBuilder() .setDeviceName("sensor-" + (i % 100)) @@ -651,11 +673,11 @@ try { .setHumidity(50 + i % 40) .build(); - // Returns immediately after queuing (non-blocking) + // Returns immediately after queuing lastOffset = stream.ingestRecordOffset(record); } - // Wait for all records to be acknowledged + // Confirm all records are acknowledged stream.waitForOffset(lastOffset); } finally { stream.close(); @@ -670,15 +692,21 @@ try { The future-based API is still available for backward compatibility but will be removed in a future release: ```java -// DEPRECATED - use ingestRecordOffset() instead +// DEPRECATED - use ingestRecordOffset() instead. +// Each ingestRecord() returns immediately; keep the last future and join once after +// the loop to confirm durability (joining after every record limits throughput). try { + CompletableFuture lastFuture = null; for (int i = 0; i < 1000; i++) { AirQuality record = AirQuality.newBuilder() .setDeviceName("sensor-" + i) .setTemp(20 + i % 15) .build(); - stream.ingestRecord(record).join(); // Deprecated + lastFuture = stream.ingestRecord(record); // Deprecated; non-blocking + } + if (lastFuture != null) { + lastFuture.join(); // wait once, after the loop } } finally { stream.close(); @@ -1411,20 +1439,54 @@ void onError(long offsetId, String errorMessage) ``` Called when an error occurs for records at or after `offsetId`. +**Track durability progress without blocking.** Register an `AckCallback` to observe +acknowledgments as they arrive on a background thread while you keep ingesting — a natural +fit for high-throughput or long-running streams where you want progress notifications +rather than blocking on `flush()` or `waitForOffset()`: + +```java +AckCallback callback = new AckCallback() { + @Override public void onAck(long offsetId) { + // records up to offsetId are durable (watermark is monotonic) + } + @Override public void onError(long offsetId, String errorMessage) { + System.err.println("Error at offset " + offsetId + ": " + errorMessage); + } +}; + +StreamConfigurationOptions options = StreamConfigurationOptions.builder() + .setAckCallback(callback) + .build(); + +ZerobusProtoStream stream = sdk.createProtoStream( + tableName, descriptor, clientId, clientSecret, options).join(); + +// Ingest without blocking; the callback fires as acks arrive. +for (AirQuality record : records) { + stream.ingestRecordOffset(record); +} +stream.flush(); // drain remaining acks before close +``` + +Implementations must be thread-safe and lightweight (callbacks run on internal +processing threads). + ## Best Practices 1. **Reuse SDK instances**: Create one `ZerobusSdk` instance per application 2. **Stream lifecycle**: Always close streams in a `finally` block or use try-with-resources 3. **Use offset-based API for high throughput**: `ingestRecordOffset()` avoids `CompletableFuture` overhead -4. **Batch records when possible**: Use `ingestRecordsOffset()` for multiple records -5. **Configure `maxInflightRecords`**: Adjust based on your throughput and memory requirements -6. **Implement proper error handling**: Distinguish between retriable and non-retriable errors -7. **Use `AckCallback` for monitoring**: Track acknowledgment progress without blocking -8. **Proto generation**: Use the built-in `GenerateProto` tool to generate proto files from table schemas -9. **Choose the right API**: - - `ingestRecord()` → Simple use cases, moderate throughput (deprecated) - - `ingestRecordOffset()` + `waitForOffset()` → High throughput, fine-grained control (recommended) -10. **Recovery pattern**: Use `sdk.recreateStream(closedStream)` to automatically re-ingest unacknowledged records, or manually use `getUnackedBatches()` after stream close +4. **Ingest in a loop, then `flush()`**: Confirm durability once after a batch with `flush()` (or `waitForOffset()` on the last offset, since acks are ordered). Use per-record waits only when a specific record must be confirmed before continuing. +5. **Batch records when possible**: Use `ingestRecordsOffset()` for multiple records +6. **Configure `maxInflightRecords`**: Adjust based on your throughput and memory requirements +7. **Implement proper error handling**: Distinguish between retriable and non-retriable errors +8. **Use `AckCallback` for monitoring**: Track acknowledgment progress without blocking the ingest loop +9. **Proto generation**: Use the built-in `GenerateProto` tool to generate proto files from table schemas +10. **Choose the right API**: + - `ingestRecordOffset()` + final `flush()` / `waitForOffset(lastOffset)` → High throughput (recommended) + - `ingestRecordOffset()` + `waitForOffset()` per record → When a specific record must be confirmed before continuing + - `ingestRecord().join()` → Deprecated; prefer the offset-based API +11. **Recovery pattern**: Use `sdk.recreateStream(closedStream)` to automatically re-ingest unacknowledged records, or manually use `getUnackedBatches()` after stream close ## Community and Contributing diff --git a/java/examples/json/SingleRecordExample.java b/java/examples/json/SingleRecordExample.java index 2e545909..76351ab3 100644 --- a/java/examples/json/SingleRecordExample.java +++ b/java/examples/json/SingleRecordExample.java @@ -52,16 +52,19 @@ public static void main(String[] args) throws Exception { totalRecords++; System.out.println(" 1 record ingested and acknowledged (offset: " + offset + ")"); + // Idiomatic flow: ingest in a loop, then confirm durability once on the last + // offset. Acks are ordered, so waiting on the last offset confirms every prior + // record. long lastOffset = -1; for (int i = 0; i < 10; i++) { Map data = new HashMap<>(); data.put("device_name", "json-main-loop-" + i); data.put("temp", 21 + i); data.put("humidity", 51 + i); - lastOffset = stream.ingestRecordOffset(data, SingleRecordExample::toJson); + lastOffset = stream.ingestRecordOffset(data, SingleRecordExample::toJson); // returns immediately totalRecords++; } - stream.waitForOffset(lastOffset); + stream.waitForOffset(lastOffset); // confirm durability once System.out.println(" 10 records ingested, last acknowledged (offset: " + lastOffset + ")"); // === Pre-serialized: Raw JSON string === diff --git a/java/examples/legacy/LegacyStreamExample.java b/java/examples/legacy/LegacyStreamExample.java index 37a72489..959626a0 100644 --- a/java/examples/legacy/LegacyStreamExample.java +++ b/java/examples/legacy/LegacyStreamExample.java @@ -11,6 +11,10 @@ * *

Note: New code should use {@link ZerobusProtoStream} or {@link ZerobusJsonStream} instead. * + *

With the deprecated future-based API, keep the last future and {@code .join()} once after + * the loop to confirm durability, as shown below. (Joining after every record limits throughput + * to one record per round-trip.) + * *

Run with: {@code java -cp com.databricks.zerobus.examples.legacy.LegacyStreamExample} */ public class LegacyStreamExample { @@ -105,18 +109,25 @@ public static void main(String[] args) throws Exception { ZerobusStream newStream = sdk.recreateStream(stream).join(); System.out.println(" New stream created successfully"); - // Ingest a few more records on the new stream + // Ingest a few more records on the new stream. + // Idiomatic flow: keep the last future and join once after the loop to confirm + // durability. New code should prefer the offset-based ZerobusProtoStream / + // ZerobusJsonStream APIs. int newRecords = 0; try { + java.util.concurrent.CompletableFuture lastFuture = null; for (int i = 0; i < 3; i++) { AirQuality record = AirQuality.newBuilder() .setDeviceName("legacy-recreate-" + i) .setTemp(40 + i) .setHumidity(70 + i) .build(); - newStream.ingestRecord(record).join(); + lastFuture = newStream.ingestRecord(record); // returns immediately newRecords++; } + if (lastFuture != null) { + lastFuture.join(); // confirm durability once + } System.out.println(" " + newRecords + " new records ingested on recreated stream"); } finally { newStream.close(); diff --git a/java/examples/proto/SingleRecordExample.java b/java/examples/proto/SingleRecordExample.java index c5e44b16..17938325 100644 --- a/java/examples/proto/SingleRecordExample.java +++ b/java/examples/proto/SingleRecordExample.java @@ -57,6 +57,9 @@ public static void main(String[] args) throws Exception { totalRecords++; System.out.println(" 1 record ingested and acknowledged (offset: " + offset + ")"); + // Idiomatic flow: ingest in a loop, then confirm durability once on the last + // offset. Acks are ordered, so waiting on the last offset confirms every prior + // record. long lastOffset = -1; for (int i = 0; i < 10; i++) { AirQuality record = AirQuality.newBuilder() @@ -64,10 +67,10 @@ public static void main(String[] args) throws Exception { .setTemp(21 + i) .setHumidity(51 + i) .build(); - lastOffset = stream.ingestRecordOffset(record); + lastOffset = stream.ingestRecordOffset(record); // returns immediately totalRecords++; } - stream.waitForOffset(lastOffset); + stream.waitForOffset(lastOffset); // confirm durability once System.out.println(" 10 records ingested, last acknowledged (offset: " + lastOffset + ")"); // === Pre-encoded: byte arrays === diff --git a/java/src/main/java/com/databricks/zerobus/BaseZerobusStream.java b/java/src/main/java/com/databricks/zerobus/BaseZerobusStream.java index 8b710e7a..b4af01f8 100644 --- a/java/src/main/java/com/databricks/zerobus/BaseZerobusStream.java +++ b/java/src/main/java/com/databricks/zerobus/BaseZerobusStream.java @@ -71,7 +71,13 @@ protected BaseZerobusStream( /** * Waits for a specific offset to be acknowledged by the server. * - * @param offset the offset to wait for + *

Use this when you need to confirm a specific record before continuing. Because the + * acknowledgment watermark is monotonic, passing the last offset from an ingest loop + * confirms every prior record too. For confirming a whole batch, prefer {@link #flush()}; for + * non-blocking progress tracking, register an {@link AckCallback}. (Calling this after every + * record in a tight loop limits throughput to one record per round-trip.) + * + * @param offset the offset to wait for (typically the last offset returned by an ingest loop) * @throws ZerobusException if an error occurs or the wait times out */ public void waitForOffset(long offset) throws ZerobusException { @@ -82,6 +88,10 @@ public void waitForOffset(long offset) throws ZerobusException { /** * Flushes the stream, waiting for all queued records to be acknowledged. * + *

This is the idiomatic way to confirm durability: ingest in a loop, then call {@code flush()} + * once (after a bounded batch, or periodically for a long-running stream). It returns once + * everything queued so far is acknowledged. + * * @throws ZerobusException if an error occurs or the flush times out */ public void flush() throws ZerobusException { diff --git a/java/src/main/java/com/databricks/zerobus/ZerobusJsonStream.java b/java/src/main/java/com/databricks/zerobus/ZerobusJsonStream.java index 861e99f6..69ab5e95 100644 --- a/java/src/main/java/com/databricks/zerobus/ZerobusJsonStream.java +++ b/java/src/main/java/com/databricks/zerobus/ZerobusJsonStream.java @@ -137,6 +137,10 @@ public String getClientSecret() { *

This is the main method for JSON ingestion. The object is serialized using the provided * serializer function. * + *

Returns as soon as the record is queued; the SDK sends it and tracks its acknowledgment in + * the background. The idiomatic flow is to call this in a loop, then confirm durability once via + * {@link #flush()} (or {@link #waitForOffset(long)} on the last returned offset). + * *

Example with Gson: * *

{@code
@@ -179,6 +183,10 @@ public long ingestRecordOffset(String json) throws ZerobusException {
    * 

This is the main method for batch JSON ingestion. Each object is serialized using the * provided serializer function. * + *

Returns as soon as the batch is queued. The idiomatic flow is to ingest your batches in a + * loop, then confirm durability once via {@link #flush()} (or {@link #waitForOffset(long)} on the + * last offset). + * * @param objects the objects to serialize and ingest * @param serializer a function that converts each object to a JSON string * @param the type of the objects diff --git a/java/src/main/java/com/databricks/zerobus/ZerobusProtoStream.java b/java/src/main/java/com/databricks/zerobus/ZerobusProtoStream.java index acb0b609..908888b4 100644 --- a/java/src/main/java/com/databricks/zerobus/ZerobusProtoStream.java +++ b/java/src/main/java/com/databricks/zerobus/ZerobusProtoStream.java @@ -84,6 +84,10 @@ public String getClientSecret() { *

This is the main method for ingesting proto records. The message is automatically serialized * to bytes. * + *

Returns as soon as the record is queued; the SDK sends it and tracks its acknowledgment in + * the background. The idiomatic flow is to call this in a loop, then confirm durability once via + * {@link #flush()} (or {@link #waitForOffset(long)} on the last returned offset). + * * @param record the Protocol Buffer message to ingest * @param the message type * @return the offset ID assigned to this record @@ -116,6 +120,10 @@ public long ingestRecordOffset(byte[] encodedBytes) throws ZerobusException { * *

This is the main method for batch ingestion. All messages are automatically serialized. * + *

Returns as soon as the batch is queued. The idiomatic flow is to ingest your batches in a + * loop, then confirm durability once via {@link #flush()} (or {@link #waitForOffset(long)} on the + * last offset). + * * @param records the Protocol Buffer messages to ingest * @param the message type * @return the offset ID for the batch, or empty if the iterable is empty diff --git a/python/CLAUDE.md b/python/CLAUDE.md index 154e0f88..69adacbc 100644 --- a/python/CLAUDE.md +++ b/python/CLAUDE.md @@ -2,6 +2,22 @@ Python wrapper around the Rust core via PyO3 and maturin. +## Client code patterns + +When writing or reviewing client code that uses this SDK, follow the cross-SDK +performance flow from the root `CLAUDE.md`: + +- **Idiomatic flow:** ingest in a loop (`ingest_record_offset()` or + `ingest_record_nowait()`), then call `flush()` once to confirm durability. + `ingest_record_offset()` returns as soon as the record is queued; the SDK sends it and + tracks its acknowledgment in the background. +- In async code, an `AckCallback` is a good way to track durability without blocking. +- `wait_for_offset()` blocks until a specific offset is acknowledged — use it to confirm + a specific record before continuing. Acks are ordered, so waiting on the LAST offset + confirms all prior records. Avoid calling it after every record in a tight loop, since + that limits throughput to one record per round-trip. +- `ingest_record()` is **deprecated** — prefer the offset-based APIs. + ## Structure ``` diff --git a/python/NEXT_CHANGELOG.md b/python/NEXT_CHANGELOG.md index d25f0714..890988bf 100644 --- a/python/NEXT_CHANGELOG.md +++ b/python/NEXT_CHANGELOG.md @@ -10,6 +10,14 @@ ### Documentation +- Documented the high-throughput ingestion pattern across the README, docstrings, and + examples: ingest records in a loop without waiting, then `flush()` once, rather than + calling `wait_for_offset()` after every record. Added a prominent performance callout + to the README, throughput notes to the `ingest_record_offset`, `ingest_record_nowait`, + `wait_for_offset`, and `flush` docstrings (sync and async, including Arrow streams), and + steering comments to the examples. Added a "Client code patterns" section to + `python/CLAUDE.md`. + ### Internal Changes ### Breaking Changes diff --git a/python/README.md b/python/README.md index fcc851bd..b9da7910 100644 --- a/python/README.md +++ b/python/README.md @@ -13,6 +13,7 @@ A high-performance Python client for streaming data ingestion into Databricks De - [Installation](#installation) - [Quick Start](#quick-start) - [JSON (Simplest)](#option-1-json-simplest) + - [Acknowledgments and throughput](#acknowledgments-and-throughput) - [Protocol Buffers](#option-2-protocol-buffers) - [Configuration](#configuration) - [Error Handling](#error-handling) @@ -154,6 +155,21 @@ async def main(): asyncio.run(main()) ``` +### Acknowledgments and throughput + +Ingestion is asynchronous. `ingest_record_offset()` returns as soon as the record is +queued; the SDK sends it and tracks its acknowledgment in the background. To confirm +records are durably committed, call `flush()` — it returns once everything queued so far +is acknowledged. The idiomatic flow is **ingest in a loop, then `flush()`** (once for a +bounded batch, or periodically for a long-running stream); or register an +[`AckCallback`](#ackcallback) to be notified as records commit. + +Each ingest also returns the record's offset, and `wait_for_offset(offset)` blocks until +that offset is acknowledged — handy when a specific record must be confirmed before +continuing (acks are ordered, so waiting on the last offset confirms the whole run). Just +avoid calling `wait_for_offset()` after every record in a tight loop, since that limits +throughput to one record per round-trip. + ### Option 2: Protocol Buffers First, define a protobuf schema. Use `proto2` syntax with `optional` fields to match Delta table columns: @@ -352,12 +368,36 @@ for batch in unacked_batches: ## Performance Tips +The idiomatic flow is to ingest in a loop and `flush()` once — ingest calls queue +immediately and the SDK acknowledges records in the background, so a single `flush()` +confirms everything queued so far. The ack watermark is monotonic, so if you want a +durability checkpoint mid-stream, waiting on the last offset returned confirms every +prior record. In async code, an [`AckCallback`](#ackcallback) tracks durability without +blocking. Calling `wait_for_offset()` after every record in a tight loop limits +throughput to one record per round-trip, so save it for confirming a specific record. + | Method | Throughput | Use case | |--------|------------|----------| | `ingest_record_nowait()` | **Highest** | Fire-and-forget: no offset returned; maximum throughput when you do not need per-record ack tracking in the hot path | -| `ingest_record_offset()` | Medium | Recommended for most apps: returns an offset after queueing; call `wait_for_offset()` when you need durability confirmation | +| `ingest_record_offset()` | Medium | Recommended for most apps: returns an offset after queueing. Ingest in a loop, then `flush()` once | | `ingest_record()` | Low | **Deprecated** — prefer offset-based APIs | +**Idiomatic flow:** + +```python +for record in records: + stream.ingest_record_offset(record) # queues immediately, no round-trip +stream.flush() # one wait for everything +``` + +**Confirming a specific record** (waiting on the last offset confirms all prior records): + +```python +for record in records: + offset = stream.ingest_record_offset(record) +stream.wait_for_offset(offset) # confirm the run before continuing +``` + ## API Reference ### `ZerobusSdk` @@ -396,7 +436,8 @@ stream = await sdk.create_stream(client_id, client_secret, table_properties, opt - **JSON mode**: `dict` (SDK serializes) or `str` (pre-serialized JSON) - **Protobuf mode**: `Message` object (SDK serializes) or `bytes` (pre-serialized) -**Offset tracking:** +**Offset tracking** (use to confirm a specific record before continuing; for bulk +durability, ingest in a loop and `flush()` once): ```python # Sync @@ -410,6 +451,8 @@ offset = await stream.ingest_record_offset(record) await stream.wait_for_offset(offset) # Block until durably written ``` +Acks are ordered, so waiting on the last offset returned confirms all prior records too. + **Stream management:** ```python diff --git a/python/examples/async_example_arrow.py b/python/examples/async_example_arrow.py index 566ce045..a57b135f 100644 --- a/python/examples/async_example_arrow.py +++ b/python/examples/async_example_arrow.py @@ -151,7 +151,10 @@ async def main(): logger.info(f"\nAll batches submitted in {submit_duration:.2f} seconds") # ======================================================================== - # Wait for the last offset to be acknowledged + # Wait for the last offset to be acknowledged. + # Acks are ordered, so awaiting once on the LAST offset here (or just calling + # flush()) confirms every prior batch too — no need to wait per batch in the + # loop above. # ======================================================================== logger.info(f"Waiting for offset {offsets[-1]} to be acknowledged...") await stream.wait_for_offset(offsets[-1]) diff --git a/python/examples/async_example_json.py b/python/examples/async_example_json.py index 8ea2d4f2..dde720f7 100644 --- a/python/examples/async_example_json.py +++ b/python/examples/async_example_json.py @@ -182,7 +182,9 @@ async def main(): try: # ======================================================================== # Method 1: ingest_record_offset() - Get offset for each record - # Best for when you need individual offsets + # Best for when you need individual offsets. + # Idiomatic flow: ingest in a loop, then await flush() once (or use an + # AckCallback) to confirm everything is durably committed. # ======================================================================== logger.info("\n1. Using ingest_record_offset() - individual offsets") for i in range(min(10, NUM_RECORDS)): diff --git a/python/examples/async_example_proto.py b/python/examples/async_example_proto.py index f467f10f..65977ba9 100644 --- a/python/examples/async_example_proto.py +++ b/python/examples/async_example_proto.py @@ -180,6 +180,8 @@ async def main(): try: # ======================================================================== # Method 1: ingest_record_offset() - Get offset for each record + # Idiomatic flow: ingest in a loop, then await flush() once (or use an + # AckCallback) to confirm everything is durably committed. # ======================================================================== logger.info("\n1. Using ingest_record_offset() - individual offsets") for i in range(min(10, NUM_RECORDS)): diff --git a/python/examples/sync_example_arrow.py b/python/examples/sync_example_arrow.py index f94e8387..bf9c95a2 100644 --- a/python/examples/sync_example_arrow.py +++ b/python/examples/sync_example_arrow.py @@ -143,7 +143,10 @@ def main(): logger.info(f" Table ingested: {table.num_rows} rows, offset: {offset}") # ======================================================================== - # Wait for a specific offset to be acknowledged + # Wait for a specific offset to be acknowledged. + # Acks are ordered, so waiting once on the LAST offset here (or just calling + # flush()) confirms every prior batch too — no need to wait per batch in the + # loop above. # ======================================================================== logger.info(f"\nWaiting for offset {offset} to be acknowledged...") stream.wait_for_offset(offset) diff --git a/python/examples/sync_example_json.py b/python/examples/sync_example_json.py index 5cb3f7df..e7a65047 100644 --- a/python/examples/sync_example_json.py +++ b/python/examples/sync_example_json.py @@ -156,7 +156,9 @@ def main(): try: # ======================================================================== # Method 1: ingest_record_offset() - RECOMMENDED for single records - # Returns offset directly without intermediate acknowledgment object + # Returns offset directly without intermediate acknowledgment object. + # Idiomatic flow: ingest in a loop, then flush() once at the end (see below) + # to confirm everything is durably committed. # ======================================================================== logger.info("\n1. Using ingest_record_offset() - optimized API") for i in range(min(10, NUM_RECORDS)): diff --git a/python/examples/sync_example_proto.py b/python/examples/sync_example_proto.py index 1da7bcca..eb3314a8 100644 --- a/python/examples/sync_example_proto.py +++ b/python/examples/sync_example_proto.py @@ -156,6 +156,8 @@ def main(): try: # ======================================================================== # Method 1: ingest_record_offset() - RECOMMENDED for single records + # Idiomatic flow: ingest in a loop, then flush() once at the end (see below) + # to confirm everything is durably committed. # ======================================================================== logger.info("\n1. Using ingest_record_offset() - optimized API") for i in range(min(10, NUM_RECORDS)): diff --git a/python/zerobus/sdk/aio/zerobus_sdk.py b/python/zerobus/sdk/aio/zerobus_sdk.py index 468dcef5..951ac62e 100644 --- a/python/zerobus/sdk/aio/zerobus_sdk.py +++ b/python/zerobus/sdk/aio/zerobus_sdk.py @@ -99,11 +99,21 @@ async def wait_for_ack(): # Forward all other methods to the inner Rust stream async def ingest_record_offset(self, payload: Any): - """Submit record and return offset immediately (no waiting).""" + """Submit record and return offset immediately (no waiting). + + Resolves as soon as the record is queued; the SDK sends it and tracks its + acknowledgment in the background. The idiomatic flow is to ingest in a loop and + ``await flush()`` once to confirm durability (or use an ``AckCallback``). + """ return await self._inner.ingest_record_offset(payload) def ingest_record_nowait(self, payload: Any): - """Submit record without waiting (fire-and-forget).""" + """Submit record without waiting (fire-and-forget). + + Highest-throughput single-record API: returns no offset and is not awaited. + Use when you do not need per-record offsets; track durability with an + ``AckCallback`` and ``await flush()`` before close. + """ return self._inner.ingest_record_nowait(payload) async def ingest_records_offset(self, payloads): @@ -115,11 +125,21 @@ def ingest_records_nowait(self, payloads): return self._inner.ingest_records_nowait(payloads) async def wait_for_offset(self, offset: int): - """Wait for a specific offset to be acknowledged.""" + """Block until a specific offset is acknowledged. + + Use when you need to confirm a specific record before continuing; acks are + ordered, so waiting on the last offset returned confirms all prior records too. + For bulk durability, prefer ingesting in a loop and ``await flush()`` once (or an + ``AckCallback``). + """ return await self._inner.wait_for_offset(offset) async def flush(self): - """Flush all pending records.""" + """Flush all pending records, awaiting until they are acknowledged. + + The idiomatic way to confirm durability: ingest in a loop, then ``await flush()`` + once to confirm everything queued so far is committed. + """ return await self._inner.flush() async def close(self): @@ -183,11 +203,20 @@ async def ingest_batch(self, batch) -> int: return await self._inner.ingest_batch(ipc_bytes) async def wait_for_offset(self, offset: int): - """Wait for a specific offset to be acknowledged.""" + """Block until a specific offset is acknowledged. + + Use when you need to confirm a specific batch before continuing; acks are + ordered, so waiting on the last offset returned confirms all prior batches too. + For bulk durability, prefer ingesting batches in a loop and ``await flush()``. + """ return await self._inner.wait_for_offset(offset) async def flush(self): - """Flush all pending batches, waiting for acknowledgment.""" + """Flush all pending batches, awaiting until they are acknowledged. + + The idiomatic way to confirm durability: ingest batches in a loop, then + ``await flush()`` once to confirm everything queued so far is committed. + """ return await self._inner.flush() async def close(self): diff --git a/python/zerobus/sdk/sync/zerobus_sdk.py b/python/zerobus/sdk/sync/zerobus_sdk.py index dae964de..56e30a3a 100644 --- a/python/zerobus/sdk/sync/zerobus_sdk.py +++ b/python/zerobus/sdk/sync/zerobus_sdk.py @@ -65,11 +65,21 @@ def ingest_record(self, payload): return self._inner.ingest_record(payload) def ingest_record_offset(self, payload): - """Submit record and return offset immediately (no waiting).""" + """Submit record and return offset immediately (no waiting). + + Returns as soon as the record is queued; the SDK sends it and tracks its + acknowledgment in the background. The idiomatic flow is to ingest in a loop + and call ``flush()`` once to confirm durability (or use an ``AckCallback``). + """ return self._inner.ingest_record_offset(payload) def ingest_record_nowait(self, payload): - """Submit record without waiting (fire-and-forget).""" + """Submit record without waiting (fire-and-forget). + + Highest-throughput single-record API: returns no offset. Use when you do not + need per-record offsets; track durability with an ``AckCallback`` and call + ``flush()`` before close. + """ return self._inner.ingest_record_nowait(payload) def ingest_records_offset(self, payloads): @@ -81,11 +91,20 @@ def ingest_records_nowait(self, payloads): return self._inner.ingest_records_nowait(payloads) def wait_for_offset(self, offset: int): - """Wait for a specific offset to be acknowledged.""" + """Block until a specific offset is acknowledged. + + Use when you need to confirm a specific record before continuing; acks are + ordered, so waiting on the last offset returned confirms all prior records too. + For bulk durability, prefer ingesting in a loop and calling ``flush()`` once. + """ return self._inner.wait_for_offset(offset) def flush(self): - """Flush all pending records.""" + """Flush all pending records, blocking until they are acknowledged. + + The idiomatic way to confirm durability: ingest in a loop, then call ``flush()`` + once to confirm everything queued so far is committed. + """ return self._inner.flush() def close(self): @@ -158,11 +177,20 @@ def ingest_batch(self, batch) -> int: return self._inner.ingest_batch(ipc_bytes) def wait_for_offset(self, offset: int): - """Wait for a specific offset to be acknowledged.""" + """Block until a specific offset is acknowledged. + + Use when you need to confirm a specific batch before continuing; acks are + ordered, so waiting on the last offset returned confirms all prior batches too. + For bulk durability, prefer ingesting batches in a loop and calling ``flush()``. + """ return self._inner.wait_for_offset(offset) def flush(self): - """Flush all pending batches, waiting for acknowledgment.""" + """Flush all pending batches, blocking until they are acknowledged. + + The idiomatic way to confirm durability: ingest batches in a loop, then call + ``flush()`` once to confirm everything queued so far is committed. + """ return self._inner.flush() def close(self): diff --git a/rust/CLAUDE.md b/rust/CLAUDE.md index 34d8e131..9ad9028b 100644 --- a/rust/CLAUDE.md +++ b/rust/CLAUDE.md @@ -2,6 +2,15 @@ This is the core implementation. All other SDKs depend on it. +## Client code patterns (examples, doc comments, generated code) + +See the root `CLAUDE.md` "Writing SDK client code — performance rules" for the cross-SDK rule. In Rust specifically: + +- `ingest_record_offset()` / `ingest_records_offset()` return the `OffsetId` once the record is **queued**; the network round-trip is async. **Never call `wait_for_offset()` after every ingest in a loop** — it forces one server round-trip per record. +- Correct pattern: ingest in a loop, then `flush()` once (or periodically for unbounded streams). Or wait on only the **last** offset — the ack watermark is monotonic, so acking the last offset implies all prior are acked (`wait_for_offset_internal` waits for `last_received_offset >= target`). +- Register an `ack_callback` for async ack/error tracking instead of blocking. +- Any new example, `///` doc example, or test must show the loop-then-`flush()` pattern first; reserve per-record `wait_for_offset()` for clearly-labeled low-volume cases. + ## Structure ``` diff --git a/rust/NEXT_CHANGELOG.md b/rust/NEXT_CHANGELOG.md index fb8dacd5..9b5ef492 100644 --- a/rust/NEXT_CHANGELOG.md +++ b/rust/NEXT_CHANGELOG.md @@ -18,6 +18,8 @@ ### Documentation +- Reworked ingestion docs to lead with the high-throughput pattern (ingest in a loop, then `flush()` once) and explicitly warn against calling `wait_for_offset()` after every record. Updated the README, crate- and method-level doc comments (`ingest_record_offset`, `ingest_records_offset`, `wait_for_offset`, `flush`), and the `json`/`proto` single-record examples accordingly. + ### Internal Changes ### Breaking Changes diff --git a/rust/README.md b/rust/README.md index ea959fc5..789a5291 100644 --- a/rust/README.md +++ b/rust/README.md @@ -503,23 +503,31 @@ The SDK provides flexible ways to ingest data with different levels of abstracti | `JsonString` | JSON | Pre-serialized: pass JSON strings with explicit wrapper | | `String` | JSON | Backward-compatible: raw strings without wrapper | -#### Single Record Ingestion +> **How acknowledgment works:** `ingest_record_offset()` returns as soon as the record is **queued**; the SDK sends it and tracks its acknowledgment in the background. To confirm records are durably committed, call `flush()` — it returns once everything queued so far is acknowledged. The returned `OffsetId` is a handle you can also wait on individually with `wait_for_offset()` when a specific record must be confirmed before continuing. Avoid calling `wait_for_offset()` after *every* record in a loop, though: that waits out a full round-trip before sending the next record and limits throughput to one record per round-trip. + +#### Recommended: high-throughput ingestion + +Ingest in a loop, then `flush()` to confirm all pending acknowledgments. For long-running streams, `flush()` periodically to bound memory: ```rust use databricks_zerobus_ingest_sdk::ProtoMessage; -let record = YourMessage { id: Some(1), name: Some("Alice".to_string()) }; - -// Ingest and get offset (after queuing) -let offset = stream.ingest_record_offset(ProtoMessage(record)).await?; +for i in 0..100_000 { + let record = YourMessage { id: Some(i), /* ... */ }; + // Returns immediately once queued; flush() below confirms the batch. + let _offset = stream.ingest_record_offset(ProtoMessage(record)).await?; -// Wait for server acknowledgment -stream.wait_for_offset(offset).await?; + // Periodically flush to bound memory on long-running streams. + if (i + 1) % 10_000 == 0 { + stream.flush().await?; + } +} +stream.flush().await?; // Wait once for every pending acknowledgment. ``` #### Batch Ingestion -Ingest multiple records at once for higher throughput with all-or-nothing semantics. +For records you already have grouped, send them as one batch with all-or-nothing semantics. This is the most efficient option in hot paths — it amortizes per-call overhead: ```rust use databricks_zerobus_ingest_sdk::ProtoMessage; @@ -530,27 +538,21 @@ let records: Vec> = vec![ ProtoMessage(YourMessage { id: Some(3), /* ... */ }), ]; -// Returns Some(offset) for non-empty batches, None for empty batches -if let Some(offset) = stream.ingest_records_offset(records).await? { - stream.wait_for_offset(offset).await?; -} +// Returns Some(offset) for non-empty batches, None for empty batches. +// Queue many batches this way; flush() once when done. +let _offset = stream.ingest_records_offset(records).await?; ``` -#### High Throughput Pattern +#### Per-record confirmation -Ingest many records without waiting for each acknowledgment, then flush periodically: +When a specific record must be confirmed as durably committed before you continue — e.g. low-volume control messages — wait on its returned offset. This is the right tool for that case; for high-volume ingestion, prefer the loop-then-`flush()` pattern above, since waiting on each record limits throughput to one round-trip per record. ```rust -for i in 0..100_000 { - let record = YourMessage { id: Some(i), /* ... */ }; - let _offset = stream.ingest_record_offset(ProtoMessage(record)).await?; +use databricks_zerobus_ingest_sdk::ProtoMessage; - // Periodically flush to avoid unbounded memory growth - if (i + 1) % 10_000 == 0 { - stream.flush().await?; - } -} -stream.flush().await?; +let record = YourMessage { id: Some(1), name: Some("Alice".to_string()) }; +let offset = stream.ingest_record_offset(ProtoMessage(record)).await?; +stream.wait_for_offset(offset).await?; // OK for one-off / low-volume records. ``` See [`examples/`](https://github.com/databricks/zerobus-sdk/tree/main/rust/examples) for complete working examples with all wrapper types, serialization formats, and ingestion patterns. @@ -562,38 +564,24 @@ The recommended `ingest_record_offset()` and `ingest_records_offset()` methods r - `ingest_records_offset()` returns `Option` (None if the batch is empty) ```rust -// Ingest and get offset, after queuing the record. -let offset_id = stream.ingest_record_offset(data).await?; -println!("Record sent with offset Id: {}", offset_id); - -// Wait for acknowledgment when needed. -stream.wait_for_offset(offset_id).await?; -println!("Record committed at offset: {}", offset_id); - -// For batches, the method returns Option. -// None if the batch is empty. -let batch = vec![data1, data2, data3]; -if let Some(offset_id) = stream.ingest_records_offset(batch).await? { - println!("Batch sent with last offset: {}", offset_id); - stream.wait_for_offset(offset_id).await?; - println!("Batch committed"); -} else { - println!("Empty batch, no records ingested"); -} - -// High-throughput: collect offsets and wait selectively. -let mut offsets = Vec::new(); +// Preferred: ingest without waiting, then confirm everything at once. for i in 0..1000 { - let offset = stream.ingest_record_offset(record).await?; - offsets.push(offset); -} -// Wait for specific offsets as needed. -for offset in offsets { - stream.wait_for_offset(offset).await?; + // Returns the offset as soon as the record is queued; does NOT block on the ack. + let _offset = stream.ingest_record_offset(record).await?; } +stream.flush().await?; // Waits for all pending acknowledgments in one shot. -// Or use flush() to wait for all pending acknowledgments at once. +// Batches return Option (None if the batch is empty). Queue them the +// same way — keep ingesting, and flush() once when you're done. +let batch = vec![data1, data2, data3]; +let _offset = stream.ingest_records_offset(batch).await?; stream.flush().await?; + +// Low volume only: wait on a single returned offset when you must confirm that +// specific record is committed before continuing. Never do this inside a hot loop. +let offset_id = stream.ingest_record_offset(data).await?; +stream.wait_for_offset(offset_id).await?; +println!("Record committed at offset: {}", offset_id); ``` #### Using Acknowledgment Callbacks @@ -843,17 +831,18 @@ cargo test -p tests -- --nocapture 1. **Reuse SDK Instances** - Create one `ZerobusSdk` per application and reuse for multiple streams 2. **Always Close Streams** - Use `stream.close().await?` to ensure all data is flushed -3. **Choose the Right Ingestion Method**: +3. **Confirm with `flush()`, not per-record waits** - The idiomatic flow is to ingest in a loop and `flush()` once (or periodically). Use `wait_for_offset()` only when a specific record must be confirmed individually — calling it after every record serializes the pipeline into one round-trip per record. +4. **Choose the Right Ingestion Method**: - Use `ingest_records_offset()` for high throughput batch ingestion - Use `ingest_record_offset()` when processing records individually - - Both return offsets directly; use `wait_for_offset()` to explicitly wait for acknowledgments -4. **Tune Inflight Limits** - Adjust `max_inflight_requests` based on memory and throughput needs -5. **Enable Recovery** - Always set `recovery: true` in production environments -6. **Handle Ack Futures** - Use `tokio::spawn` for fire-and-forget or batch-wait for verification -7. **Monitor Errors** - Log and alert on non-retryable errors -8. **Validate Schemas** - Use the schema generation tool to ensure type safety (for Protocol Buffers) -9. **Secure Credentials** - Never hardcode secrets; use environment variables or secret managers -10. **Test Recovery** - Simulate failures to verify your error handling logic + - Both return offsets directly; confirm with `flush()` at the end, or `wait_for_offset()` only when you need per-record confirmation +5. **Tune Inflight Limits** - Adjust `max_inflight_requests` based on memory and throughput needs +6. **Enable Recovery** - Always set `recovery: true` in production environments +7. **Use Ack Callbacks for async tracking** - Register an `ack_callback` for metrics/logging instead of blocking on `wait_for_offset()` +8. **Monitor Errors** - Log and alert on non-retryable errors +9. **Validate Schemas** - Use the schema generation tool to ensure type safety (for Protocol Buffers) +10. **Secure Credentials** - Never hardcode secrets; use environment variables or secret managers +11. **Test Recovery** - Simulate failures to verify your error handling logic ## API Reference diff --git a/rust/examples/README.md b/rust/examples/README.md index aeb868b1..1863ad9c 100644 --- a/rust/examples/README.md +++ b/rust/examples/README.md @@ -160,8 +160,11 @@ let mut stream = sdk ### 3. Ingest and Acknowledge ```rust -let offset = stream.ingest_record_offset(data).await?; -stream.wait_for_offset(offset).await?; +for record in records { + // Returns once queued — do NOT wait on this offset inside the loop. + let _offset = stream.ingest_record_offset(record).await?; +} +stream.flush().await?; // Confirm all pending records at once. ``` ### 4. Close Stream @@ -172,13 +175,20 @@ stream.close().await?; ## Ingestion API +> ⚡ **Do not call `wait_for_offset()` after every record.** `ingest_record_offset` queues the +> record and returns immediately; the round-trip happens in the background. Waiting per record +> serializes the pipeline into one round-trip per record and collapses throughput. Ingest in a +> loop, then call `flush()` once (or wait on only the last offset). + ```rust -let offset = stream.ingest_record_offset(data).await?; -// Do other work, then wait when needed. -stream.wait_for_offset(offset).await?; +for record in records { + let _offset = stream.ingest_record_offset(record).await?; +} +// Confirm everything at once. +stream.flush().await?; ``` -`ingest_record_offset` returns the assigned `OffsetId` immediately after the record is queued. Call `wait_for_offset(offset)` to block until the record is durably acknowledged by the server. +`ingest_record_offset` returns the assigned `OffsetId` immediately after the record is queued. To confirm durability, call `flush()` after a run of records, or `wait_for_offset(offset)` on a single offset only when you must confirm that specific record before continuing (low volume). ## Single-Record vs Batch Ingestion diff --git a/rust/examples/json/README.md b/rust/examples/json/README.md index 6fb0528c..c8dcd41b 100644 --- a/rust/examples/json/README.md +++ b/rust/examples/json/README.md @@ -66,31 +66,33 @@ Stream closed successfully ### Code Highlights -The example demonstrates all three data-passing approaches: +The example demonstrates all three data-passing approaches. Each `ingest_record_offset()` +returns as soon as the record is queued; we ingest all of them and `flush()` once at the end +rather than waiting per record (waiting per record forces a server round-trip each time): ```rust use databricks_zerobus_ingest_sdk::{JsonValue, JsonString}; // 1. Auto-serializing: pass struct directly let order = Order { id: 1, customer_name: "Alice".to_string(), /* ... */ }; -let offset = stream.ingest_record_offset(JsonValue(order)).await?; -stream.wait_for_offset(offset).await?; +let _offset = stream.ingest_record_offset(JsonValue(order)).await?; // 2. Pre-serialized: pass JSON string with wrapper let json = r#"{ "id": 2, "customer_name": "Bob" }"#.to_string(); -let offset = stream.ingest_record_offset(JsonString(json)).await?; -stream.wait_for_offset(offset).await?; +let _offset = stream.ingest_record_offset(JsonString(json)).await?; // 3. Backward-compatible: pass raw string (no wrapper) let json = r#"{ "id": 3, "customer_name": "Carol" }"#.to_string(); -let offset = stream.ingest_record_offset(json).await?; -stream.wait_for_offset(offset).await?; +let _offset = stream.ingest_record_offset(json).await?; + +// Confirm all queued records at once. +stream.flush().await?; ``` **Building a JSON stream:** diff --git a/rust/examples/json/single.rs b/rust/examples/json/single.rs index 2458d9d9..50daf6d4 100644 --- a/rust/examples/json/single.rs +++ b/rust/examples/json/single.rs @@ -1,3 +1,11 @@ +//! Single-record JSON ingestion, demonstrating each record wrapper type. +//! +//! Note on throughput: `ingest_record_offset()` returns as soon as the record is queued. +//! This example ingests several records and then calls `flush()` ONCE to confirm them. +//! Do not call `wait_for_offset()` after every record in a real workload — that forces a +//! server round-trip per record and collapses throughput. For high volume, prefer the +//! batch API in `batch.rs`. + use std::error::Error; use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream}; @@ -74,14 +82,10 @@ async fn ingest_with_offset_api(stream: &mut ZerobusStream) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box - no wrapper needed, works the same as ProtoBytes. let order = TableOrders { @@ -115,15 +117,15 @@ async fn ingest_with_offset_api(stream: &mut ZerobusStream) -> Result<(), Box) -> Result<(), ZerobusError> { -/// // Ingest a single record -/// let offset = stream.ingest_record_offset(data).await?; -/// println!("Record sent with offset: {}", offset); +/// # async fn example(mut stream: ZerobusStream, records: Vec>) -> Result<(), ZerobusError> { +/// // Idiomatic flow: ingest in a loop (each call returns as soon as the record is +/// // queued), then flush() once to confirm the whole batch. +/// for data in records { +/// stream.ingest_record_offset(data).await?; +/// } /// -/// // Wait for acknowledgment -/// stream.wait_for_offset(offset).await?; -/// println!("Record acknowledged at offset: {}", offset); +/// // Returns once every queued record is acknowledged. +/// stream.flush().await?; /// -/// // Close the stream gracefully +/// // Close the stream gracefully (also flushes). /// stream.close().await?; /// # Ok(()) /// # } @@ -1009,6 +1012,15 @@ impl ZerobusStream { /// as an integer (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()` /// to explicitly wait for server acknowledgment of this offset when needed. /// + /// # Performance + /// + /// This call returns as soon as the record is **queued**; sending and acknowledgment + /// happen asynchronously on background tasks. The idiomatic flow is to ingest in a loop + /// and call [`flush()`](Self::flush) once at the end (or periodically), or register an + /// `ack_callback` for async notification. Use per-record `wait_for_offset()` when a + /// specific record must be confirmed durable before continuing; calling it after every + /// record limits throughput to one server round-trip per record. + /// /// # Arguments /// /// * `payload` - A record that can be converted to `EncodedRecord` (either JSON string or protobuf bytes) @@ -1028,13 +1040,15 @@ impl ZerobusStream { /// # use databricks_zerobus_ingest_sdk::*; /// # use prost::Message; /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> { - /// # let my_record = vec![1, 2, 3]; // Example protobuf-encoded data - /// // Ingest and get offset immediately - /// let offset = stream.ingest_record_offset(my_record).await?; + /// # let records: Vec> = vec![vec![1, 2, 3]]; // Example protobuf-encoded data + /// // Ingest without waiting on each record — the call returns once queued. + /// for my_record in records { + /// stream.ingest_record_offset(my_record).await?; + /// } /// - /// // Later, wait for acknowledgment - /// stream.wait_for_offset(offset).await?; - /// println!("Record at offset {} has been acknowledged", offset); + /// // Confirm all pending records at once. Avoid wait_for_offset() per record: + /// // it serializes the pipeline into one server round-trip per record. + /// stream.flush().await?; /// # Ok(()) /// # } /// ``` @@ -1058,6 +1072,11 @@ impl ZerobusStream { /// (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()` to explicitly /// wait for server acknowledgment when needed. /// + /// Batching is the most efficient way to ingest in hot paths — it amortizes per-call + /// overhead. When sending many batches, keep ingesting and call [`flush()`](Self::flush) + /// once at the end rather than waiting on each batch's offset (see the performance note + /// on [`ingest_record_offset()`](Self::ingest_record_offset)). + /// /// # Arguments /// /// * `payload` - An iterator of records (each item should be convertible to `EncodedRecord`) @@ -1688,6 +1707,14 @@ impl ZerobusStream { /// specified offset. Use this with offsets returned from `ingest_record_offset()` or /// `ingest_records_offset()` to explicitly control when to wait for acknowledgments. /// + /// # Performance + /// + /// Use this when a specific record must be confirmed before continuing. To confirm a run + /// of records, prefer [`flush()`](Self::flush), or wait on only the **last** offset + /// returned — offsets are monotonic, so acknowledgment of the last implies all prior are + /// acked. Calling this after every record in a loop limits throughput to one server + /// round-trip per record, since ingestion is otherwise pipelined. + /// /// # Arguments /// /// * `offset` - The logical offset ID to wait for (returned from `ingest_record_offset()` or `ingest_records_offset()`) @@ -1705,16 +1732,15 @@ impl ZerobusStream { /// ```no_run /// # use databricks_zerobus_ingest_sdk::*; /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> { - /// # let my_record = vec![1, 2, 3]; - /// // Ingest multiple records and collect their offsets - /// let mut offsets = Vec::new(); + /// // Ingest a run of records, keeping only the last offset. + /// let mut last_offset = None; /// for i in 0..100 { - /// let offset = stream.ingest_record_offset(vec![i as u8]).await?; - /// offsets.push(offset); + /// last_offset = Some(stream.ingest_record_offset(vec![i as u8]).await?); /// } /// - /// // Wait for specific offsets - /// for offset in offsets { + /// // Wait once on the last offset — offsets are monotonic, so this confirms all + /// // 100 records. (stream.flush() achieves the same without tracking offsets.) + /// if let Some(offset) = last_offset { /// stream.wait_for_offset(offset).await?; /// } /// println!("All records acknowledged"); diff --git a/typescript/CLAUDE.md b/typescript/CLAUDE.md index 50a962d8..21bb65f0 100644 --- a/typescript/CLAUDE.md +++ b/typescript/CLAUDE.md @@ -2,6 +2,21 @@ Node.js wrapper around the Rust core via NAPI-RS. +## Client code patterns + +When writing or reviewing client code that uses this SDK, prefer the idiomatic +ingest-then-flush flow. `ingestRecordOffset()` (and `ingestRecordsOffset()`) resolves as +soon as the record is queued — the SDK sends it and tracks its acknowledgment in the +background. Ingest in a loop, then call `flush()` once to confirm durability (once for a +bounded batch, or periodically for a long-running stream). + +Each ingest returns the record's offset, and `waitForOffset(offset)` confirms a specific +record when one must be acknowledged before continuing (acks are ordered, so the last +offset confirms the whole run); prefer `flush()` for bulk. Note that ack callbacks are NOT +part of the TypeScript API — the async confirmation tools are `flush()` and `waitForOffset()`. +The one thing to avoid is calling `waitForOffset()` after every record in a tight loop, +since that limits throughput to one record per round-trip. + ## Structure ``` diff --git a/typescript/NEXT_CHANGELOG.md b/typescript/NEXT_CHANGELOG.md index d5e2f40f..82eed47e 100644 --- a/typescript/NEXT_CHANGELOG.md +++ b/typescript/NEXT_CHANGELOG.md @@ -10,6 +10,13 @@ ### Documentation +- Clarified the high-throughput ingestion pattern across the README, API reference, JSDoc + doc comments (`ingestRecordOffset`, `ingestRecordsOffset`, `waitForOffset`, `flush`), and + examples: ingest in a loop without waiting, then wait once on the last offset (the ack + watermark is monotonic) or `flush()` once at the end. Added explicit warnings that calling + `waitForOffset()` after every record collapses throughput and should be reserved for + low-volume cases. + ### Internal Changes ### Breaking Changes diff --git a/typescript/README.md b/typescript/README.md index b67aec72..28f1353c 100644 --- a/typescript/README.md +++ b/typescript/README.md @@ -182,6 +182,8 @@ The SDK supports two serialization formats. **Protocol Buffers is the default** > **Note:** If you don't specify `recordType`, the SDK will use Protocol Buffers by default. To use JSON, explicitly set `recordType: RecordType.Json`. +> **Acknowledgments and throughput.** Ingestion is asynchronous. `ingestRecordOffset()` (and `ingestRecordsOffset()`) resolves as soon as the record is queued; the SDK sends it and tracks its acknowledgment in the background. To confirm records are durably committed, call `flush()` — it resolves once everything queued so far is acknowledged. The idiomatic flow is **ingest in a loop, then `flush()`** (once for a bounded batch, or periodically for a long-running stream). Each ingest also returns the record's offset, and `waitForOffset(offset)` resolves when that offset is acknowledged — handy when a specific record must be confirmed before continuing (acks are ordered, so waiting on the last offset confirms the whole run). Just avoid calling `waitForOffset()` after every record in a tight loop, since that limits throughput to one record per round-trip. The examples below follow this pattern. + ### Option 1: Using JSON (Quick Start) JSON mode is the simplest way to get started. You don't need to define or compile protobuf schemas, but you must explicitly specify `RecordType.Json`. @@ -875,13 +877,14 @@ Represents an active ingestion stream. async ingestRecordOffset(payload: Buffer | string | object): Promise ``` -**(Recommended)** Ingests a single record. The Promise resolves immediately after the record is queued (before server acknowledgment). Use `waitForOffset()` to wait for acknowledgment when needed. +**(Recommended)** Ingests a single record. The Promise resolves immediately after the record is queued (before server acknowledgment); the round-trip happens in the background. The idiomatic flow is to ingest in a loop and then `flush()` once to confirm everything queued so far. The returned offset, together with `waitForOffset()`, lets you confirm a specific record when needed — prefer that for bulk over waiting after each record, since per-record waiting limits throughput to one round-trip per record. ```typescript -// High-throughput pattern: send many, wait once +// Idiomatic flow: ingest in a loop, then flush once const offset1 = await stream.ingestRecordOffset(record1); // Resolves immediately const offset2 = await stream.ingestRecordOffset(record2); // Resolves immediately -await stream.waitForOffset(offset2); // Waits for server to acknowledge all records up to offset2 +await stream.flush(); // Resolves once everything queued so far is acknowledged +// (Or, to confirm a specific record: await stream.waitForOffset(offset2)) ``` --- @@ -890,7 +893,7 @@ await stream.waitForOffset(offset2); // Waits for server to acknowledge all rec async ingestRecordsOffset(payloads: Array): Promise ``` -**(Recommended)** Ingests multiple records as a batch. The Promise resolves immediately after the batch is queued (before server acknowledgment). Returns `null` for empty batches. +**(Recommended)** Ingests multiple records as a batch. The Promise resolves immediately after the batch is queued (before server acknowledgment); the round-trip happens in the background. Returns `null` for empty batches. As with `ingestRecordOffset()`, the idiomatic flow is to ingest in a loop and `flush()` once to confirm; reach for `waitForOffset()` when a specific batch must be confirmed before continuing. --- @@ -898,7 +901,7 @@ async ingestRecordsOffset(payloads: Array): Promise ``` -Waits for the server to acknowledge all records up to and including the specified offset ID. +Waits for the server to acknowledge all records up to and including the specified offset ID. Acks are ordered, so waiting on the **last** offset confirms every prior record too. Use this when a specific record must be confirmed before continuing; for confirming a bulk run, `flush()` is usually simpler. Avoid calling it after every record in a tight loop, since that limits throughput to one record per round-trip. --- @@ -998,7 +1001,7 @@ await stream.ingestRecords(buffers); async flush(): Promise ``` -Flushes all pending records and waits for acknowledgments. +Flushes all pending records and waits for acknowledgments. This is the recommended way to confirm a batch of `ingestRecordOffset()` / `ingestRecordsOffset()` calls: ingest in a loop without waiting, then `flush()` once at the end instead of calling `waitForOffset()` after every record. ```typescript async close(): Promise @@ -1115,6 +1118,7 @@ enum RecordType { 5. **Use Protocol Buffers for production**: Protocol Buffers (the default) provides better performance and schema validation. Use JSON only when you need schema flexibility or for quick prototyping. 6. **Store credentials securely**: Use environment variables, never hardcode credentials 7. **Use batch ingestion**: For high-throughput scenarios, use `ingestRecordsOffset()` instead of individual `ingestRecordOffset()` calls +8. **Ingest in a loop, then `flush()`**: `ingestRecordOffset()` / `ingestRecordsOffset()` resolve as soon as the record is queued; the SDK sends and tracks acknowledgment in the background. Confirm durability with a single `flush()` (once for a bounded batch, or periodically for a long-running stream). Each ingest returns an offset, and `waitForOffset(offset)` confirms a specific record when you need it (acks are ordered, so the last offset confirms the whole run). Just avoid calling `waitForOffset()` after every record in a tight loop, since that limits throughput to one record per round-trip. ## Platform Support diff --git a/typescript/examples/json/single.ts b/typescript/examples/json/single.ts index 5f20db71..913726bf 100644 --- a/typescript/examples/json/single.ts +++ b/typescript/examples/json/single.ts @@ -91,6 +91,11 @@ async function main() { async function ingestWithOffsetApi() { console.log('\n=== Offset-based API (Recommended) ==='); + // NOTE: Examples 1 and 2 below confirm each record with waitForOffset(), which is + // handy when a specific record must be confirmed before continuing (and fine for + // this single-record demo). For high volume, the idiomatic flow is to ingest in a + // loop and flush() once at the end — see example 3 below. + // 1. Auto-serializing: object - SDK handles JSON.stringify() const record1: AirQuality = { device_name: 'sensor-001', diff --git a/typescript/examples/proto/single.ts b/typescript/examples/proto/single.ts index d230cf41..f056e627 100644 --- a/typescript/examples/proto/single.ts +++ b/typescript/examples/proto/single.ts @@ -103,6 +103,11 @@ async function main() { async function ingestWithOffsetApi() { console.log('\n=== Offset-based API (Recommended) ==='); + // NOTE: Examples 1 and 2 below confirm each record with waitForOffset(), which is + // handy when a specific record must be confirmed before continuing (and fine for + // this single-record demo). For high volume, the idiomatic flow is to ingest in a + // loop and flush() once at the end — see example 3 below. + // 1. Auto-encoding: Message object - SDK handles encoding const record1 = AirQuality.create({ device_name: 'sensor-001', diff --git a/typescript/src/lib.rs b/typescript/src/lib.rs index 3d6ab129..c82b35bd 100644 --- a/typescript/src/lib.rs +++ b/typescript/src/lib.rs @@ -400,6 +400,13 @@ impl ZerobusStream { /// This is the recommended API for high-throughput scenarios where you want to /// decouple record ingestion from acknowledgment tracking. /// + /// **Acknowledgments:** the idiomatic flow is to ingest in a loop and then `flush()` + /// once to confirm everything queued so far. The returned offset, together with + /// `waitForOffset()`, lets you confirm a specific record when you need it (acks are + /// ordered, so the last offset confirms the whole run) — prefer `flush()` for bulk. + /// Avoid calling `waitForOffset()` after every record in a tight loop, since that + /// limits throughput to one record per round-trip. + /// /// # Arguments /// /// * `payload` - The record data (Buffer, string, protobuf message, or plain object) @@ -412,11 +419,14 @@ impl ZerobusStream { /// # Example /// /// ```typescript - /// // Promise resolves immediately with offset (before server ack) - /// const offset1 = await stream.ingestRecordOffset(record1); - /// const offset2 = await stream.ingestRecordOffset(record2); - /// // Wait for both to be acknowledged - /// await stream.waitForOffset(offset2); + /// // High-throughput pattern: ingest in a loop, wait once at the end. + /// let lastOffset; + /// for (const record of records) { + /// lastOffset = await stream.ingestRecordOffset(record); // resolves on queue, no round-trip + /// } + /// // The ack watermark is monotonic: waiting on the last offset confirms all prior records. + /// await stream.waitForOffset(lastOffset); + /// // Or simply: await stream.flush(); /// ``` #[napi(ts_return_type = "Promise")] pub fn ingest_record_offset(&self, env: Env, payload: Unknown) -> Result { @@ -454,6 +464,12 @@ impl ZerobusStream { /// the batch is queued, without waiting for server acknowledgment. Use /// `waitForOffset()` to wait for acknowledgment when needed. /// + /// **Acknowledgments:** the idiomatic flow is to ingest your batches in a loop and + /// then `flush()` once to confirm. The returned offset, together with `waitForOffset()`, + /// confirms a specific batch when you need it (acks are ordered, so the last offset + /// confirms the whole run) — prefer `flush()` for bulk. Avoid calling `waitForOffset()` + /// after every batch in a tight loop, since that limits throughput to one round-trip per batch. + /// /// # Arguments /// /// * `records` - Array of record data @@ -466,11 +482,14 @@ impl ZerobusStream { /// # Example /// /// ```typescript - /// // Promise resolves immediately with offset (before server ack) - /// const offset = await stream.ingestRecordsOffset(batch); - /// if (offset !== null) { - /// await stream.waitForOffset(offset); + /// // Ingest many batches without waiting, then flush once. + /// let lastOffset = null; + /// for (const batch of batches) { + /// const offset = await stream.ingestRecordsOffset(batch); // resolves on queue + /// if (offset !== null) lastOffset = offset; /// } + /// if (lastOffset !== null) await stream.waitForOffset(lastOffset); + /// // Or simply: await stream.flush(); /// ``` #[napi(ts_return_type = "Promise")] pub fn ingest_records_offset(&self, env: Env, records: Vec) -> Result { @@ -511,9 +530,12 @@ impl ZerobusStream { /// Waits for a specific offset to be acknowledged by the server. /// - /// Use this method with `ingestRecordOffset()` and `ingestRecordsOffset()` to - /// selectively wait for acknowledgments. This allows you to ingest many records - /// quickly and then wait only for specific offsets when needed. + /// Use this method with `ingestRecordOffset()` and `ingestRecordsOffset()` to confirm + /// a specific record before continuing. Acks are ordered, so waiting on the LAST offset + /// confirms every prior record too — you never need to wait on intermediate offsets. + /// For confirming a bulk run, `flush()` is usually simpler; reach for `waitForOffset()` + /// when one particular record must be confirmed. Avoid calling it after every record in + /// a tight loop, since that limits throughput to one record per round-trip. /// /// # Arguments /// @@ -527,12 +549,12 @@ impl ZerobusStream { /// # Example /// /// ```typescript - /// const offsets = []; + /// let lastOffset; /// for (const record of records) { - /// offsets.push(await stream.ingestRecordOffset(record)); + /// lastOffset = await stream.ingestRecordOffset(record); // no per-record wait /// } - /// // Wait for the last offset (implies all previous are also acknowledged) - /// await stream.waitForOffset(offsets[offsets.length - 1]); + /// // Wait for the last offset only (implies all previous are also acknowledged). + /// await stream.waitForOffset(lastOffset); /// ``` #[napi(ts_return_type = "Promise")] pub fn wait_for_offset(&self, env: Env, offset_id: BigInt) -> Result { @@ -560,6 +582,11 @@ impl ZerobusStream { /// This method ensures all previously ingested records have been sent to the server /// and acknowledged. It's useful for checkpointing or ensuring data durability. /// + /// This is the idiomatic way to confirm records ingested via `ingestRecordOffset()` / + /// `ingestRecordsOffset()`: ingest in a loop, then `flush()` once (for a bounded batch, + /// or periodically for a long-running stream). It resolves once everything queued so + /// far is acknowledged. + /// /// # Errors /// /// - Timeout errors if flush takes longer than configured timeout