Extending BrokerBridge with plugins
BrokerBridge uses a plugin system for data sources and LLM providers. Add custom signal providers or alternative AI models by implementing one of two abstract base classes.
Architecture overview
DataSourcePlugin
Provides trading signals. The pipeline calls fetch_signals() during each scan interval to get ranked signals.
Examples: IBKR market data, Unusual Whales, custom scanner
LLMProviderPlugin
Evaluates trade proposals. The decision engine calls generate_decision() and expects an approve/reject decision.
Examples: Anthropic, OpenAI, OpenRouter, Google AI
Both types support three auth modes: api_key, oauth, and connection.
DataSourcePlugin interface
python
class DataSourcePlugin(ABC):
name: str # Unique plugin identifier
display_name: str # Human-readable name
auth_type: AuthType # "api_key" | "oauth" | "connection"
required: bool = False # System won't start without it if True
async def validate_credentials(self) -> bool: ...
async def fetch_signals(self, symbols: list[str]) -> list[Signal]: ...
async def health_check(self) -> PluginStatus: ...LLMProviderPlugin interface
python
class LLMProviderPlugin(ABC):
name: str # Unique plugin identifier
display_name: str # Human-readable name
auth_type: AuthType # "api_key" | "oauth"
models: list[str] = [] # Supported model IDs
async def validate_credentials(self) -> bool: ...
async def generate_decision(self, proposal: TradeProposal) -> AiTradeDecision: ...
async def health_check(self) -> PluginStatus: ...Tutorial: custom data source
my_scanner_plugin.py
from datetime import datetime, timezone
import httpx
from brokerbridge.plugins.base import DataSourcePlugin
from brokerbridge.plugins.types import AuthType, PluginStatus, Signal
class MyScannerPlugin(DataSourcePlugin):
name = "my_scanner"
display_name = "My Custom Scanner"
auth_type: AuthType = "api_key"
def __init__(self, api_key: str):
self._api_key = api_key
async def validate_credentials(self) -> bool:
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://api.example.com/ping",
headers={"Authorization": f"Bearer {self._api_key}"},
)
return resp.status_code == 200
async def fetch_signals(self, symbols: list[str]) -> list[Signal]:
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.example.com/scan",
headers={"Authorization": f"Bearer {self._api_key}"},
json={"symbols": symbols},
)
resp.raise_for_status()
data = resp.json()
signals = []
for item in data.get("signals", []):
signals.append(Signal(
symbol=item["symbol"],
direction=item["direction"],
score=item["score"],
timestamp=datetime.now(timezone.utc),
source=self.name,
))
return sorted(signals, key=lambda s: s.score, reverse=True)
async def health_check(self) -> PluginStatus:
try:
valid = await self.validate_credentials()
return PluginStatus(connected=valid)
except Exception as e:
return PluginStatus(connected=False, message=str(e))Available plugins
| Plugin | Type | Status |
|---|---|---|
| IBKR Market Data | DataSource | Available |
| Anthropic Claude | LLMProvider | Available |
| OpenAI | LLMProvider | Available |
| OpenRouter | LLMProvider | Available |
| Google AI | LLMProvider | Available |
| Unusual Whales | DataSource | Coming soon |
| Data Bento | DataSource | Planned |
| Financial Modeling Prep | DataSource | Planned |