Extending BrokerBridge with plugins

BrokerBridge uses a plugin system for data sources and LLM providers. Add custom signal providers or alternative AI models by implementing one of two abstract base classes.

Architecture overview

DataSourcePlugin

Provides trading signals. The pipeline calls fetch_signals() during each scan interval to get ranked signals.

Examples: IBKR market data, Unusual Whales, custom scanner

LLMProviderPlugin

Evaluates trade proposals. The decision engine calls generate_decision() and expects an approve/reject decision.

Examples: Anthropic, OpenAI, OpenRouter, Google AI

Both types support three auth modes: api_key, oauth, and connection.

DataSourcePlugin interface

python
class DataSourcePlugin(ABC):
    name: str                    # Unique plugin identifier
    display_name: str            # Human-readable name
    auth_type: AuthType          # "api_key" | "oauth" | "connection"
    required: bool = False       # System won't start without it if True

    async def validate_credentials(self) -> bool: ...
    async def fetch_signals(self, symbols: list[str]) -> list[Signal]: ...
    async def health_check(self) -> PluginStatus: ...

LLMProviderPlugin interface

python
class LLMProviderPlugin(ABC):
    name: str                    # Unique plugin identifier
    display_name: str            # Human-readable name
    auth_type: AuthType          # "api_key" | "oauth"
    models: list[str] = []       # Supported model IDs

    async def validate_credentials(self) -> bool: ...
    async def generate_decision(self, proposal: TradeProposal) -> AiTradeDecision: ...
    async def health_check(self) -> PluginStatus: ...

Tutorial: custom data source

my_scanner_plugin.py
from datetime import datetime, timezone
import httpx
from brokerbridge.plugins.base import DataSourcePlugin
from brokerbridge.plugins.types import AuthType, PluginStatus, Signal


class MyScannerPlugin(DataSourcePlugin):
    name = "my_scanner"
    display_name = "My Custom Scanner"
    auth_type: AuthType = "api_key"

    def __init__(self, api_key: str):
        self._api_key = api_key

    async def validate_credentials(self) -> bool:
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                "https://api.example.com/ping",
                headers={"Authorization": f"Bearer {self._api_key}"},
            )
            return resp.status_code == 200

    async def fetch_signals(self, symbols: list[str]) -> list[Signal]:
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                "https://api.example.com/scan",
                headers={"Authorization": f"Bearer {self._api_key}"},
                json={"symbols": symbols},
            )
            resp.raise_for_status()
            data = resp.json()

        signals = []
        for item in data.get("signals", []):
            signals.append(Signal(
                symbol=item["symbol"],
                direction=item["direction"],
                score=item["score"],
                timestamp=datetime.now(timezone.utc),
                source=self.name,
            ))
        return sorted(signals, key=lambda s: s.score, reverse=True)

    async def health_check(self) -> PluginStatus:
        try:
            valid = await self.validate_credentials()
            return PluginStatus(connected=valid)
        except Exception as e:
            return PluginStatus(connected=False, message=str(e))

Available plugins

PluginTypeStatus
IBKR Market DataDataSourceAvailable
Anthropic ClaudeLLMProviderAvailable
OpenAILLMProviderAvailable
OpenRouterLLMProviderAvailable
Google AILLMProviderAvailable
Unusual WhalesDataSourceComing soon
Data BentoDataSourcePlanned
Financial Modeling PrepDataSourcePlanned