Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions docs/advanced/config-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,19 @@ batch_size: 10 # Default: 10, chunks per batch
max_workers: 1 # Default: 1, concurrent workers per batch
max_retries: 3 # Default: 3, API retry attempts
base_delay: 1.0 # Default: 1.0, seconds between retries
tokens_per_minute: null # Default: null, max tokens per minute
requests_per_minute: null # Default: null, max requests per minute
rate_limit_tokens: null # Default: null, max tokens per rate limit period
rate_limit_requests: null # Default: null, max requests per rate limit period
rate_limit_period_seconds: 60.0 # Default: 60.0, rate limit window in seconds
max_completion_tokens: 4096 # Default: 4096, max completion tokens per request

# API Pass-through
api_kwargs: {} # Default: {}, extra kwargs sent to the LLM API (e.g. {store: false})

# Cost Management
track_cost: true # Default: true
max_budget: null # Default: null, max spend in dollars (requires track_cost: true)
model_input_cost_per_1M_tokens: null # Default: auto-detected from model database
model_output_cost_per_1M_tokens: null # Default: auto-detected from model database
model_input_cost_per_1M_tokens: null # Default: auto-detected via tokencost
model_output_cost_per_1M_tokens: null # Default: auto-detected via tokencost

# Data Preprocessing
target_column: "text" # Default: "text", input text column name
Expand Down Expand Up @@ -181,8 +185,9 @@ batch_size: 20
max_workers: 4
max_retries: 3
base_delay: 1.0
tokens_per_minute: 500000
requests_per_minute: 500
rate_limit_tokens: 500000
rate_limit_requests: 500
rate_limit_period_seconds: 60.0

# Cost tracking
track_cost: true
Expand Down Expand Up @@ -256,11 +261,15 @@ Contains all LLM-related settings including provider, model, prompts, processing
| `batch_size` | int | 10 | Number of chunks processed per batch |
| `max_workers` | int | 1 | Concurrent workers (within each batch) |
| `base_delay` | float | 1.0 | Seconds between retry attempts |
| `rate_limit_tokens` | int | null | Max tokens per rate limit period |
| `rate_limit_requests` | int | null | Max requests per rate limit period |
| `rate_limit_period_seconds` | float | 60.0 | Rate limit window in seconds |
| `max_completion_tokens` | int | 4096 | Maximum completion tokens per request |
| `api_kwargs` | dict | {} | Extra kwargs passed through to the LLM API call |
| `track_cost` | bool | true | Enable cost tracking |
| `max_budget` | float | null | Maximum budget in dollars (requires `track_cost: true`) |
| `model_input_cost_per_1M_tokens` | float | null | Custom input token cost (auto-detected from model database if null) |
| `model_output_cost_per_1M_tokens` | float | null | Custom output token cost (auto-detected from model database if null) |
| `model_input_cost_per_1M_tokens` | float | null | Custom input token cost (auto-detected via tokencost if null) |
| `model_output_cost_per_1M_tokens` | float | null | Custom output token cost (auto-detected via tokencost if null) |

### Data Preprocessing Config

