SDK Client Reference
Detailed API reference for the Ripperdoc Python SDK
RipperdocSDKClient
The main class for session-based interactions with the Ripperdoc CLI.
Constructor
from ripperdoc_agent_sdk import RipperdocSDKClient, RipperdocAgentOptions
options = RipperdocAgentOptions(model="main")
client = RipperdocSDKClient(options=options)Parameters
The client accepts a RipperdocAgentOptions object for configuration. See RipperdocAgentOptions for all available options.
Methods
connect
Connect to the Ripperdoc CLI subprocess and initialize the session.
async def connect(prompt: Optional[str] = None) -> NoneParameters:
prompt: Optional prompt to send after connecting
Example:
await client.connect()
# or
await client.connect(prompt="Hello, Ripperdoc!")disconnect
Close the subprocess connection and clean up resources.
async def disconnect() -> NoneExample:
try:
await client.connect()
# ... do work ...
finally:
await client.disconnect()query
Send a prompt and start streaming the response.
async def query(prompt: str, session_id: str = "default") -> NoneParameters:
prompt: The prompt to sendsession_id: Session identifier for the conversation (default: "default")
Raises:
RuntimeError: If a query is already in progress
Example:
await client.query("What files are in this project?")receive_messages
Yield messages for the active query.
async def receive_messages() -> AsyncIterator[Message]Yields:
Message: Messages from the conversation
Message Types:
UserMessage: User message with content blocksAssistantMessage: Assistant response with content blocksSystemMessage: System information (init, errors, etc.)ResultMessage: Final result with usage information
Example:
async for message in client.receive_messages():
if message.type == "assistant":
for block in message.content:
if block.type == "text":
print(block.text)
elif message.type == "result":
print(f"Done! Usage: {message.result.usage}")receive_response
Yield messages until and including a ResultMessage.
async def receive_response() -> AsyncIterator[Message]This is a convenience wrapper around receive_messages() that automatically terminates after the ResultMessage.
Example:
async for message in client.receive_response():
print(message)
# Automatically stops after ResultMessageinterrupt
Request cancellation of the active query.
async def interrupt() -> NoneExample:
task = asyncio.create_task(client.query("Long task..."))
# ... later ...
await client.interrupt()set_permission_mode
Change permission mode during conversation.
async def set_permission_mode(mode: PermissionMode) -> NoneParameters:
mode: Permission mode ("default","acceptEdits","bypassPermissions","plan")
Raises:
ValueError: If the mode is invalid
Example:
await client.set_permission_mode("acceptEdits")set_model
Change the AI model during conversation.
async def set_model(model: Optional[str]) -> NoneParameters:
model: The model to use, or None to use default
Example:
await client.set_model("sonnet")get_server_info
Get server initialization information.
async def get_server_info() -> Optional[Dict[str, Any]]Returns:
- Dictionary with server info, or None if not connected
Example:
info = await client.get_server_info()
if info:
print(f"Session: {info['session_id']}")
print(f"Model: {info['model']}")Properties
session_id
The current session ID.
session_id: Optional[str]turn_count
The number of turns in the current session.
turn_count: intuser
The user identifier for this session.
user: Optional[str]history
The conversation history.
history: List[Any]Context Manager
Use as an async context manager for automatic cleanup:
async with RipperdocSDKClient(options=options) as client:
await client.query("Hello!")
async for message in client.receive_messages():
print(message)
# Automatically disconnectsquery Function
For one-shot queries without session management.
async def query(
*,
prompt: Union[str, AsyncIterable[dict[str, Any]]],
options: Optional[RipperdocAgentOptions] = None,
) -> AsyncIterator[Message]Parameters:
prompt: The prompt (string) or async iterable of content blocksoptions: Optional configuration (defaults toRipperdocAgentOptions()if None)
Yields:
Message: Messages from the conversation
Example:
from ripperdoc_agent_sdk import query, RipperdocAgentOptions
options = RipperdocAgentOptions(model="main")
async for message in query(
prompt="What is the capital of France?",
options=options
):
if message.type == "assistant":
for block in message.content:
if block.type == "text":
print(block.text)RipperdocAgentOptions
Configuration for SDK behavior.
Constructor
options = RipperdocAgentOptions(
# Model configuration
model: str = "main",
max_thinking_tokens: int = 0,
# Permission mode
permission_mode: PermissionMode = "default",
# Tool configuration
allowed_tools: Optional[Sequence[str]] = None,
disallowed_tools: Optional[Sequence[str]] = None,
# Behavior settings
verbose: bool = False,
max_turns: Optional[int] = None,
query_timeout: float = 300.0,
# Context
cwd: Optional[Union[str, Path]] = None,
context: Dict[str, str] = {},
system_prompt: Optional[str] = None,
additional_instructions: Optional[Union[str, Sequence[str]]] = None,
# Session management
resume: Optional[str] = None,
continue_conversation: bool = False,
fork_session: bool = False,
# Custom permissions
permission_checker: Optional[Callable] = None,
# MCP servers
mcp_servers: Optional[Dict[str, McpServerConfig]] = None,
# Programmatic agents
agents: Optional[Dict[str, AgentConfig]] = None,
# Programmatic hooks
hooks: Optional[Dict[str, List[HookMatcher]]] = None,
# Environment
env: Optional[Dict[str, str]] = None,
additional_directories: Optional[List[str]] = None,
# Other
cli_path: Optional[str] = None,
stderr: Optional[StderrCallback] = None,
include_partial_messages: bool = False,
)Option Reference
| Option | Type | Default | Description |
|---|---|---|---|
model | str | "main" | Model profile to use |
max_thinking_tokens | int | 0 | Max tokens for thinking mode (0 = disabled) |
permission_mode | str | "default" | Permission mode |
allowed_tools | List[str] | None | Whitelist of tools |
disallowed_tools | List[str] | None | Blacklist of tools |
verbose | bool | False | Enable verbose output |
max_turns | int | None | Max conversation turns (None = unlimited) |
query_timeout | float | 300.0 | Query timeout in seconds |
cwd | str/Path | None | Working directory |
context | Dict[str, str] | {} | Custom context variables |
system_prompt | str | None | Override system prompt |
additional_instructions | str/List[str] | None | Additional instructions |
resume | str | None | Session ID to resume |
continue_conversation | bool | False | Continue most recent conversation |
fork_session | bool | False | Create branch when resuming |
permission_checker | Callable | None | Custom permission checker |
mcp_servers | Dict[str, McpServerConfig] | None | MCP server configurations |
agents | Dict[str, AgentConfig] | None | Programmatic agents |
hooks | Dict[str, List[HookMatcher]] | None | Programmatic hooks |
env | Dict[str, str] | None | Environment variables |
cli_path | str | None | Path to CLI executable |
McpServerConfig
Configuration for an MCP server.
Constructor
from ripperdoc_agent_sdk import McpServerConfig
config = McpServerConfig(
type: str = "stdio", # "stdio", "sse", or "http"
command: Optional[str] = None, # Command for stdio servers
args: Optional[List[str]] = None, # Arguments for stdio servers
url: Optional[str] = None, # URL for SSE/HTTP servers
env: Optional[Dict[str, str]] = None, # Environment variables
headers: Optional[Dict[str, str]] = None, # Headers for SSE/HTTP
description: Optional[str] = None, # Server description
instructions: Optional[str] = None, # Server instructions
)AgentConfig
Programmatic configuration for a subagent.
Constructor
from ripperdoc_agent_sdk import AgentConfig
agent = AgentConfig(
description: str, # Description of when to use this agent
prompt: str, # System prompt for the agent
tools: Optional[List[str]] = None, # Tools available (["*"] for all)
model: Optional[str] = None, # Model: "sonnet", "opus", "haiku", or None to inherit
color: Optional[str] = None, # Display color
fork_context: bool = False, # Whether to fork context
)HookMatcher
Matcher configuration for a programmatic hook.
Constructor
from ripperdoc_agent_sdk import HookMatcher
matcher = HookMatcher(
callback: HookCallback, # Callback function
tool_pattern: Optional[str] = None, # Tool name pattern
)Hook Callback Signature
async def hook_callback(event: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Return modified input or other hook response
return {}Permission Types
PermissionMode
from ripperdoc_agent_sdk import PermissionMode
# Valid modes:
PermissionMode.DEFAULT # "default" - Prompt for dangerous tools
PermissionMode.ACCEPT_EDITS # "acceptEdits" - Auto-accept file edits
PermissionMode.BYPASS_PERMISSIONS # "bypassPermissions" - Allow all
PermissionMode.PLAN # "plan" - Planning mode, no executionPermissionResultAllow
from ripperdoc_agent_sdk import PermissionResultAllow
# Allow with optional updated input
return PermissionResultAllow(updated_input={...})PermissionResultDeny
from ripperdoc_agent_sdk import PermissionResultDeny
# Deny with optional message and interrupt
return PermissionResultDeny(message="Reason here", interrupt=True)Message Types
Message
Base type for all messages. Union of:
UserMessageAssistantMessageSystemMessageResultMessageStreamEvent
UserMessage
class UserMessage:
type: Literal["user"]
content: List[ContentBlock]
role: Literal["user"]AssistantMessage
class AssistantMessage:
type: Literal["assistant"]
content: List[ContentBlock]
role: Literal["assistant"]SystemMessage
class SystemMessage:
type: Literal["system"]
subtype: str # "init", "error", etc.
data: Dict[str, Any]
role: Literal["system"]ResultMessage
class ResultMessage:
type: Literal["result"]
result: ResultData
class ResultData:
status: Literal["success", "error"]
usage: Optional[UsageInfo]
error: Optional[str]
class UsageInfo:
input_tokens: int
output_tokens: int
cache_creation_tokens: Optional[int]
cache_read_tokens: Optional[int]
cost_usd: Optional[float]
duration_ms: Optional[float]ContentBlock
Union of:
TextBlockThinkingBlockToolUseBlockToolResultBlock
TextBlock
class TextBlock:
type: Literal["text"]
text: strThinkingBlock
class ThinkingBlock:
type: Literal["thinking"]
thinking: strToolUseBlock
class ToolUseBlock:
type: Literal["tool_use"]
id: str
name: str
input: Dict[str, Any]ToolResultBlock
class ToolResultBlock:
type: Literal["tool_result"]
tool_use_id: str
content: Union[str, List[ContentBlock]]
is_error: Optional[bool]Error Types
SDKError
Base class for all SDK errors.
CLIConnectionError
Raised when connection to CLI fails.
CLINotFoundError
Raised when the CLI executable is not found.
ProcessError
Raised when the subprocess encounters an error.
CLIJSONDecodeError
Raised when JSON response from CLI cannot be decoded.
MessageParseError
Raised when a message cannot be parsed.
Examples
Basic Query
from ripperdoc_agent_sdk import query
async for message in query(prompt="What is 2 + 2?"):
if message.type == "assistant":
for block in message.content:
if block.type == "text":
print(block.text) # "4"With Options
from ripperdoc_agent_sdk import RipperdocSDKClient, RipperdocAgentOptions
options = RipperdocAgentOptions(
model="sonnet",
permission_mode="acceptEdits",
allowed_tools=["Read", "Glob", "Edit"]
)
async with RipperdocSDKClient(options=options) as client:
await client.query("List all Python files")
async for message in client.receive_messages():
if message.type == "assistant":
for block in message.content:
if block.type == "text":
print(block.text)Custom Permission Checker
from ripperdoc_agent_sdk import (
RipperdocAgentOptions,
PermissionResultAllow,
PermissionResultDeny
)
async def my_checker(tool_name, tool_input, context):
if tool_name == "Bash" and "rm" in tool_input.get("command", ""):
return PermissionResultDeny(message="No deletion allowed!")
return PermissionResultAllow()
options = RipperdocAgentOptions(
permission_checker=my_checker
)With MCP Servers
from ripperdoc_agent_sdk import RipperdocAgentOptions, McpServerConfig
options = RipperdocAgentOptions(
mcp_servers={
"filesystem": McpServerConfig(
type="stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/path"]
)
}
)SDK MCP Server (In-Process Tools)
from ripperdoc_agent_sdk import tool, create_sdk_mcp_server, query
@tool("calculate", "Perform calculations", {"expression": str})
async def calculate(args):
try:
result = eval(args["expression"])
return {"content": [{"type": "text", "text": str(result)}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"Error: {e}"}], "is_error": True}
server = create_sdk_mcp_server(name="calc", tools=[calculate])
async for message in query(
prompt="What is 123 * 456?",
options=RipperdocAgentOptions(mcp_servers={"calc": server})
):
if message.type == "assistant":
for block in message.content:
if block.type == "text":
print(block.text)Error Handling
from ripperdoc_agent_sdk import (
RipperdocSDKClient,
RipperdocAgentOptions,
CLINotFoundError,
SDKError
)
try:
client = RipperdocSDKClient()
await client.connect()
await client.query("Do something")
async for message in client.receive_messages():
print(message)
except CLINotFoundError:
print("Ripperdoc CLI not found!")
except SDKError as e:
print(f"SDK error: {e}")
finally:
await client.disconnect()Session Management
from ripperdoc_agent_sdk import RipperdocSDKClient, RipperdocAgentOptions
options = RipperdocAgentOptions(max_turns=10)
async with RipperdocSDKClient(options=options) as client:
print(f"Session ID: {client.session_id}")
# First query
await client.query("What files are here?")
async for msg in client.receive_messages():
pass
print(f"Turn count: {client.turn_count}")
# Second query
await client.query("Read main.py")
async for msg in client.receive_messages():
pass
print(f"Final turn count: {client.turn_count}")