Skip to content

Assess API

config

dita_etl.assess.config

Assessment-stage configuration dataclasses.

Loaded from config/assess.yaml once at startup and passed immutably through the assessment pipeline.

AssessConfig dataclass

Root configuration object for the assessment stage.

Parameters:

Name Type Description Default
intermediate str

Intermediate format name used in reports.

'docbook5'
shingling Shingling

MinHash shingling parameters.

Shingling()
scoring ScoringWeights

Scoring weights for readiness and risk.

ScoringWeights()
classification dict[str, list[str]]

Keyword lists used by the topic-type predictor.

(lambda: {'task_keywords': ['click', 'run', 'open', 'select', 'type', 'press'], 'task_landmarks': ['prerequisites', 'steps', 'results', 'troubleshooting'], 'reference_markers': ['parameters', 'options', 'syntax', 'defaults']})()
duplication Duplication

Near-duplicate handling settings.

Duplication()
limits Limits

Content length thresholds.

Limits()
Source code in dita_etl/assess/config.py
@dataclass
class AssessConfig:
    """Root configuration object for the assessment stage.

    :param intermediate: Intermediate format name used in reports.
    :param shingling: MinHash shingling parameters.
    :param scoring: Scoring weights for readiness and risk.
    :param classification: Keyword lists used by the topic-type predictor.
    :param duplication: Near-duplicate handling settings.
    :param limits: Content length thresholds.
    """

    intermediate: str = "docbook5"
    shingling: Shingling = field(default_factory=Shingling)
    scoring: ScoringWeights = field(default_factory=ScoringWeights)
    classification: dict[str, list[str]] = field(
        default_factory=lambda: {
            "task_keywords": ["click", "run", "open", "select", "type", "press"],
            "task_landmarks": ["prerequisites", "steps", "results", "troubleshooting"],
            "reference_markers": ["parameters", "options", "syntax", "defaults"],
        }
    )
    duplication: Duplication = field(default_factory=Duplication)
    limits: Limits = field(default_factory=Limits)

    # ------------------------------------------------------------------
    # Factory (imperative shell: file I/O lives only here)
    # ------------------------------------------------------------------

    @staticmethod
    def load(path: str) -> "AssessConfig":
        """Load an assessment configuration from a YAML file.

        :param path: Path to the YAML configuration file.
        :returns: Populated :class:`AssessConfig` instance.
        :raises FileNotFoundError: If *path* does not exist.
        """
        with open(path) as fh:
            data: dict[str, Any] = yaml.safe_load(fh) or {}

        cfg = AssessConfig()
        _nested_types = {
            "shingling": Shingling,
            "scoring": ScoringWeights,
            "limits": Limits,
            "duplication": Duplication,
        }
        for key, value in data.items():
            if key in _nested_types and isinstance(value, dict):
                obj = getattr(cfg, key)
                for sub_key, sub_val in value.items():
                    setattr(obj, sub_key, sub_val)
            else:
                setattr(cfg, key, value)
        return cfg

load(path) staticmethod

Load an assessment configuration from a YAML file.

Parameters:

Name Type Description Default
path str

Path to the YAML configuration file.

required

Returns:

Type Description
'AssessConfig'

Populated :class:AssessConfig instance.

Raises:

Type Description
FileNotFoundError

If path does not exist.

Source code in dita_etl/assess/config.py
@staticmethod
def load(path: str) -> "AssessConfig":
    """Load an assessment configuration from a YAML file.

    :param path: Path to the YAML configuration file.
    :returns: Populated :class:`AssessConfig` instance.
    :raises FileNotFoundError: If *path* does not exist.
    """
    with open(path) as fh:
        data: dict[str, Any] = yaml.safe_load(fh) or {}

    cfg = AssessConfig()
    _nested_types = {
        "shingling": Shingling,
        "scoring": ScoringWeights,
        "limits": Limits,
        "duplication": Duplication,
    }
    for key, value in data.items():
        if key in _nested_types and isinstance(value, dict):
            obj = getattr(cfg, key)
            for sub_key, sub_val in value.items():
                setattr(obj, sub_key, sub_val)
        else:
            setattr(cfg, key, value)
    return cfg