Expand Down
2 changes: 1 addition & 1 deletion docs/advanced/large-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ delm = DELM(

**Best Practice**: Set `max_workers` ≤ `batch_size`. Having more workers than chunks in a batch just wastes resources. For example, if `batch_size=10` and `max_workers=20`, you'll have 10 idle workers.

**Warning**: More workers = more concurrent API calls = higher rate limit usage. If you hit "429 Too Many Requests" errors, you may need to reduce `max_workers` or increase `base_delay`. A better solution might be to specify the exact TPM and RPM parameters for your specific provider and model.
**Warning**: More workers = more concurrent API calls = higher rate limit usage. If you hit "429 Too Many Requests" errors, you may need to reduce `max_workers` or increase `base_delay`. A better solution is to set `rate_limit_tokens` and `rate_limit_requests` to match your provider's limits (with `rate_limit_period_seconds` to set the time window).

### Overwrite vs Resume

Expand Down
4 changes: 3 additions & 1 deletion docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ LLM and extraction settings.

**Attributes:**
- `provider`, `model`, `temperature`
- `batch_size`, `max_workers`, `max_retries`, `base_delay`, `tokens_per_minute`, `requests_per_minute`
- `batch_size`, `max_workers`, `max_retries`, `base_delay`
- `rate_limit_tokens`, `rate_limit_requests`, `rate_limit_period_seconds`
- `max_completion_tokens`
- `track_cost`, `max_budget`
- `model_input_cost_per_1M_tokens`, `model_output_cost_per_1M_tokens`
- `prompt_template`, `system_prompt`
- `api_kwargs`

---

Expand Down
6 changes: 4 additions & 2 deletions docs/reference/delm.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ delm = DELM(
| `max_workers` | `int` | `1` | Concurrent workers per batch |
| `max_retries` | `int` | `3` | Retry attempts for failed requests |
| `base_delay` | `float` | `1.0` | Exponential backoff base delay (seconds) |
| `tokens_per_minute` | `int` | `null` | Maximum tokens per minute |
| `requests_per_minute` | `int` | `null` | Maximum requests per minute |
| `rate_limit_tokens` | `int` | `null` | Maximum tokens per rate limit period |
| `rate_limit_requests` | `int` | `null` | Maximum requests per rate limit period |
| `rate_limit_period_seconds` | `float` | `60.0` | Rate limit window in seconds |
| `max_completion_tokens` | `int` | `4096` | Max completion tokens per request |
| `api_kwargs` | `dict \| None` | `None` | Extra kwargs passed through to the LLM API call |


### Cost Tracking
Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/cost-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ DELM provides tools to estimate costs before running a job, track costs during e

Before running a large extraction job, you should estimate the potential cost. DELM offers two methods for this: a free input-only estimate and a more accurate sample-based estimate.

**Note on Pricing**: Since model prices change frequently, you should configure your `DELM` instance with current pricing if the defaults are outdated.
**Note on Pricing**: DELM uses the [tokencost](https://github.com/AgentOps-AI/tokencost) package to look up model prices automatically (400+ models supported). If your model isn't in the database or prices have changed, override with `model_input_cost_per_1M_tokens` and `model_output_cost_per_1M_tokens`.

### 1. Input Token Estimation (Free)

Expand Down
43 changes: 43 additions & 0 deletions docs/user-guide/model-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,49 @@ delm = DELM(
)
```

## API Pass-through (`api_kwargs`)

The `api_kwargs` parameter lets you pass arbitrary keyword arguments through to the underlying LLM API call. This is useful for provider-specific options that DELM doesn't expose directly.

```python
from delm import DELM

# Disable request storage on Fireworks AI
delm = DELM(
schema=my_schema,
provider="openai",
model="accounts/fireworks/models/llama-v3p1-8b-instruct",
base_url="https://api.fireworks.ai/inference/v1",
api_kwargs={"store": False},
)
```

Any keys in `api_kwargs` are unpacked into the `instructor` `create_with_completion` call alongside `model`, `temperature`, `messages`, etc.

## Rate Limiting

Control how fast DELM sends API requests using token-bucket rate limiting. You can specify the number of tokens and/or requests allowed within a configurable time window.

```python
from delm import DELM

# 500k tokens and 500 requests per minute (default period)
delm = DELM(
schema=my_schema,
rate_limit_tokens=500_000,
rate_limit_requests=500,
)

# 33k tokens per second (for high-throughput providers)
delm = DELM(
schema=my_schema,
rate_limit_tokens=33_000,
rate_limit_period_seconds=1.0,
)
```

If a single request requires more tokens than the limit (e.g., a 99k-token prompt with a 33k token/sec limit), the limiter waits until the bucket is fully refilled before allowing that request, then naturally delays subsequent requests until the "debt" is recovered.

## API Keys

DELM reads API keys from environment variables:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"beautifulsoup4>=4.11.0",
"python-docx>=0.8.11",
"tiktoken>=0.5.0",
"tokencost>=0.1.0",
]

[project.optional-dependencies]
Expand Down
46 changes: 32 additions & 14 deletions src/delm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ class LLMExtractionConfig(BaseConfig):
batch_size: int
max_workers: int
base_delay: float
tokens_per_minute: Optional[int]
requests_per_minute: Optional[int]
rate_limit_tokens: Optional[int]
rate_limit_requests: Optional[int]
rate_limit_period_seconds: float
track_cost: bool
max_budget: Optional[float]
model_input_cost_per_1M_tokens: Optional[float]
model_output_cost_per_1M_tokens: Optional[float]
max_completion_tokens: int
api_kwargs: dict

def get_provider_string(self) -> str:
"""Return the combined provider string for Instructor.
Expand Down Expand Up @@ -117,13 +119,17 @@ def validate(self):
raise ValueError(
f"base_delay must be non-negative. base_delay: {self.base_delay}, Suggestion: Use a non-negative float"
)
if self.tokens_per_minute is not None and self.tokens_per_minute <= 0:
if self.rate_limit_tokens is not None and self.rate_limit_tokens <= 0:
raise ValueError(
f"tokens_per_minute must be positive. tokens_per_minute: {self.tokens_per_minute}, Suggestion: Use a positive integer"
f"rate_limit_tokens must be positive. rate_limit_tokens: {self.rate_limit_tokens}, Suggestion: Use a positive integer"
)
if self.requests_per_minute is not None and self.requests_per_minute <= 0:
if self.rate_limit_requests is not None and self.rate_limit_requests <= 0:
raise ValueError(
f"requests_per_minute must be positive. requests_per_minute: {self.requests_per_minute}, Suggestion: Use a positive integer"
f"rate_limit_requests must be positive. rate_limit_requests: {self.rate_limit_requests}, Suggestion: Use a positive integer"
)
if self.rate_limit_period_seconds <= 0:
raise ValueError(
f"rate_limit_period_seconds must be positive. rate_limit_period_seconds: {self.rate_limit_period_seconds}"
)
if not isinstance(self.track_cost, bool):
raise ValueError(
Expand All @@ -146,6 +152,10 @@ def validate(self):
raise ValueError(
f"max_completion_tokens must be greater than 0. max_completion_tokens: {self.max_completion_tokens}"
)
if not isinstance(self.api_kwargs, dict):
raise ValueError(
f"api_kwargs must be a dict. api_kwargs: {self.api_kwargs}"
)

def to_dict(self) -> dict:
return {
Expand All @@ -160,13 +170,15 @@ def to_dict(self) -> dict:
"batch_size": self.batch_size,
"max_workers": self.max_workers,
"base_delay": self.base_delay,
"tokens_per_minute": self.tokens_per_minute,
"requests_per_minute": self.requests_per_minute,
"rate_limit_tokens": self.rate_limit_tokens,
"rate_limit_requests": self.rate_limit_requests,
"rate_limit_period_seconds": self.rate_limit_period_seconds,
"track_cost": self.track_cost,
"max_budget": self.max_budget,
"model_input_cost_per_1M_tokens": self.model_input_cost_per_1M_tokens,
"model_output_cost_per_1M_tokens": self.model_output_cost_per_1M_tokens,
"max_completion_tokens": self.max_completion_tokens,
"api_kwargs": self.api_kwargs,
}


Expand Down Expand Up @@ -449,13 +461,15 @@ def __init__(
max_workers: int = 1,
max_retries: int = 3,
base_delay: float = 1.0,
tokens_per_minute: Optional[int] = None,
requests_per_minute: Optional[int] = None,
rate_limit_tokens: Optional[int] = None,
rate_limit_requests: Optional[int] = None,
rate_limit_period_seconds: float = 60.0,
track_cost: bool = True,
max_budget: Optional[float] = None,
model_input_cost_per_1M_tokens: Optional[float] = None,
model_output_cost_per_1M_tokens: Optional[float] = None,
max_completion_tokens: int = 4096,
api_kwargs: Optional[Dict[str, Any]] = None,
# Data Preprocessing (flat)
target_column: str = "text",
drop_target_column: bool = False,
Expand Down Expand Up @@ -513,13 +527,15 @@ def __init__(
max_workers=max_workers,
max_retries=max_retries,
base_delay=base_delay,
tokens_per_minute=tokens_per_minute,
requests_per_minute=requests_per_minute,
rate_limit_tokens=rate_limit_tokens,
rate_limit_requests=rate_limit_requests,
rate_limit_period_seconds=rate_limit_period_seconds,
track_cost=track_cost,
max_budget=max_budget,
model_input_cost_per_1M_tokens=model_input_cost_per_1M_tokens,
model_output_cost_per_1M_tokens=model_output_cost_per_1M_tokens,
max_completion_tokens=max_completion_tokens,
api_kwargs=api_kwargs if api_kwargs is not None else {},
)
self.data_preprocessing_cfg = DataPreprocessingConfig(
target_column=target_column,
Expand Down Expand Up @@ -577,13 +593,15 @@ def from_dict(cls, data: Dict[str, Any]) -> "DELMConfig":
max_workers=data["max_workers"],
max_retries=data["max_retries"],
base_delay=data["base_delay"],
tokens_per_minute=data["tokens_per_minute"],
requests_per_minute=data["requests_per_minute"],
rate_limit_tokens=data["rate_limit_tokens"],
rate_limit_requests=data["rate_limit_requests"],
rate_limit_period_seconds=data.get("rate_limit_period_seconds", 60.0),
track_cost=data["track_cost"],
max_budget=data["max_budget"],
model_input_cost_per_1M_tokens=data["model_input_cost_per_1M_tokens"],
model_output_cost_per_1M_tokens=data["model_output_cost_per_1M_tokens"],
max_completion_tokens=data.get("max_completion_tokens", 4096),
api_kwargs=data.get("api_kwargs", {}),
target_column=data["target_column"],
drop_target_column=data["drop_target_column"],
splitting_strategy=data["splitting_strategy"],
Expand Down
1 change: 1 addition & 0 deletions src/delm/core/extraction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def _instructor_extract():
],
max_completion_tokens=self.model_config.max_completion_tokens,
max_retries=0,
**self.model_config.api_kwargs,
)
)
except Exception as e:
Expand Down
39 changes: 25 additions & 14 deletions src/delm/delm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ def __init__(
max_workers: int = 1,
max_retries: int = 3,
base_delay: float = 1.0,
tokens_per_minute: Optional[int] = None,
requests_per_minute: Optional[int] = None,
rate_limit_tokens: Optional[int] = None,
rate_limit_requests: Optional[int] = None,
rate_limit_period_seconds: float = 60.0,
track_cost: bool = True,
max_budget: Optional[float] = None,
model_input_cost_per_1M_tokens: Optional[float] = None,
model_output_cost_per_1M_tokens: Optional[float] = None,
max_completion_tokens: int = 4096,
api_kwargs: Optional[Dict[str, Any]] = None,
# Data Preprocessing (flat)
target_column: str = "text",
drop_target_column: bool = False,
Expand Down Expand Up @@ -115,14 +117,18 @@ def __init__(
max_workers: Maximum number of concurrent workers for parallel processing.
max_retries: Maximum number of retry attempts for failed API calls.
base_delay: Base delay in seconds for exponential backoff between retries.
tokens_per_minute: Rate limit for tokens per minute.
requests_per_minute: Rate limit for requests per minute.
rate_limit_tokens: Maximum number of tokens allowed per rate limit period.
rate_limit_requests: Maximum number of requests allowed per rate limit period.
rate_limit_period_seconds: Duration of the rate limit window in seconds
(default 60). For example, set to 2 for "tokens per 2 seconds".
track_cost: Whether to track API costs during extraction.
max_budget: Maximum budget in dollars. Extraction stops if exceeded.
model_input_cost_per_1M_tokens: Override input token cost per 1M tokens.
Uses built-in pricing if not specified.
Uses tokencost pricing if not specified.
model_output_cost_per_1M_tokens: Override output token cost per 1M tokens.
Uses built-in pricing if not specified.
Uses tokencost pricing if not specified.
api_kwargs: Extra keyword arguments passed through to the underlying LLM API
call (e.g. ``{"store": False}`` for Fireworks AI).

target_column: Name of the column containing text to extract from.
drop_target_column: Whether to drop the original target column after
Expand Down Expand Up @@ -170,13 +176,15 @@ def __init__(
max_workers=max_workers,
max_retries=max_retries,
base_delay=base_delay,
tokens_per_minute=tokens_per_minute,
requests_per_minute=requests_per_minute,
rate_limit_tokens=rate_limit_tokens,
rate_limit_requests=rate_limit_requests,
rate_limit_period_seconds=rate_limit_period_seconds,
track_cost=track_cost,
max_budget=max_budget,
model_input_cost_per_1M_tokens=model_input_cost_per_1M_tokens,
model_output_cost_per_1M_tokens=model_output_cost_per_1M_tokens,
max_completion_tokens=max_completion_tokens,
api_kwargs=api_kwargs,
target_column=target_column,
drop_target_column=drop_target_column,
splitting_strategy=splitting_strategy,
Expand Down Expand Up @@ -264,13 +272,15 @@ def from_config(
max_workers=config.llm_extraction_cfg.max_workers,
max_retries=config.llm_extraction_cfg.max_retries,
base_delay=config.llm_extraction_cfg.base_delay,
tokens_per_minute=config.llm_extraction_cfg.tokens_per_minute,
requests_per_minute=config.llm_extraction_cfg.requests_per_minute,
rate_limit_tokens=config.llm_extraction_cfg.rate_limit_tokens,
rate_limit_requests=config.llm_extraction_cfg.rate_limit_requests,
rate_limit_period_seconds=config.llm_extraction_cfg.rate_limit_period_seconds,
track_cost=config.llm_extraction_cfg.track_cost,
max_budget=config.llm_extraction_cfg.max_budget,
model_input_cost_per_1M_tokens=config.llm_extraction_cfg.model_input_cost_per_1M_tokens,
model_output_cost_per_1M_tokens=config.llm_extraction_cfg.model_output_cost_per_1M_tokens,
max_completion_tokens=config.llm_extraction_cfg.max_completion_tokens,
api_kwargs=config.llm_extraction_cfg.api_kwargs,
target_column=config.data_preprocessing_cfg.target_column,
drop_target_column=config.data_preprocessing_cfg.drop_target_column,
splitting_strategy=config.data_preprocessing_cfg.splitting_strategy,
Expand Down Expand Up @@ -512,12 +522,13 @@ def _initialize_components(self) -> None:
)

if (
self.config.llm_extraction_cfg.tokens_per_minute
or self.config.llm_extraction_cfg.requests_per_minute
self.config.llm_extraction_cfg.rate_limit_tokens
or self.config.llm_extraction_cfg.rate_limit_requests
):
self.rate_limiter = BucketRateLimiter(
tokens_per_minute=self.config.llm_extraction_cfg.tokens_per_minute,
requests_per_minute=self.config.llm_extraction_cfg.requests_per_minute,
rate_limit_tokens=self.config.llm_extraction_cfg.rate_limit_tokens,
rate_limit_requests=self.config.llm_extraction_cfg.rate_limit_requests,
period_seconds=self.config.llm_extraction_cfg.rate_limit_period_seconds,
)
else:
self.rate_limiter = NoOpRateLimiter()
Expand Down
Loading