Skip to content

Services API

Scanner service — top-level validation orchestrator.

:class:Scanner is the Facade for the entire validation pipeline. Callers (CLI, tests, external code) only need to interact with this class; they do not need to know about parsers, loaders, or evaluators.

Design pattern: Facade — :class:Scanner presents a single, simple interface over the multi-step parse → load → evaluate pipeline.

Usage::

from markdown_validator.services.scanner import Scanner

scanner = Scanner()
report = scanner.validate(
    markdown_file="docs/article.md",
    rules_file="rules/tutorial.json",
)
print(report.passed, report.score, report.total_rules)

Scanner

Orchestrates the end-to-end markdown validation pipeline.

Parameters:

Name Type Description Default
repository RuleSetRepository | None

Optional custom :class:~markdown_validator.infrastructure.loader.RuleSetRepository. If omitted, a default instance is created. Inject a custom repository in tests to avoid filesystem access.

None
Source code in markdown_validator/services/scanner.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class Scanner:
    """Orchestrates the end-to-end markdown validation pipeline.

    :param repository: Optional custom :class:`~markdown_validator.infrastructure.loader.RuleSetRepository`.
        If omitted, a default instance is created.  Inject a custom
        repository in tests to avoid filesystem access.
    """

    def __init__(self, repository: RuleSetRepository | None = None) -> None:
        self._repo = repository or RuleSetRepository()

    def validate(
        self,
        markdown_file: str | Path,
        rules_file: str | Path,
    ) -> ScanReport:
        """Validate a single Markdown file against a rule-set.

        :param markdown_file: Path to the ``.md`` file to validate.
        :param rules_file: Path to the rule-set JSON file.
        :return: A frozen :class:`~markdown_validator.domain.models.ScanReport`
            with per-rule results and an aggregate ``passed`` flag.
        :raises FileNotFoundError: If either file does not exist.
        :raises ValueError: If the rule-set JSON is invalid.
        """
        md_path = Path(markdown_file)
        logger.info("Scanner.validate: file=%s rules=%s", md_path, rules_file)

        rule_set = self._repo.load(rules_file)
        return self.validate_with_ruleset(md_path, rule_set)

    def validate_with_ruleset(
        self,
        markdown_file: str | Path,
        rule_set: RuleSetModel,
    ) -> ScanReport:
        """Validate a Markdown file against an already-loaded rule set.

        This method is useful when the rule set has already been loaded (e.g.,
        to validate multiple files against the same rules without re-reading
        the JSON on each call).

        :param markdown_file: Path to the ``.md`` file to validate.
        :param rule_set: Pre-loaded, validated rule set.
        :return: A frozen :class:`~markdown_validator.domain.models.ScanReport`.
        :raises FileNotFoundError: If *markdown_file* does not exist.
        """
        md_path = Path(markdown_file)
        doc = parse_document(md_path)

        results: list[ValidationResult] = []
        for rule in rule_set.all_rules:
            result = evaluate_rule(rule, doc)
            results.append(result)
            logger.debug(
                "Scanner: rule id=%d %r%s",
                rule.id,
                rule.name,
                "PASS" if result.passed else "FAIL",
            )

        score = sum(1 for r in results if r.passed)
        # A scan passes only when all Required rules pass
        passed = all(
            r.passed for r in results if r.level == "Required"
        )

        report = ScanReport(
            filepath=str(md_path.resolve()),
            score=score,
            total_rules=len(results),
            passed=passed,
            results=results,
        )
        logger.info(
            "Scanner.validate: score=%d/%d passed=%s file=%s",
            score,
            len(results),
            passed,
            md_path,
        )
        return report

    def validate_directory(
        self,
        directory: str | Path,
        rules_file: str | Path,
    ) -> list[ScanReport]:
        """Validate all ``.md`` files under *directory*.

        The rule set is loaded once and reused for every file.

        :param directory: Root directory to scan recursively.
        :param rules_file: Path to the rule-set JSON file.
        :return: List of :class:`~markdown_validator.domain.models.ScanReport`
            objects, one per file.
        :raises FileNotFoundError: If the rule-set file does not exist.
        :raises NotADirectoryError: If *directory* is not a directory.
        """
        from markdown_validator.infrastructure.parser import find_markdown_files

        rule_set = self._repo.load(rules_file)
        files = find_markdown_files(directory)
        logger.info(
            "Scanner.validate_directory: %d files in %s", len(files), directory
        )

        reports: list[ScanReport] = []
        for md_path in files:
            try:
                report = self.validate_with_ruleset(md_path, rule_set)
                reports.append(report)
            except (FileNotFoundError, ValueError) as exc:
                logger.error("Scanner.validate_directory: skipping %s%s", md_path, exc)

        return reports

