Service Proxy Packs¶
Service Proxy packs connect external services (APIs, SaaS platforms) to KruxOS through the Service Proxy safety chain: read-replica, write buffer, batch protection, and rollback.
Architecture¶
graph LR
A[Agent] --> B[Capability]
B --> C[Service Proxy]
C --> D[Read Replica]
C --> E[Write Buffer]
C --> F[Batch Protection]
C --> G[Rollback Engine]
D --> H[External Service API]
E --> H
Scaffold a proxy pack¶
This creates the standard pack structure plus proxy-specific files:
my-slack-pack/
├── pack.yaml
├── definitions/
│ └── slack.yaml # Capability definitions
├── implementations/
│ ├── slack.py # Capability implementations
│ └── proxy_adapter.py # Service Proxy adapter
├── tests/
│ └── test_slack.py
└── README.md
Implement the proxy adapter¶
The proxy adapter defines how to sync data, execute writes, and rollback:
"""Slack Service Proxy adapter."""
from kruxos.packs.proxy import ProxyAdapter, SyncResult, WriteResult
class SlackAdapter(ProxyAdapter):
"""Adapter for Slack API via Service Proxy."""
service_name = "slack"
async def sync(self) -> SyncResult:
"""Sync channels and recent messages to local replica."""
# Fetch from Slack API
channels = await self.api_client.get("conversations.list")
messages = []
for channel in channels["channels"]:
history = await self.api_client.get(
"conversations.history",
params={"channel": channel["id"], "limit": 100},
)
messages.extend(history["messages"])
# Store in local replica
await self.replica.upsert_many("channels", channels["channels"])
await self.replica.upsert_many("messages", messages)
return SyncResult(
entries_synced=len(channels["channels"]) + len(messages),
next_sync_in_seconds=120,
)
async def execute_write(self, operation: str, params: dict) -> WriteResult:
"""Execute a buffered write against the Slack API."""
if operation == "slack.send_message":
result = await self.api_client.post(
"chat.postMessage",
json={
"channel": params["channel"],
"text": params["text"],
},
)
return WriteResult(
success=True,
external_id=result["ts"],
rollback_data={"channel": params["channel"], "ts": result["ts"]},
)
async def rollback(self, operation: str, rollback_data: dict) -> bool:
"""Rollback a write operation."""
if operation == "slack.send_message":
await self.api_client.post(
"chat.delete",
json={
"channel": rollback_data["channel"],
"ts": rollback_data["ts"],
},
)
return True
return False
Define capabilities with proxy annotations¶
- name: slack.search_messages
version: "1.0"
purpose: "Searches Slack messages in the local replica."
when_to_use: |
Use slack.search_messages to find messages matching a query.
Searches the local replica — no Slack API calls.
inputs:
- name: query
type: String
required: true
description: "Search text."
- name: channel
type: String
required: false
description: "Filter to a specific channel."
outputs:
- name: messages
type: Array
description: "Matching messages."
side_effects: []
permission_tier: autonomous
tags: ["slack", "read", "safe"]
- name: slack.send_message
version: "1.0"
purpose: "Sends a message to a Slack channel. Buffered for 2 minutes."
when_to_use: |
Use slack.send_message to post a message to a channel. The send
is buffered for 2 minutes — it can be cancelled during that time.
inputs:
- name: channel
type: String
required: true
description: "Channel name or ID."
- name: text
type: String
required: true
description: "Message text."
outputs:
- name: write_id
type: String
description: "Buffered write ID for cancellation."
- name: buffer_until
type: DateTime
description: "When the message will be sent."
side_effects:
- description: "Buffered write — 2 minute delay."
reversible: true
permission_tier: notify
tags: ["slack", "write", "buffered", "cancellable"]
Configure proxy settings in pack.yaml¶
name: my-slack-pack
version: "1.0.0"
description: "Slack integration for KruxOS"
proxy:
adapter: implementations/proxy_adapter.py:SlackAdapter
sync_interval_seconds: 120
write_buffer_seconds: 120
batch_limits:
- operation: "slack.send_message"
max_per_hour: 20
escalate_to: approval_required
rollback_enabled: true
secrets:
- name: SLACK_BOT_TOKEN
description: "Slack Bot User OAuth Token (xoxb-...)"
required: true
- name: SLACK_SIGNING_SECRET
description: "Slack app signing secret"
required: true
Testing proxy packs¶
from kruxos.packs.testing import ProxyTestHarness
@pytest.fixture
def harness():
return ProxyTestHarness("my-slack-pack")
@pytest.mark.asyncio
async def test_sync(harness):
"""Test that sync populates the replica."""
await harness.trigger_sync()
result = await harness.invoke("slack.search_messages", query="hello")
assert result.success
@pytest.mark.asyncio
async def test_send_is_buffered(harness):
"""Test that sends are buffered, not immediate."""
result = await harness.invoke(
"slack.send_message",
channel="general",
text="Hello!"
)
assert result.data["write_id"] # Got a buffer ID
assert not harness.api_calls # No API call yet
@pytest.mark.asyncio
async def test_rollback(harness):
"""Test that sent messages can be rolled back."""
result = await harness.invoke(
"slack.send_message",
channel="general",
text="Oops"
)
await harness.flush_writes() # Execute buffered write
await harness.rollback(result.data["write_id"])
# Message should be deleted
Safety features inherited from the proxy¶
All Service Proxy packs automatically get:
| Feature | Description |
|---|---|
| Read replica | Reads hit local SQLite, not the external API |
| Write buffer | Writes are delayed (configurable) and cancellable |
| Batch protection | Limits on operations per hour/session |
| Rollback | Undo recent writes if the adapter implements rollback() |
| Dead letter queue | Failed writes are retried with exponential backoff |
| Disconnect | Clean disconnect revokes tokens, stops sync, deletes replica |