Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@ The data flow is: **User code → Wrapper API → FFI boundary → Rust core →

Go uses the C FFI layer (`rust/ffi/`, header auto-generated by cbindgen). Java has its own JNI crate (`rust/jni/`). Python and TypeScript bind Rust directly via PyO3 and NAPI-RS respectively.

## Writing SDK client code — performance rules

These rules apply to **every example, snippet, doc comment, and test** you write across all SDKs, and to any client code you generate for a user. Ingestion is asynchronous and pipelined in all SDKs — getting this wrong is the single most common performance bug.

**The cardinal rule: never wait for an acknowledgment after every `ingest` call.**

- `ingest` (`ingest_record_offset` / `ingestRecordOffset` / `IngestRecordOffset`) returns as soon as the record is queued; sending and acknowledgment happen on background tasks. The returned offset is a *handle* you can wait on later — not a signal to wait now.
- Calling the wait method (`wait_for_offset` / `waitForOffset` / `WaitForOffset`, or `.join()` on a per-record future) inside the ingest loop forces one full server round-trip per record. Throughput collapses to ~1 record per round-trip — orders of magnitude below the SDK's capability.

**The correct pattern (use this in all examples by default):**

```text
for record in records:
stream.ingest(record) # queue only — do NOT wait here
stream.flush() # wait once for all pending acks at the end
```

- For continuous/unbounded streams, call `flush()` periodically (every N records) rather than per record, or register an **ack callback** for async notification.
- Reserve per-record `wait_for_offset` for genuinely low-volume cases where each record must be confirmed durable before continuing — and label it as such.
- Prefer the batch API (`ingest_records_offset` / `ingestRecordsOffset` / `IngestRecordsOffset`) in hot paths; it amortizes the per-call FFI crossing.

When writing or reviewing a README, doc comment, or example, the **first** ingestion code a reader sees must demonstrate the loop-then-`flush()` pattern, not per-record waiting.

## Versioning and Breaking Changes

**Strict semver.** Breaking changes are only allowed in major version bumps.
Expand Down
78 changes: 44 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@ Zerobus is a high-throughput streaming service for direct data ingestion into Da

## SDKs

