API Reference¶
Databrew is a composition layer that orchestrates several component packages.
Internal subpackages (databrew.core, databrew.state) are documented here.
External dependencies (fetchkit, extractkit, itemstore) are summarised
with links to their repositories.
databrew.core — Contracts and Utilities¶
Shared Pydantic models and helpers used across the framework.
CrawlPolicy¶
Pydantic model that holds every behavioural rule for a crawl: retry logic, stopping conditions, concurrency, request pacing, and cross-run retry thresholds.
| Field | Type | Default | Description |
|---|---|---|---|
max_retries |
int |
3 |
Max retry attempts per URL |
retry_delay |
float |
1.0 |
Initial retry delay (seconds) |
backoff_factor |
float |
2.0 |
Exponential backoff multiplier |
max_retry_delay |
float |
60.0 |
Cap on retry delay |
retryable_categories |
set[str] |
{"network", "server", "rate_limited"} |
Error categories eligible for retry |
max_requests |
int \| None |
None |
Stop after N requests (unlimited if None) |
max_consecutive_failures |
int |
50 |
Stop after N consecutive failures |
max_error_rate |
float |
0.5 |
Stop if error rate exceeds this (0–1) |
min_requests_for_error_rate |
int |
20 |
Minimum requests before error-rate check applies |
stop_on_empty |
bool |
True |
Stop pagination when a page yields nothing |
stop_on_caught_up |
bool |
False |
Global stop on reaching already-scraped items |
caught_up_threshold |
int |
3 |
Consecutive caught-up pages before global stop |
concurrency |
int |
5 |
Concurrent requests |
delay |
float |
0.0 |
Delay after each batch (seconds) |
jitter |
float |
0.1 |
Random per-request jitter (seconds) |
max_failed_runs |
int |
3 |
Runs a URL can fail before permanent failure |
items_from |
str |
"item" |
URL types to save items from ("item", "pagination", "all") |
Key methods:
should_retry(error, attempts) -> bool— decide whether to retry a failed request.get_retry_delay(attempts) -> float— exponential-backoff delay for the next attempt.should_stop(requests_completed, consecutive_failures, total_failures, consecutive_caught_up) -> tuple[bool, str]— evaluate all stopping rules and return(stop, reason).
CrawlStats¶
Dataclass that tracks runtime counters for progress reporting and policy decisions.
| Attribute | Type | Description |
|---|---|---|
urls_queued |
int |
URLs added to the queue |
urls_processed |
int |
URLs processed so far |
urls_succeeded |
int |
Successful fetches |
urls_failed |
int |
Terminal failures |
urls_retried |
int |
Retried requests |
items_extracted |
int |
Total items saved |
consecutive_failures |
int |
Current failure streak |
consecutive_caught_up |
int |
Consecutive pages where all items already existed |
Key methods:
record_success(items_count, caught_up)— update counters after a successful fetch.record_failure(will_retry)— update counters after a failed fetch.summary() -> dict— snapshot suitable for logging.
Properties: error_rate, elapsed_seconds, urls_per_second.
HooksConfig¶
Pydantic model for lifecycle hook shell commands.
| Field | Type | Default | Description |
|---|---|---|---|
on_start |
str \| None |
None |
Command before crawl starts (exit non-zero to abort) |
on_failure |
str \| None |
None |
Command when max_consecutive_failures is reached |
on_complete |
str \| None |
None |
Command after crawl finishes |
max_hook_retries |
int |
3 |
Max times on_failure can fire per crawl |
hook_timeout |
float |
300.0 |
Timeout per hook execution (seconds) |
Commands support template variables: {name}, {failures}, {items}, {requests}.
load_module_from_path¶
from databrew.core import load_module_from_path
module = load_module_from_path("my_parsers", Path("parsers.py"))
Load a Python module from an arbitrary file path. Used internally to load custom parsers and hooks defined in user scripts.
databrew.state — Crawl State Management¶
URL queue, failure tracking, and item persistence coordination.
StateStore¶
Unified coordinator for crawl state. Delegates to three subsystems:
- UrlQueue — SQLite-backed priority queue for pagination and item URLs.
- StorageEngine (
itemstore) — append-only Parquet item storage. - FailureStore — durable failure tracking in a dedicated SQLite file.
store = StateStore(
storage_path="data/mysite",
id_field="property_id",
)
store.add_pagination_url("https://example.com/listings")
store.add_item_urls(["https://example.com/item/1"])
store.save_item({"property_id": "123", "price": 100000}, url)
| Parameter | Type | Default | Description |
|---|---|---|---|
storage_path |
Path \| str |
— | Directory for state files and items |
id_field |
str \| None |
None |
JSON field to use as item ID |
max_pending_items |
int |
100 |
Flush buffer at this count |
flush_policy |
str |
"finalize" |
"finalize" or "periodic" |
target_max_file_bytes |
int \| None |
None |
Target max Parquet file size |
compression |
str |
"snappy" |
Parquet compression codec |
auto_compact |
bool |
True |
Auto-compact part files on close |
URL queue methods:
| Method | Description |
|---|---|
add_pagination_url(url, priority=0) |
Enqueue a pagination URL |
add_pagination_urls(urls, priority=0) |
Enqueue multiple pagination URLs |
add_item_url(url, priority=10) |
Enqueue an item URL |
add_item_urls(urls, priority=10) |
Enqueue multiple item URLs |
get_next_url() -> UrlTask \| None |
Dequeue the highest-priority URL |
mark_url_done(url) |
Mark as completed, clear failure record |
mark_url_failed(url, error) |
Mark as failed, record in durable store |
schedule_url_retry(url, delay_seconds, error) |
Schedule for delayed retry |
reset_in_progress() -> int |
Crash recovery: reset in-progress to pending |
reset_failed_items(max_failed_runs) -> (reset, perm_failed) |
Reset failed item URLs for retry |
Item storage methods:
| Method | Description |
|---|---|
has_item(item_id) -> bool |
Check item by ID |
has_item_for_url(url) -> bool |
Check item by source URL |
save_item(data, source_url) -> (is_new, key) |
Persist an extracted item |
item_count() -> int |
Total items stored |
get_files() -> list[Path] |
List Parquet part files |
Failure tracking methods:
| Method | Description |
|---|---|
get_failed_urls() -> list[dict] |
All durable failure records |
export_failure_snapshot() |
Write _failed_urls.json |
clear_failure_store() |
Clear all failure records and snapshot |
Lifecycle: Use as a context manager or call close() explicitly.
Closing flushes pending items, syncs failures to the durable store, and
exports the JSON snapshot.
FailureStore¶
Dedicated SQLite store for durable failure tracking. Survives
.state.db deletion. Produces _failed_urls.json snapshots for
cross-machine sync.
| Method | Description |
|---|---|
record_failure(url, url_hash, url_type, failed_runs, error) |
UPSERT a failure record |
record_resolution(url_hash) |
Remove a record when the URL succeeds |
mark_permanently_failed(url_hash) |
Flag as exhausted |
get_all() -> list[dict] |
All failure records |
export_json(path) |
Atomic write to JSON |
import_snapshot(path) -> int |
Merge records from JSON (cross-machine safe) |
count() -> int |
Total failure records |
clear() |
Delete all records |
close() |
Close the database connection |
UrlTask¶
Dataclass representing a URL to be processed.
| Field | Type | Default | Description |
|---|---|---|---|
url |
str |
— | The URL |
url_hash |
str |
— | SHA-1 hash of the URL |
url_type |
str |
"item" |
"pagination" or "item" |
attempts |
int |
0 |
Retry attempts so far |
priority |
int |
0 |
Queue priority (higher = sooner) |
External Component Packages¶
Databrew delegates fetching, extraction, and storage to standalone packages. Import them directly when you need tighter control.
fetchkit — HTTP and Browser Fetchers¶
Provides pluggable fetchers (HTTP via httpx, optional browser via pydoll), request pacing, and a fetcher registry.
Key exports: HttpxFetcher, FetchResult, Content, RequestPacer,
create_fetcher, register_fetcher.
Repository: github.com/datakomari/fetchkit
extractkit — HTML and JSON Extractors¶
CSS/XPath-based HTML extraction and JSON path extraction, with a parser registry for custom transform functions.
Key exports: HtmlExtractor, JsonExtractor, ExtractResult, ItemLink,
register_parser.
Repository: github.com/datakomari/extractkit
itemstore — Parquet Item Storage¶
Append-only rolling Parquet file storage with deduplication, auto-compaction, and a lightweight SQLite index.
Key exports: StorageEngine, compact_storage.
Repository: github.com/datakomari/itemstore
Composition Layer¶
The top-level databrew package provides orchestration and lifecycle APIs:
import databrew
# Configuration
config = databrew.load_config("site.toml")
components = databrew.create_components(config)
# Orchestration
orchestrator = databrew.Orchestrator(components)
result = await orchestrator.run()
# Middleware
chain = databrew.MiddlewareChain([
databrew.LoggingMiddleware(),
databrew.HeaderMiddleware(headers={"User-Agent": "..."}),
])
Key exports: load_config, create_components, WebsiteConfig,
CrawlComponents, Orchestrator, CrawlResult, Middleware,
MiddlewareChain, run_hook, HookContext.
See the User Guide for configuration details and the Middleware section for extending the crawl pipeline.