# Copyright 2025-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0
# system packages
import copy
import logging
from pathlib import Path
from typing import Optional, Union
# external packages
from graphql import (
DocumentNode,
EnumTypeDefinitionNode,
EnumValueDefinitionNode,
FieldDefinitionNode,
Node,
ObjectTypeDefinitionNode,
StringValueNode,
parse,
print_ast,
)
from graphdoc.data.helper import check_directory_path, check_file_path
# internal packages
from graphdoc.data.schema import SchemaObject
# logging
log = logging.getLogger(__name__)
[docs]
class Parser:
"""A class for parsing and handling of GraphQL objects."""
DEFAULT_NODE_TYPES = {
DocumentNode: "full schema",
ObjectTypeDefinitionNode: "table schema",
EnumTypeDefinitionNode: "enum schema",
EnumValueDefinitionNode: "enum value",
}
def __init__(self, type_mapping: Optional[dict[type, str]] = None) -> None:
self.type_mapping = type_mapping or Parser.DEFAULT_NODE_TYPES
[docs]
@staticmethod
def _check_node_type(
node: Node, type_mapping: Optional[dict[type, str]] = None
) -> str:
"""Check the type of a schema node.
:param node: The schema node to check
:type node: Node
:param type_mapping: Custom mapping of node types to strings. Defaults to
DEFAULT_NODE_TYPES
:type type_mapping: Optional[dict[type, str]]
:return: The type of the schema node
:rtype: str
"""
# use provided mapping or fall back to defaults
mapping = type_mapping or Parser.DEFAULT_NODE_TYPES
return mapping.get(type(node), "unknown schema")
[docs]
@staticmethod
def parse_schema_from_file(
schema_file: Union[str, Path],
schema_directory_path: Optional[Union[str, Path]] = None,
) -> DocumentNode:
"""Parse a schema from a file.
:param schema_file: The name of the schema file
:type schema_file: Union[str, Path]
:param schema_directory_path: A path to a directory containing schemas
:type schema_directory_path: Optional[Union[str, Path]]
:return: The parsed schema
:rtype: DocumentNode
:raises Exception: If the schema cannot be parsed
"""
if schema_directory_path:
check_directory_path(schema_directory_path)
schema_path = Path(schema_directory_path) / schema_file
else:
check_file_path(schema_file)
schema_path = Path(schema_file)
try:
schema = schema_path.read_text()
return parse(schema)
except Exception as e:
log.error(f"Error parsing schema from file: {e}")
raise e
[docs]
@staticmethod
def update_node_descriptions(node: Node, new_value: Optional[str] = None) -> Node:
"""Given a GraphQL node, recursively traverse the node and its children,
updating all descriptions with the new value. Can also be used to remove
descriptions by passing None as the new value.
:param node: The GraphQL node to update
:type node: Node
:param new_value: The new description value. If None, the description will be
removed.
:type new_value: Optional[str]
:return: The updated node
:rtype: Node
"""
if hasattr(node, "description"):
description = getattr(node, "description", None)
if isinstance(description, StringValueNode):
if new_value:
description.value = new_value
else:
node.description = None
for attr in dir(node):
if attr.startswith("__") or attr == "description":
continue
child = getattr(node, attr, None)
if isinstance(child, (list, tuple)):
for item in child:
if isinstance(item, Node):
Parser.update_node_descriptions(item, new_value)
elif isinstance(child, Node):
Parser.update_node_descriptions(child, new_value)
return node
[docs]
@staticmethod
def count_description_pattern_matching(node: Node, pattern: str) -> dict[str, int]:
"""Counts the number of times a pattern matches a description in a node and its
children.
:param node: The GraphQL node to count the pattern matches in
:type node: Node
:param pattern: The pattern to count the matches of
:type pattern: str
:return: A dictionary with the counts of matches
:rtype: dict[str, int]
"""
counts = {
"total": 0,
"pattern": 0,
"empty": 0,
}
def update_counts(node: Node, counts: dict):
if hasattr(node, "description"):
description = getattr(node, "description", None)
counts["total"] += 1
if description is None:
counts["empty"] += 1
elif pattern in description.value:
counts["pattern"] += 1
return counts
def traverse(node: Node, counts: dict):
counts = update_counts(node, counts)
for attr in dir(node):
if attr.startswith("__") or attr == "description":
continue
child = getattr(node, attr, None)
if isinstance(child, (list, tuple)):
for item in child:
if isinstance(item, Node):
traverse(item, counts)
elif isinstance(child, Node):
traverse(child, counts)
return counts
counts = traverse(node, counts)
return counts
[docs]
@staticmethod
def fill_empty_descriptions(
node: Node,
new_column_value: str = "Description for column: {}",
new_table_value: str = "Description for table: {}",
use_value_name: bool = True,
value_name: Optional[str] = None,
):
"""Recursively traverse the node and its children, filling in empty descriptions
with the new column or table value. Do not update descriptions that already have
a value. Default values are provided for the new column and table descriptions.
:param node: The GraphQL node to update
:type node: Node
:param new_column_value: The new column description value
:type new_column_value: str
:param new_table_value: The new table description value
:type new_table_value: str
:param use_value_name: Whether to use the value name in the description
:type use_value_name: bool
:param value_name: The name of the value
:type value_name: Optional[str]
:return: The updated node
:rtype: Node
"""
if hasattr(node, "description"): # and node.description == None:
description = getattr(node, "description", None)
if description is None:
# if the node is a table, use the table value
if isinstance(node, ObjectTypeDefinitionNode):
new_value = new_table_value
elif isinstance(node, EnumTypeDefinitionNode): # this is an enum type
new_value = f"Description for enum type: {value_name}"
# TODO: we should add this back to the fill_empty_descriptions
# parameter list
# else the node is a column, use the column value
else:
new_value = new_column_value
# format with the value name if needed (table/column name)
if use_value_name:
update_value = new_value.format(value_name)
else:
update_value = new_value
node.description = StringValueNode(value=update_value)
for attr in dir(node):
if attr.startswith("__") or attr == "description":
continue
child = getattr(node, attr, None)
if isinstance(child, (list, tuple)):
for item in child:
if isinstance(item, Node):
if (
isinstance(item, FieldDefinitionNode)
or isinstance(item, EnumValueDefinitionNode)
or isinstance(item, ObjectTypeDefinitionNode)
or isinstance(
item, EnumTypeDefinitionNode
) # EnumTypeDefinitionNode: check
):
if isinstance(child, ObjectTypeDefinitionNode):
log.debug(
f"found an instance of a ObjectTypeDefinitionNode: "
f"{item.name.value}"
)
value_name = item.name.value
Parser.fill_empty_descriptions(
item,
new_column_value,
new_table_value,
use_value_name,
value_name,
)
elif isinstance(child, Node):
if (
isinstance(child, FieldDefinitionNode)
or isinstance(child, EnumValueDefinitionNode)
or isinstance(child, ObjectTypeDefinitionNode)
or isinstance(child, EnumTypeDefinitionNode)
):
if isinstance(child, ObjectTypeDefinitionNode):
log.debug(
f"found an instance of a ObjectTypeDefinitionNode: "
f"{child.name.value}"
)
value_name = child.name.value
Parser.fill_empty_descriptions(
child,
new_column_value,
new_table_value,
use_value_name,
value_name,
)
return node
[docs]
@staticmethod
def schema_equality_check(gold_node: Node, check_node: Node) -> bool:
"""A method to check if two schema nodes are equal. Only checks that the schemas
structures are equal, not the descriptions.
:param gold_node: The gold standard schema node
:type gold_node: Node
:param check_node: The schema node to check
:type check_node: Node
:return: Whether the schemas are equal
:rtype: bool
"""
gold_node_copy = copy.deepcopy(gold_node)
check_node_copy = copy.deepcopy(check_node)
gold_node = Parser.update_node_descriptions(gold_node_copy)
check_node = Parser.update_node_descriptions(check_node_copy)
if print_ast(gold_node) != print_ast(check_node):
return False
else:
return True
[docs]
@staticmethod
def schema_object_from_file(
schema_file: Union[str, Path],
category: Optional[str] = None,
rating: Optional[int] = None,
) -> SchemaObject:
"""Parse a schema object from a file."""
try:
schema_ast = Parser.parse_schema_from_file(schema_file)
schema_str = print_ast(schema_ast)
schema_type = Parser._check_node_type(schema_ast)
return SchemaObject.from_dict(
{
"key": str(schema_file),
"category": category,
"rating": rating,
"schema_name": str(Path(schema_file).stem),
"schema_type": schema_type,
"schema_str": schema_str,
"schema_ast": schema_ast,
}
)
except Exception as e:
log.error(f"Error parsing schema file {schema_file}: {e}")
raise ValueError(f"Failed to parse schema from file {schema_file}: {e}")
[docs]
@staticmethod
def parse_objects_from_full_schema_object(
schema: SchemaObject, type_mapping: Optional[dict[type, str]] = None
) -> Union[dict[str, SchemaObject], None]:
"""Parse out all available tables from a full schema object.
:param schema: The full schema object to parse
:type schema: SchemaObject
:param type_mapping: Custom mapping of node types to strings. Defaults to
DEFAULT_NODE_TYPES
:type type_mapping: Optional[dict[type, str]]
:return: The parsed objects (tables and enums)
:rtype: Union[dict, None]
"""
if schema.schema_ast is None:
log.info(f"Schema object has no schema_ast: {schema.schema_name}")
return None
elif not isinstance(schema.schema_ast, DocumentNode):
log.info(
f"Schema object cannot be further decomposed: {schema.schema_name}"
)
return None
tables = {}
for definition in schema.schema_ast.definitions:
if isinstance(definition, ObjectTypeDefinitionNode):
log.debug("found table schema")
key = f"{schema.key}_{definition.name.value}"
schema_type = Parser._check_node_type(definition, type_mapping)
elif isinstance(definition, EnumTypeDefinitionNode):
log.debug("found enum schema")
key = f"{schema.key}_{definition.name.value}"
schema_type = Parser._check_node_type(definition, type_mapping)
else:
log.debug(f"skipping schema of type: {type(definition)}")
continue
object_schema = SchemaObject.from_dict(
{
"key": key,
"category": schema.category,
"rating": schema.rating,
"schema_name": definition.name.value,
"schema_type": schema_type,
"schema_str": print_ast(definition),
"schema_ast": definition,
}
)
tables[object_schema.key] = object_schema
return tables