Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions evaluators/contrib/atr/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.PHONY: help test lint lint-fix typecheck build

help:
@echo "Agent Control Evaluator - ATR Threat Rules - Makefile commands"
@echo " make test - run pytest"
@echo " make lint - run ruff check"
@echo " make lint-fix - run ruff check --fix"
@echo " make typecheck - run mypy"
@echo " make build - build package"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is adding a new supported contrib package, I think we also need to wire it into the repo-level checks and release path. Right now the root test-extras target only runs Galileo, and the release/build scripts only publish the Galileo contrib evaluator. If ATR should be maintained here, it should have root atr-* targets, be included in contrib CI coverage, and be added to build/release/version metadata.

test:
uv run --with pytest --with pytest-asyncio --with pytest-cov pytest tests --cov=src --cov-report=xml:../../../coverage-evaluators-atr.xml -q

lint:
uv run --with ruff ruff check --config ../../../pyproject.toml src/

lint-fix:
uv run --with ruff ruff check --config ../../../pyproject.toml --fix src/

typecheck:
uv run --with mypy mypy --config-file ../../../pyproject.toml src/

build:
uv build
47 changes: 47 additions & 0 deletions evaluators/contrib/atr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# ATR Threat Rules Evaluator for Agent Control

Regex-based AI agent threat detection using [ATR (Agent Threat Rules)](https://agentthreatrule.org) community rules.

## Features

- 20 bundled rules covering OWASP Agentic Top 10 categories
- Pure regex detection -- no API keys, no external calls
- Sub-5ms evaluation time
- Configurable severity threshold and category filtering
- Auto-discovered via Python entry points

## Categories

| Category | Rules | Description |
|----------|-------|-------------|
| prompt-injection | 5 | Direct, indirect, jailbreak, system override, multi-turn |
| agent-manipulation | 2 | Cross-agent attacks, goal hijacking |
| context-exfiltration | 2 | Data exfil via tools, context window leaks |
| privilege-escalation | 2 | Unauthorized escalation, role assumption |
| tool-poisoning | 5 | Tool definition poisoning, hidden instructions, credentials, reverse shell |
| skill-compromise | 1 | Malicious skill installation |
| excessive-autonomy | 2 | Unauthorized actions, safety bypass |
| data-poisoning | 1 | Training data poisoning |

## Configuration

```python
from agent_control_evaluator_atr.threat_rules import ATRConfig

config = ATRConfig(
min_severity="medium", # "low", "medium", "high", "critical"
block_on_match=True, # matched=True when threat detected
categories=[], # empty = all categories
on_error="allow", # "allow" (fail-open) or "deny" (fail-closed)
)
```

## Installation

```bash
uv pip install -e evaluators/contrib/atr
```

## License

Apache-2.0. ATR rules are MIT-licensed.
42 changes: 42 additions & 0 deletions evaluators/contrib/atr/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[project]
name = "agent-control-evaluator-atr"
version = "0.1.0"
description = "ATR (Agent Threat Rules) evaluator for agent-control"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Apache-2.0" }
authors = [{ name = "ATR Community" }]
dependencies = [
"agent-control-evaluators>=3.0.0",
"agent-control-models>=3.0.0",
]

[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-cov>=4.0.0",
"ruff>=0.1.0",
"mypy>=1.8.0",
]

[project.entry-points."agent_control.evaluators"]
"atr.threat_rules" = "agent_control_evaluator_atr.threat_rules:ATREvaluator"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/agent_control_evaluator_atr"]

[tool.ruff]
line-length = 100
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I"]

[tool.uv.sources]
agent-control-evaluators = { path = "../../builtin", editable = true }
agent-control-models = { path = "../../../models", editable = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__all__: list[str] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .config import ATRConfig
from .evaluator import ATREvaluator

__all__ = ["ATREvaluator", "ATRConfig"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

from typing import Literal

from agent_control_evaluators import EvaluatorConfig
from pydantic import Field


class ATRConfig(EvaluatorConfig):
"""Configuration for ATR (Agent Threat Rules) evaluator.

Attributes:
min_severity: Minimum severity level to match ("low", "medium", "high", "critical")
block_on_match: Whether to set matched=True when a threat is detected
categories: Category filter; empty list means all categories
on_error: Error policy ("allow" = fail-open, "deny" = fail-closed)
"""

min_severity: Literal["low", "medium", "high", "critical"] = "medium"
block_on_match: bool = True
categories: list[str] = Field(default_factory=list)
on_error: Literal["allow", "deny"] = "allow"
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from __future__ import annotations

import json
import re
from pathlib import Path
from typing import Any

from agent_control_evaluators import (
Evaluator,
EvaluatorMetadata,
register_evaluator,
)
from agent_control_models import EvaluatorResult

from .config import ATRConfig

_SEVERITY_ORDER: dict[str, int] = {
"low": 0,
"medium": 1,
"high": 2,
"critical": 3,
}

_SEVERITY_CONFIDENCE: dict[str, float] = {
"low": 0.6,
"medium": 0.75,
"high": 0.9,
"critical": 0.99,
}

_RULES_PATH = Path(__file__).parent / "rules.json"


def _load_rules(path: Path) -> list[dict[str, Any]]:
"""Load ATR rules from the bundled JSON file."""
with path.open(encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"Expected list of rules, got {type(data).__name__}")
return data


def _coerce_to_string(data: Any) -> str:
"""Convert arbitrary input data to a string for pattern matching."""
if data is None:
return ""
if isinstance(data, str):
return data
if isinstance(data, dict):
# Scan all common content fields, not just the first match
parts = []
for key in ("content", "input", "output", "text", "message"):
if key in data and data[key] is not None:
parts.append(str(data[key]))
if parts:
return "\n".join(parts)
# Fall back to JSON serialization
try:
return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str)
except TypeError:
return str(data)
if isinstance(data, (int, float, bool)):
return str(data)
if isinstance(data, (list, tuple)):
try:
return json.dumps(data, ensure_ascii=False, default=str)
except TypeError:
return str(data)
return str(data)


@register_evaluator
class ATREvaluator(Evaluator[ATRConfig]):
"""ATR (Agent Threat Rules) evaluator.

Regex-based AI agent threat detection using community rules.
No external API calls or keys required.
"""

metadata = EvaluatorMetadata(
name="atr.threat_rules",
version="0.1.0",
description="Regex-based AI agent threat detection using ATR community rules",
requires_api_key=False,
timeout_ms=5000,
)

config_model = ATRConfig

@classmethod
def is_available(cls) -> bool:
"""Always available -- no optional dependencies."""
return _RULES_PATH.exists()

def __init__(self, config: ATRConfig) -> None:
super().__init__(config)
self.config = config

# Load and filter rules eagerly
raw_rules = _load_rules(_RULES_PATH)

min_level = _SEVERITY_ORDER.get(self.config.min_severity, 1)
allowed_categories = set(self.config.categories) if self.config.categories else None

self._compiled_rules: list[dict[str, Any]] = []
for rule in raw_rules:
severity = rule.get("severity", "medium").lower()
if _SEVERITY_ORDER.get(severity, 0) < min_level:
continue

category = rule.get("category", "")
if allowed_categories and category not in allowed_categories:
continue

compiled_patterns: list[dict[str, Any]] = []
for p in rule.get("patterns", []):
try:
compiled_patterns.append({
"regex": re.compile(p["pattern"], re.IGNORECASE),
"description": p.get("description", ""),
})
except re.error:
# Skip invalid patterns rather than failing entirely
continue

if compiled_patterns:
self._compiled_rules.append({
"id": rule.get("id", "unknown"),
"title": rule.get("title", ""),
"severity": severity,
"category": category,
"confidence": _SEVERITY_CONFIDENCE.get(severity, 0.75),
"patterns": compiled_patterns,
})

async def evaluate(self, data: Any) -> EvaluatorResult: # noqa: D401
"""Evaluate input data against ATR threat rules."""
if data is None:
return EvaluatorResult(matched=False, confidence=1.0, message="No data")

try:
text = _coerce_to_string(data)
except Exception as e: # noqa: BLE001
return self._error_result(f"Failed to coerce input: {e}")

if not text:
return EvaluatorResult(matched=False, confidence=1.0, message="Empty input")

try:
return self._match_rules(text)
except Exception as e: # noqa: BLE001
return self._error_result(f"ATR evaluation error: {e}")

def _match_rules(self, text: str) -> EvaluatorResult:
"""Run all compiled rules against the text and return all matches."""
all_findings: list[dict[str, Any]] = []
max_confidence = 0.0

for rule in self._compiled_rules:
for pattern_entry in rule["patterns"]:
regex: re.Pattern[str] = pattern_entry["regex"]
match = regex.search(text)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this loses an important part of the ATR model. The upstream rules are field/context scoped, but here every regex is run over one flattened text blob. For example, ATR-2026-00061 is intended for MCP tool args/tool responses, but this implementation will match something benign like Please rotate the database credentials tomorrow. because the word credentials appears anywhere in the selected data. We should preserve the ATR fields/scan target semantics, or at least make the broad free-text behavior explicit and opt-in.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also block the event loop for longer than the evaluator timeout. evaluate() is async, but the work here is CPU-bound synchronous regex scanning, so asyncio.wait_for in the engine cannot interrupt it until this function yields. On my machine, benign no-match input took about 66ms for 10KB, 664ms for 100KB, and 6.6s for 1MB. We need a hard input-length cap and/or a different execution strategy before this is safe on latency-sensitive paths.

if match:
all_findings.append({
"rule_id": rule["id"],
"title": rule["title"],
"severity": rule["severity"],
"category": rule["category"],
"matched_text": match.group()[:200],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not include the literal matched secret in result metadata. For credential rules, matched_text can be the AWS key/token/etc. that we just detected, and the SDK observability path copies result metadata into events by default. I'd redact, hash, or omit the matched text by default and only expose a safe snippet/offset if we really need debugging detail.

"pattern_description": pattern_entry["description"],
})
max_confidence = max(max_confidence, rule["confidence"])
break # one match per rule is enough, but continue to other rules

if all_findings:
matched = self.config.block_on_match
return EvaluatorResult(
matched=matched,
confidence=max_confidence,
message=f"ATR: {len(all_findings)} threat(s) detected",
metadata={
"findings": all_findings,
"count": len(all_findings),
"max_severity": all_findings[0]["severity"] if all_findings else None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_severity is currently just the first finding in rules-file order, not the maximum severity. I hit a case where a high-severity rule appeared before a critical credential rule, and the metadata reported max_severity='high'. This should compute the max using the severity ordering, and the legacy single-match fields probably should point at the highest-severity finding too.

# Keep backward-compatible single-match fields
"rule_id": all_findings[0]["rule_id"],
"title": all_findings[0]["title"],
"severity": all_findings[0]["severity"],
"category": all_findings[0]["category"],
"matched_text": all_findings[0]["matched_text"],
"pattern_description": all_findings[0]["pattern_description"],
},
)

return EvaluatorResult(
matched=False,
confidence=1.0,
message="ATR: No threats detected",
)

def _error_result(self, error_detail: str) -> EvaluatorResult:
"""Build an error result respecting the on_error policy."""
fallback = self.config.on_error
if fallback == "deny":
# fail-closed: matched=True, error=None (to satisfy model validator)
return EvaluatorResult(
matched=True,
confidence=0.0,
message=f"ATR evaluation error (fail-closed): {error_detail}",
metadata={"error": error_detail, "fallback_action": "deny"},
)
# fail-open: matched=False, error set
return EvaluatorResult(
matched=False,
confidence=0.0,
message=f"ATR evaluation error: {error_detail}",
metadata={"error": error_detail, "fallback_action": "allow"},
error=error_detail,
)

async def aclose(self) -> None:
"""No resources to clean up."""
Loading
Loading