-
Notifications
You must be signed in to change notification settings - Fork 28
feat(evaluators): add ATR threat detection evaluator #170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| .PHONY: help test lint lint-fix typecheck build | ||
|
|
||
| help: | ||
| @echo "Agent Control Evaluator - ATR Threat Rules - Makefile commands" | ||
| @echo " make test - run pytest" | ||
| @echo " make lint - run ruff check" | ||
| @echo " make lint-fix - run ruff check --fix" | ||
| @echo " make typecheck - run mypy" | ||
| @echo " make build - build package" | ||
|
|
||
| test: | ||
| uv run --with pytest --with pytest-asyncio --with pytest-cov pytest tests --cov=src --cov-report=xml:../../../coverage-evaluators-atr.xml -q | ||
|
|
||
| lint: | ||
| uv run --with ruff ruff check --config ../../../pyproject.toml src/ | ||
|
|
||
| lint-fix: | ||
| uv run --with ruff ruff check --config ../../../pyproject.toml --fix src/ | ||
|
|
||
| typecheck: | ||
| uv run --with mypy mypy --config-file ../../../pyproject.toml src/ | ||
|
|
||
| build: | ||
| uv build | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| # ATR Threat Rules Evaluator for Agent Control | ||
|
|
||
| Regex-based AI agent threat detection using [ATR (Agent Threat Rules)](https://agentthreatrule.org) community rules. | ||
|
|
||
| ## Features | ||
|
|
||
| - 20 bundled rules covering OWASP Agentic Top 10 categories | ||
| - Pure regex detection -- no API keys, no external calls | ||
| - Sub-5ms evaluation time | ||
| - Configurable severity threshold and category filtering | ||
| - Auto-discovered via Python entry points | ||
|
|
||
| ## Categories | ||
|
|
||
| | Category | Rules | Description | | ||
| |----------|-------|-------------| | ||
| | prompt-injection | 5 | Direct, indirect, jailbreak, system override, multi-turn | | ||
| | agent-manipulation | 2 | Cross-agent attacks, goal hijacking | | ||
| | context-exfiltration | 2 | Data exfil via tools, context window leaks | | ||
| | privilege-escalation | 2 | Unauthorized escalation, role assumption | | ||
| | tool-poisoning | 5 | Tool definition poisoning, hidden instructions, credentials, reverse shell | | ||
| | skill-compromise | 1 | Malicious skill installation | | ||
| | excessive-autonomy | 2 | Unauthorized actions, safety bypass | | ||
| | data-poisoning | 1 | Training data poisoning | | ||
|
|
||
| ## Configuration | ||
|
|
||
| ```python | ||
| from agent_control_evaluator_atr.threat_rules import ATRConfig | ||
|
|
||
| config = ATRConfig( | ||
| min_severity="medium", # "low", "medium", "high", "critical" | ||
| block_on_match=True, # matched=True when threat detected | ||
| categories=[], # empty = all categories | ||
| on_error="allow", # "allow" (fail-open) or "deny" (fail-closed) | ||
| ) | ||
| ``` | ||
|
|
||
| ## Installation | ||
|
|
||
| ```bash | ||
| uv pip install -e evaluators/contrib/atr | ||
| ``` | ||
|
|
||
| ## License | ||
|
|
||
| Apache-2.0. ATR rules are MIT-licensed. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| [project] | ||
| name = "agent-control-evaluator-atr" | ||
| version = "0.1.0" | ||
| description = "ATR (Agent Threat Rules) evaluator for agent-control" | ||
| readme = "README.md" | ||
| requires-python = ">=3.12" | ||
| license = { text = "Apache-2.0" } | ||
| authors = [{ name = "ATR Community" }] | ||
| dependencies = [ | ||
| "agent-control-evaluators>=3.0.0", | ||
| "agent-control-models>=3.0.0", | ||
| ] | ||
|
|
||
| [project.optional-dependencies] | ||
| dev = [ | ||
| "pytest>=8.0.0", | ||
| "pytest-asyncio>=0.23.0", | ||
| "pytest-cov>=4.0.0", | ||
| "ruff>=0.1.0", | ||
| "mypy>=1.8.0", | ||
| ] | ||
|
|
||
| [project.entry-points."agent_control.evaluators"] | ||
| "atr.threat_rules" = "agent_control_evaluator_atr.threat_rules:ATREvaluator" | ||
|
|
||
| [build-system] | ||
| requires = ["hatchling"] | ||
| build-backend = "hatchling.build" | ||
|
|
||
| [tool.hatch.build.targets.wheel] | ||
| packages = ["src/agent_control_evaluator_atr"] | ||
|
|
||
| [tool.ruff] | ||
| line-length = 100 | ||
| target-version = "py312" | ||
|
|
||
| [tool.ruff.lint] | ||
| select = ["E", "F", "I"] | ||
|
|
||
| [tool.uv.sources] | ||
| agent-control-evaluators = { path = "../../builtin", editable = true } | ||
| agent-control-models = { path = "../../../models", editable = true } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| __all__: list[str] = [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| from .config import ATRConfig | ||
| from .evaluator import ATREvaluator | ||
|
|
||
| __all__ = ["ATREvaluator", "ATRConfig"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Literal | ||
|
|
||
| from agent_control_evaluators import EvaluatorConfig | ||
| from pydantic import Field | ||
|
|
||
|
|
||
| class ATRConfig(EvaluatorConfig): | ||
| """Configuration for ATR (Agent Threat Rules) evaluator. | ||
|
|
||
| Attributes: | ||
| min_severity: Minimum severity level to match ("low", "medium", "high", "critical") | ||
| block_on_match: Whether to set matched=True when a threat is detected | ||
| categories: Category filter; empty list means all categories | ||
| on_error: Error policy ("allow" = fail-open, "deny" = fail-closed) | ||
| """ | ||
|
|
||
| min_severity: Literal["low", "medium", "high", "critical"] = "medium" | ||
| block_on_match: bool = True | ||
| categories: list[str] = Field(default_factory=list) | ||
| on_error: Literal["allow", "deny"] = "allow" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import re | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| from agent_control_evaluators import ( | ||
| Evaluator, | ||
| EvaluatorMetadata, | ||
| register_evaluator, | ||
| ) | ||
| from agent_control_models import EvaluatorResult | ||
|
|
||
| from .config import ATRConfig | ||
|
|
||
| _SEVERITY_ORDER: dict[str, int] = { | ||
| "low": 0, | ||
| "medium": 1, | ||
| "high": 2, | ||
| "critical": 3, | ||
| } | ||
|
|
||
| _SEVERITY_CONFIDENCE: dict[str, float] = { | ||
| "low": 0.6, | ||
| "medium": 0.75, | ||
| "high": 0.9, | ||
| "critical": 0.99, | ||
| } | ||
|
|
||
| _RULES_PATH = Path(__file__).parent / "rules.json" | ||
|
|
||
|
|
||
| def _load_rules(path: Path) -> list[dict[str, Any]]: | ||
| """Load ATR rules from the bundled JSON file.""" | ||
| with path.open(encoding="utf-8") as f: | ||
| data = json.load(f) | ||
| if not isinstance(data, list): | ||
| raise ValueError(f"Expected list of rules, got {type(data).__name__}") | ||
| return data | ||
|
|
||
|
|
||
| def _coerce_to_string(data: Any) -> str: | ||
| """Convert arbitrary input data to a string for pattern matching.""" | ||
| if data is None: | ||
| return "" | ||
| if isinstance(data, str): | ||
| return data | ||
| if isinstance(data, dict): | ||
| # Scan all common content fields, not just the first match | ||
| parts = [] | ||
| for key in ("content", "input", "output", "text", "message"): | ||
| if key in data and data[key] is not None: | ||
| parts.append(str(data[key])) | ||
| if parts: | ||
| return "\n".join(parts) | ||
| # Fall back to JSON serialization | ||
| try: | ||
| return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str) | ||
| except TypeError: | ||
| return str(data) | ||
| if isinstance(data, (int, float, bool)): | ||
| return str(data) | ||
| if isinstance(data, (list, tuple)): | ||
| try: | ||
| return json.dumps(data, ensure_ascii=False, default=str) | ||
| except TypeError: | ||
| return str(data) | ||
| return str(data) | ||
|
|
||
|
|
||
| @register_evaluator | ||
| class ATREvaluator(Evaluator[ATRConfig]): | ||
| """ATR (Agent Threat Rules) evaluator. | ||
|
|
||
| Regex-based AI agent threat detection using community rules. | ||
| No external API calls or keys required. | ||
| """ | ||
|
|
||
| metadata = EvaluatorMetadata( | ||
| name="atr.threat_rules", | ||
| version="0.1.0", | ||
| description="Regex-based AI agent threat detection using ATR community rules", | ||
| requires_api_key=False, | ||
| timeout_ms=5000, | ||
| ) | ||
|
|
||
| config_model = ATRConfig | ||
|
|
||
| @classmethod | ||
| def is_available(cls) -> bool: | ||
| """Always available -- no optional dependencies.""" | ||
| return _RULES_PATH.exists() | ||
|
|
||
| def __init__(self, config: ATRConfig) -> None: | ||
| super().__init__(config) | ||
| self.config = config | ||
|
|
||
| # Load and filter rules eagerly | ||
| raw_rules = _load_rules(_RULES_PATH) | ||
|
|
||
| min_level = _SEVERITY_ORDER.get(self.config.min_severity, 1) | ||
| allowed_categories = set(self.config.categories) if self.config.categories else None | ||
|
|
||
| self._compiled_rules: list[dict[str, Any]] = [] | ||
| for rule in raw_rules: | ||
| severity = rule.get("severity", "medium").lower() | ||
| if _SEVERITY_ORDER.get(severity, 0) < min_level: | ||
| continue | ||
|
|
||
| category = rule.get("category", "") | ||
| if allowed_categories and category not in allowed_categories: | ||
| continue | ||
|
|
||
| compiled_patterns: list[dict[str, Any]] = [] | ||
| for p in rule.get("patterns", []): | ||
| try: | ||
| compiled_patterns.append({ | ||
| "regex": re.compile(p["pattern"], re.IGNORECASE), | ||
| "description": p.get("description", ""), | ||
| }) | ||
| except re.error: | ||
| # Skip invalid patterns rather than failing entirely | ||
| continue | ||
|
|
||
| if compiled_patterns: | ||
| self._compiled_rules.append({ | ||
| "id": rule.get("id", "unknown"), | ||
| "title": rule.get("title", ""), | ||
| "severity": severity, | ||
| "category": category, | ||
| "confidence": _SEVERITY_CONFIDENCE.get(severity, 0.75), | ||
| "patterns": compiled_patterns, | ||
| }) | ||
|
|
||
| async def evaluate(self, data: Any) -> EvaluatorResult: # noqa: D401 | ||
| """Evaluate input data against ATR threat rules.""" | ||
| if data is None: | ||
| return EvaluatorResult(matched=False, confidence=1.0, message="No data") | ||
|
|
||
| try: | ||
| text = _coerce_to_string(data) | ||
| except Exception as e: # noqa: BLE001 | ||
| return self._error_result(f"Failed to coerce input: {e}") | ||
|
|
||
| if not text: | ||
| return EvaluatorResult(matched=False, confidence=1.0, message="Empty input") | ||
|
|
||
| try: | ||
| return self._match_rules(text) | ||
| except Exception as e: # noqa: BLE001 | ||
| return self._error_result(f"ATR evaluation error: {e}") | ||
|
|
||
| def _match_rules(self, text: str) -> EvaluatorResult: | ||
| """Run all compiled rules against the text and return all matches.""" | ||
| all_findings: list[dict[str, Any]] = [] | ||
| max_confidence = 0.0 | ||
|
|
||
| for rule in self._compiled_rules: | ||
| for pattern_entry in rule["patterns"]: | ||
| regex: re.Pattern[str] = pattern_entry["regex"] | ||
| match = regex.search(text) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this loses an important part of the ATR model. The upstream rules are field/context scoped, but here every regex is run over one flattened text blob. For example, ATR-2026-00061 is intended for MCP tool args/tool responses, but this implementation will match something benign like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can also block the event loop for longer than the evaluator timeout. |
||
| if match: | ||
| all_findings.append({ | ||
| "rule_id": rule["id"], | ||
| "title": rule["title"], | ||
| "severity": rule["severity"], | ||
| "category": rule["category"], | ||
| "matched_text": match.group()[:200], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not include the literal matched secret in result metadata. For credential rules, |
||
| "pattern_description": pattern_entry["description"], | ||
| }) | ||
| max_confidence = max(max_confidence, rule["confidence"]) | ||
| break # one match per rule is enough, but continue to other rules | ||
|
|
||
| if all_findings: | ||
| matched = self.config.block_on_match | ||
| return EvaluatorResult( | ||
| matched=matched, | ||
| confidence=max_confidence, | ||
| message=f"ATR: {len(all_findings)} threat(s) detected", | ||
| metadata={ | ||
| "findings": all_findings, | ||
| "count": len(all_findings), | ||
| "max_severity": all_findings[0]["severity"] if all_findings else None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| # Keep backward-compatible single-match fields | ||
| "rule_id": all_findings[0]["rule_id"], | ||
| "title": all_findings[0]["title"], | ||
| "severity": all_findings[0]["severity"], | ||
| "category": all_findings[0]["category"], | ||
| "matched_text": all_findings[0]["matched_text"], | ||
| "pattern_description": all_findings[0]["pattern_description"], | ||
| }, | ||
| ) | ||
|
|
||
| return EvaluatorResult( | ||
| matched=False, | ||
| confidence=1.0, | ||
| message="ATR: No threats detected", | ||
| ) | ||
|
|
||
| def _error_result(self, error_detail: str) -> EvaluatorResult: | ||
| """Build an error result respecting the on_error policy.""" | ||
| fallback = self.config.on_error | ||
| if fallback == "deny": | ||
| # fail-closed: matched=True, error=None (to satisfy model validator) | ||
| return EvaluatorResult( | ||
| matched=True, | ||
| confidence=0.0, | ||
| message=f"ATR evaluation error (fail-closed): {error_detail}", | ||
| metadata={"error": error_detail, "fallback_action": "deny"}, | ||
| ) | ||
| # fail-open: matched=False, error set | ||
| return EvaluatorResult( | ||
| matched=False, | ||
| confidence=0.0, | ||
| message=f"ATR evaluation error: {error_detail}", | ||
| metadata={"error": error_detail, "fallback_action": "allow"}, | ||
| error=error_detail, | ||
| ) | ||
|
|
||
| async def aclose(self) -> None: | ||
| """No resources to clean up.""" | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is adding a new supported contrib package, I think we also need to wire it into the repo-level checks and release path. Right now the root
test-extrastarget only runs Galileo, and the release/build scripts only publish the Galileo contrib evaluator. If ATR should be maintained here, it should have rootatr-*targets, be included in contrib CI coverage, and be added to build/release/version metadata.