validate(markdown_file: str | Path, rules_file: str | Path) -> ScanReport

Validate a single Markdown file against a rule-set.

Parameters:

Name Type Description Default
markdown_file str | Path

Path to the .md file to validate.

required
rules_file str | Path

Path to the rule-set JSON file.

required

Returns:

Type Description
ScanReport

A frozen :class:~markdown_validator.domain.models.ScanReport with per-rule results and an aggregate passed flag.

Raises:

Type Description
FileNotFoundError

If either file does not exist.

ValueError

If the rule-set JSON is invalid.

Source code in markdown_validator/services/scanner.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def validate(
    self,
    markdown_file: str | Path,
    rules_file: str | Path,
) -> ScanReport:
    """Validate a single Markdown file against a rule-set.

    :param markdown_file: Path to the ``.md`` file to validate.
    :param rules_file: Path to the rule-set JSON file.
    :return: A frozen :class:`~markdown_validator.domain.models.ScanReport`
        with per-rule results and an aggregate ``passed`` flag.
    :raises FileNotFoundError: If either file does not exist.
    :raises ValueError: If the rule-set JSON is invalid.
    """
    md_path = Path(markdown_file)
    logger.info("Scanner.validate: file=%s rules=%s", md_path, rules_file)

    rule_set = self._repo.load(rules_file)
    return self.validate_with_ruleset(md_path, rule_set)

validate_directory(directory: str | Path, rules_file: str | Path) -> list[ScanReport]

Validate all .md files under directory.

The rule set is loaded once and reused for every file.

Parameters:

Name Type Description Default
directory str | Path

Root directory to scan recursively.

required
rules_file str | Path

Path to the rule-set JSON file.

required

Returns:

Type Description
list[ScanReport]

List of :class:~markdown_validator.domain.models.ScanReport objects, one per file.

Raises:

Type Description
FileNotFoundError

If the rule-set file does not exist.

NotADirectoryError

If directory is not a directory.

Source code in markdown_validator/services/scanner.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def validate_directory(
    self,
    directory: str | Path,
    rules_file: str | Path,
) -> list[ScanReport]:
    """Validate all ``.md`` files under *directory*.

    The rule set is loaded once and reused for every file.

    :param directory: Root directory to scan recursively.
    :param rules_file: Path to the rule-set JSON file.
    :return: List of :class:`~markdown_validator.domain.models.ScanReport`
        objects, one per file.
    :raises FileNotFoundError: If the rule-set file does not exist.
    :raises NotADirectoryError: If *directory* is not a directory.
    """
    from markdown_validator.infrastructure.parser import find_markdown_files

    rule_set = self._repo.load(rules_file)
    files = find_markdown_files(directory)
    logger.info(
        "Scanner.validate_directory: %d files in %s", len(files), directory
    )

    reports: list[ScanReport] = []
    for md_path in files:
        try:
            report = self.validate_with_ruleset(md_path, rule_set)
            reports.append(report)
        except (FileNotFoundError, ValueError) as exc:
            logger.error("Scanner.validate_directory: skipping %s%s", md_path, exc)

    return reports

validate_with_ruleset(markdown_file: str | Path, rule_set: RuleSetModel) -> ScanReport

Validate a Markdown file against an already-loaded rule set.

This method is useful when the rule set has already been loaded (e.g., to validate multiple files against the same rules without re-reading the JSON on each call).

Parameters:

Name Type Description Default
markdown_file str | Path

Path to the .md file to validate.

required
rule_set RuleSetModel

Pre-loaded, validated rule set.

required

Returns:

Type Description
ScanReport

A frozen :class:~markdown_validator.domain.models.ScanReport.

Raises:

Type Description
FileNotFoundError

If markdown_file does not exist.