Shingling dataclass

MinHash shingling parameters for near-duplicate detection.

Parameters:

Name Type Description Default
stopwords str | None

Optional path to a stopword list (currently unused).

None
ngram int

Size of each shingle (token n-gram window).

7
minhash_num_perm int

Number of permutations for the MinHash signature.

64
threshold float

Jaccard similarity threshold above which two documents are considered near-duplicates.

0.88
Source code in dita_etl/assess/config.py
@dataclass
class Shingling:
    """MinHash shingling parameters for near-duplicate detection.

    :param stopwords: Optional path to a stopword list (currently unused).
    :param ngram: Size of each shingle (token n-gram window).
    :param minhash_num_perm: Number of permutations for the MinHash signature.
    :param threshold: Jaccard similarity threshold above which two documents
        are considered near-duplicates.
    """

    stopwords: str | None = None
    ngram: int = 7
    minhash_num_perm: int = 64
    threshold: float = 0.88

ScoringWeights dataclass

Weights used by the topicization-readiness and conversion-risk scorers.

Parameters:

Name Type Description Default
topicization_weights dict[str, int]

Per-metric additive weights for the readiness score (0-100).

(lambda: {'heading_ladder_valid': 10, 'avg_section_len_target': 15, 'tables_simple': 10, 'lists_depth_ok': 10, 'images_with_alt': 5})()
risk_weights dict[str, int]

Per-metric additive weights for the risk score (0-100).

(lambda: {'deep_nesting': 20, 'complex_tables': 25, 'unresolved_anchors': 15, 'mixed_inline_blocks': 10})()
Source code in dita_etl/assess/config.py
@dataclass
class ScoringWeights:
    """Weights used by the topicization-readiness and conversion-risk scorers.

    :param topicization_weights: Per-metric additive weights for the readiness
        score (0-100).
    :param risk_weights: Per-metric additive weights for the risk score (0-100).
    """

    topicization_weights: dict[str, int] = field(
        default_factory=lambda: {
            "heading_ladder_valid": 10,
            "avg_section_len_target": 15,
            "tables_simple": 10,
            "lists_depth_ok": 10,
            "images_with_alt": 5,
        }
    )
    risk_weights: dict[str, int] = field(
        default_factory=lambda: {
            "deep_nesting": 20,
            "complex_tables": 25,
            "unresolved_anchors": 15,
            "mixed_inline_blocks": 10,
        }
    )

Limits dataclass

Content length thresholds for scoring.

Parameters:

Name Type Description Default
target_section_tokens list[int]

[min, max] token range for an "ideally-sized" section.

(lambda: [50, 500])()
Source code in dita_etl/assess/config.py
@dataclass
class Limits:
    """Content length thresholds for scoring.

    :param target_section_tokens: ``[min, max]`` token range for an
        "ideally-sized" section.
    """

    target_section_tokens: list[int] = field(default_factory=lambda: [50, 500])

Duplication dataclass

Near-duplicate handling settings.

Parameters:

Name Type Description Default
prefer_paths list[str]

Path prefixes that should be preferred when resolving duplicate clusters.

list()
action str

What to do with detected duplicates — "propose" emits recommendations; future values may include "remove" or "merge".

'propose'
Source code in dita_etl/assess/config.py
@dataclass
class Duplication:
    """Near-duplicate handling settings.

    :param prefer_paths: Path prefixes that should be preferred when resolving
        duplicate clusters.
    :param action: What to do with detected duplicates — ``"propose"`` emits
        recommendations; future values may include ``"remove"`` or ``"merge"``.
    """

    prefer_paths: list[str] = field(default_factory=list)
    action: str = "propose"

structure

dita_etl.assess.structure

Markdown structural analysis — pure functions.

