diff --git a/references/rule-schema.json b/references/rule-schema.json new file mode 100644 index 0000000..bdf5a1a --- /dev/null +++ b/references/rule-schema.json @@ -0,0 +1,161 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://github.com/Wolfvin/CodeLens/blob/main/references/rule-schema.json", + "title": "CodeLens Rule YAML Schema", + "description": "JSON Schema for CodeLens rule YAML files. Used by `codelens rule-validate` to catch typos, unknown keys, missing required fields, and type mismatches before a rule is loaded by the engine. The schema describes the superset of taint-style (sources/sinks/sanitizers) and pattern-style (pattern/patterns) rules; cross-field constraints (pattern vs patterns mutually exclusive, fix requires pattern) are enforced separately by the validator because JSON Schema cannot express them cleanly.", + "type": "object", + "required": ["rules"], + "additionalProperties": false, + "properties": { + "rules": { + "type": "array", + "minItems": 1, + "items": {"$ref": "#/$defs/rule"} + } + }, + "$defs": { + "rule": { + "type": "object", + "required": ["id", "message", "severity", "language"], + "additionalProperties": false, + "properties": { + "id": { + "type": "string", + "minLength": 1, + "description": "Stable unique rule identifier, e.g. 'py/sql-injection' or 'owasp/A01/broken-access-control'. Used in findings, SARIF, and # ruleid: test markers." + }, + "name": { + "type": "string", + "description": "Human-readable rule name shown in reports." + }, + "message": { + "type": "string", + "minLength": 1, + "description": "Finding message displayed when the rule fires." + }, + "severity": { + "type": "string", + "enum": ["critical", "high", "medium", "low", "info"], + "description": "Severity level. Maps to SARIF result level: critical/high -> error, medium -> warning, low/info -> note." + }, + "language": { + "type": "string", + "minLength": 1, + "description": "Target language. CodeLens tree-sitter-supported: python, javascript, typescript, tsx, rust, html, css. Other languages are accepted but pattern parseability is skipped." + }, + "cwe": { + "type": "string", + "description": "CWE identifier, e.g. 'CWE-89'. Optional metadata." + }, + "owasp": { + "type": "string", + "description": "OWASP Top 10 category, e.g. 'A01:2021'. Optional metadata." + }, + "sources": { + "type": "array", + "items": {"type": "string"}, + "description": "Taint sources — where untrusted data enters (e.g. 'flask.request.args'). Used by taint-style rules." + }, + "sinks": { + "type": "array", + "items": {"type": "string"}, + "description": "Taint sinks — where untrusted data becomes dangerous (e.g. 'cursor.execute'). Used by taint-style rules." + }, + "sanitizers": { + "type": "array", + "items": {"type": "string"}, + "description": "Sanitizers — functions that make data safe (e.g. 'parameterized_query'). Used by taint-style rules." + }, + "pattern": { + "type": "string", + "description": "Pattern-style rule: a single AST pattern (Semgrep-compatible subset). Mutually exclusive with 'patterns'." + }, + "patterns": { + "type": "array", + "items": {"type": ["string", "object"]}, + "description": "Pattern-style rule: list of patterns (all must match). Mutually exclusive with 'pattern'." + }, + "pattern-either": { + "type": "array", + "items": {"type": "object"}, + "description": "Pattern-style rule: any of these patterns matches." + }, + "pattern-not": { + "type": ["string", "object"], + "description": "Pattern-style rule: this pattern must NOT match." + }, + "pattern-inside": { + "type": ["string", "object"], + "description": "Pattern-style rule: match must be inside this pattern." + }, + "pattern-not-inside": { + "type": ["string", "object"], + "description": "Pattern-style rule: match must NOT be inside this pattern." + }, + "pattern-regex": { + "type": "string", + "description": "Pattern-style rule: regex pattern (matched against source text, not AST)." + }, + "metavariable-regex": { + "type": "object", + "description": "Constrain a metavariable by regex." + }, + "metavariable-comparison": { + "type": "object", + "description": "Constrain a metavariable by Python expression." + }, + "fix": { + "type": "string", + "description": "Autofix replacement string (may reference metavariables). Requires 'pattern', 'patterns', or 'pattern-either'." + }, + "fix-regex": { + "type": "object", + "description": "Regex-based autofix. Requires 'pattern', 'patterns', or 'pattern-either'.", + "properties": { + "regex": {"type": "string"}, + "replacement": {"type": "string"}, + "count": {"type": "integer", "minimum": 0} + }, + "required": ["regex", "replacement"] + }, + "paths": { + "type": "object", + "description": "Per-rule path filter (gitignore-style globs).", + "properties": { + "include": {"type": "array", "items": {"type": "string"}}, + "exclude": {"type": "array", "items": {"type": "string"}} + } + }, + "metadata": { + "type": "object", + "description": "Free-form metadata dict. Serialized to SARIF 'properties' and JSON output 'metadata'." + }, + "options": { + "type": "object", + "description": "Per-rule engine options (constant_propagation, symbolic_propagation, taint_intrafile, etc.)." + }, + "timeout": { + "type": ["integer", "number"], + "minimum": 0, + "description": "Per-rule timeout in seconds (overrides global --timeout). Requires --allow-rule-timeout-control." + }, + "max-match-per-file": { + "type": "integer", + "minimum": 0, + "description": "Per-rule cap on reported matches per file." + }, + "project-depends-on": { + "type": "array", + "items": {"type": "object"}, + "description": "SCA rule: only match if project depends on the specified package(s).", + "properties": { + "namespace": {"type": "string"}, + "package": {"type": "string"}, + "version": {"type": "string"} + }, + "required": ["namespace", "package", "version"] + } + } + } + } +} diff --git a/scripts/commands/registry_validate.py b/scripts/commands/registry_validate.py new file mode 100644 index 0000000..2712bfd --- /dev/null +++ b/scripts/commands/registry_validate.py @@ -0,0 +1,44 @@ +"""registry-validate command — Validate registry against file system. + +Renamed from `validate` in v8.x to make room for `rule-validate` (rule YAML +validation). The old `validate` command name still works as a deprecated alias +(see ``scripts/commands/validate.py``) but prints a one-line stderr warning +and will be removed in a future release. +""" + +import sys + +from validate_engine import validate_registry +from commands import register_command + + +def add_args(parser): + """Register registry-validate arguments.""" + parser.add_argument( + "workspace", + nargs="?", + default=None, + help="Path to workspace root (auto-detected if omitted)", + ) + + +def execute(args, workspace): + """Execute the registry-validate command. + + Args: + args: Parsed argparse namespace with ``workspace``. + workspace: Resolved workspace root path. + + Returns: + Dict with the registry validation result (``validate_registry`` + return shape). + """ + return validate_registry(workspace) + + +register_command( + "registry-validate", + "Validate registry against file system (renamed from `validate`)", + add_args, + execute, +) diff --git a/scripts/commands/rule_test.py b/scripts/commands/rule_test.py new file mode 100644 index 0000000..cb3bf91 --- /dev/null +++ b/scripts/commands/rule_test.py @@ -0,0 +1,183 @@ +"""rule-test command — snapshot testing for rule YAML files. + +Runs a rule against positive/negative code samples (``.test.yaml``) and +verifies the rule fires (or doesn't fire) where expected via inline +``# ruleid: `` / ``# ok`` markers. All logic lives in +``scripts/rule_test_runner.py``; this file is the thin CLI wrapper. + +Usage:: + + codelens rule-test tests/rule_fixtures/py_sql_injection.yaml + codelens rule-test tests/rule_fixtures/ # run all rules in a dir + codelens rule-test --json tests/rule_fixtures/ + codelens rule-test --test-ignore-todo tests/rule_fixtures/ + +Exit codes: + 0 — all tests pass (or no tests ran) + 1 — at least one test failed or errored +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Dict, List + +from commands import register_command +from rule_test_runner import ( + TestResult, + determine_exit_code, + run_tests, + run_tests_recursive, +) + + +def add_args(parser): + """Register rule-test CLI arguments.""" + parser.add_argument( + "rule_path", + help="Path to a rule YAML file or a directory of rule files", + ) + parser.add_argument( + "--test-ignore-todo", + action="store_true", + default=False, + help="Skip '# todoruleid:' markers (staged rules not yet enforced)", + ) + parser.add_argument( + "--json", + dest="json_output", + action="store_true", + default=False, + help="Output machine-readable JSON instead of human-readable text", + ) + + +def _format_human(results: List[TestResult]) -> str: + """Render test results as human-readable text. + + One block per rule: ``: PASS (3/3 samples)`` or fail with + a per-failure diff. Ends with a summary line. + """ + lines: List[str] = [] + total_pass = sum(1 for r in results if r.is_pass) + total_fail = sum(1 for r in results if not r.is_pass) + total_samples = sum(r.total for r in results) + total_passed_samples = sum(r.passed for r in results) + total_skipped = sum(r.skipped for r in results) + + for result in results: + rule_id = result.rule_id or Path(result.rule_path).stem + if result.error: + lines.append(f"\n{rule_id}: ERROR — {result.error}") + continue + + if result.total == 0: + lines.append(f"\n{rule_id}: SKIP (no samples)") + continue + + # Per-rule verdict line — the most important line for CI parsers. + verdict = "PASS" if result.is_pass else "FAIL" + sample_summary = f"{result.passed}/{result.total} samples" + if result.skipped: + sample_summary += f" ({result.skipped} skipped)" + lines.append(f"\n{rule_id}: {verdict} ({sample_summary})") + + # Per-failure detail so authors can fix the rule. + for failure in result.failures: + lines.append(f" ✗ {failure.sample_name} line {failure.line}: {failure.message}") + + # Summary line. + lines.append("\n" + "=" * 60) + if total_fail > 0: + lines.append( + f"FAIL: {total_fail}/{len(results)} rule(s) failed, " + f"{total_passed_samples}/{total_samples} samples passed " + f"({total_skipped} skipped)" + ) + else: + lines.append( + f"PASS: {total_pass}/{len(results)} rule(s), " + f"{total_passed_samples}/{total_samples} samples passed " + f"({total_skipped} skipped)" + ) + + return "\n".join(lines) + + +def _format_json(results: List[TestResult]) -> str: + """Render test results as JSON for CI / programmatic consumers.""" + payload: Dict[str, Any] = { + "status": "ok" if all(r.is_pass for r in results) else "fail", + "exit_code": determine_exit_code(results), + "total_rules": len(results), + "total_pass": sum(1 for r in results if r.is_pass), + "total_fail": sum(1 for r in results if not r.is_pass), + "total_samples": sum(r.total for r in results), + "total_passed_samples": sum(r.passed for r in results), + "total_skipped": sum(r.skipped for r in results), + "results": [r.to_dict() for r in results], + } + return json.dumps(payload, indent=2) + + +def execute(args, workspace): + """Execute the rule-test command. + + Returns a dict (so the result flows through the standard CodeLens + output formatter) AND sets the process exit code via ``sys.exit`` so + CI pipelines get the correct 0/1 signal. + + Args: + args: Parsed argparse namespace with ``rule_path``, ``test_ignore_todo``, + and ``json_output``. + workspace: Workspace root (unused — rule-test is path-based). + + Returns: + Dict with ``status``, ``exit_code``, ``results``, and the rendered + ``output`` string (human or JSON). + """ + raw_path = os.path.expanduser(args.rule_path) + path = Path(raw_path).resolve() + + if not path.exists(): + # Surface a clear error rather than crashing — the path may be a + # typo, and the user benefits from an actionable message. + print(f"Error: path does not exist: {path}", file=sys.stderr) + sys.exit(1) + + # A single file → run tests for that one rule. A directory → walk and + # run tests for every rule with a ``.test.yaml`` companion. + if path.is_file(): + results = [run_tests(path, ignore_todo=args.test_ignore_todo)] + else: + results = run_tests_recursive(path, ignore_todo=args.test_ignore_todo) + + exit_code = determine_exit_code(results) + + if args.json_output: + output = _format_json(results) + else: + output = _format_human(results) + + print(output) + sys.exit(exit_code) + + # Unreachable, but keeps the return-type contract honest for callers + # that import ``execute`` directly (e.g., tests). + return { + "status": "ok" if exit_code == 0 else "fail", + "exit_code": exit_code, + "results": [r.to_dict() for r in results], + "output": output, + } + + +register_command( + "rule-test", + "Run snapshot tests for rule YAML files (inline # ruleid: / # ok markers)", + add_args, + execute, +) diff --git a/scripts/commands/rule_validate.py b/scripts/commands/rule_validate.py new file mode 100644 index 0000000..5193411 --- /dev/null +++ b/scripts/commands/rule_validate.py @@ -0,0 +1,200 @@ +"""rule-validate command — validate rule YAML files for typos and schema errors. + +Catches the silent-skip class of bugs: typos (``pattern-eiter`` vs +``pattern-either``), unknown keys, missing required fields, invalid +``severity`` enum, unparseable ``pattern`` strings, and cross-field +violations (``pattern`` + ``patterns`` mutually exclusive, ``fix`` requires +``pattern``). All logic lives in ``scripts/rule_validator.py``; this file +is the thin CLI wrapper. + +Exit codes: + 0 — all rules valid (no errors, no warnings without ``--strict``) + 1 — at least one rule has an error + 2 — at least one rule has a warning AND ``--strict`` is set + +Usage:: + + codelens rule-validate scripts/rules/python_security.yaml + codelens rule-validate --strict scripts/rules/*.yaml + codelens rule-validate --json scripts/rules/python_security.yaml +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Dict, List + +from commands import register_command +from rule_validator import ( + ValidationResult, + determine_exit_code, + validate_rule, + validate_rule_files, +) + +# Exit codes — kept as named constants so the command and its tests agree. +EXIT_OK = 0 +EXIT_ERROR = 1 +EXIT_WARNING_STRICT = 2 + + +def add_args(parser): + """Register rule-validate CLI arguments.""" + parser.add_argument( + "rule_path", + nargs="+", + help="Path(s) to rule YAML file(s) to validate (one or more)", + ) + parser.add_argument( + "--strict", + action="store_true", + default=False, + help="Treat warnings as errors for exit-code purposes (exit 2 instead of 0)", + ) + parser.add_argument( + "--json", + dest="json_output", + action="store_true", + default=False, + help="Output machine-readable JSON instead of human-readable text", + ) + + +def _format_human(results: List[ValidationResult], strict: bool) -> str: + """Render validation results as human-readable text. + + One block per rule file: header line, then errors (✗) and warnings (⚠), + each with file:line: message. Ends with a summary line. + """ + lines: List[str] = [] + total_errors = sum(len(r.errors) for r in results) + total_warnings = sum(len(r.warnings) for r in results) + total_rules = sum(r.rules_checked for r in results) + valid_files = sum(1 for r in results if r.is_valid) + + for result in results: + # Header: file path + ✓/✗ status badge. + status = "✓ valid" if result.is_valid else "✗ invalid" + lines.append(f"\n{result.rule_path} — {status} ({result.rules_checked} rules)") + lines.append("─" * 60) + + if not result.errors and not result.warnings: + lines.append(" No issues found.") + continue + + # Errors first (always surface the most important issues up top). + for issue in result.errors: + loc = f"line {issue.line}: " if issue.line else "" + lines.append(f" ✗ [{issue.category}] {loc}{issue.message}") + + # Then warnings. + for issue in result.warnings: + loc = f"line {issue.line}: " if issue.line else "" + lines.append(f" ⚠ [{issue.category}] {loc}{issue.message}") + + # Summary line — gives CI parsers and humans a one-line verdict. + lines.append("\n" + "=" * 60) + if total_errors > 0: + lines.append( + f"FAIL: {total_errors} error(s), {total_warnings} warning(s) " + f"across {len(results)} file(s), {total_rules} rule(s)" + ) + elif total_warnings > 0 and strict: + lines.append( + f"FAIL (--strict): {total_warnings} warning(s) treated as errors " + f"across {len(results)} file(s), {total_rules} rule(s)" + ) + elif total_warnings > 0: + lines.append( + f"PASS with warnings: {valid_files}/{len(results)} file(s) valid, " + f"{total_warnings} warning(s) (use --strict to fail on warnings)" + ) + else: + lines.append( + f"PASS: {len(results)}/{len(results)} file(s) valid, {total_rules} rule(s) checked" + ) + + return "\n".join(lines) + + +def _format_json(results: List[ValidationResult], strict: bool) -> str: + """Render validation results as JSON for CI / programmatic consumers.""" + payload: Dict[str, Any] = { + "status": "ok" if all(r.is_valid for r in results) else "error", + "strict": strict, + "exit_code": determine_exit_code(results, strict=strict), + "total_files": len(results), + "total_rules": sum(r.rules_checked for r in results), + "total_errors": sum(len(r.errors) for r in results), + "total_warnings": sum(len(r.warnings) for r in results), + "results": [r.to_dict() for r in results], + } + return json.dumps(payload, indent=2) + + +def execute(args, workspace): + """Execute the rule-validate command. + + Returns a dict (so the result flows through the standard CodeLens + output formatter) AND sets the process exit code via ``sys.exit`` so + CI pipelines get the correct 0/1/2 signal. + + Args: + args: Parsed argparse namespace with ``rule_path`` (list), + ``strict`` (bool), and ``json_output`` (bool). + workspace: Workspace root (unused — rule-validate is path-based). + + Returns: + Dict with ``status``, ``exit_code``, ``results``, and the rendered + ``output`` string (human or JSON). + """ + # Expand and deduplicate paths. ``args.rule_path`` is a list (nargs="+"). + paths: List[Path] = [] + seen: set = set() + for raw in args.rule_path: + # Expand ``~`` and resolve to absolute. We don't follow symlinks + # here — a missing file is reported as a validation error below. + p = Path(os.path.expanduser(raw)).resolve() + if p in seen: + continue + seen.add(p) + paths.append(p) + + # Validate each path. Missing files produce a single-error result + # rather than crashing — the validator's ``_parse_yaml`` already + # handles ``OSError`` and records it as a yaml_syntax error. + results = validate_rule_files(paths) + + exit_code = determine_exit_code(results, strict=args.strict) + + if args.json_output: + output = _format_json(results, args.strict) + else: + output = _format_human(results, args.strict) + + # Print to stdout so the report is pipeable, then exit with the + # contract code. We use ``sys.exit`` from inside the command (rather + # than returning a sentinel) because rule-validate is fundamentally a + # CI gate — the exit code IS the result. + print(output) + sys.exit(exit_code) + + # Unreachable, but keeps the return-type contract honest for callers + # that import ``execute`` directly (e.g., tests). + return { + "status": "ok" if exit_code == 0 else "error", + "exit_code": exit_code, + "results": [r.to_dict() for r in results], + "output": output, + } + + +register_command( + "rule-validate", + "Validate rule YAML files for typos, schema errors, and unparseable patterns", + add_args, + execute, +) diff --git a/scripts/commands/validate.py b/scripts/commands/validate.py index 2a7ca01..3fc14b3 100644 --- a/scripts/commands/validate.py +++ b/scripts/commands/validate.py @@ -1,16 +1,59 @@ -"""Validate command — Validate registry against file system.""" +"""validate command — DEPRECATED alias for ``registry-validate``. + +This command was renamed to ``registry-validate`` to make room for the new +``rule-validate`` command (rule YAML validation). It still works for one +release cycle but prints a deprecation warning to stderr. It will be removed +in a future release — switch to ``codelens registry-validate``. +""" + +import sys from validate_engine import validate_registry from commands import register_command +# Deprecation notice — printed once per invocation to stderr (NOT stdout, which +# is reserved for JSON/machine-readable output). Surfaced in both interactive +# and CI usage so users notice and migrate before the alias is removed. +_DEPRECATION_WARNING = ( + "[CodeLens] DEPRECATED: `codelens validate` is renamed to " + "`codelens registry-validate`. The old name still works for one release " + "cycle but will be removed. Use `registry-validate` for registry checks, " + "or `rule-validate` for rule YAML validation.\n" +) + def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") + """Register validate (deprecated alias) arguments — same as registry-validate.""" + parser.add_argument( + "workspace", + nargs="?", + default=None, + help="Path to workspace root (auto-detected if omitted)", + ) def execute(args, workspace): + """Execute the deprecated validate command. + + Prints a deprecation warning to stderr, then delegates to + ``validate_registry`` (same behavior as ``registry-validate``). + + Args: + args: Parsed argparse namespace with ``workspace``. + workspace: Resolved workspace root path. + + Returns: + Dict with the registry validation result. + """ + print(_DEPRECATION_WARNING, file=sys.stderr, end="") return validate_registry(workspace) -register_command("validate", "Validate registry against file system", add_args, execute) +# Register under the legacy name so existing scripts / muscle memory keep +# working. The new canonical name is registered in ``registry_validate.py``. +register_command( + "validate", + "DEPRECATED — use `registry-validate` instead", + add_args, + execute, +) diff --git a/scripts/rule_test_runner.py b/scripts/rule_test_runner.py new file mode 100644 index 0000000..5138c62 --- /dev/null +++ b/scripts/rule_test_runner.py @@ -0,0 +1,552 @@ +"""Rule Test Runner for CodeLens — snapshot testing for rule YAML files. + +Runs a rule against positive/negative code samples and verifies the rule +fires (or doesn't fire) where expected. The test format uses inline +``# ruleid: `` markers (expect a finding on this line) and +``# ok`` markers (expect no finding on this line) embedded in sample +source code, so authors can keep test fixtures next to the rule itself. + +Test file convention (inline format):: + + # tests/rule_fixtures/py_sql_injection.test.yaml + rule: py/sql-injection + samples: + - name: positive_basic + language: python + code: | + user = request.args.get('name') + cursor.execute("SELECT * FROM users WHERE name = '" + user + "'") # ruleid: py/sql-injection + - name: negative_sanitized + language: python + code: | + user = request.args.get('name') + safe = parameterized_query(user) + cursor.execute("SELECT ... WHERE name = %s", (safe,)) # ok + +The runner is decoupled from the CLI (``scripts/commands/rule_test.py``) +so it can be reused by CI pipelines and programmatic callers. + +Note: this runner exercises the existing taint/semantic engine +(``scripts/semantic_engine.py``) for taint-style rules (sources/sinks/ +sanitizers). Pattern-style rules (``pattern:`` field) are not yet +supported by the engine — samples for those rules are reported as +``skipped`` rather than failed, so authors can still scaffold tests +ahead of the pattern engine landing. +""" + +from __future__ import annotations + +import os +import re +import tempfile +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +# ─── Public dataclasses ──────────────────────────────────────────────── + + +@dataclass +class TestFailure: + """A single failed test expectation. + + Attributes: + sample_name: Name of the sample (from the ``name:`` field). + line: 1-based line number where the expectation was set. + expected: ``"finding"`` (ruleid marker) or ``"no-finding"`` (ok marker). + actual: ``"finding"`` or ``"no-finding"`` -- what actually happened. + rule_id: The rule ID the expectation was about. + message: Human-readable explanation of the mismatch. + + Note: ``__test__ = False`` prevents pytest from collecting this + dataclass as a test class (its name starts with ``Test``). + """ + + __test__ = False # type: ignore[assignment] + + sample_name: str + line: int + expected: str + actual: str + rule_id: str + message: str + + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict.""" + return asdict(self) + + +@dataclass +class TestResult: + """Aggregate test result for one rule file. + + Attributes: + rule_path: Path to the rule YAML file. + rule_id: Rule ID extracted from the rule file. + test_path: Path to the ``.test.yaml`` file (``None`` if not found). + total: Total number of samples run. + passed: Number of samples that passed all expectations. + failed: Number of samples with at least one failed expectation. + skipped: Number of samples skipped (e.g., pattern-style rule). + failures: List of ``TestFailure`` for failed expectations. + error: When the test file itself cannot be parsed/loaded. + + Note: ``__test__ = False`` prevents pytest from collecting this + dataclass as a test class (its name starts with ``Test``). + """ + + __test__ = False # type: ignore[assignment] + + rule_path: str + rule_id: str = "" + test_path: Optional[str] = None + total: int = 0 + passed: int = 0 + failed: int = 0 + skipped: int = 0 + failures: List[TestFailure] = field(default_factory=list) + error: Optional[str] = None + + @property + def is_pass(self) -> bool: + """``True`` if no failures and no error.""" + return self.failed == 0 and self.error is None + + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict.""" + return { + "rule_path": self.rule_path, + "rule_id": self.rule_id, + "test_path": self.test_path, + "total": self.total, + "passed": self.passed, + "failed": self.failed, + "skipped": self.skipped, + "is_pass": self.is_pass, + "failures": [f.to_dict() for f in self.failures], + "error": self.error, + } + + +# ─── Marker parsing ──────────────────────────────────────────────────── + +# ``# ruleid: `` — expect a finding on this line. The marker +# can appear at end of code line OR on its own line above the code line +# (the runner handles both by attributing the expectation to the next +# non-comment, non-blank code line when the marker is standalone). +_RULEID_RE = re.compile(r"#\s*ruleid:\s*([\w/.\-]+)") + +# ``# ok`` — expect NO finding on this line. Same attribution rule. +_OK_RE = re.compile(r"#\s*ok\b") + +# ``# todoruleid: `` — same as ruleid but skipped when +# ``--test-ignore-todo`` is set. Useful for staging upcoming rules. +_TODORULEID_RE = re.compile(r"#\s*todoruleid:\s*([\w/.\-]+)") + + +@dataclass +class _Expectation: + """Internal: one parsed expectation from a sample's markers.""" + + line: int # 1-based line the expectation applies to + kind: str # "finding" or "no-finding" + rule_id: str # the rule id the expectation is about + is_todo: bool = False # True for todoruleid (skipped with --test-ignore-todo) + + +def _parse_markers(code: str, ignore_todo: bool = False) -> List[_Expectation]: + """Parse ``# ruleid:`` / ``# ok`` / ``# todoruleid:`` markers from code. + + Markers may appear: + * Inline at end of a code line → expectation applies to that line. + * Standalone on their own line → expectation applies to the next + non-comment, non-blank line below. + + Args: + code: The sample source code (multi-line string). + ignore_todo: When ``True``, ``# todoruleid:`` markers are dropped. + + Returns: + List of ``_Expectation``, one per marker found. + """ + expectations: List[_Expectation] = [] + lines = code.split("\n") + pending_finding: Optional[Tuple[str, bool]] = None # (rule_id, is_todo) + + for idx, line in enumerate(lines, start=1): + # Check for ``# ok`` first — it's the no-finding marker. + if _OK_RE.search(line): + # Inline ``# ok`` — applies to this line. + expectations.append(_Expectation(line=idx, kind="no-finding", rule_id="")) + pending_finding = None + continue + + # Check for ``# ruleid: `` (inline or standalone). + m = _RULEID_RE.search(line) + if m: + rule_id = m.group(1) + # If the line has code before the marker, it's inline. + code_before = line[: m.start()].rstrip() + if code_before: + expectations.append(_Expectation(line=idx, kind="finding", rule_id=rule_id)) + else: + # Standalone marker — attribute to next code line. + pending_finding = (rule_id, False) + continue + + # Check for ``# todoruleid: ``. + m = _TODORULEID_RE.search(line) + if m: + if ignore_todo: + continue + rule_id = m.group(1) + code_before = line[: m.start()].rstrip() + if code_before: + expectations.append( + _Expectation(line=idx, kind="finding", rule_id=rule_id, is_todo=True) + ) + else: + pending_finding = (rule_id, True) + continue + + # Plain code line — if there's a pending standalone marker, + # attribute it here. + if pending_finding is not None: + stripped = line.strip() + if stripped and not stripped.startswith("#"): + rule_id, is_todo = pending_finding + expectations.append( + _Expectation(line=idx, kind="finding", rule_id=rule_id, is_todo=is_todo) + ) + pending_finding = None + + return expectations + + +# ─── Rule loading ────────────────────────────────────────────────────── + + +def _load_rule(rule_path: Path) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """Load a single rule from a YAML file. + + Returns ``(rule_dict, error_message)``. If the file contains multiple + rules under ``rules:``, the first one is returned (test files are + expected to be one-rule-per-file). + """ + try: + data = yaml.safe_load(rule_path.read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + return None, f"YAML parse error: {exc}" + except OSError as exc: + return None, f"Cannot read file: {exc}" + + if not isinstance(data, dict): + return None, f"Top-level YAML must be a mapping, got {type(data).__name__}" + + rules = data.get("rules") + if not isinstance(rules, list) or not rules: + return None, "No 'rules' list found (or list is empty)" + + first = rules[0] + if not isinstance(first, dict): + return None, f"First rule entry must be a mapping, got {type(first).__name__}" + + return first, None + + +def _find_test_file(rule_path: Path) -> Optional[Path]: + """Find the ``.test.yaml`` companion for a rule file. + + Convention: ``foo.yaml`` → ``foo.test.yaml`` (same directory). + Also accepts ``foo.yml`` → ``foo.test.yaml``. + """ + stem = rule_path.stem # ``foo`` from ``foo.yaml`` + candidate = rule_path.with_name(f"{stem}.test.yaml") + if candidate.exists(): + return candidate + # Also accept ``.test.yml``. + candidate = rule_path.with_name(f"{stem}.test.yml") + if candidate.exists(): + return candidate + return None + + +# ─── Sample execution ────────────────────────────────────────────────── + + +def _run_sample( + rule: Dict[str, Any], + sample: Dict[str, Any], + ignore_todo: bool = False, +) -> Tuple[List[TestFailure], int, bool]: + """Run one sample against the rule. + + Returns ``(failures, expectations_count, skipped)``. + + * ``failures`` — list of ``TestFailure`` for mismatched expectations. + * ``expectations_count`` — total expectations checked. + * ``skipped`` — ``True`` when the sample was skipped (pattern-style + rule, or unsupported language). + """ + code = sample.get("code", "") + language = sample.get("language", rule.get("language", "")) + sample_name = sample.get("name", "unnamed") + + # Pattern-style rules (``pattern:`` field) are not yet supported by + # the taint engine. Skip them with a clear message rather than + # failing — authors can scaffold tests ahead of the pattern engine. + if "pattern" in rule or "patterns" in rule: + return [], 0, True + + # The taint engine only knows how to analyze python / javascript / + # typescript. Other languages → skip. + if language not in ("python", "javascript", "typescript"): + return [], 0, True + + expectations = _parse_markers(code, ignore_todo=ignore_todo) + if not expectations: + # No markers → nothing to verify. Treat as a pass with 0 checks + # (the sample ran but had no expectations). + return [], 0, False + + # Write the sample to a temp file so the engine can analyze it. + # Using a real file (rather than in-memory) keeps the engine's + # file-path-based reporting intact. + suffix = ".py" if language == "python" else ".js" if language == "javascript" else ".ts" + with tempfile.NamedTemporaryFile( + mode="w", suffix=suffix, delete=False, encoding="utf-8" + ) as tmp: + tmp.write(code) + tmp_path = tmp.name + + try: + findings = _analyze_with_rule(rule, tmp_path, language) + finally: + try: + os.unlink(tmp_path) + except OSError: + pass + + # Build a set of (rule_id, line) for findings on the lines we care + # about. The engine reports line numbers as 1-based. + finding_lines: Dict[int, List[str]] = {} + for f in findings: + f_line = f.get("line") + f_rule = f.get("rule_id", "") + if f_line is not None: + finding_lines.setdefault(f_line, []).append(f_rule) + + failures: List[TestFailure] = [] + for exp in expectations: + rules_at_line = finding_lines.get(exp.line, []) + has_finding = bool(rules_at_line) + + if exp.kind == "finding": + if not has_finding: + failures.append( + TestFailure( + sample_name=sample_name, + line=exp.line, + expected="finding", + actual="no-finding", + rule_id=exp.rule_id, + message=( + f"Expected finding for rule '{exp.rule_id}' on line " + f"{exp.line}, but no finding was reported" + ), + ) + ) + elif exp.rule_id and exp.rule_id not in rules_at_line: + failures.append( + TestFailure( + sample_name=sample_name, + line=exp.line, + expected="finding", + actual=f"finding (wrong rule: {rules_at_line})", + rule_id=exp.rule_id, + message=( + f"Expected finding for rule '{exp.rule_id}' on line " + f"{exp.line}, but got findings for {rules_at_line}" + ), + ) + ) + elif exp.kind == "no-finding": + if has_finding: + failures.append( + TestFailure( + sample_name=sample_name, + line=exp.line, + expected="no-finding", + actual=f"finding ({rules_at_line})", + rule_id=",".join(rules_at_line), + message=( + f"Expected NO finding on line {exp.line}, but got " + f"findings for {rules_at_line}" + ), + ) + ) + + return failures, len(expectations), False + + +def _analyze_with_rule( + rule: Dict[str, Any], + file_path: str, + language: str, +) -> List[Dict[str, Any]]: + """Run the semantic engine with a single rule on a single file. + + Wraps ``semantic_engine.TaintAnalyzer`` so we can test one rule in + isolation (the engine normally loads all rules from ``scripts/rules/``). + """ + try: + from semantic_engine import TaintAnalyzer + except ImportError: + return [] + + analyzer = TaintAnalyzer(rules=[rule], language=language) + return analyzer.analyze_file(file_path) + + +# ─── Public entry point ──────────────────────────────────────────────── + + +def run_tests(rule_path: Path, ignore_todo: bool = False) -> TestResult: + """Run tests for a single rule file. + + Looks for ``.test.yaml`` next to the rule file, parses its + ``samples:``, runs each sample through the engine, and compares + findings to the inline ``# ruleid:`` / ``# ok`` markers. + + Args: + rule_path: Path to the rule YAML file. + ignore_todo: When ``True``, ``# todoruleid:`` markers are skipped. + + Returns: + ``TestResult`` with pass/fail counts and per-expectation failures. + """ + rule_path = Path(rule_path) + result = TestResult(rule_path=str(rule_path)) + + # Load the rule (we need its ``id`` and ``language`` for the test). + rule, err = _load_rule(rule_path) + if err is not None: + result.error = err + return result + + result.rule_id = rule.get("id", "") + + # Find the companion ``.test.yaml`` file. + test_path = _find_test_file(rule_path) + if test_path is None: + result.error = ( + f"No test file found. Expected '{rule_path.stem}.test.yaml' " + f"next to '{rule_path.name}'" + ) + return result + + result.test_path = str(test_path) + + # Parse the test file. + try: + test_data = yaml.safe_load(test_path.read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + result.error = f"Test file YAML parse error: {exc}" + return result + + if not isinstance(test_data, dict): + result.error = f"Test file top-level must be a mapping, got {type(test_data).__name__}" + return result + + samples = test_data.get("samples") + if not isinstance(samples, list) or not samples: + result.error = "Test file must contain a 'samples' list (non-empty)" + return result + + result.total = len(samples) + + for sample in samples: + if not isinstance(sample, dict): + result.failed += 1 + result.failures.append( + TestFailure( + sample_name="unnamed", + line=0, + expected="sample", + actual="invalid", + rule_id=result.rule_id, + message=f"Sample must be a mapping, got {type(sample).__name__}", + ) + ) + continue + + failures, exp_count, skipped = _run_sample(rule, sample, ignore_todo=ignore_todo) + if skipped: + result.skipped += 1 + continue + + if failures: + result.failed += 1 + result.failures.extend(failures) + else: + result.passed += 1 + + return result + + +def run_tests_recursive( + path: Path, + ignore_todo: bool = False, +) -> List[TestResult]: + """Run tests for every rule file under a directory (or a single file). + + Walks the path looking for ``.yaml`` / ``.yml`` rule files. For each + one that has a companion ``.test.yaml``, runs the tests. Rule files + without a test companion are skipped silently (reported in the + returned list with ``error`` set, so callers can surface them). + + Args: + path: Directory or single rule file path. + ignore_todo: When ``True``, ``# todoruleid:`` markers are skipped. + + Returns: + List of ``TestResult``, one per rule file that has a test companion. + """ + path = Path(path) + rule_files: List[Path] = [] + + if path.is_file(): + if path.suffix in (".yaml", ".yml") and not path.name.endswith(".test.yaml"): + rule_files.append(path) + elif path.is_dir(): + # Walk and collect rule files (skip ``.test.yaml`` files — those + # are test fixtures, not rules themselves). + for entry in sorted(path.rglob("*.y*ml")): + if entry.name.endswith(".test.yaml") or entry.name.endswith(".test.yml"): + continue + if entry.name.startswith("."): + continue + rule_files.append(entry) + + results: List[TestResult] = [] + for rule_file in rule_files: + # Only include rules that have a test companion — keeps the + # output focused on what was actually tested. + if _find_test_file(rule_file) is not None: + results.append(run_tests(rule_file, ignore_todo=ignore_todo)) + + return results + + +def determine_exit_code(results: List[TestResult]) -> int: + """Determine the process exit code from test results. + + * ``0`` — all tests pass (or no tests ran). + * ``1`` — at least one test failed or errored. + """ + for r in results: + if not r.is_pass: + return 1 + return 0 diff --git a/scripts/rule_validator.py b/scripts/rule_validator.py new file mode 100644 index 0000000..5f7936a --- /dev/null +++ b/scripts/rule_validator.py @@ -0,0 +1,562 @@ +"""Rule Validator for CodeLens — validates rule YAML files. + +Catches the silent-skip class of bugs: typos, unknown keys, missing required +fields, invalid enum values, unparseable patterns, and cross-field violations. +Designed to be reused by the ``rule-validate`` CLI command, pre-commit hooks, +CI pipelines, and programmatic callers. + +Validation pipeline (4 stages, fail-fast per rule but continue across rules): + +1. **YAML syntax** — parse the file; report unclosed quotes, bad indentation, + duplicate keys, and other YAML errors with the line reported by the parser. +2. **Schema** — required fields (``id``, ``message``, ``severity``, ``language``), + enum ``severity`` (critical/high/medium/low/info), and unknown-key detection + (catches typos like ``pattern-eiter`` vs ``pattern-either``). +3. **Pattern parseability** — when a ``pattern`` field is present, compile it + with tree-sitter for the rule's ``language`` and report syntax errors. + Falls back to a graceful warning when tree-sitter or the language grammar is + unavailable (the validator never hard-fails on a missing optional dep). +4. **Cross-field** — ``pattern`` and ``patterns`` are mutually exclusive; + ``fix`` requires either ``pattern`` or ``patterns``. + +The dataclasses (``ValidationIssue``, ``ValidationResult``) are the public +contract — callers can serialize them to JSON, render human-readable reports, +or feed them into CI exit-code logic. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +# ─── Public dataclasses ──────────────────────────────────────────────── + + +@dataclass +class ValidationIssue: + """A single validation finding (error or warning) for one rule. + + Attributes: + level: ``"error"`` or ``"warning"``. Errors fail validation; + warnings only fail when ``--strict`` is set. + category: Stable machine-readable category. One of: + ``yaml_syntax``, ``schema``, ``pattern``, ``cross_field``, + ``unknown_key``. + message: Human-readable description of the issue. + line: 1-based line number in the rule file, when known. ``None`` if + the issue is file-level (e.g., YAML parse failure) or the line + cannot be determined. + """ + + level: str + category: str + message: str + line: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict.""" + return asdict(self) + + +@dataclass +class ValidationResult: + """Aggregate validation result for one rule file. + + Attributes: + rule_path: Path to the rule file that was validated. + is_valid: ``True`` if there are no errors (warnings alone do not + invalidate unless the caller applies ``--strict``). + errors: List of error-level issues. + warnings: List of warning-level issues. + rules_checked: Count of rule entries inside the YAML (the + ``rules:`` list). 0 if the file failed YAML parsing. + """ + + rule_path: str + is_valid: bool = True + errors: List[ValidationIssue] = field(default_factory=list) + warnings: List[ValidationIssue] = field(default_factory=list) + rules_checked: int = 0 + + @property + def has_warnings(self) -> bool: + """``True`` if any warning-level issue was recorded.""" + return len(self.warnings) > 0 + + def add_error(self, category: str, message: str, line: Optional[int] = None) -> None: + """Record an error and flip ``is_valid`` to ``False``.""" + self.errors.append(ValidationIssue("error", category, message, line)) + self.is_valid = False + + def add_warning(self, category: str, message: str, line: Optional[int] = None) -> None: + """Record a warning. Does not flip ``is_valid`` — caller applies ``--strict``.""" + self.warnings.append(ValidationIssue("warning", category, message, line)) + + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict.""" + return { + "rule_path": self.rule_path, + "is_valid": self.is_valid, + "has_warnings": self.has_warnings, + "rules_checked": self.rules_checked, + "errors": [issue.to_dict() for issue in self.errors], + "warnings": [issue.to_dict() for issue in self.warnings], + } + + +# ─── Constants ───────────────────────────────────────────────────────── + +# Required top-level fields on every rule entry. Missing any → schema error. +REQUIRED_FIELDS: Tuple[str, ...] = ("id", "message", "severity", "language") + +# Allowed severity values (lowercase). Anything else → schema error. +VALID_SEVERITIES: Tuple[str, ...] = ("critical", "high", "medium", "low", "info") + +# Known optional fields on a rule entry. Anything outside this set (and +# outside REQUIRED_FIELDS) is flagged as an ``unknown_key`` warning — the +# rule still validates, but the author probably has a typo (e.g., +# ``pattern-eiter`` instead of ``pattern-either``). +KNOWN_OPTIONAL_FIELDS: Tuple[str, ...] = ( + "name", + "cwe", + "owasp", + # Pattern-style rules (Semgrep-compatible subset) + "pattern", + "patterns", + "pattern-either", + "pattern-not", + "pattern-inside", + "pattern-not-inside", + "pattern-regex", + "metavariable-regex", + "metavariable-comparison", + # Taint-style rules (CodeLens native — sources/sinks/sanitizers) + "sources", + "sinks", + "sanitizers", + # Autofix + "fix", + "fix-regex", + # Routing / metadata + "paths", + "metadata", + "options", + "timeout", + "max-match-per-file", + # Project-level rule option (depends-on) for SCA rules + "project-depends-on", +) + +# Languages CodeLens can compile patterns for via tree-sitter. Other +# languages are accepted (rule still validates) but pattern parseability +# is skipped with a warning. +TREE_SITTER_LANGUAGES: Tuple[str, ...] = ( + "python", + "javascript", + "typescript", + "tsx", + "rust", + "html", + "css", +) + + +# ─── Stage 1: YAML syntax ────────────────────────────────────────────── + + +def _parse_yaml(rule_path: Path) -> Tuple[Optional[Dict[str, Any]], Optional[ValidationIssue]]: + """Parse a YAML rule file. + + Args: + rule_path: Path to the ``.yaml`` / ``.yml`` file. + + Returns: + Tuple of (parsed_dict_or_None, parse_error_or_None). When YAML + parsing fails, the dict is ``None`` and the issue captures the + parser's error message + line (when the parser exposes one). + """ + try: + text = rule_path.read_text(encoding="utf-8") + except OSError as exc: + return None, ValidationIssue( + level="error", + category="yaml_syntax", + message=f"Cannot read file: {exc}", + line=None, + ) + + try: + # ``Loader=yaml.SafeLoader`` is the safe default; we only expect + # plain mappings/lists/strings in rule files. + data = yaml.safe_load(text) + except yaml.YAMLError as exc: + # ``exc.problem_mark`` carries the line/column for the parser's + # failure point when available — surface it so the user can jump + # straight to the typo. + line = None + mark = getattr(exc, "problem_mark", None) + if mark is not None: + line = mark.line + 1 # mark.line is 0-based + message = str(exc).split("\n", 1)[0] # first line is the human summary + return None, ValidationIssue( + level="error", + category="yaml_syntax", + message=f"YAML parse error: {message}", + line=line, + ) + + if data is None: + return None, ValidationIssue( + level="error", + category="yaml_syntax", + message="File is empty or contains only comments", + line=None, + ) + + if not isinstance(data, dict): + return None, ValidationIssue( + level="error", + category="yaml_syntax", + message=f"Top-level YAML must be a mapping, got {type(data).__name__}", + line=1, + ) + + return data, None + + +# ─── Stage 2: Schema validation ──────────────────────────────────────── + + +def _validate_schema( + rule: Dict[str, Any], + rule_index: int, + result: ValidationResult, +) -> None: + """Validate one rule entry against the schema (required fields + enums).""" + # Required fields — missing any is a hard error. + for field_name in REQUIRED_FIELDS: + value = rule.get(field_name) + if value is None or (isinstance(value, str) and not value.strip()): + result.add_error( + "schema", + f"Rule #{rule_index}: missing required field '{field_name}'", + ) + + # Severity enum — must be one of the allowed values (case-insensitive + # match against VALID_SEVERITIES, but we flag the original casing so + # the author can fix it). + severity = rule.get("severity") + if severity is not None: + if not isinstance(severity, str): + result.add_error( + "schema", + f"Rule #{rule_index}: 'severity' must be a string, got {type(severity).__name__}", + ) + elif severity.lower() not in VALID_SEVERITIES: + allowed = ", ".join(VALID_SEVERITIES) + result.add_error( + "schema", + f"Rule #{rule_index}: invalid severity '{severity}' " + f"(allowed: {allowed})", + ) + + # Language must be a non-empty string. Unknown languages are not a hard + # error (the rule may still be useful for taint analysis that doesn't + # need tree-sitter), but we warn so authors notice typos. + language = rule.get("language") + if language is not None and not isinstance(language, str): + result.add_error( + "schema", + f"Rule #{rule_index}: 'language' must be a string, got {type(language).__name__}", + ) + + # Unknown-key detection — catches typos like ``pattern-eiter``. This is + # the highest-value check for rule authors; it's a warning (not error) + # because CodeLens may legitimately add new fields in the future without + # backfilling this allowlist immediately. + known = set(REQUIRED_FIELDS) | set(KNOWN_OPTIONAL_FIELDS) + for key in rule.keys(): + if key not in known: + # Suggest the closest known field (simple edit-distance heuristic). + suggestion = _suggest_field(key, known) + hint = f" Did you mean '{suggestion}'?" if suggestion else "" + result.add_warning( + "unknown_key", + f"Rule #{rule_index}: unknown field '{key}'.{hint}", + ) + + +def _suggest_field(typo: str, known: set) -> Optional[str]: + """Suggest the closest known field name for a typo. + + Uses a simple Levenshtein-distance heuristic (threshold ≤ 2 edits). + Returns ``None`` when no known field is close enough. + """ + best: Optional[Tuple[int, str]] = None + for candidate in known: + dist = _edit_distance(typo, candidate) + if dist <= 2 and (best is None or dist < best[0]): + best = (dist, candidate) + return best[1] if best else None + + +def _edit_distance(a: str, b: str) -> int: + """Compute Levenshtein edit distance between two strings.""" + if a == b: + return 0 + if not a: + return len(b) + if not b: + return len(a) + prev = list(range(len(b) + 1)) + for i, ca in enumerate(a, start=1): + curr = [i] + for j, cb in enumerate(b, start=1): + cost = 0 if ca == cb else 1 + curr.append(min(prev[j] + 1, curr[j - 1] + 1, prev[j - 1] + cost)) + prev = curr + return prev[-1] + + +# ─── Stage 3: Pattern parseability ───────────────────────────────────── + + +def _validate_pattern( + rule: Dict[str, Any], + rule_index: int, + result: ValidationResult, +) -> None: + """Compile ``pattern`` field with tree-sitter to catch syntax errors. + + Skips gracefully (with a warning) when tree-sitter or the language + grammar is not installed — the validator must not hard-fail on a + missing optional dep. + """ + pattern = rule.get("pattern") + if pattern is None: + # ``patterns`` (list) is also accepted but not compiled here — + # each entry would need its own AST check. We leave that to a + # future enhancement and only validate the scalar ``pattern``. + return + + if not isinstance(pattern, str): + result.add_error( + "pattern", + f"Rule #{rule_index}: 'pattern' must be a string, got {type(pattern).__name__}", + ) + return + + language = rule.get("language", "") + if not isinstance(language, str) or language.lower() not in TREE_SITTER_LANGUAGES: + # Not a tree-sitter-supported language — skip parseability check + # but warn so the author knows the pattern wasn't compiled. + if isinstance(language, str) and language: + result.add_warning( + "pattern", + f"Rule #{rule_index}: pattern parseability check skipped " + f"(language '{language}' not tree-sitter-supported)", + ) + return + + try: + from grammar_loader import GrammarLoader + except ImportError: + result.add_warning( + "pattern", + f"Rule #{rule_index}: pattern parseability check skipped " + "(grammar_loader unavailable)", + ) + return + + loader = GrammarLoader() + parser = loader.get_parser(language.lower()) + if parser is None: + result.add_warning( + "pattern", + f"Rule #{rule_index}: pattern parseability check skipped " + f"(tree-sitter grammar for '{language}' not installed)", + ) + return + + # Tree-sitter pattern syntax is close to (but not identical to) the + # target language. We compile the pattern as-is and check for ``ERROR`` + # nodes — false positives are possible ( metavariables like ``$X`` + # will show up as parse errors in some languages), so we only emit a + # warning, not a hard error. + try: + tree = parser.parse(pattern.encode("utf-8")) + except Exception as exc: # pragma: no cover — defensive + result.add_warning( + "pattern", + f"Rule #{rule_index}: pattern parse raised: {exc}", + ) + return + + root = tree.root_node + if root.has_error: + # Walk for the first ERROR node to surface a useful location. + err_line = _find_error_line(root) + result.add_warning( + "pattern", + f"Rule #{rule_index}: pattern may have a syntax error " + f"(tree-sitter reported ERROR at line {err_line}). " + "Note: metavariables like $X can trigger false positives.", + line=err_line, + ) + + +def _find_error_line(root) -> Optional[int]: + """Return the 1-based line of the first ERROR node, or ``None``.""" + stack = [root] + while stack: + node = stack.pop() + if node.type == "ERROR": + return node.start_point[0] + 1 + for child in node.children: + stack.append(child) + return None + + +# ─── Stage 4: Cross-field validation ─────────────────────────────────── + + +def _validate_cross_field( + rule: Dict[str, Any], + rule_index: int, + result: ValidationResult, +) -> None: + """Validate cross-field constraints that schema checks can't express.""" + has_pattern = "pattern" in rule and rule["pattern"] is not None + has_patterns = "patterns" in rule and rule["patterns"] is not None + has_pattern_either = "pattern-either" in rule and rule["pattern-either"] is not None + has_fix = "fix" in rule and rule["fix"] is not None + has_fix_regex = "fix-regex" in rule and rule["fix-regex"] is not None + + # ``pattern`` and ``patterns`` are mutually exclusive — using both is + # ambiguous (which one wins?) and almost always a mistake. + if has_pattern and has_patterns: + result.add_error( + "cross_field", + f"Rule #{rule_index}: 'pattern' and 'patterns' are mutually exclusive", + ) + + # ``fix`` (or ``fix-regex``) requires a pattern field to apply to. + # Without one, the fix has nothing to fix. + if (has_fix or has_fix_regex) and not (has_pattern or has_patterns or has_pattern_either): + result.add_error( + "cross_field", + f"Rule #{rule_index}: 'fix'/'fix-regex' requires 'pattern', " + "'patterns', or 'pattern-either'", + ) + + +# ─── Public entry point ──────────────────────────────────────────────── + + +def validate_rule(rule_path: Path) -> ValidationResult: + """Validate a single rule YAML file. + + Runs all 4 validation stages (YAML syntax → schema → pattern + parseability → cross-field) and aggregates results into a single + ``ValidationResult``. The function never raises — callers get a + structured result they can serialize, render, or feed into CI logic. + + Args: + rule_path: Path to the ``.yaml`` / ``.yml`` file to validate. + + Returns: + ``ValidationResult`` with ``is_valid=False`` if any errors were + found. Warnings are recorded separately and only fail validation + when the caller applies ``--strict``. + """ + rule_path = Path(rule_path) + result = ValidationResult(rule_path=str(rule_path)) + + # Stage 1: YAML syntax + data, parse_error = _parse_yaml(rule_path) + if parse_error is not None: + result.add_error( + parse_error.category, + parse_error.message, + parse_error.line, + ) + return result # cannot continue without parsed YAML + + # The top-level mapping must contain a ``rules:`` list. + rules_list = data.get("rules") + if rules_list is None: + result.add_error( + "schema", + "Top-level mapping must contain a 'rules' list", + ) + return result + + if not isinstance(rules_list, list): + result.add_error( + "schema", + f"'rules' must be a list, got {type(rules_list).__name__}", + ) + return result + + result.rules_checked = len(rules_list) + + # Validate each rule entry. + for index, rule in enumerate(rules_list, start=1): + if not isinstance(rule, dict): + result.add_error( + "schema", + f"Rule #{index}: must be a mapping, got {type(rule).__name__}", + ) + continue + + _validate_schema(rule, index, result) + _validate_pattern(rule, index, result) + _validate_cross_field(rule, index, result) + + return result + + +def validate_rule_files(paths: List[Path]) -> List[ValidationResult]: + """Validate multiple rule files. + + Args: + paths: List of rule file paths (each ``.yaml`` / ``.yml``). + + Returns: + List of ``ValidationResult``, one per input path. Order matches + the input order. + """ + return [validate_rule(path) for path in paths] + + +def determine_exit_code(results: List[ValidationResult], strict: bool = False) -> int: + """Determine the process exit code from validation results. + + Exit code semantics (matches the ``rule-validate`` CLI contract): + + * ``0`` — all rules valid (no errors, no warnings, or warnings without + ``--strict``). + * ``1`` — at least one rule has an error. + * ``2`` — at least one rule has a warning AND ``--strict`` is set (no + errors). + + Args: + results: List of ``ValidationResult`` from ``validate_rule_files``. + strict: When ``True``, warnings are treated as errors for exit-code + purposes (but still reported as warnings in the output). + + Returns: + Exit code (0, 1, or 2). + """ + has_error = any(not r.is_valid for r in results) + has_warning = any(r.has_warnings for r in results) + + if has_error: + return 1 + if has_warning and strict: + return 2 + return 0 diff --git a/tests/rule_fixtures/_fix_without_pattern.yaml b/tests/rule_fixtures/_fix_without_pattern.yaml new file mode 100644 index 0000000..46dd47a --- /dev/null +++ b/tests/rule_fixtures/_fix_without_pattern.yaml @@ -0,0 +1,11 @@ +# Cross-field violation: fix without pattern. +rules: + - id: py/fix-without-pattern + message: "This rule has a fix but no pattern" + severity: high + language: python + sources: + - input + sinks: + - eval + fix: "safe_eval($X)" diff --git a/tests/rule_fixtures/_invalid_severity.yaml b/tests/rule_fixtures/_invalid_severity.yaml new file mode 100644 index 0000000..4b5988d --- /dev/null +++ b/tests/rule_fixtures/_invalid_severity.yaml @@ -0,0 +1,10 @@ +# Invalid severity enum fixture for rule-validate testing. +rules: + - id: py/bad-severity + message: "This rule has an invalid severity value" + severity: bogus + language: python + sources: + - input + sinks: + - eval diff --git a/tests/rule_fixtures/_malformed_yaml.yaml b/tests/rule_fixtures/_malformed_yaml.yaml new file mode 100644 index 0000000..c5b7db8 --- /dev/null +++ b/tests/rule_fixtures/_malformed_yaml.yaml @@ -0,0 +1,11 @@ +# Malformed YAML fixture for rule-validate testing. +# This file has intentionally invalid YAML (unclosed quote). +rules: + - id: py/broken + message: "This rule has an unclosed quote + severity: critical + language: python + sources: + - input + sinks: + - eval diff --git a/tests/rule_fixtures/_missing_required.yaml b/tests/rule_fixtures/_missing_required.yaml new file mode 100644 index 0000000..185dd71 --- /dev/null +++ b/tests/rule_fixtures/_missing_required.yaml @@ -0,0 +1,10 @@ +# Missing required field fixture for rule-validate testing. +# This rule is missing the 'severity' field. +rules: + - id: py/missing-severity + message: "This rule is missing the severity field" + language: python + sources: + - input + sinks: + - eval diff --git a/tests/rule_fixtures/_mutually_exclusive.yaml b/tests/rule_fixtures/_mutually_exclusive.yaml new file mode 100644 index 0000000..45ba9eb --- /dev/null +++ b/tests/rule_fixtures/_mutually_exclusive.yaml @@ -0,0 +1,10 @@ +# Cross-field violation fixture for rule-validate testing. +# 'pattern' and 'patterns' are mutually exclusive. +rules: + - id: py/mutually-exclusive + message: "This rule uses both pattern and patterns" + severity: high + language: python + pattern: eval($X) + patterns: + - eval($X) diff --git a/tests/rule_fixtures/_typo_and_unknown.yaml b/tests/rule_fixtures/_typo_and_unknown.yaml new file mode 100644 index 0000000..8ca4bd2 --- /dev/null +++ b/tests/rule_fixtures/_typo_and_unknown.yaml @@ -0,0 +1,11 @@ +# Unknown key + typo fixture for rule-validate testing. +# 'pattern-eiter' is a typo for 'pattern-either'. +# 'extra_unknown_field' is not a known field. +rules: + - id: py/typo-and-unknown + message: "This rule has a typo and an unknown field" + severity: high + language: python + pattern-eiter: + - pattern: eval($X) + extra_unknown_field: true diff --git a/tests/rule_fixtures/js_command_injection.test.yaml b/tests/rule_fixtures/js_command_injection.test.yaml new file mode 100644 index 0000000..d0278ac --- /dev/null +++ b/tests/rule_fixtures/js_command_injection.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for js/command-injection rule. +rule: js/command-injection +samples: + - name: positive_req_body_to_exec + language: javascript + code: | + const cmd = req.body.command; + child_process.exec(cmd); // ruleid: js/command-injection + + - name: positive_req_query_to_execSync + language: javascript + code: | + const userCmd = req.query.cmd; + execSync(userCmd); // ruleid: js/command-injection + + - name: negative_sanitized_with_shell + language: javascript + code: | + const cmd = req.body.command; + const safe = shell.escape(cmd); + child_process.exec(safe); // ok + + - name: negative_static_command + language: javascript + code: | + child_process.exec("ls -la"); // ok diff --git a/tests/rule_fixtures/js_command_injection.yaml b/tests/rule_fixtures/js_command_injection.yaml new file mode 100644 index 0000000..e9742e8 --- /dev/null +++ b/tests/rule_fixtures/js_command_injection.yaml @@ -0,0 +1,24 @@ +# Migrated from scripts/rules/javascript_security.yaml for rule-test fixture. +rules: + - id: js/command-injection + name: Command Injection + language: javascript + severity: critical + cwe: CWE-78 + message: "User input flows into system command without sanitization" + sources: + - req.body + - req.params + - req.query + - req.headers + - req.cookies + - process.argv + - process.env + sinks: + - child_process.exec + - exec( + - execSync( + - spawn( + sanitizers: + - execFile + - shell diff --git a/tests/rule_fixtures/js_path_traversal.test.yaml b/tests/rule_fixtures/js_path_traversal.test.yaml new file mode 100644 index 0000000..bb465e1 --- /dev/null +++ b/tests/rule_fixtures/js_path_traversal.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for js/path-traversal rule. +rule: js/path-traversal +samples: + - name: positive_req_query_to_readFileSync + language: javascript + code: | + const filename = req.query.file; + const data = fs.readFileSync(filename); // ruleid: js/path-traversal + + - name: positive_req_params_to_readFile + language: javascript + code: | + const path = req.params.path; + fs.readFile(path, 'utf8', callback); // ruleid: js/path-traversal + + - name: negative_sanitized_with_path_resolve + language: javascript + code: | + const filename = req.query.file; + const safe = path.resolve(filename); + const data = fs.readFileSync(safe); // ok + + - name: negative_static_path + language: javascript + code: | + const data = fs.readFileSync("/etc/hostname"); // ok diff --git a/tests/rule_fixtures/js_path_traversal.yaml b/tests/rule_fixtures/js_path_traversal.yaml new file mode 100644 index 0000000..254249c --- /dev/null +++ b/tests/rule_fixtures/js_path_traversal.yaml @@ -0,0 +1,24 @@ +# Migrated from scripts/rules/javascript_security.yaml for rule-test fixture. +rules: + - id: js/path-traversal + name: Path Traversal + language: javascript + severity: critical + cwe: CWE-22 + message: "User input flows into file system path without validation" + sources: + - req.body + - req.params + - req.query + - req.headers + - process.argv + sinks: + - fs.readFile + - fs.readFileSync + - fs.writeFile + - fs.createReadStream + sanitizers: + - path.normalize + - path.resolve + - startsWith + - path.basename diff --git a/tests/rule_fixtures/js_prototype_pollution.test.yaml b/tests/rule_fixtures/js_prototype_pollution.test.yaml new file mode 100644 index 0000000..969fae0 --- /dev/null +++ b/tests/rule_fixtures/js_prototype_pollution.test.yaml @@ -0,0 +1,27 @@ +# Test fixture for js/prototype-pollution rule. +rule: js/prototype-pollution +samples: + - name: positive_req_body_to_merge + language: javascript + code: | + const userInput = req.body; + lodash.merge(target, userInput); // ruleid: js/prototype-pollution + + - name: positive_req_query_to_lodash_set + language: javascript + code: | + const path = req.query.path; + const value = req.query.value; + lodash.set(obj, path, value); // ruleid: js/prototype-pollution + + - name: negative_sanitized_with_hasOwnProperty + language: javascript + code: | + const userInput = req.body; + const safe = Object.keys(userInput).filter(k => hasOwnProperty(userInput, k)); + Object.assign(target, safe); // ok + + - name: negative_static_merge + language: javascript + code: | + Object.assign(target, {a: 1, b: 2}); // ok diff --git a/tests/rule_fixtures/js_prototype_pollution.yaml b/tests/rule_fixtures/js_prototype_pollution.yaml new file mode 100644 index 0000000..b3ae922 --- /dev/null +++ b/tests/rule_fixtures/js_prototype_pollution.yaml @@ -0,0 +1,23 @@ +# Migrated from scripts/rules/javascript_security.yaml for rule-test fixture. +rules: + - id: js/prototype-pollution + name: Prototype Pollution + language: javascript + severity: high + cwe: CWE-1321 + message: "User input flows into object merge without prototype protection" + sources: + - req.body + - req.params + - req.query + - req.headers + - JSON.parse + sinks: + - Object.assign( + - .merge( + - lodash.merge + - lodash.set + sanitizers: + - hasOwnProperty + - Object.create(null) + - Object.freeze diff --git a/tests/rule_fixtures/js_sql_injection.test.yaml b/tests/rule_fixtures/js_sql_injection.test.yaml new file mode 100644 index 0000000..2bfc9ad --- /dev/null +++ b/tests/rule_fixtures/js_sql_injection.test.yaml @@ -0,0 +1,27 @@ +# Test fixture for js/sql-injection rule. +rule: js/sql-injection +samples: + - name: positive_req_body_to_query + language: javascript + code: | + const username = req.body.username; + db.query("SELECT * FROM users WHERE name = '" + username + "'"); // ruleid: js/sql-injection + + - name: positive_req_query_to_execute + language: javascript + code: | + const id = req.query.id; + connection.execute("SELECT * FROM products WHERE id = " + id); // ruleid: js/sql-injection + + - name: negative_sanitized_with_escape + language: javascript + code: | + const username = req.body.username; + const safe = escape(username); + db.query("SELECT * FROM users WHERE name = '" + safe + "'"); // ok + + - name: negative_no_taint_reaches_sink + language: javascript + code: | + const data = "static_value"; + db.query("SELECT * FROM accounts WHERE name = '" + data + "'"); // ok diff --git a/tests/rule_fixtures/js_sql_injection.yaml b/tests/rule_fixtures/js_sql_injection.yaml new file mode 100644 index 0000000..ba1c0ce --- /dev/null +++ b/tests/rule_fixtures/js_sql_injection.yaml @@ -0,0 +1,23 @@ +# Migrated from scripts/rules/javascript_security.yaml for rule-test fixture. +rules: + - id: js/sql-injection + name: SQL Injection + language: javascript + severity: critical + cwe: CWE-89 + message: "User input flows into SQL query without parameterization" + sources: + - req.body + - req.params + - req.query + - req.headers + - req.cookies + sinks: + - .query( + - .execute( + - .raw( + - knex.raw( + sanitizers: + - parameterized + - placeholder + - escape diff --git a/tests/rule_fixtures/js_xss_dom.test.yaml b/tests/rule_fixtures/js_xss_dom.test.yaml new file mode 100644 index 0000000..15a9b15 --- /dev/null +++ b/tests/rule_fixtures/js_xss_dom.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for js/xss-dom rule. +rule: js/xss-dom +samples: + - name: positive_location_to_innerHTML + language: javascript + code: | + const userInput = window.location.hash; + document.getElementById('output').innerHTML = userInput; // ruleid: js/xss-dom + + - name: positive_location_search_to_document_write + language: javascript + code: | + const params = location.search; + document.write(params); // ruleid: js/xss-dom + + - name: negative_sanitized_with_escapeHtml + language: javascript + code: | + const userInput = window.location.hash; + const safe = escapeHtml(userInput); + document.getElementById('output').innerHTML = safe; // ok + + - name: negative_static_content + language: javascript + code: | + document.getElementById('output').innerHTML = "

