This guide covers all features of the nbs-ssh library in detail.
- SSHConnection
- Authentication
- SSHSupervisor
- Port Forwarding
- Automation (Expect/Respond)
- Evidence Bundles
- Event System
- Error Handling
- Cross-Platform Support
SSHConnection is the low-level async wrapper for SSH operations. It handles connection establishment, authentication, and command execution.
from nbs_ssh import SSHConnection
conn = SSHConnection(
host="example.com", # Required: SSH server hostname or IP
port=22, # SSH port (default: 22)
username="alice", # Username for authentication
auth=auth_config, # AuthConfig or list of AuthConfigs
known_hosts="~/.ssh/known_hosts", # Path to known_hosts (None to disable)
connect_timeout=30.0, # Connection timeout in seconds
keepalive=keepalive_config, # Optional KeepaliveConfig
event_collector=collector, # Optional EventCollector for in-memory events
event_log_path="session.jsonl", # Optional JSONL file for event persistence
)Wait for a command to complete and get all output at once:
async with SSHConnection(...) as conn:
result = await conn.exec("ls -la /var/log")
print(result.stdout) # Standard output
print(result.stderr) # Standard error
print(result.exit_code) # Exit code (0 = success)Get output as it arrives, useful for long-running commands:
async with SSHConnection(...) as conn:
async for event in conn.stream_exec("tail -f /var/log/syslog"):
if event.stream == "stdout":
print(event.data, end="")
elif event.stream == "stderr":
print(f"[ERR] {event.data}", end="")
elif event.stream == "exit":
print(f"Exit: {event.exit_code}")nbs-ssh supports multiple authentication methods with automatic fallback.
from nbs_ssh import AuthConfig, AuthMethod
# Password authentication
auth = AuthConfig(method=AuthMethod.PASSWORD, password="secret")
# Private key authentication
auth = AuthConfig(
method=AuthMethod.PRIVATE_KEY,
key_path="~/.ssh/id_ed25519",
passphrase="key-passphrase", # Optional, for encrypted keys
)
# SSH agent authentication
auth = AuthConfig(method=AuthMethod.SSH_AGENT)Convenience functions for common authentication patterns:
from nbs_ssh import (
create_password_auth,
create_key_auth,
create_agent_auth,
get_agent_available,
)
# Password
auth = create_password_auth("my-password")
# Private key (with optional passphrase)
auth = create_key_auth("~/.ssh/id_rsa", passphrase="secret")
# SSH agent
if get_agent_available():
auth = create_agent_auth()Provide a list of AuthConfigs to try in order:
auth_configs = [
create_agent_auth(), # Try agent first
create_key_auth("~/.ssh/id_ed25519"), # Then key
create_password_auth("backup-password"), # Finally password
]
async with SSHConnection("host", username="alice", auth=auth_configs) as conn:
# Library tries each method until one succeeds
await conn.exec("whoami")The legacy parameters are still supported for backwards compatibility:
# Password (legacy)
async with SSHConnection("host", username="alice", password="secret") as conn:
...
# Key list (legacy)
async with SSHConnection(
"host",
username="alice",
client_keys=["~/.ssh/id_rsa", "~/.ssh/id_ed25519"]
) as conn:
...SSHSupervisor wraps SSHConnection with automatic reconnection, state management, and forward replay.
- Long-running scripts that need connection resilience
- Applications with port forwards that must survive reconnection
- Any scenario where transient network issues shouldn't cause failures
from nbs_ssh import SSHSupervisor, RetryPolicy
supervisor = SSHSupervisor(
host="example.com",
port=22,
username="alice",
auth=auth_config,
known_hosts="~/.ssh/known_hosts",
connect_timeout=30.0,
keepalive=keepalive_config,
event_collector=collector,
event_log_path="session.jsonl",
retry_policy=RetryPolicy( # Reconnection behaviour
max_retries=5,
base_delay_sec=2.0,
max_delay_sec=60.0,
exponential_base=2.0,
jitter=True,
),
)from nbs_ssh import ConnectionState
# Available states:
# - DISCONNECTED: Not connected
# - CONNECTING: Initial connection in progress
# - CONNECTED: Connected and operational
# - RECONNECTING: Lost connection, attempting to reconnect
# - FAILED: Permanent failure (auth failed or max retries exceeded)
print(supervisor.state) # Current state
print(supervisor.is_connected) # True if CONNECTED
print(supervisor.reconnection_count) # Number of reconnectionsasync with SSHSupervisor(...) as supervisor:
# Wait up to 60 seconds for connection
connected = await supervisor.wait_connected(timeout=60.0)
if connected:
result = await supervisor.exec("uptime")
else:
print("Could not establish connection")Control how reconnection attempts are made:
from nbs_ssh import RetryPolicy
# Aggressive retry (many attempts, short delays)
aggressive = RetryPolicy(
max_retries=10,
base_delay_sec=0.5,
max_delay_sec=30.0,
)
# Conservative retry (fewer attempts, longer delays)
conservative = RetryPolicy(
max_retries=3,
base_delay_sec=5.0,
max_delay_sec=120.0,
)
# No automatic retry
no_retry = RetryPolicy(max_retries=0)The delay formula is: min(base_delay * (exponential_base ^ attempt), max_delay)
With jitter enabled, the delay is multiplied by a random factor between 1.0 and 1.25.
- Transient (auto-retry):
ConnectionRefused,ConnectionTimeout,HostUnreachable - Permanent (no retry):
AuthFailed,HostKeyMismatch,NoMutualKex
nbs-ssh supports all SSH port forwarding types with automatic replay on reconnection.
Forward a local port to a remote destination via SSH:
async with SSHSupervisor(...) as supervisor:
# Traffic to localhost:3306 goes to database.internal:3306
handle = await supervisor.forward_local(
local_port=3306,
remote_host="database.internal",
remote_port=3306,
local_host="localhost", # Optional, default
)
print(f"Forward established on port {handle.local_port}")
# Use the forward...
await handle.close()Expose a local service to the remote server:
async with SSHSupervisor(...) as supervisor:
# Remote server can access localhost:8080 via its own port 8080
handle = await supervisor.forward_remote(
remote_port=8080,
local_host="localhost",
local_port=8080,
remote_host="", # Bind to all interfaces on remote
)Create a SOCKS proxy for tunnelling arbitrary traffic:
async with SSHSupervisor(...) as supervisor:
# SOCKS proxy on localhost:1080
handle = await supervisor.forward_dynamic(
local_port=1080,
local_host="localhost",
)
# Configure applications to use SOCKS proxy at localhost:1080When using SSHSupervisor, all port forwards are automatically re-established after reconnection:
async with SSHSupervisor(...) as supervisor:
# Establish forwards
db = await supervisor.forward_local(3306, "db.server", 3306)
web = await supervisor.forward_local(8080, "web.server", 80)
# If connection drops and reconnects, forwards are replayed
# Check active forwards
active = supervisor.forward_manager.active_forwards
intents = supervisor.forward_manager.intentshandle.intent # ForwardIntent describing the forward
handle.local_port # Actual bound port (may differ if 0 was requested)
handle.is_active # True if forward is currently activeThe automation engine enables interaction with interactive command-line programs.
Wait for specific output before continuing:
from nbs_ssh import AutomationEngine, ExpectPattern
async with SSHConnection(...) as conn:
stream = conn.stream_exec("mysql -u root -p")
engine = AutomationEngine(stream)
# Wait for password prompt
result = await engine.expect("Enter password: ", timeout=10.0)
if result.matched:
print(f"Found prompt in {result.duration_ms}ms")
elif result.timed_out:
print(f"Timeout! Buffer: {result.buffer}")# Send text followed by newline
await engine.send("my-password")
# Send without newline
await engine.send("partial", add_newline=False)Combine expect and send in one call:
result = await engine.expect_respond(
pattern="Enter password: ",
response="secret-password",
timeout=10.0,
delay=0.5, # Wait before sending response
)from nbs_ssh import ExpectPattern, PatternType
# Literal match (exact substring)
literal = ExpectPattern("Password: ", pattern_type=PatternType.LITERAL)
# Regex match
regex = ExpectPattern(r"Port (\d+)", pattern_type=PatternType.REGEX)
result = await engine.expect(regex)
if result.matched:
port = result.groups[0] # Captured groupRun multiple expect/respond pairs:
from nbs_ssh import ExpectRespond, RespondAction, RespondDelay
sequence = [
ExpectRespond(
pattern=ExpectPattern("Username: "),
response=RespondAction("alice"),
),
ExpectRespond(
pattern=ExpectPattern("Password: "),
response=RespondAction("secret"),
delay=RespondDelay(0.5),
),
ExpectRespond(
pattern=ExpectPattern(r".*\$", pattern_type=PatternType.REGEX),
response=RespondAction(""), # No response needed
),
]
results = await engine.run_sequence(sequence)
for result in results:
print(f"Matched: {result.pattern.pattern}")The automation engine maintains a complete transcript of all interactions:
transcript = engine.transcript
# Iterate entries
for entry in transcript.entries:
print(f"{entry.interaction_type}: {entry.content[:50]}")
# Export to JSONL
transcript.to_file("interaction.jsonl")
# Get as dict
data = transcript.to_dict()Evidence bundles are diagnostic packages containing everything needed to debug SSH issues.
from nbs_ssh import SSHConnection, EventCollector
collector = EventCollector()
try:
async with SSHConnection(
"example.com",
username="alice",
auth=auth,
event_collector=collector,
) as conn:
await conn.exec("command")
except Exception as e:
# Capture evidence on failure
bundle = conn.get_evidence_bundle()
bundle.to_file("debug.json")An evidence bundle includes:
- events: All JSONL events from the session
- transcript: Automation transcript (if provided)
- algorithms: Negotiated SSH algorithms (KEX, ciphers, MACs)
- timing: Connection and authentication timing
- host_info: Target host details
- disconnect_reason: Why the connection ended
- error_context: Additional error details
# JSON (single file, complete bundle)
bundle.to_file("session.json", format="json")
# JSONL (streaming format, one JSON object per line)
bundle.to_file("session.jsonl", format="jsonl")By default, bundles redact sensitive information:
# Redacted (safe for sharing)
bundle.to_file("debug.json", redact=True)
# Unredacted (for internal debugging only)
bundle.to_file("debug_raw.json", redact=False)Redacted items include:
- Passwords and passphrases
- Private key contents
- Long base64 strings
- IP addresses (partially)
from nbs_ssh import EvidenceBundle
bundle = EvidenceBundle.from_file("session.json")
print(f"Disconnect reason: {bundle.disconnect_reason}")
print(f"Connection took: {bundle.timing.connect_duration_ms}ms")
print(f"KEX algorithm: {bundle.algorithms.kex}")
for event in bundle.events:
print(f"{event.event_type}: {event.data}")nbs-ssh uses structured JSONL events for AI-inspectable logging.
from nbs_ssh import EventType
# Available event types:
EventType.CONNECT # Connection initiated/established
EventType.AUTH # Authentication attempt/result
EventType.EXEC # Command execution
EventType.DISCONNECT # Connection closed
EventType.ERROR # Error occurred
EventType.KEEPALIVE_SENT # Keepalive sent
EventType.KEEPALIVE_RECEIVED # Keepalive response
EventType.KEEPALIVE_TIMEOUT # Keepalive failed
EventType.PROGRESS_WARNING # Application progress warning
EventType.STATE_CHANGE # Supervisor state change
EventType.FORWARD # Port forward eventfrom nbs_ssh import EventCollector
collector = EventCollector()
async with SSHConnection(..., event_collector=collector) as conn:
await conn.exec("whoami")
# Access collected events
for event in collector.events:
print(f"{event.event_type}: {event.data}")
# Filter by type
auth_events = collector.get_by_type(EventType.AUTH)async with SSHConnection(
...,
event_log_path="session.jsonl"
) as conn:
await conn.exec("whoami")
# session.jsonl contains one JSON event per lineEach event contains:
{
"event_type": "EXEC",
"timestamp": 1234567890.123,
"data": {
"command": "ls -la",
"streaming": false,
"duration_ms": 123.4,
"exit_code": 0,
"stdout_len": 1024,
"stderr_len": 0
}
}nbs-ssh provides a structured exception hierarchy for programmatic error handling.
SSHError (base)
├── SSHConnectionError
│ ├── ConnectionRefused # Server refused connection
│ ├── ConnectionTimeout # Connection timed out
│ └── HostUnreachable # Network unreachable
└── AuthenticationError
├── AuthFailed # Credentials rejected
├── HostKeyMismatch # Host key verification failed
├── NoMutualKex # No compatible algorithms
├── KeyLoadError # Cannot load private key
└── AgentError # SSH agent error
from nbs_ssh import (
ConnectionRefused,
ConnectionTimeout,
AuthFailed,
HostKeyMismatch,
KeyLoadError,
SSHError,
)
try:
async with SSHConnection(...) as conn:
await conn.exec("command")
except ConnectionRefused:
print("Server refused connection - check host/port")
except ConnectionTimeout:
print("Connection timed out - check network")
except AuthFailed:
print("Authentication failed - check credentials")
except HostKeyMismatch:
print("Host key mismatch - potential security issue!")
except KeyLoadError as e:
print(f"Cannot load key: {e}")
except SSHError as e:
print(f"SSH error ({e.error_type}): {e}")All errors carry contextual information:
try:
async with SSHConnection(...) as conn:
...
except SSHError as e:
ctx = e.context
print(f"Host: {ctx.host}:{ctx.port}")
print(f"Username: {ctx.username}")
print(f"Original error: {ctx.original_error}")nbs-ssh handles platform differences automatically.
from nbs_ssh import discover_keys, get_default_key_paths
# Find all available SSH keys
keys = discover_keys()
# Get default key paths (~/.ssh/id_ed25519, etc.)
defaults = get_default_key_paths()from nbs_ssh import get_agent_available
if get_agent_available():
print("SSH agent is available")from nbs_ssh import (
is_windows,
get_ssh_dir,
get_known_hosts_path,
expand_path,
)
# ~/.ssh on Unix, %USERPROFILE%\.ssh on Windows
ssh_dir = get_ssh_dir()
# Expand ~ and environment variables
expanded = expand_path("~/.ssh/id_rsa")
# Check platform
if is_windows():
# Handle Windows-specific paths
...Configure SSH-level keepalive for connection health monitoring:
from nbs_ssh import SSHConnection, KeepaliveConfig
keepalive = KeepaliveConfig(
interval_sec=30.0, # Send keepalive every 30 seconds
max_count=3, # Disconnect after 3 missed responses
progress_timeout_sec=60.0, # App-level timeout (no output for 60s)
)
async with SSHConnection(..., keepalive=keepalive) as conn:
# Connection will be monitored for health
...For application-level freeze detection:
from nbs_ssh import ProgressWatchdog
watchdog = ProgressWatchdog(
timeout_sec=60.0,
on_timeout=lambda: print("Application appears frozen!"),
)
watchdog.start()
try:
async for event in conn.stream_exec("long_command"):
watchdog.progress() # Reset timer on each output
process(event.data)
finally:
watchdog.stop()