Provides sectionization and structural-validity checks for Markdown source documents. All functions are pure: they take text and return data structures with no I/O or side effects.

sectionize_markdown(text)

Split a Markdown document into logical sections at heading boundaries.

Each section is represented as a dictionary with keys:

  • "level" – heading depth (1–6; 0 for the implicit preamble section).
  • "title" – heading text, or "Document" for the preamble.
  • "content" – body text between this heading and the next.

:Example:

.. code-block:: python

secs = sectionize_markdown("# Intro\n\nHello\n## Details\n\nMore")
assert secs[0]["title"] == "Intro"
assert secs[1]["title"] == "Details"

Parameters:

Name Type Description Default
text str

Raw Markdown source text.

required

Returns:

Type Description
list[dict[str, Any]]

Ordered list of section dictionaries.

Source code in dita_etl/assess/structure.py
def sectionize_markdown(text: str) -> list[dict[str, Any]]:
    """Split a Markdown document into logical sections at heading boundaries.

    Each section is represented as a dictionary with keys:

    * ``"level"`` – heading depth (1–6; 0 for the implicit preamble section).
    * ``"title"`` – heading text, or ``"Document"`` for the preamble.
    * ``"content"`` – body text between this heading and the next.

    :param text: Raw Markdown source text.
    :returns: Ordered list of section dictionaries.

    :Example:

    .. code-block:: python

        secs = sectionize_markdown("# Intro\\n\\nHello\\n## Details\\n\\nMore")
        assert secs[0]["title"] == "Intro"
        assert secs[1]["title"] == "Details"
    """
    lines = text.splitlines()
    sections: list[dict[str, Any]] = []
    current: dict[str, Any] = {"level": 0, "title": "Document", "content": []}

    for line in lines:
        match = _HEADING_RE.match(line)
        if match:
            if current["content"]:
                sections.append(
                    {**current, "content": "\n".join(current["content"]).strip()}
                )
            current = {
                "level": len(match.group(1)),
                "title": match.group(2).strip(),
                "content": [],
            }
        else:
            current["content"].append(line)

    # Flush the final section — skip the default preamble if it has no content
    if current["level"] > 0 or current["content"]:
        sections.append(
            {**current, "content": "\n".join(current["content"]).strip()}
        )
    return sections

heading_ladder_valid(sections)

Check that heading levels do not skip more than one level at a time.

A document that jumps from ## directly to #### (skipping ###) is considered invalid.

Parameters:

Name Type Description Default
sections list[dict[str, Any]]

Section list as returned by :func:sectionize_markdown.

required

Returns:

Type Description
bool

True if the heading ladder is valid; False otherwise.

Source code in dita_etl/assess/structure.py
def heading_ladder_valid(sections: list[dict[str, Any]]) -> bool:
    """Check that heading levels do not skip more than one level at a time.

    A document that jumps from ``##`` directly to ``####`` (skipping ``###``)
    is considered invalid.

    :param sections: Section list as returned by :func:`sectionize_markdown`.
    :returns: ``True`` if the heading ladder is valid; ``False`` otherwise.
    """
    last = 0
    for section in sections:
        level = section.get("level", 1) or 1
        if last and (level - last) > 1:
            return False
        last = level
    return True

features

dita_etl.assess.features

Section feature extraction — pure functions.

Computes a feature vector for a single Markdown section. All functions are pure: identical inputs always produce identical outputs with no side effects.

count_tokens(text)

Count word tokens in text using a simple \w+ pattern.

Parameters:

Name Type Description Default
text str

Input text.

required

Returns:

Type Description
int

Number of token matches.

Source code in dita_etl/assess/features.py
def count_tokens(text: str) -> int:
    """Count word tokens in *text* using a simple ``\\w+`` pattern.

    :param text: Input text.
    :returns: Number of token matches.
    """
    return len(re.findall(r"\w+", text))

imperative_density(text, verbs)

Compute the ratio of imperative verb occurrences to total token count.

Parameters:

Name Type Description Default
text str

