Skip to content

Middleware System

Middleware lets you intercept and modify the crawl flow at key points.

Middleware Hooks

Middleware provides four hooks:

Hook When Called Use Cases
pre_fetch Before fetching a URL Add headers, filter URLs, log requests
post_fetch After successful fetch Transform content, cache responses
post_extract After extraction Enrich items, filter links
on_error When an error occurs Custom error handling, recovery

Creating Middleware

Inherit from Middleware and override the hooks you need:

from databrew import Middleware, MiddlewareContext

class MyMiddleware(Middleware):
    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        # Called before each fetch
        print(f"Fetching: {ctx.url}")
        return ctx

    async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        # Called after successful fetch
        print(f"Fetched: {len(ctx.content.content)} bytes")
        return ctx

    async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
        # Called after extraction
        if ctx.extract_result:
            print(f"Found {len(ctx.extract_result.items)} items")
        return ctx

    async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
        # Called when an error occurs
        print(f"Error: {ctx.error.message}")
        return ctx

Using Middleware

Pass middleware to the Orchestrator:

from databrew import Orchestrator

orchestrator = Orchestrator(
    store=store,
    fetcher=fetcher,
    extractor=extractor,
    policy=policy,
    middleware=[MyMiddleware(), AnotherMiddleware()],
)

MiddlewareContext

The context object passed through the middleware chain:

@dataclass
class MiddlewareContext:
    url: str                           # Current URL
    url_type: str                      # "item" or "pagination"
    headers: dict[str, str]            # Request headers (modify in pre_fetch)
    content: PageContent | None        # Fetched content (in post_fetch+)
    extract_result: ExtractResult | None  # Extraction result (in post_extract)
    error: CrawlError | None           # Error if one occurred (in on_error)
    skip: bool                         # Set True to skip this URL
    data: dict[str, Any]               # Pass data between hooks

Execution Order

  • pre_fetch and post_fetch: executed in order (first middleware first)
  • post_extract and on_error: executed in reverse order (last middleware first)
pre_fetch:    MW1 → MW2 → MW3
post_fetch:   MW1 → MW2 → MW3
post_extract: MW3 → MW2 → MW1
on_error:     MW3 → MW2 → MW1

Built-in Middleware

LoggingMiddleware

Logs all requests and responses:

from databrew import LoggingMiddleware
import logging

middleware = LoggingMiddleware(level=logging.INFO)

HeaderMiddleware

Add headers to all requests:

from databrew import HeaderMiddleware

middleware = HeaderMiddleware({
    "User-Agent": "MyBot/1.0",
    "Accept-Language": "en-US",
})

UrlFilterMiddleware

Filter URLs by pattern:

from databrew import UrlFilterMiddleware

middleware = UrlFilterMiddleware(
    skip_patterns=[r"/admin/", r"\.pdf$"],      # Skip these
    allow_patterns=[r"/products/"],              # Only allow these
)

RetryOnPatternMiddleware

Retry when content matches a pattern (e.g., CAPTCHA):

from databrew import RetryOnPatternMiddleware

middleware = RetryOnPatternMiddleware(
    patterns=["Please verify you are human", "Access denied"],
    max_retries=3,
)

ItemEnricherMiddleware

Enrich extracted items:

from databrew import ItemEnricherMiddleware
from datetime import datetime

def add_timestamp(item: dict) -> dict:
    item["crawled_at"] = datetime.now().isoformat()
    return item

middleware = ItemEnricherMiddleware(enricher=add_timestamp)

LinkFilterMiddleware

Filter extracted links:

from databrew import LinkFilterMiddleware

middleware = LinkFilterMiddleware(
    skip_item_patterns=[r"/archive/", r"/old/"],
    skip_pagination_patterns=[r"page=1$"],
)

Common Patterns

Authentication

class AuthMiddleware(Middleware):
    def __init__(self, token: str):
        self.token = token

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        ctx.headers["Authorization"] = f"Bearer {self.token}"
        return ctx

Skip Certain URLs

class SkipMiddleware(Middleware):
    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if "/admin/" in ctx.url:
            ctx.skip = True  # Skip this URL
        return ctx

Transform Content

class ContentMiddleware(Middleware):
    async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.content:
            # Modify content before extraction
            content = ctx.content.content.replace("old", "new")
            ctx.content = PageContent(
                url=ctx.content.url,
                content=content,
                content_type=ctx.content.content_type,
                status_code=ctx.content.status_code,
                headers=ctx.content.headers,
            )
        return ctx

Enrich Items with External Data

class GeocodingMiddleware(Middleware):
    async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.extract_result and ctx.extract_result.items:
            enriched = []
            for item in ctx.extract_result.items:
                if "address" in item:
                    coords = await self.geocode(item["address"])
                    item["coordinates"] = coords
                enriched.append(item)

            ctx.extract_result = ExtractResult.ok(
                items=enriched,
                pagination_links=ctx.extract_result.pagination_links,
                item_links=ctx.extract_result.item_links,
            )
        return ctx

Error Recovery

class RecoveryMiddleware(Middleware):
    async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.error and "rate limit" in ctx.error.message.lower():
            # Clear error to trigger retry
            ctx.error = None
            # Could also add delay here
        return ctx

Pass Data Between Hooks

class TimingMiddleware(Middleware):
    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        import time
        ctx.data["start_time"] = time.time()
        return ctx

    async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        import time
        elapsed = time.time() - ctx.data.get("start_time", 0)
        print(f"Fetch took {elapsed:.2f}s")
        return ctx

Error Handling in Middleware

Errors in middleware hooks are caught and logged, but don't stop the crawl:

class SafeMiddleware(Middleware):
    async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
        try:
            # Your logic here
            pass
        except Exception as e:
            # Log but don't re-raise
            logger.warning(f"Middleware error: {e}")
        return ctx

Complete Example

import asyncio
from databrew import (
    load_config,
    create_components,
    Orchestrator,
    Middleware,
    MiddlewareContext,
    HeaderMiddleware,
    LoggingMiddleware,
)

class ProgressMiddleware(Middleware):
    def __init__(self):
        self.items = 0
        self.urls = 0

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        self.urls += 1
        return ctx

    async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.extract_result:
            self.items += len(ctx.extract_result.items)
            print(f"Progress: {self.urls} URLs, {self.items} items")
        return ctx

async def main():
    config = load_config("mysite.toml")
    components = create_components(config)

    orchestrator = Orchestrator(
        store=components.store,
        fetcher=components.fetcher,
        extractor=components.extractor,
        policy=components.policy,
        middleware=[
            HeaderMiddleware({"User-Agent": "MyBot/1.0"}),
            LoggingMiddleware(),
            ProgressMiddleware(),
        ],
    )

    result = await orchestrator.run()
    print(f"Done! {result.store.item_count()} items")

    await components.fetcher.close()
    components.store.close()

asyncio.run(main())