Source code for dialog2graph.metrics.no_llm_metrics.keys2graph.metrics_semantic

# experiments/exp2025_03_27_create_graphs_by_keys/keys2graph/metrics_semantic.py

"""
Advanced semantic-based metrics for comparing graphs
by embedding their utterances and matching them if similarity >= threshold.
"""

import os
import math
import openai
from typing import List, Dict, Any

from . import config


def _init_openai():
    """
    Init openai keys from environment or fallback.
    """
    openai.api_key = os.getenv("OPENAI_API_KEY", "")
    openai.api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1")


[docs] def get_text_embedding_openai(texts: List[str], model: str) -> List[List[float]]: """ Get embeddings from OpenAI for a list of text strings, using a single request if possible. """ _init_openai() response = openai.Embedding.create(input=texts, model=model) embeddings = [r["embedding"] for r in response["data"]] return embeddings
[docs] def cosine_similarity(vec1: List[float], vec2: List[float]) -> float: dot = sum(a * b for a, b in zip(vec1, vec2)) norm1 = math.sqrt(sum(a * a for a in vec1)) norm2 = math.sqrt(sum(a * a for a in vec2)) if norm1 < 1e-9 or norm2 < 1e-9: return 0.0 return dot / (norm1 * norm2)
def _semantic_match_utterances( utter1: List[str], utter2: List[str], threshold: float, embed_model: str ) -> bool: """ Returns True if there's at least one pair (u1 in utter1, u2 in utter2) with cos sim >= threshold. """ if not utter1 or not utter2: return False emb1_list = get_text_embedding_openai(utter1, model=embed_model) emb2_list = get_text_embedding_openai(utter2, model=embed_model) for e1 in emb1_list: for e2 in emb2_list: sim = cosine_similarity(e1, e2) if sim >= threshold: return True return False
[docs] def match_graph_triplets_semantic(G1: Any, G2: Any, threshold: float, embed_model: str): """ Similar to match_graph_triplets, but uses semantic matching for edges and nodes. Return node_mapping, edge_mapping. """ g1 = G1.graph g2 = G2.graph node_mapping = {node: None for node in g1.nodes} node_mapping.update({node: None for node in g2.nodes}) edges1 = list(g1.edges(data=True)) edges2 = list(g2.edges(data=True)) edge_mapping = {} # Match edges for s1, t1, data1 in edges1: best_match = None utt1 = ( data1["utterances"] if isinstance(data1["utterances"], list) else [data1["utterances"]] ) for s2, t2, data2 in edges2: utt2 = ( data2["utterances"] if isinstance(data2["utterances"], list) else [data2["utterances"]] ) if _semantic_match_utterances(utt1, utt2, threshold, embed_model): best_match = (s2, t2) break edge_mapping[f"{s1}->{t1}"] = ( f"{best_match[0]}->{best_match[1]}" if best_match else None ) # Match nodes for node in g1.nodes(data=True): node_id1 = node[0] utt1 = ( node[1]["utterances"] if isinstance(node[1]["utterances"], list) else [node[1]["utterances"]] ) matched_node = None for node2 in g2.nodes(data=True): node_id2 = node2[0] utt2 = ( node2[1]["utterances"] if isinstance(node2[1]["utterances"], list) else [node2[1]["utterances"]] ) if _semantic_match_utterances(utt1, utt2, threshold, embed_model): matched_node = node_id2 break node_mapping[node_id1] = matched_node return node_mapping, edge_mapping
[docs] def is_same_structure_semantic( G1: Any, G2: Any, threshold: float, embed_model: str ) -> bool: node_map, edge_map = match_graph_triplets_semantic(G1, G2, threshold, embed_model) for n, mapped in node_map.items(): if mapped is None: return False for e, mapped_e in edge_map.items(): if mapped_e is None: return False return True
[docs] def match_triplets_dg_semantic( G1: Any, dialogs: List[Any], threshold: float, embed_model: str ) -> Dict[str, Any]: """ Stub function to do semantic check of G1 vs dialogs. """ return {"value": True, "semantic_check": True}
[docs] def triplet_match_accuracy_semantic( G1: Any, G2: Any, threshold: float, embed_model: str ) -> Dict[str, float]: node_map, edge_map = match_graph_triplets_semantic(G1, G2, threshold, embed_model) g1_nodes = list(G1.graph.nodes()) matched_nodes = sum(1 for n in g1_nodes if node_map.get(n) is not None) total_nodes = len(g1_nodes) node_acc = matched_nodes / total_nodes if total_nodes else 0.0 g1_edges = list(G1.graph.edges()) matched_edges = sum(1 for _, v in edge_map.items() if v is not None) total_edges = len(g1_edges) edge_acc = matched_edges / total_edges if total_edges else 0.0 return {"node_accuracy": node_acc, "edge_accuracy": edge_acc}
def _semantic_jaccard_for_nodes( nodes1: List[Dict[str, Any]], nodes2: List[Dict[str, Any]], threshold: float, embed_model: str, ) -> float: matched1 = set() matched2 = set() for i, n1 in enumerate(nodes1): utt1 = ( n1["utterances"] if isinstance(n1["utterances"], list) else [n1["utterances"]] ) found_j = None for j, n2 in enumerate(nodes2): utt2 = ( n2["utterances"] if isinstance(n2["utterances"], list) else [n2["utterances"]] ) if _semantic_match_utterances(utt1, utt2, threshold, embed_model): found_j = j break if found_j is not None: matched1.add(i) matched2.add(found_j) intersection = len(matched1) union = len(nodes1) + len(nodes2) - intersection if union == 0: return 0.0 return intersection / union def _semantic_jaccard_for_edges( edges1: List[Dict[str, Any]], edges2: List[Dict[str, Any]], threshold: float, embed_model: str, ) -> float: matched1 = set() matched2 = set() for i, e1 in enumerate(edges1): utt1 = ( e1["utterances"] if isinstance(e1["utterances"], list) else [e1["utterances"]] ) found_j = None for j, e2 in enumerate(edges2): utt2 = ( e2["utterances"] if isinstance(e2["utterances"], list) else [e2["utterances"]] ) if _semantic_match_utterances(utt1, utt2, threshold, embed_model): found_j = j break if found_j is not None: matched1.add(i) matched2.add(found_j) intersection = len(matched1) union = len(edges1) + len(edges2) - intersection if union == 0: return 0.0 return intersection / union
[docs] def compare_two_graphs_semantically( graph1: Dict[str, Any], graph2: Dict[str, Any], threshold: float ) -> Dict[str, float]: """ Compare two dialog graphs by semantic jaccard for nodes & edges. Embedding model is taken from config.get_embedding_model(). """ embed_model = config.get_embedding_model() nodes1 = graph1.get("nodes", []) nodes2 = graph2.get("nodes", []) edges1 = graph1.get("edges", []) edges2 = graph2.get("edges", []) sem_j_nodes = _semantic_jaccard_for_nodes(nodes1, nodes2, threshold, embed_model) sem_j_edges = _semantic_jaccard_for_edges(edges1, edges2, threshold, embed_model) return { "semantic_jaccard_nodes": sem_j_nodes, "semantic_jaccard_edges": sem_j_edges, }