Input text.

required
verbs list[str]

List of imperative verbs to search for (case-insensitive).

required

Returns:

Type Description
float

Ratio between 0.0 and 1.0.

Source code in dita_etl/assess/features.py
def imperative_density(text: str, verbs: list[str]) -> float:
    """Compute the ratio of imperative verb occurrences to total token count.

    :param text: Input text.
    :param verbs: List of imperative verbs to search for (case-insensitive).
    :returns: Ratio between 0.0 and 1.0.
    """
    total_tokens = count_tokens(text) or 1
    hits = sum(
        len(re.findall(r"\b" + re.escape(v) + r"\b", text, re.IGNORECASE))
        for v in verbs
    )
    return hits / total_tokens

extract_features(section, landmarks)

Compute a feature dictionary for a single document section.

Parameters:

Name Type Description Default
section dict[str, Any]

Section dict with "title" and "content" keys, as returned by :func:~dita_etl.assess.structure.sectionize_markdown.

required
landmarks dict[str, list[str]]

Classification keyword lists from :class:~dita_etl.assess.config.AssessConfig. Expected keys: "task_keywords", "task_landmarks", "reference_markers".

required

Returns:

Type Description
dict[str, Any]

Dictionary of feature names to scalar values.

Returned keys:

  • tokens – total word count.
  • ordered_lists – number of ordered list items.
  • unordered_lists – number of unordered list items.
  • tables – number of table rows.
  • images – number of inline images.
  • links – number of inline hyperlinks.
  • has_steps_title – whether the section title contains a task landmark.
  • imperative_density – ratio of imperative verbs to total tokens.
  • reference_markers – count of reference marker keywords found.
Source code in dita_etl/assess/features.py
def extract_features(
    section: dict[str, Any],
    landmarks: dict[str, list[str]],
) -> dict[str, Any]:
    """Compute a feature dictionary for a single document section.

    :param section: Section dict with ``"title"`` and ``"content"`` keys, as
        returned by :func:`~dita_etl.assess.structure.sectionize_markdown`.
    :param landmarks: Classification keyword lists from
        :class:`~dita_etl.assess.config.AssessConfig`. Expected keys:
        ``"task_keywords"``, ``"task_landmarks"``, ``"reference_markers"``.
    :returns: Dictionary of feature names to scalar values.

    Returned keys:

    * ``tokens`` – total word count.
    * ``ordered_lists`` – number of ordered list items.
    * ``unordered_lists`` – number of unordered list items.
    * ``tables`` – number of table rows.
    * ``images`` – number of inline images.
    * ``links`` – number of inline hyperlinks.
    * ``has_steps_title`` – whether the section title contains a task landmark.
    * ``imperative_density`` – ratio of imperative verbs to total tokens.
    * ``reference_markers`` – count of reference marker keywords found.
    """
    content: str = section.get("content", "")
    title: str = section.get("title", "")

    return {
        "tokens": count_tokens(content),
        "ordered_lists": len(_ORDERED_LIST_RE.findall(content)),
        "unordered_lists": len(_UNORDERED_LIST_RE.findall(content)),
        "tables": len(_TABLE_RE.findall(content)),
        "images": len(_IMAGE_RE.findall(content)),
        "links": len(_LINK_RE.findall(content)),
        "has_steps_title": any(
            k.lower() in title.lower()
            for k in landmarks.get("task_landmarks", [])
        ),
        "imperative_density": imperative_density(
            content, landmarks.get("task_keywords", [])
        ),
        "reference_markers": sum(
            1
            for k in landmarks.get("reference_markers", [])
            if k.lower() in content.lower()
        ),
    }

scoring

dita_etl.assess.scoring

Document-readiness and conversion-risk scorers — pure functions.

All functions are pure: no I/O, no side effects.

score_topicization(metrics, weights, target_range)

Compute a topicization-readiness score in the range 0–100.

Higher values indicate a document that is well-structured for conversion to DITA topics.

Parameters:

Name Type Description Default
metrics dict[str, Any]

Metrics dictionary as produced by :func:~dita_etl.assess.inventory.assess_file_markdown.

required
weights dict[str, int]

Per-metric additive weights from :class:~dita_etl.assess.config.ScoringWeights.

required
target_range list[int]

[min_tokens, max_tokens] for the ideal section length.

required

Returns:

Type Description
int

Integer readiness score clamped to [0, 100].

Source code in dita_etl/assess/scoring.py
def score_topicization(
    metrics: dict[str, Any],
    weights: dict[str, int],
    target_range: list[int],
) -> int:
    """Compute a topicization-readiness score in the range 0–100.

    Higher values indicate a document that is well-structured for conversion
    to DITA topics.

    :param metrics: Metrics dictionary as produced by
        :func:`~dita_etl.assess.inventory.assess_file_markdown`.
    :param weights: Per-metric additive weights from
        :class:`~dita_etl.assess.config.ScoringWeights`.
    :param target_range: ``[min_tokens, max_tokens]`` for the ideal section
        length.
    :returns: Integer readiness score clamped to [0, 100].
    """
    score = 0
    if metrics.get("heading_ladder_valid"):
        score += weights.get("heading_ladder_valid", 0)
    lo, hi = target_range[0], target_range[1]
    avg = metrics.get("avg_section_tokens", 0)
    if lo <= avg <= hi:
        score += weights.get("avg_section_len_target", 0)
    if metrics.get("tables_simple"):
        score += weights.get("tables_simple", 0)
    if metrics.get("lists_depth_ok"):
        score += weights.get("lists_depth_ok", 0)
    if metrics.get("images_with_alt"):
        score += weights.get("images_with_alt", 0)
    return _clamp(score, 0, 100)

score_risk(metrics, weights)

Compute a conversion-risk score in the range 0–100.

Higher values indicate a document with structural patterns that are difficult to convert reliably.

Parameters:

Name Type Description Default
metrics dict[str, Any]

Metrics dictionary as produced by :func:~dita_etl.assess.inventory.assess_file_markdown.

required
weights dict[str, int]

Per-metric additive weights from :class:~dita_etl.assess.config.ScoringWeights.

required

Returns:

Type Description
int

Integer risk score clamped to [0, 100].

Source code in dita_etl/assess/scoring.py
def score_risk(
    metrics: dict[str, Any],
    weights: dict[str, int],
) -> int:
    """Compute a conversion-risk score in the range 0–100.

    Higher values indicate a document with structural patterns that are
    difficult to convert reliably.

    :param metrics: Metrics dictionary as produced by
        :func:`~dita_etl.assess.inventory.assess_file_markdown`.
    :param weights: Per-metric additive weights from
        :class:`~dita_etl.assess.config.ScoringWeights`.
    :returns: Integer risk score clamped to [0, 100].
    """
    score = 0
    if metrics.get("deep_nesting"):
        score += weights.get("deep_nesting", 0)
    if metrics.get("complex_tables"):
        score += weights.get("complex_tables", 0)
    if metrics.get("unresolved_anchors"):
        score += weights.get("unresolved_anchors", 0)
    if metrics.get("mixed_inline_blocks"):
        score += weights.get("mixed_inline_blocks", 0)
    return _clamp(score, 0, 100)

predict

dita_etl.assess.predict

Topic-type prediction for individual sections — pure functions.

Split from scoring.py to give prediction its own single-responsibility module with a clear, testable interface.

predict_topic_type(section_feats, landmarks)

Predict the DITA topic type for a single section based on its features.

Rules are evaluated in priority order:

  1. Task: ordered list present and (imperative density > 0.005 or steps-style title detected).
  2. Reference: tables present or reference-marker keywords found.
  3. Concept: default fallback.

Parameters:

Name Type Description Default
section_feats dict[str, Any]

Feature dictionary as returned by :func:~dita_etl.assess.features.extract_features.

required
landmarks dict[str, list[str]]

