Coverage for src / check_datapackage / check.py: 98%
311 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 08:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 08:54 +0000
1import os
2import re
3import sys
4from dataclasses import dataclass, field
5from functools import reduce
6from types import TracebackType
7from typing import Any, Callable, Iterator, Optional, cast
9from jsonpath import findall, resolve
10from jsonschema import Draft7Validator, FormatChecker, ValidationError
11from rich import print as rprint
13from check_datapackage.config import Config
14from check_datapackage.constants import (
15 DATA_PACKAGE_SCHEMA_PATH,
16 FIELD_TYPES,
17 GROUP_ERRORS,
18)
19from check_datapackage.exclusion import exclude
20from check_datapackage.extensions import apply_extensions
21from check_datapackage.internals import (
22 PropertyField,
23 _filter,
24 _flat_map,
25 _get_fields_at_jsonpath,
26 _map,
27)
28from check_datapackage.issue import Issue
29from check_datapackage.read_json import read_json
31# Type alias for Python exception hook
32PythonExceptionHook = Callable[
33 [type[BaseException], BaseException, Optional[TracebackType]],
34 None,
35]
37# Type alias for IPython custom exception handler (includes self and tb_offset)
38IPythonExceptionHandler = Callable[
39 [Any, type[BaseException], BaseException, Optional[TracebackType], None],
40 Optional[list[str]],
41]
44def _pretty_print_exception(
45 exc_type: type[BaseException],
46 exc_value: BaseException,
47) -> None:
48 rprint(f"\n[red]{exc_type.__name__}[/red]: {exc_value}")
51def _create_suppressed_traceback_hook(
52 exception_types: tuple[type[BaseException], ...],
53 old_hook: PythonExceptionHook,
54) -> PythonExceptionHook:
55 """Create a Python exception hook that suppresses tracebacks.
57 Args:
58 exception_types: Exception types to suppress tracebacks for.
59 old_hook: The previous exception hook to delegate unregistered exceptions to.
61 Returns:
62 A composable exception hook function.
63 """
65 def hook(
66 exc_type: type[BaseException],
67 exc_value: BaseException,
68 exc_traceback: Optional[TracebackType],
69 ) -> None:
70 if issubclass(exc_type, exception_types):
71 _pretty_print_exception(exc_type, exc_value)
72 else:
73 old_hook(exc_type, exc_value, exc_traceback)
75 return hook
78def _create_suppressed_traceback_ipython_hook(
79 exception_types: tuple[type[BaseException], ...],
80 old_custom_tb: Optional[IPythonExceptionHandler],
81) -> Callable[
82 [Any, type[BaseException], BaseException, Optional[TracebackType], None],
83 Optional[list[str]],
84]:
85 """Create an IPython exception hook that suppresses tracebacks.
87 Args:
88 exception_types: Exception types to suppress tracebacks for.
89 old_custom_tb: The previous IPython custom exception handler, if any.
91 Returns:
92 A composable IPython exception hook function.
93 """
94 has_old_handler = old_custom_tb is not None
96 def hook(
97 self: Any,
98 exc_type: type[BaseException],
99 exc_value: BaseException,
100 exc_traceback: Optional[TracebackType],
101 tb_offset: None = None,
102 ) -> Optional[list[str]]:
103 if issubclass(exc_type, exception_types):
104 _pretty_print_exception(exc_type, exc_value)
105 return []
106 elif has_old_handler and old_custom_tb is not None:
107 return old_custom_tb(self, exc_type, exc_value, exc_traceback, tb_offset)
108 else:
109 return None
111 return hook
114def _is_running_from_ipython() -> bool:
115 """Checks whether running in IPython interactive console or not."""
116 try:
117 from IPython import get_ipython # type: ignore[attr-defined]
118 except ImportError:
119 return False
120 else:
121 return get_ipython() is not None # type: ignore[no-untyped-call]
124def _setup_suppressed_tracebacks(
125 *exception_types: type[BaseException],
126) -> None:
127 """Set up exception hooks to hide tracebacks for specified exceptions.
129 This function is composable - multiple calls add to the existing hook
130 rather than replacing it. Each package only needs to register its own
131 exceptions.
133 Args:
134 *exception_types: Exception types to hide tracebacks for.
136 Raises:
137 TypeError: If any exception_type is not an exception class.
139 Examples:
140 ```python
141 # In package A
142 _setup_suppressed_tracebacks(ErrorA)
144 # In package B - adds to existing hook
145 _setup_suppressed_tracebacks(ErrorB, ErrorC)
146 # Now ErrorA, ErrorB, and ErrorC will all have suppressed tracebacks
147 ```
148 """
149 for exc_type in exception_types:
150 if not (isinstance(exc_type, type) and issubclass(exc_type, BaseException)):
151 raise TypeError(f"{exc_type!r} is not an exception class")
153 sys.excepthook = _create_suppressed_traceback_hook(exception_types, sys.excepthook)
155 if _is_running_from_ipython():
156 ip = get_ipython() # type: ignore # noqa: F821
157 old_custom_tb: Optional[IPythonExceptionHandler] = getattr(ip, "CustomTB", None)
158 ip.set_custom_exc(
159 (Exception,),
160 _create_suppressed_traceback_ipython_hook(exception_types, old_custom_tb),
161 )
164class DataPackageError(Exception):
165 """Convert Data Package issues to an error and hide the traceback."""
167 def __init__(
168 self,
169 issues: list[Issue],
170 ) -> None:
171 """Create the DataPackageError from issues."""
172 super().__init__(explain(issues))
175def explain(issues: list[Issue]) -> str:
176 """Explain the issues in a human-readable format.
178 The explanation of the issue is outputted as a string. To display the
179 string in a easily readable format, the `pretty_print()` function from
180 `check-datapackage` can be used.
182 Args:
183 issues: A list of `Issue` objects to explain.
185 Returns:
186 A human-readable explanation of the issues.
188 Examples:
189 ```{python}
190 import check_datapackage as cdp
192 issue = cdp.Issue(
193 jsonpath="$.resources[2].title",
194 type="required",
195 message="The `title` field is required but missing at the given JSON path.",
196 )
198 issues = cdp.explain([issue])
200 # On its own
201 issues
202 # Normal print
203 print(issues)
204 # Pretty print with rich
205 cdp.pretty_print(issues)
206 ```
207 """
208 issue_explanations: list[str] = _map(
209 issues,
210 _create_explanation,
211 )
212 num_issues = len(issue_explanations)
213 singular_or_plural = " was" if num_issues == 1 else "s were"
214 return (
215 f"{num_issues} issue{singular_or_plural} found in your [u]datapackage.json[/u]:\n\n" # noqa: E501
216 + "\n".join(issue_explanations)
217 )
220def _create_explanation(issue: Issue) -> str:
221 """Create an informative explanation of what went wrong in each issue."""
222 # Remove suffix '$' to account for root path when `[]` is passed to `check()`
223 property_name = issue.jsonpath.removesuffix("$").split(".")[-1]
224 if not property_name:
225 return (
226 "check() requires a dictionary with metadata,"
227 f" but received {issue.instance}."
228 )
230 number_of_carets = len(str(issue.instance))
231 return ( # noqa: F401
232 f"At {issue.jsonpath.removeprefix('$.')}:\n"
233 "|\n"
234 f"| {property_name}{': ' if property_name else ' '}{issue.instance}\n"
235 f"| {' ' * len(property_name)} [red]{'^' * number_of_carets}[/red]\n"
236 f"{issue.message}\n"
237 )
240def check(
241 properties: dict[str, Any], config: Config = Config(), error: bool = False
242) -> list[Issue]:
243 """Checks a Data Package's properties against the Data Package standard.
245 Args:
246 properties: A Data Package's metadata from `datapackage.json` as a Python
247 dictionary.
248 config: Configuration for the checks to be done. See the `Config`
249 class for more details, especially about the default values.
250 error: Whether to treat any issues found as errors. Defaults
251 to `False`, meaning that issues will be returned as a list of `Issue`
252 objects. Will internally run `explain()` on the Issues
253 if set to `True`.
255 Returns:
256 A list of `Issue` objects representing any issues found
257 while checking the properties. If no issues are found, an empty list
258 is returned.
259 """
260 schema = read_json(DATA_PACKAGE_SCHEMA_PATH)
262 if config.strict:
263 _set_should_fields_to_required(schema)
265 issues = _check_object_against_json_schema(properties, schema)
266 issues += _check_keys(properties, issues)
267 issues += apply_extensions(properties, config.extensions)
268 issues = exclude(issues, config.exclusions)
269 issues = sorted(set(issues))
271 # Use by doing `CDP_DEBUG=true uv run ...`
272 if os.getenv("CDP_DEBUG"):
273 rprint("", properties)
274 for issue in issues:
275 rprint(issue)
276 rprint(explain([issue]))
278 if error and issues:
279 raise DataPackageError(issues)
281 return issues
284def _check_keys(properties: dict[str, Any], issues: list[Issue]) -> list[Issue]:
285 """Check that primary and foreign keys exist."""
286 # Primary keys
287 resources_with_pk = _get_fields_at_jsonpath(
288 "$.resources[?(length(@.schema.primaryKey) > 0 || @.schema.primaryKey == '')]",
289 properties,
290 )
291 resources_with_pk = _keep_resources_with_no_issue_at_property(
292 resources_with_pk, issues, "schema.primaryKey"
293 )
294 key_issues = _flat_map(resources_with_pk, _check_primary_key)
296 # Foreign keys
297 resources_with_fk = _get_fields_at_jsonpath(
298 "$.resources[?(length(@.schema.foreignKeys) > 0)]",
299 properties,
300 )
301 resources_with_fk = _keep_resources_with_no_issue_at_property(
302 resources_with_fk, issues, "schema.foreignKeys"
303 )
304 key_issues += _flat_map(
305 resources_with_fk,
306 lambda resource: _check_foreign_keys(resource, properties),
307 )
308 return key_issues
311def _issues_at_property(
312 resource: PropertyField, issues: list[Issue], jsonpath: str
313) -> list[Issue]:
314 return _filter(
315 issues,
316 lambda issue: f"{resource.jsonpath}.{jsonpath}" in issue.jsonpath,
317 )
320def _keep_resources_with_no_issue_at_property(
321 resources: list[PropertyField], issues: list[Issue], jsonpath: str
322) -> list[PropertyField]:
323 """Filter out resources that have an issue at or under the given `jsonpath`."""
324 return _filter(
325 resources,
326 lambda resource: not _issues_at_property(resource, issues, jsonpath),
327 )
330def _check_primary_key(resource: PropertyField) -> list[Issue]:
331 """Check that primary key fields exist in the resource."""
332 pk_fields = resolve("/schema/primaryKey", resource.value)
333 pk_fields_list = _key_fields_as_str_list(pk_fields)
334 unknown_fields = _get_unknown_key_fields(pk_fields_list, resource.value)
336 if not unknown_fields:
337 return []
339 return [
340 Issue(
341 jsonpath=f"{resource.jsonpath}.schema.primaryKey",
342 type="primary-key",
343 message=(
344 f"No fields found in resource for primary key fields: {unknown_fields}."
345 ),
346 instance=pk_fields,
347 )
348 ]
351def _check_foreign_keys(
352 resource: PropertyField, properties: dict[str, Any]
353) -> list[Issue]:
354 """Check that foreign key source and destination fields exist."""
355 # Safe, as only FKs of the correct type here
356 foreign_keys = cast(
357 list[dict[str, Any]], resolve("/schema/foreignKeys", resource.value)
358 )
359 foreign_keys_diff_resource = _filter(
360 foreign_keys,
361 lambda fk: "resource" in fk["reference"] and fk["reference"]["resource"] != "",
362 )
363 foreign_keys_same_resource = _filter(
364 foreign_keys, lambda fk: fk not in foreign_keys_diff_resource
365 )
367 issues = _flat_map(foreign_keys, lambda fk: _check_fk_source_fields(fk, resource))
368 issues += _flat_map(
369 foreign_keys_same_resource,
370 lambda fk: _check_fk_dest_fields_same_resource(fk, resource),
371 )
372 issues += _flat_map(
373 foreign_keys_diff_resource,
374 lambda fk: _check_fk_dest_fields_diff_resource(fk, resource, properties),
375 )
377 return issues
380def _key_fields_as_str_list(key_fields: Any) -> list[str]:
381 """Returns the list representation of primary and foreign key fields.
383 Key fields can be represented either as a string (containing one field name)
384 or a list of strings.
386 The input should contain a correctly typed `key_fields` object.
387 """
388 if not isinstance(key_fields, list):
389 key_fields = [key_fields]
390 return cast(list[str], key_fields)
393def _get_unknown_key_fields(
394 key_fields: list[str], properties: dict[str, Any], resource_path: str = ""
395) -> str:
396 """Return the key fields that don't exist on the specified resource."""
397 known_fields = findall(f"{resource_path}schema.fields[*].name", properties)
398 unknown_fields = _filter(key_fields, lambda field: field not in known_fields)
399 unknown_fields = _map(unknown_fields, lambda field: f"{field!r}")
400 return ", ".join(unknown_fields)
403def _check_fk_source_fields(
404 foreign_key: dict[str, Any], resource: PropertyField
405) -> list[Issue]:
406 """Check that foreign key source fields exist and have the correct number."""
407 issues = []
408 source_fields = resolve("/fields", foreign_key)
409 source_field_list = _key_fields_as_str_list(source_fields)
410 unknown_fields = _get_unknown_key_fields(source_field_list, resource.value)
411 if unknown_fields:
412 issues.append(
413 Issue(
414 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
415 type="foreign-key-source-fields",
416 message=(
417 "No fields found in resource for foreign key source fields: "
418 f"{unknown_fields}."
419 ),
420 instance=source_fields,
421 )
422 )
424 dest_fields = _key_fields_as_str_list(resolve("/reference/fields", foreign_key))
425 if len(source_field_list) != len(dest_fields):
426 issues.append(
427 Issue(
428 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
429 type="foreign-key-source-fields",
430 message=(
431 "The number of foreign key source fields must be the same as "
432 "the number of foreign key destination fields."
433 ),
434 instance=source_fields,
435 )
436 )
437 return issues
440def _check_fk_dest_fields_same_resource(
441 foreign_key: dict[str, Any],
442 resource: PropertyField,
443) -> list[Issue]:
444 """Check that foreign key destination fields exist on the same resource."""
445 dest_fields = resolve("/reference/fields", foreign_key)
446 dest_field_list = _key_fields_as_str_list(dest_fields)
447 unknown_fields = _get_unknown_key_fields(dest_field_list, resource.value)
448 if not unknown_fields:
449 return []
451 return [
452 Issue(
453 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
454 type="foreign-key-destination-fields",
455 message=(
456 "No fields found in resource for foreign key "
457 f"destination fields: {unknown_fields}."
458 ),
459 instance=dest_fields,
460 )
461 ]
464def _check_fk_dest_fields_diff_resource(
465 foreign_key: dict[str, Any], resource: PropertyField, properties: dict[str, Any]
466) -> list[Issue]:
467 """Check that foreign key destination fields exist on the destination resource."""
468 dest_fields = resolve("/reference/fields", foreign_key)
469 dest_field_list = _key_fields_as_str_list(dest_fields)
470 # Safe, as only keys of the correct type here
471 dest_resource_name = cast(str, resolve("/reference/resource", foreign_key))
473 dest_resource_path = f"resources[?(@.name == '{dest_resource_name}')]"
474 if not findall(dest_resource_path, properties):
475 return [
476 Issue(
477 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.resource",
478 type="foreign-key-destination-resource",
479 message=(
480 f"The destination resource {dest_resource_name!r} of this foreign "
481 "key doesn't exist in the package."
482 ),
483 instance=dest_resource_name,
484 )
485 ]
487 unknown_fields = _get_unknown_key_fields(
488 dest_field_list, properties, f"{dest_resource_path}."
489 )
490 if not unknown_fields:
491 return []
493 return [
494 Issue(
495 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
496 type="foreign-key-destination-fields",
497 message=(
498 f"No fields found in destination resource {dest_resource_name!r} "
499 f"for foreign key destination fields: {unknown_fields}."
500 ),
501 instance=dest_fields,
502 )
503 ]
506def _set_should_fields_to_required(schema: dict[str, Any]) -> dict[str, Any]:
507 """Set 'SHOULD' fields to 'REQUIRED' in the schema."""
508 should_fields = ("name", "id", "licenses")
509 name_pattern = r"^[a-z0-9._-]+$"
511 # From https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
512 semver_pattern = (
513 r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)"
514 r"(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0"
515 r"|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>"
516 r"[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"
517 )
519 # Convert to required
520 schema["required"].extend(should_fields)
521 schema["properties"]["name"]["pattern"] = name_pattern
522 schema["properties"]["version"]["pattern"] = semver_pattern
523 schema["properties"]["contributors"]["items"]["required"] = ["title"]
524 schema["properties"]["sources"]["items"]["required"] = ["title"]
525 schema["properties"]["resources"]["items"]["properties"]["name"]["pattern"] = (
526 name_pattern
527 )
528 return schema
531def _check_object_against_json_schema(
532 json_object: dict[str, Any], schema: dict[str, Any]
533) -> list[Issue]:
534 """Checks that `json_object` matches the given JSON schema.
536 Structural, type and format constraints are all checked. All schema violations are
537 collected before issues are returned.
539 Args:
540 json_object: The JSON object to check.
541 schema: The JSON schema to check against.
543 Returns:
544 A list of issues. An empty list, if no issues are found.
546 Raises:
547 jsonschema.exceptions.SchemaError: If the given schema is invalid.
548 """
549 Draft7Validator.check_schema(schema)
550 validator = Draft7Validator(schema, format_checker=FormatChecker())
551 return _validation_errors_to_issues(validator.iter_errors(json_object))
554@dataclass(frozen=True)
555class SchemaError:
556 """A simpler representation of `ValidationError` for easier processing.
558 Attributes:
559 message (str): The error message generated by `jsonschema`.
560 type (str): The type of the error (e.g., a JSON schema type such as "required",
561 "type", "pattern", or "format", or a custom type).
562 schema_path (str): The path to the violated check in the JSON schema.
563 Path components are separated by '/'.
564 jsonpath (str): The JSON path to the field that violates the check.
565 instance (Any): The part of the object that failed the check.
566 schema_value (Optional[Any]): The expected value that is checked against,
567 which is part of the schema violated by this error.
568 parent (Optional[SchemaError]): The error group the error belongs to, if any.
569 """
571 message: str
572 type: str
573 schema_path: str
574 jsonpath: str
575 instance: Any
576 schema_value: Optional[Any] = None
577 parent: Optional["SchemaError"] = None
580def _validation_errors_to_issues(
581 validation_errors: Iterator[ValidationError],
582) -> list[Issue]:
583 """Transforms `jsonschema.ValidationError`s to more compact `Issue`s.
585 Args:
586 validation_errors: The `jsonschema.ValidationError`s to transform.
588 Returns:
589 A list of `Issue`s.
590 """
591 schema_errors = _flat_map(validation_errors, _validation_error_to_schema_errors)
592 grouped_errors = _filter(schema_errors, lambda error: error.type in GROUP_ERRORS)
593 schema_errors = reduce(_handle_grouped_error, grouped_errors, schema_errors)
595 return _map(schema_errors, _create_issue)
598@dataclass(frozen=True)
599class SchemaErrorEdits:
600 """Expresses which errors to add to or remove from schema errors."""
602 add: list[SchemaError] = field(default_factory=list)
603 remove: list[SchemaError] = field(default_factory=list)
606def _handle_S_resources_x(
607 parent_error: SchemaError,
608 schema_errors: list[SchemaError],
609) -> SchemaErrorEdits:
610 """Do not flag missing `path` and `data` separately."""
611 edits = SchemaErrorEdits()
612 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
613 # If the parent error is caused by other errors, remove it
614 if errors_in_group:
615 edits.remove.append(parent_error)
617 path_or_data_required_errors = _filter(
618 errors_in_group, _path_or_data_required_error
619 )
620 # If path and data are both missing, add a more informative error
621 if len(path_or_data_required_errors) > 1:
622 edits.add.append(
623 SchemaError(
624 message=(
625 "This resource has no `path` or `data` field. "
626 "One of them must be provided."
627 ),
628 type="required",
629 jsonpath=parent_error.jsonpath,
630 schema_path=parent_error.schema_path,
631 instance=parent_error.instance,
632 )
633 )
635 # Remove all required errors on path and data
636 edits.remove.extend(path_or_data_required_errors)
637 return edits
640def _handle_S_resources_x_path(
641 parent_error: SchemaError,
642 schema_errors: list[SchemaError],
643) -> SchemaErrorEdits:
644 """Only flag errors for the relevant type.
646 If `path` is a string, flag errors for the string-based schema.
647 If `path` is an array, flag errors for the array-based schema.
648 """
649 edits = SchemaErrorEdits()
650 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
651 type_errors = _filter(errors_in_group, _is_path_type_error)
652 only_type_errors = len(errors_in_group) == len(type_errors)
654 if type_errors:
655 edits.remove.append(parent_error)
657 # If the only error is that $.resources[x].path is of the wrong type,
658 # add a more informative error
659 if only_type_errors:
660 edits.add.append(
661 SchemaError(
662 message="The `path` property must be either a string or an array.",
663 type="type",
664 jsonpath=type_errors[0].jsonpath,
665 schema_path=type_errors[0].schema_path,
666 instance=parent_error.instance,
667 )
668 )
670 # Remove all original type errors on $.resources[x].path
671 edits.remove.extend(type_errors)
672 return edits
675def _handle_S_resources_x_schema_fields_x(
676 parent_error: SchemaError,
677 schema_errors: list[SchemaError],
678) -> SchemaErrorEdits:
679 """Only flag errors for the relevant field type.
681 E.g., if the field type is `string`, flag errors for the string-based schema only.
682 """
683 edits = SchemaErrorEdits(remove=[parent_error])
684 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
686 parent_instance = parent_error.instance
687 if not isinstance(parent_instance, dict):
688 return edits
690 field_type: str = parent_instance.get("type", "string")
692 # The field's type is unknown
693 if field_type not in FIELD_TYPES:
694 unknown_field_error = SchemaError(
695 message=(
696 "The type property in this resource schema field is incorrect. "
697 f"The value can only be one of these types: {', '.join(FIELD_TYPES)}."
698 ),
699 type="enum",
700 jsonpath=f"{parent_error.jsonpath}.type",
701 schema_path=parent_error.schema_path,
702 instance=parent_instance,
703 )
704 # Replace all errors with an unknown field error
705 edits.add.append(unknown_field_error)
706 edits.remove.extend(errors_in_group)
707 return edits
709 # The field's type is known; keep only errors for this field type
710 schema_index = FIELD_TYPES.index(field_type)
712 errors_for_other_types = _filter(
713 errors_in_group,
714 lambda error: f"fields/items/oneOf/{schema_index}/" not in error.schema_path,
715 )
716 edits.remove.extend(errors_for_other_types)
717 return edits
720def _handle_S_resources_x_schema_fields_x_constraints_enum(
721 parent_error: SchemaError,
722 schema_errors: list[SchemaError],
723) -> SchemaErrorEdits:
724 """Only flag errors for the relevant field type and simplify errors."""
725 edits = SchemaErrorEdits(remove=[parent_error])
726 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
728 # Remove errors for other field types
729 if _not_field_type_error(parent_error):
730 edits.remove.extend(errors_in_group)
731 return edits
733 value_errors = _filter(
734 errors_in_group,
735 lambda error: not error.jsonpath.endswith("enum"),
736 )
738 # If there are only value errors, simplify them
739 if value_errors == errors_in_group:
740 edits.add.append(_get_enum_values_error(parent_error, value_errors))
742 # Otherwise, keep only top-level enum errors
743 edits.remove.extend(value_errors)
744 return edits
747def _get_enum_values_error(
748 parent_error: SchemaError,
749 value_errors: list[SchemaError],
750) -> SchemaError:
751 message = "All enum values must be the same type."
752 same_type = len(set(_map(parent_error.instance, lambda value: type(value)))) == 1
753 if same_type:
754 allowed_types = set(_map(value_errors, lambda error: str(error.schema_value)))
755 message = (
756 "The enum value type is not correct. Enum values should be "
757 f"one of {', '.join(allowed_types)}."
758 )
759 return SchemaError(
760 message=message,
761 type="type",
762 schema_path=value_errors[0].schema_path,
763 jsonpath=_strip_index(value_errors[0].jsonpath),
764 instance=value_errors[0].instance,
765 )
768def _not_field_type_error(parent_error: SchemaError) -> bool:
769 if not parent_error.parent:
770 return True
771 field_type: str = parent_error.parent.instance.get("type", "string")
772 if field_type not in FIELD_TYPES:
773 return True
774 schema_index = FIELD_TYPES.index(field_type)
775 return f"fields/items/oneOf/{schema_index}/" not in parent_error.schema_path
778def _handle_S_resources_x_schema_primary_key(
779 parent_error: SchemaError,
780 schema_errors: list[SchemaError],
781) -> SchemaErrorEdits:
782 """Only flag errors for the relevant type and simplify errors."""
783 PRIMARY_KEY_TYPES: tuple[type[Any], ...] = (list, str)
784 edits = SchemaErrorEdits(remove=[parent_error])
785 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
787 key_type = type(parent_error.instance)
788 if key_type in PRIMARY_KEY_TYPES:
789 schema_for_type = f"primaryKey/oneOf/{PRIMARY_KEY_TYPES.index(key_type)}/"
790 edits.remove.extend(
791 _filter(
792 errors_in_group,
793 lambda error: schema_for_type not in error.schema_path,
794 )
795 )
796 return edits
798 edits.remove.extend(errors_in_group)
799 edits.add.append(
800 SchemaError(
801 message="The `primaryKey` property must be a string or an array.",
802 type="type",
803 jsonpath=parent_error.jsonpath,
804 schema_path=parent_error.schema_path,
805 instance=parent_error.instance,
806 )
807 )
809 return edits
812def _handle_S_resources_x_schema_foreign_keys(
813 parent_error: SchemaError,
814 schema_errors: list[SchemaError],
815) -> SchemaErrorEdits:
816 """Only flag errors for the relevant type and simplify errors.
818 The sub-schema to use is determined based on the type of the top-level foreign
819 key fields property.
820 """
821 FOREIGN_KEY_TYPES: tuple[type[Any], ...] = (list, str)
822 edits = SchemaErrorEdits(remove=[parent_error])
823 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
825 parent_instance = parent_error.instance
826 key_exists = isinstance(parent_instance, dict) and "fields" in parent_instance
828 # If the key type is correct, use that schema
829 if (
830 key_exists
831 and (key_type := type(parent_instance["fields"])) in FOREIGN_KEY_TYPES
832 ):
833 schema_part = f"foreignKeys/items/oneOf/{FOREIGN_KEY_TYPES.index(key_type)}/"
834 edits.remove.extend(
835 _filter(
836 errors_in_group,
837 lambda error: schema_part not in error.schema_path,
838 )
839 )
840 return edits
842 # If the key type is incorrect, remove all errors that depend on it
843 key_type_errors = _filter(
844 errors_in_group,
845 lambda error: (
846 error.schema_path.endswith("fields/type")
847 or "reference/properties/fields" in error.schema_path
848 ),
849 )
850 edits.remove.extend(key_type_errors)
852 # If the key exists, flag incorrect type
853 if key_exists:
854 edits.add.append(
855 SchemaError(
856 message=(
857 "The `fields` property of a foreign key must be a string or "
858 "an array."
859 ),
860 type="type",
861 jsonpath=f"{parent_error.jsonpath}.fields",
862 schema_path=parent_error.schema_path,
863 instance=parent_error.instance,
864 )
865 )
867 return edits
870def _handle_licenses(
871 parent_error: SchemaError,
872 schema_errors: list[SchemaError],
873) -> SchemaErrorEdits:
874 """Only include one error if both `name` and `path` are missing."""
875 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
876 return SchemaErrorEdits(
877 remove=errors_in_group + [parent_error],
878 add=[
879 SchemaError(
880 message=(
881 "Licenses must have at least one of the following properties: "
882 "`name`, `path`."
883 ),
884 type="required",
885 schema_path=parent_error.schema_path,
886 jsonpath=parent_error.jsonpath,
887 instance=parent_error.instance,
888 )
889 ],
890 )
893_schema_path_to_handler: list[
894 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]]
895] = [
896 ("resources/items/oneOf", _handle_S_resources_x),
897 ("resources/items/properties/path/oneOf", _handle_S_resources_x_path),
898 ("fields/items/oneOf", _handle_S_resources_x_schema_fields_x),
899 (
900 "constraints/properties/enum/oneOf",
901 _handle_S_resources_x_schema_fields_x_constraints_enum,
902 ),
903 ("primaryKey/oneOf", _handle_S_resources_x_schema_primary_key),
904 ("foreignKeys/items/oneOf", _handle_S_resources_x_schema_foreign_keys),
905 ("licenses/items/anyOf", _handle_licenses),
906]
909def _handle_grouped_error(
910 schema_errors: list[SchemaError], parent_error: SchemaError
911) -> list[SchemaError]:
912 """Handle grouped schema errors that need special treatment.
914 Args:
915 schema_errors: All remaining schema errors.
916 parent_error: The parent error of a group.
918 Returns:
919 The schema errors after processing.
920 """
922 def _get_edits(
923 handlers: list[
924 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]]
925 ],
926 ) -> SchemaErrorEdits:
927 schema_path, handler = handlers[0]
928 edits = SchemaErrorEdits()
929 if parent_error.schema_path.endswith(schema_path):
930 edits = handler(parent_error, schema_errors)
932 if len(handlers) == 1:
933 return edits
935 next_edits = _get_edits(handlers[1:])
936 return SchemaErrorEdits(
937 add=edits.add + next_edits.add,
938 remove=edits.remove + next_edits.remove,
939 )
941 edits = _get_edits(_schema_path_to_handler)
942 return _filter(schema_errors, lambda error: error not in edits.remove) + edits.add
945def _validation_error_to_schema_errors(error: ValidationError) -> list[SchemaError]:
946 current = [_create_schema_error(error)]
947 if not error.context:
948 return current
950 return current + _flat_map(error.context, _validation_error_to_schema_errors)
953def _get_full_json_path_from_error(error: ValidationError) -> str:
954 """Returns the full `json_path` to the error.
956 For 'required' errors, the field name is extracted from the error message.
958 Args:
959 error: The error to get the full `json_path` for.
961 Returns:
962 The full `json_path` of the error.
963 """
964 if str(error.validator) == "required":
965 match = re.search("'(.*)' is a required property", error.message)
966 if match:
967 return f"{error.json_path}.{match.group(1)}"
968 return error.json_path
971def _create_schema_error(error: ValidationError) -> SchemaError:
972 return SchemaError(
973 message=error.message,
974 type=str(error.validator),
975 jsonpath=_get_full_json_path_from_error(error),
976 schema_path="/".join(_map(error.absolute_schema_path, str)),
977 instance=error.instance,
978 schema_value=error.validator_value,
979 parent=_create_schema_error(error.parent) if error.parent else None, # type: ignore[arg-type]
980 )
983def _path_or_data_required_error(error: SchemaError) -> bool:
984 return error.jsonpath.endswith(("path", "data")) and error.type == "required"
987def _is_path_type_error(error: SchemaError) -> bool:
988 return error.type == "type" and error.jsonpath.endswith("path")
991def _create_issue(error: SchemaError) -> Issue:
992 return Issue(
993 message=error.message,
994 jsonpath=error.jsonpath,
995 type=error.type,
996 instance=error.instance,
997 )
1000def _get_errors_in_group(
1001 schema_errors: list[SchemaError], parent_error: SchemaError
1002) -> list[SchemaError]:
1003 return _filter(schema_errors, lambda error: error.parent == parent_error)
1006def _strip_index(jsonpath: str) -> str:
1007 return re.sub(r"\[\d+\]$", "", jsonpath)
1010# Set up exception hooks at module load time
1011_setup_suppressed_tracebacks(DataPackageError)