Middleware System¶
Middleware lets you intercept and modify the crawl flow at key points.
Middleware Hooks¶
Middleware provides four hooks:
| Hook | When Called | Use Cases |
|---|---|---|
pre_fetch |
Before fetching a URL | Add headers, filter URLs, log requests |
post_fetch |
After successful fetch | Transform content, cache responses |
post_extract |
After extraction | Enrich items, filter links |
on_error |
When an error occurs | Custom error handling, recovery |
Creating Middleware¶
Inherit from Middleware and override the hooks you need:
from databrew import Middleware, MiddlewareContext
class MyMiddleware(Middleware):
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
# Called before each fetch
print(f"Fetching: {ctx.url}")
return ctx
async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
# Called after successful fetch
print(f"Fetched: {len(ctx.content.content)} bytes")
return ctx
async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
# Called after extraction
if ctx.extract_result:
print(f"Found {len(ctx.extract_result.items)} items")
return ctx
async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
# Called when an error occurs
print(f"Error: {ctx.error.message}")
return ctx
Using Middleware¶
Pass middleware to the Orchestrator:
from databrew import Orchestrator
orchestrator = Orchestrator(
store=store,
fetcher=fetcher,
extractor=extractor,
policy=policy,
middleware=[MyMiddleware(), AnotherMiddleware()],
)
MiddlewareContext¶
The context object passed through the middleware chain:
@dataclass
class MiddlewareContext:
url: str # Current URL
url_type: str # "item" or "pagination"
headers: dict[str, str] # Request headers (modify in pre_fetch)
content: PageContent | None # Fetched content (in post_fetch+)
extract_result: ExtractResult | None # Extraction result (in post_extract)
error: CrawlError | None # Error if one occurred (in on_error)
skip: bool # Set True to skip this URL
data: dict[str, Any] # Pass data between hooks
Execution Order¶
pre_fetchandpost_fetch: executed in order (first middleware first)post_extractandon_error: executed in reverse order (last middleware first)
pre_fetch: MW1 → MW2 → MW3
post_fetch: MW1 → MW2 → MW3
post_extract: MW3 → MW2 → MW1
on_error: MW3 → MW2 → MW1
Built-in Middleware¶
LoggingMiddleware¶
Logs all requests and responses:
from databrew import LoggingMiddleware
import logging
middleware = LoggingMiddleware(level=logging.INFO)
HeaderMiddleware¶
Add headers to all requests:
from databrew import HeaderMiddleware
middleware = HeaderMiddleware({
"User-Agent": "MyBot/1.0",
"Accept-Language": "en-US",
})
UrlFilterMiddleware¶
Filter URLs by pattern:
from databrew import UrlFilterMiddleware
middleware = UrlFilterMiddleware(
skip_patterns=[r"/admin/", r"\.pdf$"], # Skip these
allow_patterns=[r"/products/"], # Only allow these
)
RetryOnPatternMiddleware¶
Retry when content matches a pattern (e.g., CAPTCHA):
from databrew import RetryOnPatternMiddleware
middleware = RetryOnPatternMiddleware(
patterns=["Please verify you are human", "Access denied"],
max_retries=3,
)
ItemEnricherMiddleware¶
Enrich extracted items:
from databrew import ItemEnricherMiddleware
from datetime import datetime
def add_timestamp(item: dict) -> dict:
item["crawled_at"] = datetime.now().isoformat()
return item
middleware = ItemEnricherMiddleware(enricher=add_timestamp)
LinkFilterMiddleware¶
Filter extracted links:
from databrew import LinkFilterMiddleware
middleware = LinkFilterMiddleware(
skip_item_patterns=[r"/archive/", r"/old/"],
skip_pagination_patterns=[r"page=1$"],
)
Common Patterns¶
Authentication¶
class AuthMiddleware(Middleware):
def __init__(self, token: str):
self.token = token
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
ctx.headers["Authorization"] = f"Bearer {self.token}"
return ctx
Skip Certain URLs¶
class SkipMiddleware(Middleware):
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
if "/admin/" in ctx.url:
ctx.skip = True # Skip this URL
return ctx
Transform Content¶
class ContentMiddleware(Middleware):
async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
if ctx.content:
# Modify content before extraction
content = ctx.content.content.replace("old", "new")
ctx.content = PageContent(
url=ctx.content.url,
content=content,
content_type=ctx.content.content_type,
status_code=ctx.content.status_code,
headers=ctx.content.headers,
)
return ctx
Enrich Items with External Data¶
class GeocodingMiddleware(Middleware):
async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
if ctx.extract_result and ctx.extract_result.items:
enriched = []
for item in ctx.extract_result.items:
if "address" in item:
coords = await self.geocode(item["address"])
item["coordinates"] = coords
enriched.append(item)
ctx.extract_result = ExtractResult.ok(
items=enriched,
pagination_links=ctx.extract_result.pagination_links,
item_links=ctx.extract_result.item_links,
)
return ctx
Error Recovery¶
class RecoveryMiddleware(Middleware):
async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
if ctx.error and "rate limit" in ctx.error.message.lower():
# Clear error to trigger retry
ctx.error = None
# Could also add delay here
return ctx
Pass Data Between Hooks¶
class TimingMiddleware(Middleware):
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
import time
ctx.data["start_time"] = time.time()
return ctx
async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
import time
elapsed = time.time() - ctx.data.get("start_time", 0)
print(f"Fetch took {elapsed:.2f}s")
return ctx
Error Handling in Middleware¶
Errors in middleware hooks are caught and logged, but don't stop the crawl:
class SafeMiddleware(Middleware):
async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
try:
# Your logic here
pass
except Exception as e:
# Log but don't re-raise
logger.warning(f"Middleware error: {e}")
return ctx
Complete Example¶
import asyncio
from databrew import (
load_config,
create_components,
Orchestrator,
Middleware,
MiddlewareContext,
HeaderMiddleware,
LoggingMiddleware,
)
class ProgressMiddleware(Middleware):
def __init__(self):
self.items = 0
self.urls = 0
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
self.urls += 1
return ctx
async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
if ctx.extract_result:
self.items += len(ctx.extract_result.items)
print(f"Progress: {self.urls} URLs, {self.items} items")
return ctx
async def main():
config = load_config("mysite.toml")
components = create_components(config)
orchestrator = Orchestrator(
store=components.store,
fetcher=components.fetcher,
extractor=components.extractor,
policy=components.policy,
middleware=[
HeaderMiddleware({"User-Agent": "MyBot/1.0"}),
LoggingMiddleware(),
ProgressMiddleware(),
],
)
result = await orchestrator.run()
print(f"Done! {result.store.item_count()} items")
await components.fetcher.close()
components.store.close()
asyncio.run(main())