Classification keyword lists (unused in current heuristic but retained for future expansion).

required

Returns:

Type Description
tuple[str, float, list[str]]

Tuple of (topic_type, confidence, reasons) where topic_type is one of "concept", "task", "reference"; confidence is a float in [0, 1]; and reasons is a list of human-readable strings explaining the prediction.

Source code in dita_etl/assess/predict.py
def predict_topic_type(
    section_feats: dict[str, Any],
    landmarks: dict[str, list[str]],
) -> tuple[str, float, list[str]]:
    """Predict the DITA topic type for a single section based on its features.

    Rules are evaluated in priority order:

    1. **Task**: ordered list present *and* (imperative density > 0.005 *or*
       steps-style title detected).
    2. **Reference**: tables present *or* reference-marker keywords found.
    3. **Concept**: default fallback.

    :param section_feats: Feature dictionary as returned by
        :func:`~dita_etl.assess.features.extract_features`.
    :param landmarks: Classification keyword lists (unused in current
        heuristic but retained for future expansion).
    :returns: Tuple of ``(topic_type, confidence, reasons)`` where
        *topic_type* is one of ``"concept"``, ``"task"``, ``"reference"``;
        *confidence* is a float in [0, 1]; and *reasons* is a list of
        human-readable strings explaining the prediction.
    """
    reasons: list[str] = []

    has_ordered = section_feats.get("ordered_lists", 0) > 0
    is_imperative = section_feats.get("imperative_density", 0.0) > 0.005
    has_steps_title = bool(section_feats.get("has_steps_title", False))
    has_tables = section_feats.get("tables", 0) > 0
    has_ref_markers = section_feats.get("reference_markers", 0) > 0

    if has_ordered and (is_imperative or has_steps_title):
        reasons.append("ordered list + imperative/steps")
        return "task", 0.85, reasons

    if has_tables or has_ref_markers:
        reasons.append("tables or reference markers")
        return "reference", 0.80, reasons

    reasons.append("expository default")
    return "concept", 0.60, reasons

dedupe

dita_etl.assess.dedupe

Near-duplicate detection via MinHash — pure functions.

Uses token n-gram shingling and MinHash signatures to efficiently cluster documents that are likely near-duplicates without comparing every pair of full texts.

All functions are pure: no I/O, no side effects.

shingle_tokens(text, n=7)

Tokenise text and return all overlapping n-gram shingles.

:Example:

.. code-block:: python

shingles = shingle_tokens("the quick brown fox", n=2)
# ["the quick", "quick brown", "brown fox"]

Parameters:

Name Type Description Default
text str

Input document text.

required
n int

Shingle size (token n-gram window).

7

Returns:

Type Description
list[str]

List of n-gram strings, lower-cased and space-separated.

Source code in dita_etl/assess/dedupe.py
def shingle_tokens(text: str, n: int = 7) -> list[str]:
    """Tokenise *text* and return all overlapping n-gram shingles.

    :param text: Input document text.
    :param n: Shingle size (token n-gram window).
    :returns: List of n-gram strings, lower-cased and space-separated.

    :Example:

    .. code-block:: python

        shingles = shingle_tokens("the quick brown fox", n=2)
        # ["the quick", "quick brown", "brown fox"]
    """
    tokens = [t.lower() for t in re.findall(r"\w+", text)]
    return [
        " ".join(tokens[i: i + n])
        for i in range(max(0, len(tokens) - n + 1))
    ]

minhash_signature(shingles, num_perm=128)

Compute a MinHash signature for a set of shingles.

Uses BLAKE2b with per-permutation personalisation bytes as a fast, independent hash family.

Parameters:

Name Type Description Default
shingles list[str]

List of shingle strings.

required
num_perm int

Number of hash permutations (signature length).

128

Returns:

Type Description
list[int]

List of num_perm integer values forming the MinHash signature.

