Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cecli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,24 @@ def get_parser(default_config_files, git_root):
" (default: False)"
),
)
group.add_argument(
"--session-encrypt",
action=argparse.BooleanOptionalAction,
default=False,
help=(
"Encrypt saved sessions on disk (AES-256-GCM). Requires CECLI_SESSION_KEY or"
" --session-key-file (default: False)"
),
)
group.add_argument(
"--session-key-file",
metavar="SESSION_KEY_FILE",
default=None,
help=(
"File containing a urlsafe-base64 32-byte session encryption key"
" (default: use CECLI_SESSION_KEY only)"
),
).complete = shtab.FILE
group.add_argument(
"--mcp-servers",
metavar="MCP_CONFIG_JSON",
Expand Down
110 changes: 110 additions & 0 deletions cecli/session_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Optional AES-256-GCM encryption for on-disk cecli session files."""

from __future__ import annotations

import base64
import json
import os
from pathlib import Path
from typing import Any

MAGIC = b"CECLI_ENCRYPTED_SESSION_v1\n"
KEY_ENV = "CECLI_SESSION_KEY"
KEY_BYTES = 32


class SessionCryptoError(Exception):
"""Session encrypt/decrypt failed."""


def is_encrypted_payload(data: bytes) -> bool:
return data.startswith(MAGIC)


def resolve_key(*, key_file: str | Path | None = None) -> bytes | None:
"""Load a 32-byte key from CECLI_SESSION_KEY (urlsafe base64) or a key file."""
raw = os.environ.get(KEY_ENV, "").strip()
if raw:
key = _decode_key_b64(raw)
if key is not None:
return key
if key_file:
path = Path(key_file).expanduser()
if path.is_file():
text = path.read_text(encoding="utf-8").strip()
key = _decode_key_b64(text)
if key is not None:
return key
return None


def _decode_key_b64(text: str) -> bytes | None:
try:
padded = text + "=" * (-len(text) % 4)
key = base64.urlsafe_b64decode(padded.encode("ascii"))
except (ValueError, UnicodeEncodeError):
return None
if len(key) != KEY_BYTES:
return None
return key


def encrypt_session_dict(session_data: dict[str, Any], key: bytes) -> bytes:
if len(key) != KEY_BYTES:
raise SessionCryptoError(f"Session key must be {KEY_BYTES} bytes.")
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
except ImportError as err:
raise SessionCryptoError(
"Session encryption requires the cryptography package (pip install cryptography)."
) from err

plaintext = json.dumps(session_data, ensure_ascii=False).encode("utf-8")
nonce = os.urandom(12)
ciphertext = AESGCM(key).encrypt(nonce, plaintext, None)
payload = base64.urlsafe_b64encode(nonce + ciphertext).decode("ascii")
return MAGIC + payload.encode("ascii") + b"\n"


def decrypt_session_bytes(data: bytes, key: bytes) -> dict[str, Any]:
if len(key) != KEY_BYTES:
raise SessionCryptoError(f"Session key must be {KEY_BYTES} bytes.")
if not is_encrypted_payload(data):
try:
parsed = json.loads(data.decode("utf-8"))
except json.JSONDecodeError as err:
raise SessionCryptoError("Invalid session file (not JSON).") from err
if not isinstance(parsed, dict):
raise SessionCryptoError("Invalid session format.")
return parsed

body = data[len(MAGIC) :].strip()
if not body:
raise SessionCryptoError("Encrypted session file is empty.")
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
except ImportError as err:
raise SessionCryptoError(
"Session encryption requires the cryptography package (pip install cryptography)."
) from err

try:
blob = base64.urlsafe_b64decode(body + b"=" * (-len(body) % 4))
except ValueError as err:
raise SessionCryptoError("Encrypted session payload is invalid.") from err
if len(blob) < 13:
raise SessionCryptoError("Encrypted session payload is too short.")
nonce, ciphertext = blob[:12], blob[12:]
try:
plaintext = AESGCM(key).decrypt(nonce, ciphertext, None)
except Exception as err:
raise SessionCryptoError(
"Could not decrypt session (wrong key or corrupted file)."
) from err
try:
parsed = json.loads(plaintext.decode("utf-8"))
except json.JSONDecodeError as err:
raise SessionCryptoError("Decrypted session is not valid JSON.") from err
if not isinstance(parsed, dict):
raise SessionCryptoError("Invalid session format.")
return parsed
101 changes: 89 additions & 12 deletions cecli/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Dict, List, Optional

from cecli import models
from cecli import models, session_crypto
from cecli.helpers.conversation import ConversationService, MessageTag


Expand All @@ -22,6 +22,65 @@ def _get_session_directory(self) -> Path:
os.makedirs(session_dir, exist_ok=True)
return session_dir

def _session_encrypt_settings(self) -> tuple[bool, bytes | None]:
args = getattr(self.coder, "args", None)
if not args or not getattr(args, "session_encrypt", False):
return False, None
key_file = getattr(args, "session_key_file", None)
return True, session_crypto.resolve_key(key_file=key_file)

def _read_session_file(self, session_file: Path) -> dict | None:
try:
data = session_file.read_bytes()
except OSError as e:
self.io.tool_error(f"Error reading session: {e}")
return None
try:
if session_crypto.is_encrypted_payload(data):
args = getattr(self.coder, "args", None)
key_file = getattr(args, "session_key_file", None) if args else None
key = session_crypto.resolve_key(key_file=key_file)
if not key:
self.io.tool_error(
"Session is encrypted but no key is configured "
f"({session_crypto.KEY_ENV} or --session-key-file)."
)
return None
return session_crypto.decrypt_session_bytes(data, key)
parsed = json.loads(data.decode("utf-8"))
if not isinstance(parsed, dict):
self.io.tool_error("Invalid session format.")
return None
return parsed
except session_crypto.SessionCryptoError as e:
self.io.tool_error(str(e))
return None
except json.JSONDecodeError as e:
self.io.tool_error(f"Error loading session: {e}")
return None

def _write_session_file(self, session_file: Path, session_data: dict) -> bool:
encrypt_enabled, key = self._session_encrypt_settings()
try:
if encrypt_enabled:
if not key:
self.io.tool_error(
"Session encryption is enabled but no key is configured "
f"({session_crypto.KEY_ENV} or --session-key-file)."
)
return False
session_file.write_bytes(session_crypto.encrypt_session_dict(session_data, key))
else:
with open(session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f, indent=2)
return True
except session_crypto.SessionCryptoError as e:
self.io.tool_error(str(e))
return False
except OSError as e:
self.io.tool_error(f"Error saving session: {e}")
return False

def save_session(self, session_name: str, output=True) -> bool:
"""Save the current chat session to a named file."""
if not session_name:
Expand All @@ -39,11 +98,12 @@ def save_session(self, session_name: str, output=True) -> bool:

try:
session_data = self._build_session_data(session_name)
with open(session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f, indent=2)
if not self._write_session_file(session_file, session_data):
return False

if output:
self.io.tool_output(f"Session saved: {session_file}")
suffix = " (encrypted)" if self._session_encrypt_settings()[0] else ""
self.io.tool_output(f"Session saved: {session_file}{suffix}")

return True

Expand All @@ -63,8 +123,27 @@ def list_sessions(self) -> List[Dict]:
sessions = []
for session_file in sorted(session_files, key=lambda x: x.stat().st_mtime, reverse=True):
try:
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
raw = session_file.read_bytes()
if session_crypto.is_encrypted_payload(raw):
_, key = self._session_encrypt_settings()
if not key:
sessions.append(
{
"name": session_file.stem,
"file": session_file,
"model": "encrypted",
"edit_format": "—",
"num_messages": 0,
"num_files": 0,
"encrypted": True,
}
)
continue
session_data = session_crypto.decrypt_session_bytes(raw, key)
else:
session_data = json.loads(raw.decode("utf-8"))
if not isinstance(session_data, dict):
raise ValueError("not a session object")

session_info = {
"name": session_file.stem,
Expand All @@ -80,6 +159,7 @@ def list_sessions(self) -> List[Dict]:
+ len(session_data.get("files", {}).get("read_only", []))
+ len(session_data.get("files", {}).get("read_only_stubs", []))
),
"encrypted": session_crypto.is_encrypted_payload(raw),
}
sessions.append(session_info)

Expand All @@ -99,15 +179,12 @@ async def load_session(self, session_identifier: str, switch=True) -> bool:
if not session_file:
return False

try:
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
except Exception as e:
self.io.tool_error(f"Error loading session: {e}")
session_data = self._read_session_file(session_file)
if session_data is None:
return False

# Verify session format
if not isinstance(session_data, dict) or "version" not in session_data:
if "version" not in session_data:
self.io.tool_error("Invalid session format.")
return False

Expand Down
11 changes: 11 additions & 0 deletions cecli/website/docs/usage/sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ Sessions are stored as JSON files in the `.cecli/sessions/` directory within you
### Version Control
- Consider adding `.cecli/sessions/` to your `.gitignore` if sessions contain sensitive information

### Optional encryption (AES-256-GCM)

When enabled, session files on disk are encrypted (plaintext JSON is unchanged when disabled).

```bash
export CECLI_SESSION_KEY="$(python -c 'import os,base64; print(base64.urlsafe_b64encode(os.urandom(32)).decode())')"
cecli --session-encrypt --auto-save
```

Or use `--session-key-file` pointing at a file with the same urlsafe-base64 32-byte key. BrightVision stores the key in the OS keychain and sets `CECLI_SESSION_KEY` for the Vision API process.

## Troubleshooting

### Session Not Found
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tomlkit>=0.14.0
truststore
xxhash>=3.6.0
py-cymbal>=0.1.24
cryptography>=42.0.0

# Replaced networkx with rustworkx for better performance in repomap
rustworkx>=0.15.0
Expand Down
24 changes: 24 additions & 0 deletions tests/basic/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Shared fixtures for cecli basic tests."""

import base64
import os

import pytest

from cecli import session_crypto


@pytest.fixture
def session_key32():
return os.urandom(session_crypto.KEY_BYTES)


@pytest.fixture
def session_key_b64(session_key32):
return base64.urlsafe_b64encode(session_key32).decode().rstrip("=")


@pytest.fixture
def session_key_env(monkeypatch, session_key32, session_key_b64):
monkeypatch.setenv(session_crypto.KEY_ENV, session_key_b64)
return session_key32
31 changes: 31 additions & 0 deletions tests/basic/test_session_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""CLI args for session encryption and auto-save."""

from cecli.args import get_parser


def test_session_encrypt_defaults_off():
parser = get_parser([], "/tmp/project")
args = parser.parse_args([])
assert args.session_encrypt is False
assert args.session_key_file is None
assert args.auto_save is False
assert args.auto_load is False
assert args.auto_save_session_name == "auto-save"


def test_session_encrypt_flag():
parser = get_parser([], "/tmp/project")
args = parser.parse_args(["--session-encrypt"])
assert args.session_encrypt is True


def test_session_encrypt_no_flag():
parser = get_parser([], "/tmp/project")
args = parser.parse_args(["--no-session-encrypt"])
assert args.session_encrypt is False


def test_session_key_file_flag():
parser = get_parser([], "/tmp/project")
args = parser.parse_args(["--session-key-file", "/tmp/key.bin"])
assert args.session_key_file == "/tmp/key.bin"
Loading
Loading