Source code in markdown_validator/services/scanner.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def validate_with_ruleset(
    self,
    markdown_file: str | Path,
    rule_set: RuleSetModel,
) -> ScanReport:
    """Validate a Markdown file against an already-loaded rule set.

    This method is useful when the rule set has already been loaded (e.g.,
    to validate multiple files against the same rules without re-reading
    the JSON on each call).

    :param markdown_file: Path to the ``.md`` file to validate.
    :param rule_set: Pre-loaded, validated rule set.
    :return: A frozen :class:`~markdown_validator.domain.models.ScanReport`.
    :raises FileNotFoundError: If *markdown_file* does not exist.
    """
    md_path = Path(markdown_file)
    doc = parse_document(md_path)

    results: list[ValidationResult] = []
    for rule in rule_set.all_rules:
        result = evaluate_rule(rule, doc)
        results.append(result)
        logger.debug(
            "Scanner: rule id=%d %r%s",
            rule.id,
            rule.name,
            "PASS" if result.passed else "FAIL",
        )

    score = sum(1 for r in results if r.passed)
    # A scan passes only when all Required rules pass
    passed = all(
        r.passed for r in results if r.level == "Required"
    )

    report = ScanReport(
        filepath=str(md_path.resolve()),
        score=score,
        total_rules=len(results),
        passed=passed,
        results=results,
    )
    logger.info(
        "Scanner.validate: score=%d/%d passed=%s file=%s",
        score,
        len(results),
        passed,
        md_path,
    )
    return report

Workflow execution engine.

A workflow is an ordered sequence of steps that combines the results of individual validation rules into a higher-level pass/fail decision. Steps are encoded in a mini-language::

S-1,1-D,T-2,F-3,M-E

Each token is <source>-<target> where source/target are either a rule ID (integer) or a control symbol (S, D, T, F, M, E, R). Twelve step patterns are supported; see :meth:WorkflowEngine.run.

Design pattern: Chain of Responsibility — each step pattern is dispatched to its own handler method, making the state machine readable and individually testable.

WorkflowEngine

Executes workflow step sequences against a set of rule results.

Parameters:

Name Type Description Default
rule_results dict[int, bool]

Mapping from rule ID to the boolean pass/fail result for that rule, as produced by the scanner.

required
Source code in markdown_validator/services/workflow.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class WorkflowEngine:
    """Executes workflow step sequences against a set of rule results.

    :param rule_results: Mapping from rule ID to the boolean pass/fail result
        for that rule, as produced by the scanner.
    """

    def __init__(self, rule_results: dict[int, bool]) -> None:
        self._results = rule_results

    def run(self, workflow: WorkflowModel) -> WorkflowResult:
        """Execute a single workflow and return its result.

        :param workflow: The workflow definition (already normalised).
        :return: A :class:`~markdown_validator.domain.models.WorkflowResult`.
        """
        steps = _parse_steps(workflow.steps)
        state = _WorkflowState()

        for step_num, (source, target) in enumerate(steps, start=1):
            logger.debug(
                "WorkflowEngine.run: step %d%r-%r | wf=%s dec=%s",
                step_num,
                source,
                target,
                state.workflow_state,
                state.decision,
            )
            self._dispatch(source, target, state, step_num, workflow.name)

        passed = bool(state.workflow_state)
        logger.debug(
            "WorkflowEngine.run: workflow %r finished — passed=%s",
            workflow.name,
            passed,
        )
        return WorkflowResult(
            workflow_name=workflow.name,
            passed=passed,
            fix=workflow.fix if not passed else "",
        )

    def _dispatch(
        self,
        source: str | int,
        target: str | int,
        state: _WorkflowState,
        step_num: int,
        workflow_name: str,
    ) -> None:
        """Dispatch a single step to the appropriate handler.

        Implements all 12 workflow step patterns.

        :param source: Left-hand token of the step.
        :param target: Right-hand token of the step.
        :param state: Mutable workflow execution state (modified in place).
        :param step_num: 1-based step counter (for logging).
        :param workflow_name: Name of the enclosing workflow (for logging).
        """
        s_is_num = isinstance(source, int)
        t_is_num = isinstance(target, int)

        # Pattern 1: S-N  →  start, load rule N
        if source == "s" and t_is_num:
            state.workflow_state = self._rule_passed(target)  # type: ignore[arg-type]

        # Pattern 2: N-D  →  rule N becomes the decision
        elif s_is_num and target == "d":
            state.decision = self._rule_passed(source)  # type: ignore[arg-type]
            state.merge_active = True

        # Pattern 3: M-D  →  merge state becomes decision
        elif source == "m" and target == "d":
            state.workflow_state = state.decision
            state.merge_active = True
            state.decision = None

        # Pattern 4: T-N  →  if decision was True, load rule N
        elif source == "t" and t_is_num and state.decision is True:
            state.workflow_state = self._rule_passed(target)  # type: ignore[arg-type]

        # Pattern 5: F-N  →  if decision was False, load rule N
        elif source == "f" and t_is_num and state.decision is False:
            state.workflow_state = self._rule_passed(target)  # type: ignore[arg-type]

        # Pattern 6: T-R  →  if decision True, reverse (negate) it
        elif source == "t" and target == "r" and state.decision is True:
            state.decision = False

        # Pattern 7: F-R  →  if decision False, reverse (negate) it
        elif source == "f" and target == "r" and state.decision is False:
            state.decision = True

        # Pattern 8: N-M  →  rule N merges into merge state
        elif s_is_num and target == "m" and state.merge_active:
            state.workflow_state = state.decision
            state.merge_active = False  # type: ignore[assignment]

        # Pattern 9: M-N  →  exit merge, load rule N
        elif source == "m" and t_is_num and not state.merge_active:
            state.workflow_state = state.decision
            state.merge_active = False
            state.decision = None

        # Pattern 10: M-E  →  merge ends workflow
        elif source == "m" and target == "e":
            state.workflow_state = state.decision

        # Pattern 11: N-E  →  rule N ends workflow
        elif s_is_num and target == "e":
            state.workflow_state = self._rule_passed(source)  # type: ignore[arg-type]

        # Pattern 12: N-N  →  both rules must pass
        elif s_is_num and t_is_num:
            if not self._rule_passed(source):  # type: ignore[arg-type]
                state.workflow_state = False
            if not self._rule_passed(target):  # type: ignore[arg-type]
                state.workflow_state = False

        else:
            logger.warning(
                "WorkflowEngine: unrecognised step %r-%r in workflow %r (step %d)",
                source,
                target,
                workflow_name,
                step_num,
            )

    def _rule_passed(self, rule_id: int) -> bool:
        """Look up the pass/fail state of *rule_id*.

        :param rule_id: ID of the rule to look up.
        :return: ``True`` if the rule passed, ``False`` if it failed or is
            not present in the results.
        """
        result = self._results.get(rule_id)
        if result is None:
            logger.warning("WorkflowEngine: rule id=%d not found in results", rule_id)
            return False
        return result