Source code in dita_etl/assess/dedupe.py
def minhash_signature(shingles: list[str], num_perm: int = 128) -> list[int]:
    """Compute a MinHash signature for a set of shingles.

    Uses BLAKE2b with per-permutation personalisation bytes as a fast,
    independent hash family.

    :param shingles: List of shingle strings.
    :param num_perm: Number of hash permutations (signature length).
    :returns: List of *num_perm* integer values forming the MinHash signature.
    """
    signature = [2**63 - 1] * num_perm
    for shingle in shingles:
        encoded = shingle.encode("utf-8")
        for perm in range(num_perm):
            digest = hashlib.blake2b(
                encoded,
                digest_size=8,
                person=str(perm).encode().ljust(16, b"\x00")[:16],
            ).digest()
            value = int.from_bytes(digest, "big")
            if value < signature[perm]:
                signature[perm] = value
    return signature

jaccard_from_signatures(sig1, sig2)

Estimate the Jaccard similarity of two sets from their MinHash signatures.

Parameters:

Name Type Description Default
sig1 list[int]

MinHash signature for the first set.

required
sig2 list[int]

MinHash signature for the second set.

required

Returns:

Type Description
float

Estimated Jaccard similarity in [0.0, 1.0]; returns 0.0 if either signature is empty.

Source code in dita_etl/assess/dedupe.py
def jaccard_from_signatures(sig1: list[int], sig2: list[int]) -> float:
    """Estimate the Jaccard similarity of two sets from their MinHash signatures.

    :param sig1: MinHash signature for the first set.
    :param sig2: MinHash signature for the second set.
    :returns: Estimated Jaccard similarity in [0.0, 1.0]; returns 0.0 if
        either signature is empty.
    """
    if not sig1 or not sig2:
        return 0.0
    matches = sum(1 for a, b in zip(sig1, sig2) if a == b)
    return matches / len(sig1)

cluster_near_duplicates(items, ngram, num_perm, threshold)

Group documents into near-duplicate clusters using MinHash.

Uses a greedy O(n²) clustering approach (sufficient for typical document-set sizes of hundreds to low thousands).

Parameters:

Name Type Description Default
items list[tuple[str, str]]

List of (key, text) pairs where key is a document identifier (e.g. a file path) and text is the document content.

required
ngram int

Shingle size for token n-grams.

required
num_perm int

Number of MinHash permutations.

required
threshold float

Jaccard similarity threshold; document pairs above this value are placed in the same cluster.

required

Returns:

Type Description
list[list[str]]

List of clusters, each cluster being a list of document keys. Every key appears in exactly one cluster.

Source code in dita_etl/assess/dedupe.py
def cluster_near_duplicates(
    items: list[tuple[str, str]],
    ngram: int,
    num_perm: int,
    threshold: float,
) -> list[list[str]]:
    """Group documents into near-duplicate clusters using MinHash.

    Uses a greedy O(n²) clustering approach (sufficient for typical
    document-set sizes of hundreds to low thousands).

    :param items: List of ``(key, text)`` pairs where *key* is a document
        identifier (e.g. a file path) and *text* is the document content.
    :param ngram: Shingle size for token n-grams.
    :param num_perm: Number of MinHash permutations.
    :param threshold: Jaccard similarity threshold; document pairs above this
        value are placed in the same cluster.
    :returns: List of clusters, each cluster being a list of document keys.
        Every key appears in exactly one cluster.
    """
    signatures: dict[str, list[int]] = {
        key: minhash_signature(shingle_tokens(text, n=ngram), num_perm=num_perm)
        for key, text in items
    }
    keys = list(signatures)
    clusters: list[list[str]] = []
    seen: set[str] = set()

    for i, key_a in enumerate(keys):
        if key_a in seen:
            continue
        cluster = [key_a]
        seen.add(key_a)
        for key_b in keys[i + 1:]:
            if key_b in seen:
                continue
            if jaccard_from_signatures(signatures[key_a], signatures[key_b]) >= threshold:
                cluster.append(key_b)
                seen.add(key_b)
        clusters.append(cluster)

    return clusters