# pylint: disable=no-name-in-module
from typing import Any, Literal, Iterable, Mapping
from abc import ABC, abstractmethod
from importlib import import_module
import re
import numpy as np
from spacy.vocab import Vocab
from spacy.vectors import Vectors
from ..grammar import Component, Phrase, Sent, Doc
from ..abc import init_class_attrs
from ..datastruct import DataTuple
from ..utils.misc import best_matches, cosine_similarity
SpecType = dict[str, str | Iterable[str] | Phrase | Sent | Doc]
FloatVec = np.ndarray[tuple[int], np.floating]
_sim_methods = ("components", "phrases", "recursive", "average")
[docs]
class GrammarSimilarity(ABC):
"""Abstract base class for structured similarity scorers."""
__slots__ = ("element", "spec", "np")
slot_names: tuple[str, ...] = ()
def __init__(self, element: "GrammarElement", spec: Any) -> None:
self.element = element
if not self.vocab.has_vector:
raise RuntimeError("word vectors not available")
self.spec = spec
self.np = import_module(self.vocab.vectors.data.__class__.__module__)
def __init_subclass__(cls, register_with: type["GrammarElement"]) -> None:
init_class_attrs(cls, {
"__slots__": "slot_names"
}, check_slots=True)
register_with.Similarity = cls
ds = cls._get_docstring()
ds = re.sub(r"(\n\s*)Attributes(\s*\n)", r"\1Parameters\2", ds)
register_with.similarity.__doc__ += ds
# Properties --------------------------------------------------------------
@property
def vocab(self) -> Vocab:
return self.element.doc.vocab
@property
def vectors(self) -> Vectors:
return self.vocab.vectors
@property
def similarity(self) -> float:
sim = self.get_similarity(self.element, self.spec)
if isinstance(sim, np.ndarray) and sim.size == 1:
sim = float(sim)
return max(-1, min(sim, 1))
@property
@abstractmethod
def config(self) -> dict[str, Any]:
return {}
# Methods -----------------------------------------------------------------
[docs]
@abstractmethod
def get_similarity(self, element: "GrammarElement", spec: Any) -> float:
"""Get structured similarity between ``self.element`` and ``self.spec``."""
# Internals ---------------------------------------------------------------
@classmethod
def _get_docstring(cls) -> str:
return "\n"+"\n".join(cls.__doc__.split("\n")[1:-1])
[docs]
class PhraseSimilarity(GrammarSimilarity, register_with=Phrase):
r"""Structured similarity between phrases and sentences.
All methods defined here are designed to ensure that:
* Similarity of a phrase with respect to itself is ``1``.
* Similarity ``x ~ y == y ~ x``.
In some case the above may be true only approximately due to
accumulation of floating point imprecision.
Attributes
----------
element
Grammar phrase to compare.
spec
Specification against which the phrase is to be compared.
Can be another phrase, a string or an iterable of strings,
which should be single words. A single strings is splitted at
whitespace and turned into multiple words.
Finally, an averaged word vector for all words is computed.
Alternatively, a specification can have a form
of a dictionary mapping names of phrase parts or components
(see :attr:`segram.grammar.phrases.Phrase.part_names`
and :attr:`segram.grammar.phrase.Phrase.component_names`)
to either strings or iterables of strings convertible to word
vectors (as previously) or other phrases.
Importantly, phrases can be also compared against
:class:`segram.grammar.Sent` and :class:`segram.grammar.Doc`
objects as long as they are comprised of a single sentence.
See :class:`SentSimilarity` for details.
method
Method for calculating similarity between phrases:
``components``
Components are grouped in buckets by type
(verbs, nouns, prepositions and descriptions)
and averaged vectors are compared between
the same types. Finally, a weighted average
(with weights defined by the ``weight`` parameter)
is taken and rescaled with a factor ``shared / union``,
where ``shared`` is the numebr of types present in
both elements and ``union`` is the total number of unique
types among both of them. Thus, the final result is akin
to a fuzzy Jaccard similarity:
.. math::
J = \frac{|A \cap B|}{|A \cup B|}
``phrases``
As above but based on phrase parts and phrase head compoents.
See :attr:`segram.grammar.Phrase.part_names` for a full list.
``both``
As above but components and phrases are used
together.
``average``
Simple average vectors calculated over all component
head tokens are used. In this case weights are ignored.
``recursive``
NOTE. Currently not implemented.
First, head components are compared between two phrases,
and then the same rule is applied recursively to all
parts (subjects, direct objects etc.) where for each
type elements of the two phrases are matched in pairs
to maximize similarity. As previously, weights can be
applied to different types and a Jaccard-like rescaling
is applied. Additionaly, importance of nested phrases
may be discounted using ``decay_rate`` parameter by
rescaling each weight with a factor of ``decay_rate**depth``,
where ``depth`` is calculated relative to the depth
of the ``self.phrase``.
weights
Dictionary mapping phrase part or component names to arbitrary
weights (which must be positive). The weights do not have to be
normalized and sum up to one.
decay_rate
Additional parameter used when ``method="recursive"``,
which controls the rate at which contributions coming
from nested subphrases are discounted.
only, ignore
Lists of part or component names to selectively use or ignore.
Both arguments cannot be used at the same time.
Raises
------
RuntimeError
If word vectors are not available.
"""
__slots__ = ("method", "weights", "decay_rate", "only", "ignore")
def __init__(
self,
element: Phrase,
spec: Phrase | str | Iterable[str] | SpecType,
method: Literal[*_sim_methods] = _sim_methods[0],
*,
weights: dict[str, float | int] | None = None,
decay_rate: float = 1,
only: str | Iterable[str] = (),
ignore: str | Iterable[str] = ()
) -> None:
super().__init__(element, spec)
if method not in _sim_methods:
raise ValueError(f"'method' has to be one of {_sim_methods}")
if only and ignore:
raise ValueError("'only' and 'ignore' cannot be used at the same time")
weights = weights or {}
if any(v < 0 for v in weights.values()):
raise ValueError("weights must be non-negative")
if decay_rate <= 0:
raise ValueError("'decay_rate' must be positive")
self.method = method
self.weights = weights
self.decay_rate = decay_rate
self.only = only
self.ignore = ignore
# Properties --------------------------------------------------------------
@property
def config(self) -> dict[str, Any]:
return {
"method": self.method,
"weights": self.weights,
"decay_rate": self.decay_rate,
"only": self.only,
"ignore": self.ignore
}
# Methods -----------------------------------------------------------------
[docs]
def get_similarity(self, element: Phrase, spec: SpecType) -> float:
r"""Structured similarity between ``self.phrase`` and ``self.spec``."""
phrase = element
if isinstance(spec, Doc):
spec = self._make_sent(Doc)
if isinstance(spec, Sent):
proots = spec.proots
return sum(self.get_similarity(phrase, p) for p in proots) \
/ len(proots)
if isinstance(spec, Phrase):
if self.method == "recursive":
return self._sim_recursive(phrase, spec)
if self.method == "average":
return cosine_similarity(phrase.vector, spec.vector)
return self._sim_parts(phrase, spec)
if isinstance(self.spec, str | Iterable | Mapping):
return self._sim_custom(phrase, spec)
pcn = phrase.cname()
raise ValueError(
f"cannot compare '{pcn}' with '{self.__class__.__name__}'"
)
# Internals ---------------------------------------------------------------
def _sim_recursive(self, phrase: Phrase, other: Phrase, depth: int = 0) -> float:
raise NotImplementedError("'recursive' method is not yet implemented")
# sim = 0
# total_weight = 0
# if self._is_name_ok((name := "head")):
# total_weight += self.weights.get(name, 1)
# sim += self._sim(phrase.head, other.head) * total_weight
# active_parts = set(phrase.active_parts).union(other.active_parts)
# for name in active_parts:
# if not self._is_name_ok(name):
# continue
# sps = getattr(phrase, name)
# if phrase in sps.flat:
# # This is to prevent infinite recursion
# # happening for verb phrases/clauses
# continue
# w = self.weights.get(name, 1) * self.decay_rate**(depth+1)
# total_weight += w
# ops = getattr(other, name)
# if not sps or not ops:
# continue
# # denom = max(len(ops), len(sps))
# best = best_matches(sps, ops, self._sim_recursive, depth=depth+1)
# add_sim = sum(x for x, *_ in best)
# # sim += add_sim * w / denom
# sim += add_sim * w
# if not total_weight:
# return .0
# return sim / total_weight
def _sim_parts(self, phrase: Phrase, other: Phrase) -> float:
sdict = self._get_parts(phrase)
odict = self._get_parts(other)
shared = set(sdict).intersection(odict)
denom = sum(self.weights.get(k, 1) for k in set(sdict).union(odict))
if not denom:
return .0
num = sum(self.weights.get(k, 1) for k in shared)
sdict = {
k: v for k, v in sdict.items()
if k in shared and self._is_name_ok(k)
}
W = self.np.array([
self.weights.get(k, 1) for k in shared
], dtype=self.vocab.vectors.data.dtype)
w_total = W.sum()
if not w_total:
return .0
odict = { k: odict[k] for k in sdict }
svec = DataTuple(sdict.values()) \
.map(lambda x: sum(c.vector for c in x)) \
.pipe(self.np.vstack)
ovec = DataTuple(odict.values()) \
.map(lambda x: sum(c.vector for c in x)) \
.pipe(self.np.vstack)
cos = cosine_similarity(svec, ovec, aligned=True, nans_as_zeros=False)
sim = self.np.nansum(cos * W) * (num / denom) / W.sum()
return sim
def _sim_custom(self, phrase: Phrase, spec: SpecType) -> float:
if isinstance(spec, Mapping):
invalid = set(spec) \
- set(phrase.component_names) \
- set(phrase.component_names) \
- {"head"}
if invalid:
raise ValueError(f"incorrect specification fields: {invalid}")
pdict = { k: getattr(phrase, k) for k in spec }
sim = 0
denom = 0
num = 0
total_weight = 0
for key, _spec in spec.items():
denom += 1
if key not in pdict:
continue
num += 1
w = self.weights.get(key, 1)
total_weight += 1
parts = pdict[key]
if not parts:
continue
if isinstance(_spec, Doc):
_spec = self._make_sent(_spec)
if isinstance(_spec, Phrase | Sent):
sim += max(self.get_similarity(p, _spec) for p in parts) \
* w
elif isinstance(_spec, Iterable):
_spec = self._get_text_vector(_spec)
sim += max(cosine_similarity(p.vector, _spec) for p in parts) \
* w
else:
raise ValueError(f"invalid specification '{_spec}' for key '{key}'")
if not denom or not total_weight:
return .0
sim *= (num / denom) / total_weight
else:
spec = self._get_text_vector(spec)
sim = cosine_similarity(phrase.vector, spec)
return sim
def _is_name_ok(self, name: str) -> bool:
if self.ignore:
return name not in self.ignore
if self.only:
return name in self.only
return True
def _get_parts(self, phrase: Phrase) -> dict[str, DataTuple[Phrase | Component]]:
pdict = {}
if self.method == "components":
keys = phrase.component_names
elif self.method == "phrases":
keys = ("head", *phrase.controlled_names)
else:
raise ValueError(
f"cannot calculate by parts comparison for method '{self.method}'"
)
if self.ignore:
keys = [ k for k in keys if k not in self.ignore ]
elif self.only:
keys = [ k for k in keys if k in self.only]
pdict = { k: v for k in keys if (v := getattr(phrase, k)) }
return pdict
def _get_text_vector(
self,
toks: str | Iterable[str]
) -> np.ndarray[tuple[int], np.floating]:
if isinstance(toks, str):
toks = toks.strip().split()
toks = tuple(toks)
if not toks:
raise ValueError("cannot fetch word vectors; empty token list")
vec = sum(self._get_single_vec(tok) for tok in toks) / len(toks)
if vec.size == 0:
raise ValueError("all provided tokens are out-of-vocabulary")
return vec
def _get_single_vec(self, tok: str | int) -> np.ndarray[tuple[int], np.floating]:
try:
return self.vectors[tok]
except KeyError:
vlen = self.vocab.vectors_length
dtype = self.vectors.data.dtype
return self.np.zeros(vlen, dtype=dtype)
def _make_sent(self, doc: Doc) -> Phrase:
if len(doc.sents[:2]) != 1:
raise ValueError(
"only documents with exactly one sentence "
"can be compared with phrases"
)
return doc.sents[0]
[docs]
class SentSimilarity(PhraseSimilarity, register_with=Sent):
"""Structured similarity between sentences and phrases."""
# pylint: disable=protected-access
__doc__ += PhraseSimilarity._get_docstring()
@property
def phrase(self) -> None:
raise AttributeError(f"'{self.__class__.__name__}' object has not attribute 'phrase'")
[docs]
def get_similarity(self, element: Sent, spec: SpecType) -> float:
"""Structured similarity between ``self.phrase`` and ``self.spec``."""
# pylint: disable=arguments-renamed
sent = element
if isinstance(spec, Doc):
spec = self._make_sent(spec)
if isinstance(spec, Sent):
if self.method == "average":
return cosine_similarity(sent.vector, spec.vector)
if self.method == "components":
return self._sim_parts(sent, spec)
proots = sent.proots
oroots = spec.proots
return sum (score for score, *_ in best_matches(
proots, oroots, lambda s, o: s.Similarity(s, o, **self.config) \
.similarity
)) / max(len(proots), len(oroots))
return max(
p.Similarity(p, spec, **self.config).similarity
for p in sent.proots
)
[docs]
class DocSimilarity(GrammarSimilarity, register_with=Doc):
"""Structured similarity between documents.
.. warning::
Currently only doc-doc comparisons based on average token vectors
are implemented.
"""
# pylint: disable=protected-access
__doc__ += PhraseSimilarity._get_docstring()
[docs]
def get_similarity(self, element: Doc, spec: Doc) -> float:
if not isinstance(spec, Doc):
raise NotImplementedError(
"'Doc.similarity' is currently implemented only "
"for comparisons with other documents"
)
return element.doc.similarity(spec.doc)