run(workflow: WorkflowModel) -> WorkflowResult

Execute a single workflow and return its result.

Parameters:

Name Type Description Default
workflow WorkflowModel

The workflow definition (already normalised).

required

Returns:

Type Description
WorkflowResult

A :class:~markdown_validator.domain.models.WorkflowResult.

Source code in markdown_validator/services/workflow.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def run(self, workflow: WorkflowModel) -> WorkflowResult:
    """Execute a single workflow and return its result.

    :param workflow: The workflow definition (already normalised).
    :return: A :class:`~markdown_validator.domain.models.WorkflowResult`.
    """
    steps = _parse_steps(workflow.steps)
    state = _WorkflowState()

    for step_num, (source, target) in enumerate(steps, start=1):
        logger.debug(
            "WorkflowEngine.run: step %d%r-%r | wf=%s dec=%s",
            step_num,
            source,
            target,
            state.workflow_state,
            state.decision,
        )
        self._dispatch(source, target, state, step_num, workflow.name)

    passed = bool(state.workflow_state)
    logger.debug(
        "WorkflowEngine.run: workflow %r finished — passed=%s",
        workflow.name,
        passed,
    )
    return WorkflowResult(
        workflow_name=workflow.name,
        passed=passed,
        fix=workflow.fix if not passed else "",
    )

run_all_workflows(rule_set: RuleSetModel, scan_report: ScanReport) -> list[WorkflowResult]

Run every workflow defined in rule_set against scan_report results.

Parameters:

Name Type Description Default
rule_set RuleSetModel

The rule set containing workflow definitions.

required
scan_report ScanReport

The scan report produced by the scanner for the same document and rule set.

required

Returns:

Type Description
list[WorkflowResult]

List of :class:~markdown_validator.domain.models.WorkflowResult objects, one per workflow.

Source code in markdown_validator/services/workflow.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def run_all_workflows(
    rule_set: RuleSetModel,
    scan_report: ScanReport,
) -> list[WorkflowResult]:
    """Run every workflow defined in *rule_set* against *scan_report* results.

    :param rule_set: The rule set containing workflow definitions.
    :param scan_report: The scan report produced by the scanner for the same
        document and rule set.
    :return: List of :class:`~markdown_validator.domain.models.WorkflowResult`
        objects, one per workflow.
    """
    rule_results: dict[int, bool] = {r.rule_id: r.passed for r in scan_report.results}
    engine = WorkflowEngine(rule_results)

    results: list[WorkflowResult] = []
    for workflow in rule_set.workflows:
        wf_result = engine.run(workflow)
        results.append(wf_result)
        logger.info(
            "run_all_workflows: workflow=%r passed=%s",
            workflow.name,
            wf_result.passed,
        )
    return results