Hello

"; // ok diff --git a/tests/rule_fixtures/js_xss_dom.yaml b/tests/rule_fixtures/js_xss_dom.yaml new file mode 100644 index 0000000..eb67dc4 --- /dev/null +++ b/tests/rule_fixtures/js_xss_dom.yaml @@ -0,0 +1,25 @@ +# Migrated from scripts/rules/javascript_security.yaml for rule-test fixture. +rules: + - id: js/xss-dom + name: DOM Cross-Site Scripting + language: javascript + severity: critical + cwe: CWE-79 + message: "User input flows into DOM without sanitization (XSS)" + sources: + - window.location + - window.location.href + - location.search + - document.cookie + - message.data + sinks: + - .innerHTML + - .outerHTML + - document.write + sanitizers: + - textContent + - innerText + - DOMPurify + - escapeHtml + - encodeURI + - createTextNode diff --git a/tests/rule_fixtures/py_command_injection.test.yaml b/tests/rule_fixtures/py_command_injection.test.yaml new file mode 100644 index 0000000..48cc30d --- /dev/null +++ b/tests/rule_fixtures/py_command_injection.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for py/command-injection rule. +rule: py/command-injection +samples: + - name: positive_input_to_os_system + language: python + code: | + cmd = input("Enter command: ") + os.system(cmd) # ruleid: py/command-injection + + - name: positive_input_to_subprocess_run + language: python + code: | + user_input = input("Enter args: ") + subprocess.run(user_input, shell=True) # ruleid: py/command-injection + + - name: negative_sanitized_with_shlex_quote + language: python + code: | + cmd = input("Enter command: ") + safe = shlex.quote(cmd) + os.system("echo " + safe) # ok + + - name: negative_static_command + language: python + code: | + os.system("ls -la") # ok diff --git a/tests/rule_fixtures/py_command_injection.yaml b/tests/rule_fixtures/py_command_injection.yaml new file mode 100644 index 0000000..4788c5c --- /dev/null +++ b/tests/rule_fixtures/py_command_injection.yaml @@ -0,0 +1,27 @@ +# Migrated from scripts/rules/python_security.yaml for rule-test fixture. +rules: + - id: py/command-injection + name: Command Injection + language: python + severity: critical + cwe: CWE-78 + message: "User input flows into OS command without sanitization" + sources: + - flask.request.args + - flask.request.form + - flask.request.json + - input + - sys.stdin + - os.environ + sinks: + - os.system + - os.popen + - subprocess.call + - subprocess.run + - subprocess.Popen + - subprocess.check_output + sanitizers: + - shlex.quote + - shlex.split + - validate_command + - whitelist_command diff --git a/tests/rule_fixtures/py_path_traversal.test.yaml b/tests/rule_fixtures/py_path_traversal.test.yaml new file mode 100644 index 0000000..9137de5 --- /dev/null +++ b/tests/rule_fixtures/py_path_traversal.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for py/path-traversal rule. +rule: py/path-traversal +samples: + - name: positive_input_to_open + language: python + code: | + filename = input("Enter filename: ") + f = open(filename) # ruleid: py/path-traversal + + - name: positive_flask_args_to_open + language: python + code: | + path = flask.request.args.get('file') + f = open(path) # ruleid: py/path-traversal + + - name: negative_sanitized_with_realpath + language: python + code: | + filename = input("Enter filename: ") + safe = os.path.realpath(filename) + f = open(safe) # ok + + - name: negative_static_path + language: python + code: | + f = open("/etc/passwd") # ok diff --git a/tests/rule_fixtures/py_path_traversal.yaml b/tests/rule_fixtures/py_path_traversal.yaml new file mode 100644 index 0000000..5d24e13 --- /dev/null +++ b/tests/rule_fixtures/py_path_traversal.yaml @@ -0,0 +1,25 @@ +# Migrated from scripts/rules/python_security.yaml for rule-test fixture. +rules: + - id: py/path-traversal + name: Path Traversal + language: python + severity: high + cwe: CWE-22 + message: "User input flows into file path without validation" + sources: + - flask.request.args + - flask.request.form + - flask.request.json + - input + - sys.stdin + sinks: + - open + - os.path.join + - pathlib.Path + - shutil.copy + - shutil.move + sanitizers: + - os.path.realpath + - os.path.abspath + - validate_path + - secure_filename diff --git a/tests/rule_fixtures/py_sql_injection.test.yaml b/tests/rule_fixtures/py_sql_injection.test.yaml new file mode 100644 index 0000000..0f530d8 --- /dev/null +++ b/tests/rule_fixtures/py_sql_injection.test.yaml @@ -0,0 +1,29 @@ +# Test fixture for py/sql-injection rule. +# Inline markers: `# ruleid: ` (expect finding on this line), +# `# ok` (expect no finding on this line). +rule: py/sql-injection +samples: + - name: positive_basic_input_to_execute + language: python + code: | + user = input("Enter name: ") + cursor.execute("SELECT * FROM accounts WHERE name = '" + user + "'") # ruleid: py/sql-injection + + - name: positive_flask_args_to_execute + language: python + code: | + name = flask.request.args.get('name') + cursor.execute("SELECT * FROM accounts WHERE name = '%s'" % name) # ruleid: py/sql-injection + + - name: negative_sanitized_with_parameterized_query + language: python + code: | + user = input("Enter name: ") + safe = parameterized_query(user) + cursor.execute("SELECT * FROM accounts WHERE name = %s", (safe,)) # ok + + - name: negative_no_taint_reaches_sink + language: python + code: | + name = "static_value" + cursor.execute("SELECT * FROM accounts WHERE name = %s", (name,)) # ok diff --git a/tests/rule_fixtures/py_sql_injection.yaml b/tests/rule_fixtures/py_sql_injection.yaml new file mode 100644 index 0000000..c08a047 --- /dev/null +++ b/tests/rule_fixtures/py_sql_injection.yaml @@ -0,0 +1,25 @@ +# Migrated from scripts/rules/python_security.yaml for rule-test fixture. +# Single-rule file so `codelens rule-test` can exercise it in isolation. +rules: + - id: py/sql-injection + name: SQL Injection + language: python + severity: critical + cwe: CWE-89 + message: "User input flows into SQL query without parameterization" + sources: + - flask.request.args + - flask.request.form + - flask.request.json + - input + - sys.stdin + sinks: + - cursor.execute + - db.execute + - connection.execute + - session.execute + - engine.execute + sanitizers: + - parameterized_query + - escape_string + - psycopg2.sql.SQL diff --git a/tests/rule_fixtures/py_ssrf.test.yaml b/tests/rule_fixtures/py_ssrf.test.yaml new file mode 100644 index 0000000..042c035 --- /dev/null +++ b/tests/rule_fixtures/py_ssrf.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for py/ssrf rule. +rule: py/ssrf +samples: + - name: positive_input_to_requests_get + language: python + code: | + url = input("Enter URL: ") + response = requests.get(url) # ruleid: py/ssrf + + - name: positive_flask_args_to_urlopen + language: python + code: | + target = flask.request.args.get('url') + urllib.request.urlopen(target) # ruleid: py/ssrf + + - name: negative_sanitized_with_validate_url + language: python + code: | + url = input("Enter URL: ") + safe = validate_url(url) + response = requests.get(safe) # ok + + - name: negative_static_url + language: python + code: | + response = requests.get("https://api.example.com/health") # ok diff --git a/tests/rule_fixtures/py_ssrf.yaml b/tests/rule_fixtures/py_ssrf.yaml new file mode 100644 index 0000000..f46eab2 --- /dev/null +++ b/tests/rule_fixtures/py_ssrf.yaml @@ -0,0 +1,24 @@ +# Migrated from scripts/rules/python_security.yaml for rule-test fixture. +rules: + - id: py/ssrf + name: Server-Side Request Forgery + language: python + severity: high + cwe: CWE-918 + message: "User-controlled URL flows into HTTP request" + sources: + - flask.request.args + - flask.request.form + - flask.request.json + - input + sinks: + - requests.get + - requests.post + - requests.put + - requests.delete + - urllib.request.urlopen + - http.client.HTTPConnection + sanitizers: + - validate_url + - url_allowed + - is_internal_url diff --git a/tests/rule_fixtures/py_xss_template.test.yaml b/tests/rule_fixtures/py_xss_template.test.yaml new file mode 100644 index 0000000..c4e4874 --- /dev/null +++ b/tests/rule_fixtures/py_xss_template.test.yaml @@ -0,0 +1,26 @@ +# Test fixture for py/xss-template rule. +rule: py/xss-template +samples: + - name: positive_flask_args_to_mark_safe + language: python + code: | + user_input = flask.request.args.get('comment') + html = mark_safe(user_input) # ruleid: py/xss-template + + - name: positive_flask_form_to_render_template_string + language: python + code: | + name = flask.request.form.get('name') + render_template_string("

