diff --git a/cecli/args.py b/cecli/args.py index 387f4764e78..07b947e17ae 100644 --- a/cecli/args.py +++ b/cecli/args.py @@ -370,6 +370,24 @@ def get_parser(default_config_files, git_root): " (default: False)" ), ) + group.add_argument( + "--session-encrypt", + action=argparse.BooleanOptionalAction, + default=False, + help=( + "Encrypt saved sessions on disk (AES-256-GCM). Requires CECLI_SESSION_KEY or" + " --session-key-file (default: False)" + ), + ) + group.add_argument( + "--session-key-file", + metavar="SESSION_KEY_FILE", + default=None, + help=( + "File containing a urlsafe-base64 32-byte session encryption key" + " (default: use CECLI_SESSION_KEY only)" + ), + ).complete = shtab.FILE group.add_argument( "--mcp-servers", metavar="MCP_CONFIG_JSON", diff --git a/cecli/session_crypto.py b/cecli/session_crypto.py new file mode 100644 index 00000000000..7d68d711edb --- /dev/null +++ b/cecli/session_crypto.py @@ -0,0 +1,110 @@ +"""Optional AES-256-GCM encryption for on-disk cecli session files.""" + +from __future__ import annotations + +import base64 +import json +import os +from pathlib import Path +from typing import Any + +MAGIC = b"CECLI_ENCRYPTED_SESSION_v1\n" +KEY_ENV = "CECLI_SESSION_KEY" +KEY_BYTES = 32 + + +class SessionCryptoError(Exception): + """Session encrypt/decrypt failed.""" + + +def is_encrypted_payload(data: bytes) -> bool: + return data.startswith(MAGIC) + + +def resolve_key(*, key_file: str | Path | None = None) -> bytes | None: + """Load a 32-byte key from CECLI_SESSION_KEY (urlsafe base64) or a key file.""" + raw = os.environ.get(KEY_ENV, "").strip() + if raw: + key = _decode_key_b64(raw) + if key is not None: + return key + if key_file: + path = Path(key_file).expanduser() + if path.is_file(): + text = path.read_text(encoding="utf-8").strip() + key = _decode_key_b64(text) + if key is not None: + return key + return None + + +def _decode_key_b64(text: str) -> bytes | None: + try: + padded = text + "=" * (-len(text) % 4) + key = base64.urlsafe_b64decode(padded.encode("ascii")) + except (ValueError, UnicodeEncodeError): + return None + if len(key) != KEY_BYTES: + return None + return key + + +def encrypt_session_dict(session_data: dict[str, Any], key: bytes) -> bytes: + if len(key) != KEY_BYTES: + raise SessionCryptoError(f"Session key must be {KEY_BYTES} bytes.") + try: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + except ImportError as err: + raise SessionCryptoError( + "Session encryption requires the cryptography package (pip install cryptography)." + ) from err + + plaintext = json.dumps(session_data, ensure_ascii=False).encode("utf-8") + nonce = os.urandom(12) + ciphertext = AESGCM(key).encrypt(nonce, plaintext, None) + payload = base64.urlsafe_b64encode(nonce + ciphertext).decode("ascii") + return MAGIC + payload.encode("ascii") + b"\n" + + +def decrypt_session_bytes(data: bytes, key: bytes) -> dict[str, Any]: + if len(key) != KEY_BYTES: + raise SessionCryptoError(f"Session key must be {KEY_BYTES} bytes.") + if not is_encrypted_payload(data): + try: + parsed = json.loads(data.decode("utf-8")) + except json.JSONDecodeError as err: + raise SessionCryptoError("Invalid session file (not JSON).") from err + if not isinstance(parsed, dict): + raise SessionCryptoError("Invalid session format.") + return parsed + + body = data[len(MAGIC) :].strip() + if not body: + raise SessionCryptoError("Encrypted session file is empty.") + try: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + except ImportError as err: + raise SessionCryptoError( + "Session encryption requires the cryptography package (pip install cryptography)." + ) from err + + try: + blob = base64.urlsafe_b64decode(body + b"=" * (-len(body) % 4)) + except ValueError as err: + raise SessionCryptoError("Encrypted session payload is invalid.") from err + if len(blob) < 13: + raise SessionCryptoError("Encrypted session payload is too short.") + nonce, ciphertext = blob[:12], blob[12:] + try: + plaintext = AESGCM(key).decrypt(nonce, ciphertext, None) + except Exception as err: + raise SessionCryptoError( + "Could not decrypt session (wrong key or corrupted file)." + ) from err + try: + parsed = json.loads(plaintext.decode("utf-8")) + except json.JSONDecodeError as err: + raise SessionCryptoError("Decrypted session is not valid JSON.") from err + if not isinstance(parsed, dict): + raise SessionCryptoError("Invalid session format.") + return parsed diff --git a/cecli/sessions.py b/cecli/sessions.py index f1ee5a12570..69b44c54c29 100644 --- a/cecli/sessions.py +++ b/cecli/sessions.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Dict, List, Optional -from cecli import models +from cecli import models, session_crypto from cecli.helpers.conversation import ConversationService, MessageTag @@ -22,6 +22,65 @@ def _get_session_directory(self) -> Path: os.makedirs(session_dir, exist_ok=True) return session_dir + def _session_encrypt_settings(self) -> tuple[bool, bytes | None]: + args = getattr(self.coder, "args", None) + if not args or not getattr(args, "session_encrypt", False): + return False, None + key_file = getattr(args, "session_key_file", None) + return True, session_crypto.resolve_key(key_file=key_file) + + def _read_session_file(self, session_file: Path) -> dict | None: + try: + data = session_file.read_bytes() + except OSError as e: + self.io.tool_error(f"Error reading session: {e}") + return None + try: + if session_crypto.is_encrypted_payload(data): + args = getattr(self.coder, "args", None) + key_file = getattr(args, "session_key_file", None) if args else None + key = session_crypto.resolve_key(key_file=key_file) + if not key: + self.io.tool_error( + "Session is encrypted but no key is configured " + f"({session_crypto.KEY_ENV} or --session-key-file)." + ) + return None + return session_crypto.decrypt_session_bytes(data, key) + parsed = json.loads(data.decode("utf-8")) + if not isinstance(parsed, dict): + self.io.tool_error("Invalid session format.") + return None + return parsed + except session_crypto.SessionCryptoError as e: + self.io.tool_error(str(e)) + return None + except json.JSONDecodeError as e: + self.io.tool_error(f"Error loading session: {e}") + return None + + def _write_session_file(self, session_file: Path, session_data: dict) -> bool: + encrypt_enabled, key = self._session_encrypt_settings() + try: + if encrypt_enabled: + if not key: + self.io.tool_error( + "Session encryption is enabled but no key is configured " + f"({session_crypto.KEY_ENV} or --session-key-file)." + ) + return False + session_file.write_bytes(session_crypto.encrypt_session_dict(session_data, key)) + else: + with open(session_file, "w", encoding="utf-8") as f: + json.dump(session_data, f, indent=2) + return True + except session_crypto.SessionCryptoError as e: + self.io.tool_error(str(e)) + return False + except OSError as e: + self.io.tool_error(f"Error saving session: {e}") + return False + def save_session(self, session_name: str, output=True) -> bool: """Save the current chat session to a named file.""" if not session_name: @@ -39,11 +98,12 @@ def save_session(self, session_name: str, output=True) -> bool: try: session_data = self._build_session_data(session_name) - with open(session_file, "w", encoding="utf-8") as f: - json.dump(session_data, f, indent=2) + if not self._write_session_file(session_file, session_data): + return False if output: - self.io.tool_output(f"Session saved: {session_file}") + suffix = " (encrypted)" if self._session_encrypt_settings()[0] else "" + self.io.tool_output(f"Session saved: {session_file}{suffix}") return True @@ -63,8 +123,27 @@ def list_sessions(self) -> List[Dict]: sessions = [] for session_file in sorted(session_files, key=lambda x: x.stat().st_mtime, reverse=True): try: - with open(session_file, "r", encoding="utf-8") as f: - session_data = json.load(f) + raw = session_file.read_bytes() + if session_crypto.is_encrypted_payload(raw): + _, key = self._session_encrypt_settings() + if not key: + sessions.append( + { + "name": session_file.stem, + "file": session_file, + "model": "encrypted", + "edit_format": "—", + "num_messages": 0, + "num_files": 0, + "encrypted": True, + } + ) + continue + session_data = session_crypto.decrypt_session_bytes(raw, key) + else: + session_data = json.loads(raw.decode("utf-8")) + if not isinstance(session_data, dict): + raise ValueError("not a session object") session_info = { "name": session_file.stem, @@ -80,6 +159,7 @@ def list_sessions(self) -> List[Dict]: + len(session_data.get("files", {}).get("read_only", [])) + len(session_data.get("files", {}).get("read_only_stubs", [])) ), + "encrypted": session_crypto.is_encrypted_payload(raw), } sessions.append(session_info) @@ -99,15 +179,12 @@ async def load_session(self, session_identifier: str, switch=True) -> bool: if not session_file: return False - try: - with open(session_file, "r", encoding="utf-8") as f: - session_data = json.load(f) - except Exception as e: - self.io.tool_error(f"Error loading session: {e}") + session_data = self._read_session_file(session_file) + if session_data is None: return False # Verify session format - if not isinstance(session_data, dict) or "version" not in session_data: + if "version" not in session_data: self.io.tool_error("Invalid session format.") return False diff --git a/cecli/website/docs/usage/sessions.md b/cecli/website/docs/usage/sessions.md index ada211cea95..ff7dc663b9b 100644 --- a/cecli/website/docs/usage/sessions.md +++ b/cecli/website/docs/usage/sessions.md @@ -158,6 +158,17 @@ Sessions are stored as JSON files in the `.cecli/sessions/` directory within you ### Version Control - Consider adding `.cecli/sessions/` to your `.gitignore` if sessions contain sensitive information +### Optional encryption (AES-256-GCM) + +When enabled, session files on disk are encrypted (plaintext JSON is unchanged when disabled). + +```bash +export CECLI_SESSION_KEY="$(python -c 'import os,base64; print(base64.urlsafe_b64encode(os.urandom(32)).decode())')" +cecli --session-encrypt --auto-save +``` + +Or use `--session-key-file` pointing at a file with the same urlsafe-base64 32-byte key. BrightVision stores the key in the OS keychain and sets `CECLI_SESSION_KEY` for the Vision API process. + ## Troubleshooting ### Session Not Found diff --git a/requirements/requirements.in b/requirements/requirements.in index 895678008c9..26d155cda8c 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -32,6 +32,7 @@ tomlkit>=0.14.0 truststore xxhash>=3.6.0 py-cymbal>=0.1.24 +cryptography>=42.0.0 # Replaced networkx with rustworkx for better performance in repomap rustworkx>=0.15.0 diff --git a/tests/basic/conftest.py b/tests/basic/conftest.py new file mode 100644 index 00000000000..b5126377867 --- /dev/null +++ b/tests/basic/conftest.py @@ -0,0 +1,24 @@ +"""Shared fixtures for cecli basic tests.""" + +import base64 +import os + +import pytest + +from cecli import session_crypto + + +@pytest.fixture +def session_key32(): + return os.urandom(session_crypto.KEY_BYTES) + + +@pytest.fixture +def session_key_b64(session_key32): + return base64.urlsafe_b64encode(session_key32).decode().rstrip("=") + + +@pytest.fixture +def session_key_env(monkeypatch, session_key32, session_key_b64): + monkeypatch.setenv(session_crypto.KEY_ENV, session_key_b64) + return session_key32 diff --git a/tests/basic/test_session_args.py b/tests/basic/test_session_args.py new file mode 100644 index 00000000000..7576ee5c439 --- /dev/null +++ b/tests/basic/test_session_args.py @@ -0,0 +1,31 @@ +"""CLI args for session encryption and auto-save.""" + +from cecli.args import get_parser + + +def test_session_encrypt_defaults_off(): + parser = get_parser([], "/tmp/project") + args = parser.parse_args([]) + assert args.session_encrypt is False + assert args.session_key_file is None + assert args.auto_save is False + assert args.auto_load is False + assert args.auto_save_session_name == "auto-save" + + +def test_session_encrypt_flag(): + parser = get_parser([], "/tmp/project") + args = parser.parse_args(["--session-encrypt"]) + assert args.session_encrypt is True + + +def test_session_encrypt_no_flag(): + parser = get_parser([], "/tmp/project") + args = parser.parse_args(["--no-session-encrypt"]) + assert args.session_encrypt is False + + +def test_session_key_file_flag(): + parser = get_parser([], "/tmp/project") + args = parser.parse_args(["--session-key-file", "/tmp/key.bin"]) + assert args.session_key_file == "/tmp/key.bin" diff --git a/tests/basic/test_session_crypto.py b/tests/basic/test_session_crypto.py new file mode 100644 index 00000000000..965d6c14a4a --- /dev/null +++ b/tests/basic/test_session_crypto.py @@ -0,0 +1,104 @@ +"""Unit tests for cecli.session_crypto.""" + +import base64 +import json +import os + +import pytest + +from cecli import session_crypto + + +def test_roundtrip_encrypted(session_key32): + data = {"version": 1, "session_name": "t", "model": "gpt-4"} + blob = session_crypto.encrypt_session_dict(data, session_key32) + assert session_crypto.is_encrypted_payload(blob) + assert session_crypto.decrypt_session_bytes(blob, session_key32) == data + + +def test_plaintext_json_still_loads(session_key32): + raw = json.dumps({"version": 1}).encode("utf-8") + assert not session_crypto.is_encrypted_payload(raw) + out = session_crypto.decrypt_session_bytes(raw, session_key32) + assert out["version"] == 1 + + +def test_wrong_key_fails(session_key32): + blob = session_crypto.encrypt_session_dict({"version": 1}, session_key32) + with pytest.raises(session_crypto.SessionCryptoError): + session_crypto.decrypt_session_bytes(blob, os.urandom(32)) + + +def test_invalid_key_length_rejected(): + with pytest.raises(session_crypto.SessionCryptoError): + session_crypto.encrypt_session_dict({"version": 1}, b"short") + + +def test_resolve_key_from_env(session_key_env, session_key32): + assert session_crypto.resolve_key() == session_key32 + + +def test_resolve_key_from_file(tmp_path, session_key32): + path = tmp_path / "key.txt" + path.write_text(base64.urlsafe_b64encode(session_key32).decode(), encoding="utf-8") + assert session_crypto.resolve_key(key_file=path) == session_key32 + + +def test_resolve_key_missing_returns_none(monkeypatch): + monkeypatch.delenv(session_crypto.KEY_ENV, raising=False) + assert session_crypto.resolve_key() is None + + +def test_resolve_key_rejects_bad_env(monkeypatch): + monkeypatch.setenv(session_crypto.KEY_ENV, "not-valid-key-material") + assert session_crypto.resolve_key() is None + + +def test_magic_prefix_constant(): + assert session_crypto.MAGIC.startswith(b"CECLI_ENCRYPTED_SESSION") + + +def test_corrupt_ciphertext_raises(session_key32): + blob = session_crypto.MAGIC + b"not-valid-base64!!!\n" + with pytest.raises(session_crypto.SessionCryptoError): + session_crypto.decrypt_session_bytes(blob, session_key32) + + +def test_empty_encrypted_body_raises(session_key32): + blob = session_crypto.MAGIC + b"\n" + with pytest.raises(session_crypto.SessionCryptoError): + session_crypto.decrypt_session_bytes(blob, session_key32) + + +def test_encrypted_file_roundtrip_on_disk(tmp_path, session_key32): + path = tmp_path / "sess.json" + payload = { + "version": 1, + "session_name": "disk", + "chat_history": {"done_messages": [], "cur_messages": []}, + } + path.write_bytes(session_crypto.encrypt_session_dict(payload, session_key32)) + raw = path.read_bytes() + assert session_crypto.is_encrypted_payload(raw) + assert session_crypto.decrypt_session_bytes(raw, session_key32) == payload + + +def test_unicode_roundtrip(session_key32): + payload = {"version": 1, "session_name": "t", "todo_list": "— fix café naïve"} + blob = session_crypto.encrypt_session_dict(payload, session_key32) + assert session_crypto.decrypt_session_bytes(blob, session_key32) == payload + + +def test_cryptography_import_error(monkeypatch): + import builtins + + real_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "cryptography.hazmat.primitives.ciphers.aead": + raise ImportError("blocked for test") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + with pytest.raises(session_crypto.SessionCryptoError, match="cryptography"): + session_crypto.encrypt_session_dict({"version": 1}, os.urandom(32)) diff --git a/tests/basic/test_sessions.py b/tests/basic/test_sessions.py index c6611e12909..9262df2d147 100644 --- a/tests/basic/test_sessions.py +++ b/tests/basic/test_sessions.py @@ -1,5 +1,6 @@ import json import os +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest @@ -47,6 +48,17 @@ def mock_coder(): coder.mcp_manager = None coder.skills_manager = None coder.io.read_text.return_value = "some todo content" + coder.format_chat_chunks = MagicMock() + coder.args = SimpleNamespace( + model="test_model", + weak_model="test_weak_model", + editor_model="test_editor_model", + agent_model="test_agent_model", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=False, + session_key_file=None, + ) return coder diff --git a/tests/basic/test_sessions_manager.py b/tests/basic/test_sessions_manager.py new file mode 100644 index 00000000000..d56ae834e08 --- /dev/null +++ b/tests/basic/test_sessions_manager.py @@ -0,0 +1,219 @@ +"""SessionManager on-disk persistence and optional encryption.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from cecli import session_crypto +from cecli.io import InputOutput +from cecli.sessions import SessionManager + + +def _prepare_workspace(coder, tmp_path) -> Path: + root = Path(tmp_path) + coder.abs_root_path.side_effect = lambda x: str(root / x) + (root / ".cecli" / "sessions").mkdir(parents=True, exist_ok=True) + (root / "file1.py").write_text("", encoding="utf-8") + return root + + +@pytest.fixture +def mock_coder(monkeypatch): + main_model = MagicMock() + main_model.name = "test_model" + main_model.weak_model.name = "weak" + main_model.editor_model.name = "editor" + main_model.agent_model.name = "agent" + main_model.editor_edit_format = "editor-diff" + main_model.retries = 0 + main_model.debug = False + + conv_manager = MagicMock() + conv_manager.get_messages_dict.return_value = [] + files_manager = MagicMock() + monkeypatch.setattr( + "cecli.sessions.ConversationService.get_manager", + lambda _coder: conv_manager, + ) + monkeypatch.setattr( + "cecli.sessions.ConversationService.get_files", + lambda _coder: files_manager, + ) + monkeypatch.setattr( + "cecli.sessions.models.Model", + lambda *args, **kwargs: main_model, + ) + + coder = MagicMock() + coder.abs_fnames = set() + coder.abs_read_only_fnames = set() + coder.abs_read_only_stubs_fnames = set() + coder.auto_commits = True + coder.auto_lint = True + coder.auto_test = False + coder.total_tokens_sent = 0 + coder.total_tokens_received = 0 + coder.total_cached_tokens = 0 + coder.total_cost = 0.0 + coder.edit_format = "diff" + coder.format_chat_chunks = MagicMock() + coder.get_rel_fname.side_effect = lambda x: os.path.basename(x) + coder.local_agent_folder.side_effect = lambda x: f".cecli/{x}" + coder.io = MagicMock(spec=InputOutput) + coder.agent_config = {} + coder.mcp_manager = None + coder.skills_manager = None + coder.main_model = main_model + coder.args = SimpleNamespace( + model="test_model", + weak_model="weak", + editor_model="editor", + agent_model="agent", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=False, + session_key_file=None, + ) + return coder + + +@pytest.fixture +def session_manager(mock_coder): + return SessionManager(mock_coder, mock_coder.io) + + +@pytest.fixture +def encrypt_coder(mock_coder, session_key_env): + mock_coder.args = SimpleNamespace( + model="test_model", + weak_model="weak", + editor_model="editor", + agent_model="agent", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=True, + session_key_file=None, + ) + return mock_coder + + +def test_save_plaintext_json(session_manager, mock_coder, tmp_path): + root = _prepare_workspace(mock_coder, tmp_path) + assert session_manager.save_session("plain", output=False) + path = root / ".cecli" / "sessions" / "plain.json" + raw = path.read_bytes() + assert raw.startswith(b"{") + data = json.loads(raw.decode("utf-8")) + assert data["session_name"] == "plain" + assert data["version"] == 1 + + +def test_save_encrypted_blob(encrypt_coder, session_key32, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + root = _prepare_workspace(encrypt_coder, tmp_path) + assert manager.save_session("secret", output=False) + path = root / ".cecli" / "sessions" / "secret.json" + raw = path.read_bytes() + assert session_crypto.is_encrypted_payload(raw) + assert session_crypto.decrypt_session_bytes(raw, session_key32)["session_name"] == "secret" + + +def test_save_encrypt_without_key_fails(mock_coder, monkeypatch, tmp_path): + monkeypatch.delenv(session_crypto.KEY_ENV, raising=False) + _prepare_workspace(mock_coder, tmp_path) + mock_coder.args = SimpleNamespace( + model="test_model", + weak_model="weak", + editor_model="editor", + agent_model="agent", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=True, + session_key_file=None, + ) + assert SessionManager(mock_coder, mock_coder.io).save_session("nope", output=False) is False + + +def test_list_encrypted_with_key(encrypt_coder, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + _prepare_workspace(encrypt_coder, tmp_path) + manager.save_session("listed", output=False) + rows = manager.list_sessions() + assert len(rows) == 1 + assert rows[0]["name"] == "listed" + assert rows[0].get("encrypted") is True + assert rows[0]["model"] == "test_model" + + +def test_list_encrypted_placeholder_without_key(encrypt_coder, monkeypatch, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + _prepare_workspace(encrypt_coder, tmp_path) + manager.save_session("locked", output=False) + monkeypatch.delenv(session_crypto.KEY_ENV, raising=False) + encrypt_coder.args = SimpleNamespace( + model="test_model", + weak_model="weak", + editor_model="editor", + agent_model="agent", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=False, + session_key_file=None, + ) + rows = manager.list_sessions() + assert rows[0]["encrypted"] is True + assert rows[0]["model"] == "encrypted" + + +def test_read_legacy_plaintext_when_encrypt_enabled(encrypt_coder, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + root = _prepare_workspace(encrypt_coder, tmp_path) + legacy = root / ".cecli" / "sessions" / "legacy.json" + legacy.write_text( + json.dumps({"version": 1, "session_name": "legacy", "model": "test_model"}), + encoding="utf-8", + ) + data = manager._read_session_file(legacy) + assert data is not None + assert data["session_name"] == "legacy" + + +@pytest.mark.asyncio +async def test_load_encrypted_without_switch(encrypt_coder, session_key32, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + root = _prepare_workspace(encrypt_coder, tmp_path) + encrypt_coder.edit_format = "ask" + assert manager.save_session("enc", output=False) + encrypt_coder.edit_format = "diff" + path = root / ".cecli" / "sessions" / "enc.json" + assert await manager.load_session(str(path), switch=False) is True + loaded = session_crypto.decrypt_session_bytes(path.read_bytes(), session_key32) + assert loaded["edit_format"] == "ask" + + +@pytest.mark.asyncio +async def test_load_encrypted_using_env_key_only(encrypt_coder, session_key_env, tmp_path): + manager = SessionManager(encrypt_coder, encrypt_coder.io) + root = _prepare_workspace(encrypt_coder, tmp_path) + encrypt_coder.edit_format = "architect" + manager.save_session("env", output=False) + encrypt_coder.args = SimpleNamespace( + model="test_model", + weak_model="weak", + editor_model="editor", + agent_model="agent", + editor_edit_format="editor-diff", + verbose=False, + session_encrypt=False, + session_key_file=None, + ) + path = root / ".cecli" / "sessions" / "env.json" + assert await manager.load_session(str(path), switch=False) is True + loaded = session_crypto.decrypt_session_bytes(path.read_bytes(), session_key_env) + assert loaded["edit_format"] == "architect"