| Language | Directory | Package |
|----------|-----------|---------|
| Rust | [`rust/`](rust/) | [`databricks-zerobus-ingest-sdk`](https://crates.io/crates/databricks-zerobus-ingest-sdk) |
| Python | [`python/`](python/) | [`databricks-zerobus-ingest-sdk`](https://pypi.org/project/databricks-zerobus-ingest-sdk/) |
| Go | [`go/`](go/) | [`github.com/databricks/zerobus-sdk/go`](https://pkg.go.dev/github.com/databricks/zerobus-sdk/go) |
| TypeScript | [`typescript/`](typescript/) | [`@databricks/zerobus-ingest-sdk`](https://www.npmjs.com/package/@databricks/zerobus-ingest-sdk) |
| Java | [`java/`](java/) | [`com.databricks:zerobus-ingest-sdk`](https://central.sonatype.com/artifact/com.databricks/zerobus-ingest-sdk) |
| Language | Directory | Package |
| ---------- | ---------------------------- | -------------------------------------------------------------------------------------------------------------- |
| Rust | [`rust/`](rust/) | [`databricks-zerobus-ingest-sdk`](https://crates.io/crates/databricks-zerobus-ingest-sdk) |
| Python | [`python/`](python/) | [`databricks-zerobus-ingest-sdk`](https://pypi.org/project/databricks-zerobus-ingest-sdk/) |
| Go | [`go/`](go/) | [`github.com/databricks/zerobus-sdk/go`](https://pkg.go.dev/github.com/databricks/zerobus-sdk/go) |
| TypeScript | [`typescript/`](typescript/) | [`@databricks/zerobus-ingest-sdk`](https://www.npmjs.com/package/@databricks/zerobus-ingest-sdk) |
| Java | [`java/`](java/) | [`com.databricks:zerobus-ingest-sdk`](https://central.sonatype.com/artifact/com.databricks/zerobus-ingest-sdk) |

## Platform Support

We try to provide prebuilt native binaries for the following platforms:

| Platform | Architecture |
|----------|-------------|
| Linux | x86_64 |
| Linux | aarch64 |
| Windows | x86_64 |
| macOS | x86_64 |
| macOS | aarch64 (Apple Silicon) |
| Platform | Architecture |
| -------- | ----------------------- |
| Linux | x86_64 |
| Linux | aarch64 |
| Windows | x86_64 |
| macOS | x86_64 |
| macOS | aarch64 (Apple Silicon) |

> **Note:** We do not currently have macOS CI runners, so macOS binaries are built locally and may not be available for every SDK or release. If your platform is not supported or you encounter compatibility issues, you can [build from source](CONTRIBUTING.md) or [file an issue](https://github.com/databricks/zerobus-sdk/issues).

Expand Down Expand Up @@ -110,21 +110,21 @@ Use `proto2` syntax with `optional` fields to correctly represent nullable Delta

##### Delta → Protobuf Type Mappings

| Delta Type | Proto2 Type |
|-----------|-------------|
| TINYINT, BYTE, INT, SMALLINT, SHORT | int32 |
| BIGINT, LONG | int64 |
| FLOAT | float |
| DOUBLE | double |
| STRING, VARCHAR | string |
| BOOLEAN | bool |
| BINARY | bytes |
| DATE | int32 |
| TIMESTAMP, TIMESTAMP_NTZ | int64 |
| ARRAY\<type\> | repeated type |
| MAP\<key, value\> | map\<key, value\> |
| STRUCT\<fields\> | nested message |
| VARIANT | string (JSON string) |
| Delta Type | Proto2 Type |
| ----------------------------------- | -------------------- |
| TINYINT, BYTE, INT, SMALLINT, SHORT | int32 |
| BIGINT, LONG | int64 |
| FLOAT | float |
| DOUBLE | double |
| STRING, VARCHAR | string |
| BOOLEAN | bool |
| BINARY | bytes |
| DATE | int32 |
| TIMESTAMP, TIMESTAMP_NTZ | int64 |
| ARRAY\<type\> | repeated type |
| MAP\<key, value\> | map\<key, value\> |
| STRUCT\<fields\> | nested message |
| VARIANT | string (JSON string) |

#### Schema Generation

Expand All @@ -139,15 +139,25 @@ Supported by all SDKs starting from version 2.0.0. Currently in Beta — the API

For sparse, one-row-at-a-time traffic, JSON or Protocol Buffers over the standard SDK gRPC path are usually simpler. See each SDK's `examples/arrow/` directory for usage.

### Acknowledgments and throughput

Ingestion is asynchronous in every SDK. An `ingest` call returns as soon as the record is queued — the SDK sends it and tracks its acknowledgment on a background task. To confirm that records were durably committed, call `flush()`; it returns once everything queued so far has been acknowledged.

The idiomatic flow is therefore **ingest in a loop, then `flush()`** — once at the end of a bounded batch, or periodically for a long-running stream. Where the SDK supports it, you can instead register an ack callback and be notified as records commit, without blocking at all.

Each `ingest` also returns the record's offset, and `wait_for_offset(offset)` blocks until that offset is acknowledged. That's useful when a particular record must be confirmed before you continue; because acknowledgments are ordered, waiting on the last offset of a run confirms the whole run. The one thing to avoid is waiting on every record inside a tight loop — that turns the asynchronous pipeline into a synchronous request/response and limits throughput to a single record per network round-trip.

See each SDK's README for exact method names and a runnable example.

## HTTP Proxy Support

All SDKs support HTTP CONNECT proxies via environment variables, following gRPC core conventions. The first variable found (in order) is used:

| Proxy | No-proxy |
|-------|----------|
| `grpc_proxy` / `GRPC_PROXY` | `no_grpc_proxy` / `NO_GRPC_PROXY` |
| `https_proxy` / `HTTPS_PROXY` | `no_proxy` / `NO_PROXY` |
| `http_proxy` / `HTTP_PROXY` | |
| Proxy | No-proxy |
| ----------------------------- | --------------------------------- |
| `grpc_proxy` / `GRPC_PROXY` | `no_grpc_proxy` / `NO_GRPC_PROXY` |
| `https_proxy` / `HTTPS_PROXY` | `no_proxy` / `NO_PROXY` |
| `http_proxy` / `HTTP_PROXY` | |

The `no_proxy` value is a comma-separated list of hostnames (suffix-matched) or `*` to bypass the proxy entirely.

Expand Down
18 changes: 18 additions & 0 deletions go/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@

Go wrapper around the Rust core via cgo and the C FFI library.

## Client code patterns (performance)

When writing or reviewing client/example code, follow the idiomatic async flow.
`IngestRecordOffset()` (and `IngestRecordsOffset()` / `IngestBatch()`) return as

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestBatch is a method on ZerobusArrowStream (go/arrow_stream.go:168), not on ZerobusStream where IngestRecordOffset/IngestRecordsOffset live. This reads as if all three are on the same type. We should maybe drop IngestBatch() from the list or note it belongs to the Arrow stream. Flagged by LLM so probably useful.

soon as the record is queued; the SDK sends it and tracks its acknowledgment in
the background.

- Ingest in a loop, then call `Flush()` to confirm durability — once for a
bounded batch, or periodically for a long-running stream.
- Acks are ordered, so if you only need to confirm a group of records, call
`WaitForOffset()` on the LAST offset — it confirms all prior offsets too.
- Use `WaitForOffset()` when a specific record must be confirmed before
continuing; prefer `Flush()` for bulk durability. Avoid calling
`WaitForOffset()` after every record in a tight loop, since that limits
throughput to one record per round-trip.
- There is no ack-callback API in the Go SDK; use `Flush()` / `WaitForOffset()`
(or the fire-and-forget `IngestRecordNowait`/`IngestRecordsNowait`).

## Structure

```
Expand Down
2 changes: 2 additions & 0 deletions go/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Documentation

- Clarified throughput guidance in the README, godoc, and examples: ingest records in a loop without waiting and call `Flush()` once, rather than calling `WaitForOffset()` after every record. Documented that the ack watermark is monotonic, so waiting on the last offset confirms all prior records.

### Internal Changes

### API Changes
112 changes: 75 additions & 37 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,22 @@ func main() {
}
defer stream.Close()

// 4. Send record to server and get offset
// The offset is a logical sequence number assigned to this record
// 4. Send record to server and get offset.
// IngestRecordOffset returns as soon as the record is queued; the offset is a
// logical sequence number assigned to this record. The server round-trip
// happens in the background.
offset, err := stream.IngestRecordOffset(`{"id": 1, "message": "Hello"}`)
if err != nil {
log.Fatal(err)
}
log.Printf("Record queued for ingestion with offset %d", offset)

// 5. Wait for server to acknowledge the record is durably written
// 5. Wait for the server to acknowledge the record is durably written.
//
// WaitForOffset confirms this one record, which is all this example needs.
// For real workloads, the idiomatic flow is to ingest many records in a loop
// and call stream.Flush() once at the end. See the "Idiomatic high-throughput
// flow" under Usage Guide → Ingest Data.
if err := stream.WaitForOffset(offset); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -247,7 +254,9 @@ if err != nil {
}
log.Printf("Record queued with offset %d", offset)

// 4. Optionally wait for server acknowledgment
// 4. Optionally wait for server acknowledgment of this specific record.
// For bulk ingestion, ingest in a loop and call stream.Flush() once instead —
// see the "Idiomatic high-throughput flow" under Usage Guide → Ingest Data.
if err := stream.WaitForOffset(offset); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -477,21 +486,40 @@ defer stream.Close()

### 4. Ingest Data

**Single record:**
> **Acknowledgments and throughput.** Ingestion is asynchronous.
> `IngestRecordOffset()` returns as soon as the record is queued; the SDK sends it
> and tracks its acknowledgment in the background. To confirm records are durably
> committed, call `Flush()` — it returns once everything queued so far is
> acknowledged. The idiomatic flow is **ingest in a loop, then `Flush()`** (once
> for a bounded batch, or periodically for a long-running stream). Each ingest
> also returns the record's offset, and `WaitForOffset(offset)` blocks until that
> offset is acknowledged — handy when a specific record must be confirmed before
> continuing (acks are ordered, so waiting on the last offset confirms the whole
> run). Just avoid calling `WaitForOffset()` after every record in a tight loop,
> since that limits throughput to one record per round-trip.

**Idiomatic high-throughput flow: ingest in a loop, then `Flush()` once.**

```go
// JSON (string) - queues record and returns offset
offset, err := stream.IngestRecordOffset(`{"id": 1, "value": "hello"}`)
if err != nil {
// Ingest many records without waiting between them.
for i := 0; i < 100000; i++ {
jsonData := fmt.Sprintf(`{"id": %d, "timestamp": %d}`, i, time.Now().Unix())
if _, err := stream.IngestRecordOffset(jsonData); err != nil {
log.Printf("Record %d failed: %v", i, err)
continue
}
}

// Wait for ALL pending records to be acknowledged in a single call.
if err := stream.Flush(); err != nil {
log.Fatal(err)
}
log.Printf("Record queued at offset: %d", offset)
```

**Batch ingestion for high throughput:**
**Batch ingestion for even higher throughput:**

```go
// Ingest multiple records at once
// Ingest multiple records in one call (one offset for the whole batch).
records := []interface{}{
`{"id": 1, "value": "first"}`,
`{"id": 2, "value": "second"}`,
Expand All @@ -502,28 +530,26 @@ if err != nil {
log.Fatal(err)
}
log.Printf("Batch queued with offset: %d", batchOffset)
// ... ingest more batches ...
stream.Flush() // wait for everything at the end

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other Flush() and WaitForOffset() calls in this README check for errors; this lone one does not. We can change to something like:

if err := stream.Flush(); err != nil {
    log.Fatal(err)
}

```

**High throughput pattern:**
**Single record with explicit confirmation:**

```go
// Ingest many records without waiting
for i := 0; i < 100000; i++ {
jsonData := fmt.Sprintf(`{"id": %d, "timestamp": %d}`, i, time.Now().Unix())
offset, err := stream.IngestRecordOffset(jsonData)
if err != nil {
log.Printf("Record %d failed: %v", i, err)
continue
}

// Optional: log progress
if i%10000 == 0 {
log.Printf("Ingested %d records, latest offset: %d", i, offset)
}
// JSON (string) - queues record and returns offset.
offset, err := stream.IngestRecordOffset(`{"id": 1, "value": "hello"}`)
if err != nil {
log.Fatal(err)
}
log.Printf("Record queued at offset: %d", offset)

// Wait for all records to be acknowledged
stream.Flush()
// WaitForOffset confirms this specific record before continuing. For bulk
// ingestion, prefer ingesting in a loop and calling Flush() once (see the
// high-throughput flow above).
if err := stream.WaitForOffset(offset); err != nil {
log.Fatal(err)
}
```

**Concurrent ingestion with goroutines:**
Expand Down Expand Up @@ -620,13 +646,18 @@ if err := stream.WaitForOffset(batchOffset); err != nil {
}
log.Println("Batch confirmed")

// High-throughput:
// Idiomatic high-throughput flow: ingest in a loop, then Flush() once to
// confirm everything queued so far.
for i := 0; i < 1000; i++ {
_, _ := stream.IngestRecordOffset(record)
if _, err := stream.IngestRecordOffset(record); err != nil {
log.Printf("Record %d failed: %v", i, err)
}
}

// Use Flush() to wait for all pending acknowledgments at once
stream.Flush()
// Use Flush() to wait for all pending acknowledgments at once.
if err := stream.Flush(); err != nil {
log.Fatal(err)
}
```

### 6. Error Handling
Expand Down Expand Up @@ -898,16 +929,17 @@ The test suite includes:

1. **Reuse SDK Instances** - Create one `ZerobusSdk` per application and reuse for multiple streams
2. **Always Close Streams** - Use `defer stream.Close()` to ensure all data is flushed
3. **Choose the Right Ingestion Method**:
3. **Ingest, then `Flush()`** - `IngestRecordOffset()`/`IngestRecordsOffset()` return as soon as the record is queued and track acknowledgment in the background. The idiomatic flow is to ingest in a loop and call `Flush()` to confirm durability. Use `WaitForOffset()` when a specific record must be confirmed before continuing (acks are ordered, so the last offset confirms the whole group). Just avoid calling `WaitForOffset()` after every record in a tight loop, since that limits throughput to one record per round-trip.
4. **Choose the Right Ingestion Method**:
- Use `IngestRecordsOffset()` for high throughput batch ingestion
- Use `IngestRecordOffset()` when processing records individually
- Both return offsets directly; use `WaitForOffset()` to explicitly wait for acknowledgments
- The older `IngestRecord()` method is deprecated
4. **Tune Inflight Limits** - Adjust `MaxInflightRequests` based on memory and throughput needs
5. **Enable Recovery** - Always set `Recovery: true` in production environments
6. **Use Batch Ingestion** - For high throughput, ingest many records before calling `Flush()`
7. **Monitor Errors** - Log and alert on non-retryable errors
8. **Use Protocol Buffers for Production** - More efficient than JSON for high-volume scenarios
5. **Tune Inflight Limits** - Adjust `MaxInflightRequests` based on memory and throughput needs
6. **Enable Recovery** - Always set `Recovery: true` in production environments
7. **Use Batch Ingestion** - For high throughput, ingest many records before calling `Flush()`
8. **Monitor Errors** - Log and alert on non-retryable errors
9. **Use Protocol Buffers for Production** - More efficient than JSON for high-volume scenarios

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate number 9 in list below, this line should be followed with 10/11/12.

9. **Secure Credentials** - Never hardcode secrets; use environment variables or secret managers
10. **Test Recovery** - Simulate failures to verify your error handling logic
11. **One Stream Per Goroutine** - Don't share streams across goroutines; create separate streams for concurrent ingestion
Expand Down Expand Up @@ -1120,6 +1152,12 @@ Waits for the server to acknowledge that a specific record has been durably writ

Unlike `Flush()` which waits for all pending records, this waits only for a specific offset, allowing more granular control.

> Use `WaitForOffset()` when a specific record must be confirmed before
> continuing; acks are ordered, so waiting on the last offset of a group confirms
> all prior offsets too. For bulk durability, prefer ingesting in a loop and
> calling `Flush()` once. Avoid calling `WaitForOffset()` after every record in a
> tight loop, since that limits throughput to one record per round-trip.

**Example:**
```go
// Send multiple records
Expand Down
Loading
Loading