" + name + "

") # ruleid: py/xss-template + + - name: negative_sanitized_with_escape + language: python + code: | + user_input = flask.request.args.get('comment') + safe = escape(user_input) + html = mark_safe(safe) # ok + + - name: negative_static_template + language: python + code: | + render_template_string("

Hello World

") # ok diff --git a/tests/rule_fixtures/py_xss_template.yaml b/tests/rule_fixtures/py_xss_template.yaml new file mode 100644 index 0000000..9ff1963 --- /dev/null +++ b/tests/rule_fixtures/py_xss_template.yaml @@ -0,0 +1,23 @@ +# Migrated from scripts/rules/python_security.yaml for rule-test fixture. +rules: + - id: py/xss-template + name: Cross-Site Scripting (Template) + language: python + severity: high + cwe: CWE-79 + message: "User input rendered in template without escaping" + sources: + - flask.request.args + - flask.request.form + - flask.request.json + - django.http.request.GET + - django.http.request.POST + sinks: + - render_template_string + - mark_safe + - Markup + - dangerously_set_innerhtml + sanitizers: + - escape + - bleach.clean + - markupsafe.escape diff --git a/tests/test_rule_test.py b/tests/test_rule_test.py new file mode 100644 index 0000000..be1f3e0 --- /dev/null +++ b/tests/test_rule_test.py @@ -0,0 +1,347 @@ +"""Tests for the rule test runner (``scripts/rule_test_runner.py``). + +Verifies the snapshot-test framework: loading rule + test fixtures, +running samples through the semantic engine, comparing findings to +inline ``# ruleid:`` / ``# ok`` markers, and reporting pass/fail. +""" + +from __future__ import annotations + +import os +import sys +import tempfile +from pathlib import Path + +import pytest + +# Add scripts/ to path so we can import rule_test_runner +SCRIPT_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts") +sys.path.insert(0, SCRIPT_DIR) + +from rule_test_runner import ( + TestFailure, + TestResult, + determine_exit_code, + run_tests, + run_tests_recursive, +) + + +# ─── Fixtures ────────────────────────────────────────────────────────── + +FIXTURES_DIR = Path(__file__).parent / "rule_fixtures" + + +# ─── Single rule tests ───────────────────────────────────────────────── + + +def test_run_tests_python_sql_injection(): + """The py/sql-injection fixture should pass all samples.""" + rule_path = FIXTURES_DIR / "py_sql_injection.yaml" + result = run_tests(rule_path) + + assert result.error is None, f"Unexpected error: {result.error}" + assert result.rule_id == "py/sql-injection" + assert result.total == 4 # 4 samples in the test file + # All samples should pass (2 positive + 2 negative) + assert result.passed == 4 + assert result.failed == 0 + assert result.is_pass + + +def test_run_tests_javascript_xss_dom(): + """The js/xss-dom fixture should pass all samples.""" + rule_path = FIXTURES_DIR / "js_xss_dom.yaml" + result = run_tests(rule_path) + + assert result.error is None, f"Unexpected error: {result.error}" + assert result.rule_id == "js/xss-dom" + assert result.total == 4 + assert result.passed == 4 + assert result.failed == 0 + assert result.is_pass + + +def test_run_tests_missing_test_file_errors(): + """A rule with no .test.yaml companion should report an error.""" + # Write a rule with no test companion + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False, encoding="utf-8") + tmp.write(""" +rules: + - id: py/no-test + message: "No test file" + severity: high + language: python + sources: [input] + sinks: [eval] +""") + tmp.close() + try: + result = run_tests(Path(tmp.name)) + assert result.error is not None + assert "No test file found" in result.error + finally: + os.unlink(tmp.name) + + +def test_run_tests_malformed_test_yaml_errors(): + """A malformed .test.yaml should produce an error, not crash.""" + # Use a temp directory so the rule and test file have matching stems. + tmpdir = tempfile.mkdtemp() + rule_path = Path(tmpdir) / "myrule.yaml" + test_path = Path(tmpdir) / "myrule.test.yaml" + + rule_path.write_text( + """ +rules: + - id: py/malformed-test + message: "Malformed test" + severity: high + language: python + sources: [input] + sinks: [eval] +""", + encoding="utf-8", + ) + test_path.write_text("this: is: not: valid: yaml: [unclosed", encoding="utf-8") + + result = run_tests(rule_path) + assert result.error is not None + assert "parse" in result.error.lower() or "yaml" in result.error.lower() + + +def test_run_tests_empty_samples_errors(): + """A test file with no samples list should error.""" + tmpdir = tempfile.mkdtemp() + rule_path = Path(tmpdir) / "myrule.yaml" + test_path = Path(tmpdir) / "myrule.test.yaml" + + rule_path.write_text( + """ +rules: + - id: py/empty-samples + message: "Empty samples" + severity: high + language: python + sources: [input] + sinks: [eval] +""", + encoding="utf-8", + ) + test_path.write_text("rule: py/empty-samples\nsamples: []\n", encoding="utf-8") + + result = run_tests(rule_path) + assert result.error is not None + assert "samples" in result.error.lower() + + +# ─── Marker parsing ──────────────────────────────────────────────────── + + +def test_inline_ruleid_marker_detected(): + """An inline ``# ruleid:`` marker should produce a finding expectation.""" + from rule_test_runner import _parse_markers + + code = "eval(user_input) # ruleid: py/eval-usage\n" + exps = _parse_markers(code) + assert len(exps) == 1 + assert exps[0].kind == "finding" + assert exps[0].rule_id == "py/eval-usage" + assert exps[0].line == 1 + + +def test_inline_ok_marker_detected(): + """An inline ``# ok`` marker should produce a no-finding expectation.""" + from rule_test_runner import _parse_markers + + code = "safe_call() # ok\n" + exps = _parse_markers(code) + assert len(exps) == 1 + assert exps[0].kind == "no-finding" + assert exps[0].line == 1 + + +def test_standalone_ruleid_marker_attributes_to_next_line(): + """A standalone ``# ruleid:`` attributes to the next code line.""" + from rule_test_runner import _parse_markers + + code = "# ruleid: py/eval-usage\neval(user_input)\n" + exps = _parse_markers(code) + assert len(exps) == 1 + assert exps[0].kind == "finding" + assert exps[0].line == 2 # the code line, not the marker line + + +def test_todoruleid_marker_skipped_with_ignore_todo(): + """``# todoruleid:`` should be skipped when ignore_todo=True.""" + from rule_test_runner import _parse_markers + + code = "eval(user_input) # todoruleid: py/eval-usage\n" + # Without ignore_todo → expectation is kept + exps = _parse_markers(code, ignore_todo=False) + assert len(exps) == 1 + assert exps[0].is_todo is True + + # With ignore_todo → expectation is dropped + exps = _parse_markers(code, ignore_todo=True) + assert len(exps) == 0 + + +def test_multiple_markers_in_one_sample(): + """Multiple markers in one sample should all be parsed.""" + from rule_test_runner import _parse_markers + + code = """ +user = input() +eval(user) # ruleid: py/eval +safe_call() # ok +""" + exps = _parse_markers(code) + assert len(exps) == 2 + kinds = [e.kind for e in exps] + assert "finding" in kinds + assert "no-finding" in kinds + + +# ─── Directory tests ─────────────────────────────────────────────────── + + +def test_run_tests_recursive_directory(): + """Running tests on the fixtures directory should test all 10 rules.""" + results = run_tests_recursive(FIXTURES_DIR) + + # We created 10 valid rule fixtures (5 Python + 5 JS) + # The malformed/invalid fixtures (prefixed with _) don't have .test.yaml + # companions, so they're skipped by run_tests_recursive. + rule_ids = [r.rule_id for r in results] + assert "py/sql-injection" in rule_ids + assert "py/command-injection" in rule_ids + assert "py/path-traversal" in rule_ids + assert "py/ssrf" in rule_ids + assert "py/xss-template" in rule_ids + assert "js/xss-dom" in rule_ids + assert "js/sql-injection" in rule_ids + assert "js/command-injection" in rule_ids + assert "js/path-traversal" in rule_ids + assert "js/prototype-pollution" in rule_ids + + # All should pass + for r in results: + assert r.is_pass, f"Rule {r.rule_id} failed: {[f.to_dict() for f in r.failures]}" + + +def test_run_tests_recursive_single_file(): + """Passing a single file path should run tests for just that file.""" + rule_path = FIXTURES_DIR / "py_sql_injection.yaml" + results = run_tests_recursive(rule_path) + + assert len(results) == 1 + assert results[0].rule_id == "py/sql-injection" + assert results[0].is_pass + + +# ─── Exit code logic ────────────────────────────────────────────────── + + +def test_determine_exit_code_all_pass(): + """All passing results → exit 0.""" + results = [TestResult(rule_path="a.yaml", rule_id="a", total=2, passed=2)] + assert determine_exit_code(results) == 0 + + +def test_determine_exit_code_any_fail(): + """Any failing result → exit 1.""" + results = [ + TestResult(rule_path="a.yaml", rule_id="a", total=2, passed=2), + TestResult(rule_path="b.yaml", rule_id="b", total=2, passed=1, failed=1), + ] + assert determine_exit_code(results) == 1 + + +def test_determine_exit_code_error_counts_as_fail(): + """An errored result should count as failure → exit 1.""" + results = [TestResult(rule_path="a.yaml", rule_id="a", error="something went wrong")] + assert determine_exit_code(results) == 1 + + +# ─── Dataclass serialization ────────────────────────────────────────── + + +def test_test_failure_to_dict(): + """TestFailure should serialize to a clean dict.""" + failure = TestFailure( + sample_name="positive_basic", + line=42, + expected="finding", + actual="no-finding", + rule_id="py/sql-injection", + message="Expected finding on line 42", + ) + d = failure.to_dict() + assert d["sample_name"] == "positive_basic" + assert d["line"] == 42 + assert d["expected"] == "finding" + assert d["actual"] == "no-finding" + assert d["rule_id"] == "py/sql-injection" + assert d["message"] == "Expected finding on line 42" + + +def test_test_result_to_dict(): + """TestResult should serialize to a clean dict.""" + result = TestResult( + rule_path="test.yaml", + rule_id="py/test", + test_path="test.test.yaml", + total=3, + passed=2, + failed=1, + ) + failure = TestFailure( + sample_name="bad", + line=10, + expected="finding", + actual="no-finding", + rule_id="py/test", + message="oops", + ) + result.failures.append(failure) + + d = result.to_dict() + assert d["rule_path"] == "test.yaml" + assert d["rule_id"] == "py/test" + assert d["total"] == 3 + assert d["passed"] == 2 + assert d["failed"] == 1 + assert d["is_pass"] is False + assert len(d["failures"]) == 1 + assert d["failures"][0]["sample_name"] == "bad" + + +# ─── All 10 fixtures pass ────────────────────────────────────────────── + + +@pytest.mark.parametrize( + "rule_file", + [ + "py_sql_injection.yaml", + "py_command_injection.yaml", + "py_path_traversal.yaml", + "py_ssrf.yaml", + "py_xss_template.yaml", + "js_xss_dom.yaml", + "js_sql_injection.yaml", + "js_command_injection.yaml", + "js_path_traversal.yaml", + "js_prototype_pollution.yaml", + ], +) +def test_all_fixtures_pass(rule_file): + """Every migrated fixture should pass its snapshot tests.""" + rule_path = FIXTURES_DIR / rule_file + result = run_tests(rule_path) + + assert result.error is None, f"{rule_file}: error={result.error}" + assert result.is_pass, ( + f"{rule_file}: {result.passed}/{result.total} passed, " + f"{result.failed} failed. Failures: {[f.to_dict() for f in result.failures]}" + ) diff --git a/tests/test_rule_validate.py b/tests/test_rule_validate.py new file mode 100644 index 0000000..883034e --- /dev/null +++ b/tests/test_rule_validate.py @@ -0,0 +1,356 @@ +"""Tests for the rule validator (``scripts/rule_validator.py``). + +Covers all 4 validation stages: +1. YAML syntax (malformed YAML, unclosed quote) +2. Schema (missing required fields, invalid severity enum) +3. Pattern parseability (skipped gracefully when tree-sitter unavailable) +4. Cross-field (pattern + patterns mutually exclusive, fix requires pattern) + +Also tests the exit-code logic (``determine_exit_code``) for the 0/1/2 +contract. +""" + +from __future__ import annotations + +import os +import sys +import tempfile +from pathlib import Path + +import pytest + +# Add scripts/ to path so we can import rule_validator +SCRIPT_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts") +sys.path.insert(0, SCRIPT_DIR) + +from rule_validator import ( + REQUIRED_FIELDS, + VALID_SEVERITIES, + ValidationIssue, + ValidationResult, + determine_exit_code, + validate_rule, + validate_rule_files, +) + + +# ─── Fixtures ────────────────────────────────────────────────────────── + +FIXTURES_DIR = Path(__file__).parent / "rule_fixtures" + + +def _write_tmp_rule(content: str) -> Path: + """Write a rule YAML to a temp file and return its path.""" + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False, encoding="utf-8") + tmp.write(content) + tmp.close() + return Path(tmp.name) + + +# ─── Valid rule ──────────────────────────────────────────────────────── + + +def test_validate_valid_rule_passes(): + """A well-formed taint-style rule should validate with no errors.""" + rule_path = FIXTURES_DIR / "py_sql_injection.yaml" + result = validate_rule(rule_path) + + assert result.is_valid, f"Expected valid, got errors: {[e.to_dict() for e in result.errors]}" + assert result.rules_checked == 1 + assert len(result.errors) == 0 + # Warnings are OK (e.g., pattern parseability skip for taint-style rules) + + +def test_validate_javascript_rule_passes(): + """A JS taint-style rule should also validate cleanly.""" + rule_path = FIXTURES_DIR / "js_sql_injection.yaml" + result = validate_rule(rule_path) + + assert result.is_valid, f"Expected valid, got errors: {[e.to_dict() for e in result.errors]}" + assert result.rules_checked == 1 + + +def test_validate_multiple_rules_in_one_file(): + """A file with multiple rule entries should validate all of them.""" + content = """ +rules: + - id: py/rule-1 + message: "First rule" + severity: high + language: python + sources: [input] + sinks: [eval] + - id: py/rule-2 + message: "Second rule" + severity: critical + language: python + sources: [input] + sinks: [exec] +""" + path = _write_tmp_rule(content) + try: + result = validate_rule(path) + assert result.is_valid + assert result.rules_checked == 2 + finally: + path.unlink() + + +# ─── Stage 1: YAML syntax ───────────────────────────────────────────── + + +def test_validate_malformed_yaml_fails(): + """Malformed YAML (unclosed quote) should produce a yaml_syntax error.""" + rule_path = FIXTURES_DIR / "_malformed_yaml.yaml" + result = validate_rule(rule_path) + + assert not result.is_valid + assert len(result.errors) >= 1 + assert result.errors[0].category == "yaml_syntax" + assert "YAML parse error" in result.errors[0].message + + +def test_validate_empty_file_fails(): + """An empty file should produce a yaml_syntax error.""" + path = _write_tmp_rule("") + try: + result = validate_rule(path) + assert not result.is_valid + assert result.errors[0].category == "yaml_syntax" + finally: + path.unlink() + + +def test_validate_nonexistent_file_fails(): + """A missing file should produce a yaml_syntax error, not crash.""" + path = Path("/nonexistent/rule.yaml") + result = validate_rule(path) + + assert not result.is_valid + assert result.errors[0].category == "yaml_syntax" + assert "Cannot read file" in result.errors[0].message + + +def test_validate_top_level_not_mapping_fails(): + """Top-level YAML that is a list (not a mapping) should fail.""" + path = _write_tmp_rule("- id: foo\n message: bar\n severity: high\n language: python\n") + try: + result = validate_rule(path) + assert not result.is_valid + assert result.errors[0].category == "yaml_syntax" + finally: + path.unlink() + + +# ─── Stage 2: Schema validation ─────────────────────────────────────── + + +def test_validate_missing_required_field_fails(): + """A rule missing a required field (severity) should fail with schema error.""" + rule_path = FIXTURES_DIR / "_missing_required.yaml" + result = validate_rule(rule_path) + + assert not result.is_valid + schema_errors = [e for e in result.errors if e.category == "schema"] + assert len(schema_errors) >= 1 + assert any("severity" in e.message for e in schema_errors) + + +def test_validate_invalid_severity_enum_fails(): + """A severity outside the allowed enum should fail with schema error.""" + rule_path = FIXTURES_DIR / "_invalid_severity.yaml" + result = validate_rule(rule_path) + + assert not result.is_valid + schema_errors = [e for e in result.errors if e.category == "schema"] + assert len(schema_errors) >= 1 + assert any("invalid severity" in e.message for e in schema_errors) + + +def test_validate_all_required_fields_enforced(): + """Each required field, when missing, should produce a schema error.""" + # Build the content WITHOUT the field being tested, rather than using + # string replacement (which is fragile to indentation differences). + base_fields = { + "id": "py/test", + "message": '"test"', + "severity": "high", + "language": "python", + } + for field_name in REQUIRED_FIELDS: + fields = dict(base_fields) + del fields[field_name] + lines = [f" - {k}: {v}" for k, v in fields.items()] + content = "rules:\n" + "\n".join(lines) + "\n" + + path = _write_tmp_rule(content) + try: + result = validate_rule(path) + assert not result.is_valid, f"Missing {field_name} should fail" + assert any( + field_name in e.message and e.category == "schema" for e in result.errors + ), f"Missing {field_name} should produce schema error mentioning {field_name}" + finally: + path.unlink() + + +def test_validate_unknown_key_warns(): + """Unknown fields should produce a warning (not error), with typo suggestion.""" + rule_path = FIXTURES_DIR / "_typo_and_unknown.yaml" + result = validate_rule(rule_path) + + # Unknown keys are warnings, not errors — rule still validates + assert len(result.warnings) >= 1 + unknown_warnings = [w for w in result.warnings if w.category == "unknown_key"] + assert len(unknown_warnings) >= 1 + + # The typo 'pattern-eiter' should suggest 'pattern-either' + typo_warnings = [w for w in unknown_warnings if "pattern-eiter" in w.message] + assert len(typo_warnings) >= 1 + assert "pattern-either" in typo_warnings[0].message + + +# ─── Stage 4: Cross-field validation ────────────────────────────────── + + +def test_validate_mutually_exclusive_pattern_and_patterns(): + """Using both 'pattern' and 'patterns' should fail with cross_field error.""" + rule_path = FIXTURES_DIR / "_mutually_exclusive.yaml" + result = validate_rule(rule_path) + + assert not result.is_valid + cross_field_errors = [e for e in result.errors if e.category == "cross_field"] + assert len(cross_field_errors) >= 1 + assert any("mutually exclusive" in e.message for e in cross_field_errors) + + +def test_validate_fix_requires_pattern(): + """'fix' without 'pattern' or 'patterns' should fail with cross_field error.""" + rule_path = FIXTURES_DIR / "_fix_without_pattern.yaml" + result = validate_rule(rule_path) + + assert not result.is_valid + cross_field_errors = [e for e in result.errors if e.category == "cross_field"] + assert len(cross_field_errors) >= 1 + assert any("fix" in e.message and "requires" in e.message for e in cross_field_errors) + + +def test_validate_fix_with_pattern_passes(): + """'fix' with 'pattern' should pass cross_field validation.""" + content = """ +rules: + - id: py/fix-ok + message: "Fix with pattern" + severity: high + language: python + pattern: eval($X) + fix: "ast.literal_eval($X)" +""" + path = _write_tmp_rule(content) + try: + result = validate_rule(path) + cross_field_errors = [e for e in result.errors if e.category == "cross_field"] + assert len(cross_field_errors) == 0 + finally: + path.unlink() + + +# ─── Stage 3: Pattern parseability ──────────────────────────────────── + + +def test_validate_pattern_parseability_graceful_skip(): + """When tree-sitter is unavailable, pattern check should warn, not error.""" + content = """ +rules: + - id: py/pattern-rule + message: "Pattern rule" + severity: high + language: python + pattern: eval($X) +""" + path = _write_tmp_rule(content) + try: + result = validate_rule(path) + # Pattern parseability is a warning (metavariables cause false positives), + # and when tree-sitter is missing it's skipped with a warning. + # Either way, no hard error from pattern check. + pattern_errors = [e for e in result.errors if e.category == "pattern"] + assert len(pattern_errors) == 0 + finally: + path.unlink() + + +# ─── Exit code logic ────────────────────────────────────────────────── + + +def test_determine_exit_code_all_valid(): + """All valid results → exit 0.""" + results = [ValidationResult(rule_path="a.yaml", is_valid=True)] + assert determine_exit_code(results) == 0 + + +def test_determine_exit_code_with_errors(): + """Any error → exit 1.""" + results = [ + ValidationResult(rule_path="a.yaml", is_valid=True), + ValidationResult(rule_path="b.yaml", is_valid=False), + ] + assert determine_exit_code(results) == 1 + + +def test_determine_exit_code_warnings_without_strict(): + """Warnings without --strict → exit 0.""" + result = ValidationResult(rule_path="a.yaml", is_valid=True) + result.add_warning("unknown_key", "test warning") + assert determine_exit_code([result], strict=False) == 0 + + +def test_determine_exit_code_warnings_with_strict(): + """Warnings with --strict → exit 2.""" + result = ValidationResult(rule_path="a.yaml", is_valid=True) + result.add_warning("unknown_key", "test warning") + assert determine_exit_code([result], strict=True) == 2 + + +def test_determine_exit_code_errors_override_warnings_strict(): + """Errors take precedence over warnings — exit 1 even with --strict.""" + result = ValidationResult(rule_path="a.yaml", is_valid=False) + result.add_warning("unknown_key", "test warning") + result.add_error("schema", "test error") + assert determine_exit_code([result], strict=True) == 1 + + +# ─── Dataclass serialization ────────────────────────────────────────── + + +def test_validation_issue_to_dict(): + """ValidationIssue should serialize to a clean dict.""" + issue = ValidationIssue("error", "schema", "test message", line=42) + d = issue.to_dict() + assert d == {"level": "error", "category": "schema", "message": "test message", "line": 42} + + +def test_validation_result_to_dict(): + """ValidationResult should serialize to a clean dict.""" + result = ValidationResult(rule_path="test.yaml") + result.add_error("schema", "error msg") + result.add_warning("unknown_key", "warning msg") + + d = result.to_dict() + assert d["rule_path"] == "test.yaml" + assert d["is_valid"] is False + assert d["has_warnings"] is True + assert len(d["errors"]) == 1 + assert len(d["warnings"]) == 1 + assert d["errors"][0]["category"] == "schema" + + +def test_validate_rule_files_multiple(): + """validate_rule_files should return one result per file.""" + paths = [ + FIXTURES_DIR / "py_sql_injection.yaml", + FIXTURES_DIR / "js_xss_dom.yaml", + ] + results = validate_rule_files(paths) + assert len(results) == 2 + assert all(r.is_valid for r in results)