Coverage for src / check_datapackage / check.py: 98%
322 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 13:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 13:13 +0000
1import os
2import re
3import sys
4from dataclasses import dataclass, field
5from functools import reduce
6from types import TracebackType
7from typing import Any, Callable, Iterator, Optional, cast
9from jsonpath import findall, resolve
10from jsonschema import Draft7Validator, FormatChecker, ValidationError
11from rich import print as rprint
12from seedcase_soil import flat_fmap, fmap, keep, read_properties
14from check_datapackage.config import Config
15from check_datapackage.constants import (
16 DATA_PACKAGE_SCHEMA_ADDRESS,
17 FIELD_TYPES,
18 GROUP_ERRORS,
19)
20from check_datapackage.exclusion import exclude
21from check_datapackage.extensions import apply_extensions
22from check_datapackage.internals import (
23 PropertyField,
24 _get_fields_at_jsonpath,
25)
26from check_datapackage.issue import MISSING, Issue
28# Type alias for Python exception hook
29PythonExceptionHook = Callable[
30 [type[BaseException], BaseException, Optional[TracebackType]],
31 None,
32]
34# Type alias for IPython custom exception handler (includes self and tb_offset)
35IPythonExceptionHandler = Callable[
36 [Any, type[BaseException], BaseException, Optional[TracebackType], None],
37 Optional[list[str]],
38]
41def _pretty_print_exception(
42 exc_type: type[BaseException],
43 exc_value: BaseException,
44) -> None:
45 rprint(f"\n[red]{exc_type.__name__}[/red]: {exc_value}")
48def _create_suppressed_traceback_hook(
49 exception_types: tuple[type[BaseException], ...],
50 old_hook: PythonExceptionHook,
51) -> PythonExceptionHook:
52 """Create a Python exception hook that suppresses tracebacks.
54 Args:
55 exception_types: Exception types to suppress tracebacks for.
56 old_hook: The previous exception hook to delegate unregistered exceptions to.
58 Returns:
59 A composable exception hook function.
60 """
62 def hook(
63 exc_type: type[BaseException],
64 exc_value: BaseException,
65 exc_traceback: Optional[TracebackType],
66 ) -> None:
67 if issubclass(exc_type, exception_types):
68 _pretty_print_exception(exc_type, exc_value)
69 else:
70 old_hook(exc_type, exc_value, exc_traceback)
72 return hook
75def _create_suppressed_traceback_ipython_hook(
76 exception_types: tuple[type[BaseException], ...],
77 old_custom_tb: Optional[IPythonExceptionHandler],
78) -> Callable[
79 [Any, type[BaseException], BaseException, Optional[TracebackType], None],
80 Optional[list[str]],
81]:
82 """Create an IPython exception hook that suppresses tracebacks.
84 Args:
85 exception_types: Exception types to suppress tracebacks for.
86 old_custom_tb: The previous IPython custom exception handler, if any.
88 Returns:
89 A composable IPython exception hook function.
90 """
91 has_old_handler = old_custom_tb is not None
93 def hook(
94 self: Any,
95 exc_type: type[BaseException],
96 exc_value: BaseException,
97 exc_traceback: Optional[TracebackType],
98 tb_offset: None = None,
99 ) -> Optional[list[str]]:
100 if issubclass(exc_type, exception_types):
101 _pretty_print_exception(exc_type, exc_value)
102 return []
103 elif has_old_handler and old_custom_tb is not None:
104 return old_custom_tb(self, exc_type, exc_value, exc_traceback, tb_offset)
105 else:
106 return None
108 return hook
111def _is_running_from_ipython() -> bool:
112 """Checks whether running in IPython interactive console or not."""
113 try:
114 from IPython import get_ipython # type: ignore[attr-defined]
115 except ImportError:
116 return False
117 else:
118 return get_ipython() is not None # type: ignore[no-untyped-call]
121def _setup_suppressed_tracebacks(
122 *exception_types: type[BaseException],
123) -> None:
124 """Set up exception hooks to hide tracebacks for specified exceptions.
126 This function is composable - multiple calls add to the existing hook
127 rather than replacing it. Each package only needs to register its own
128 exceptions.
130 Args:
131 *exception_types: Exception types to hide tracebacks for.
133 Raises:
134 TypeError: If any exception_type is not an exception class.
136 Examples:
137 ```python
138 # In package A
139 _setup_suppressed_tracebacks(ErrorA)
141 # In package B - adds to existing hook
142 _setup_suppressed_tracebacks(ErrorB, ErrorC)
143 # Now ErrorA, ErrorB, and ErrorC will all have suppressed tracebacks
144 ```
145 """
146 for exc_type in exception_types:
147 if not (isinstance(exc_type, type) and issubclass(exc_type, BaseException)):
148 raise TypeError(f"{exc_type!r} is not an exception class")
150 sys.excepthook = _create_suppressed_traceback_hook(exception_types, sys.excepthook)
152 if _is_running_from_ipython():
153 ip = get_ipython() # type: ignore # noqa: F821
154 old_custom_tb: Optional[IPythonExceptionHandler] = getattr(ip, "CustomTB", None)
155 ip.set_custom_exc(
156 (Exception,),
157 _create_suppressed_traceback_ipython_hook(exception_types, old_custom_tb),
158 )
161class DataPackageError(Exception):
162 """Convert Data Package issues to an error and hide the traceback."""
164 def __init__(
165 self,
166 issues: list[Issue],
167 ) -> None:
168 """Create the DataPackageError from issues."""
169 super().__init__(explain(issues))
172def explain(issues: list[Issue]) -> str:
173 """Explain the issues in a human-readable format.
175 The explanation of the issue is outputted as a string. To display the
176 string in a easily readable format, the `pretty_print()` function from
177 `check-datapackage` can be used.
179 Args:
180 issues: A list of `Issue` objects to explain.
182 Returns:
183 A human-readable explanation of the issues.
185 Examples:
186 ```{python}
187 import check_datapackage as cdp
189 issue = cdp.Issue(
190 jsonpath="$.resources[2].title",
191 type="required",
192 message="The `title` field is required but missing at the given JSON path.",
193 )
195 issues = cdp.explain([issue])
197 # On its own
198 issues
199 # Normal print
200 print(issues)
201 # Pretty print with rich
202 cdp.pretty_print(issues)
203 ```
204 """
205 issue_explanations: list[str] = fmap(
206 issues,
207 _create_explanation,
208 )
209 num_issues = len(issue_explanations)
210 singular_or_plural = " was" if num_issues == 1 else "s were"
211 return (
212 f"{num_issues} issue{singular_or_plural} found in your [u]datapackage.json[/u]:\n\n" # noqa: E501
213 + "\n".join(issue_explanations)
214 )
217def _create_explanation(issue: Issue) -> str:
218 """Create an informative explanation of what went wrong in each issue."""
219 display_jsonpath, property_name = _display_jsonpath_and_property(issue)
220 if not property_name:
221 return (
222 "check() requires a dictionary with metadata,"
223 f" but received {issue.instance}."
224 )
226 number_of_carets = len(str(issue.instance))
227 return ( # noqa: F401
228 f"At {display_jsonpath}:\n"
229 "|\n"
230 f"| {property_name}{': ' if property_name else ' '}{issue.instance}\n"
231 f"| {' ' * len(property_name)} [red]{'^' * number_of_carets}[/red]\n"
232 f"{issue.message}\n"
233 )
236def _display_jsonpath_and_property(issue: Issue) -> tuple[str, str]:
237 if issue.jsonpath == "$":
238 return "top level", ""
240 parent_jsonpath, property_name = issue.jsonpath.rsplit(".", maxsplit=1)
241 return _display_jsonpath(parent_jsonpath), property_name
244def _display_jsonpath(jsonpath: str) -> str:
245 if jsonpath == "$":
246 return "top level"
247 return jsonpath.removeprefix("$.")
250def check(
251 properties: dict[str, Any], config: Config = Config(), error: bool = False
252) -> list[Issue]:
253 """Checks a Data Package's properties against the Data Package standard.
255 Args:
256 properties: A Data Package's metadata from `datapackage.json` as a Python
257 dictionary.
258 config: Configuration for the checks to be done. See the `Config`
259 class for more details, especially about the default values.
260 error: Whether to treat any issues found as errors. Defaults
261 to `False`, meaning that issues will be returned as a list of `Issue`
262 objects. Will internally run `explain()` on the Issues
263 if set to `True`.
265 Returns:
266 A list of `Issue` objects representing any issues found
267 while checking the properties. If no issues are found, an empty list
268 is returned.
269 """
270 schema = read_properties(DATA_PACKAGE_SCHEMA_ADDRESS)
272 if config.strict:
273 _set_should_fields_to_required(schema)
275 issues = _check_object_against_json_schema(properties, schema)
276 issues += _check_keys(properties, issues)
277 issues += apply_extensions(properties, config.extensions)
278 issues = exclude(issues, config.exclusions)
279 issues = sorted(set(issues))
281 # Use by doing `CDP_DEBUG=true uv run ...`
282 if os.getenv("CDP_DEBUG"):
283 rprint("", properties)
284 for issue in issues:
285 rprint(issue)
286 rprint(explain([issue]))
288 if error and issues:
289 raise DataPackageError(issues)
291 return issues
294def _check_keys(properties: dict[str, Any], issues: list[Issue]) -> list[Issue]:
295 """Check that primary and foreign keys exist."""
296 # Primary keys
297 resources_with_pk = _get_fields_at_jsonpath(
298 "$.resources[?(length(@.schema.primaryKey) > 0 || @.schema.primaryKey == '')]",
299 properties,
300 )
301 resources_with_pk = _keep_resources_with_no_issue_at_property(
302 resources_with_pk, issues, "schema.primaryKey"
303 )
304 key_issues = flat_fmap(resources_with_pk, _check_primary_key)
306 # Foreign keys
307 resources_with_fk = _get_fields_at_jsonpath(
308 "$.resources[?(length(@.schema.foreignKeys) > 0)]",
309 properties,
310 )
311 resources_with_fk = _keep_resources_with_no_issue_at_property(
312 resources_with_fk, issues, "schema.foreignKeys"
313 )
314 key_issues += flat_fmap(
315 resources_with_fk,
316 lambda resource: _check_foreign_keys(resource, properties),
317 )
318 return key_issues
321def _issues_at_property(
322 resource: PropertyField, issues: list[Issue], jsonpath: str
323) -> list[Issue]:
324 return keep(
325 issues,
326 lambda issue: f"{resource.jsonpath}.{jsonpath}" in issue.jsonpath,
327 )
330def _keep_resources_with_no_issue_at_property(
331 resources: list[PropertyField], issues: list[Issue], jsonpath: str
332) -> list[PropertyField]:
333 """Filter out resources that have an issue at or under the given `jsonpath`."""
334 return keep(
335 resources,
336 lambda resource: not _issues_at_property(resource, issues, jsonpath),
337 )
340def _check_primary_key(resource: PropertyField) -> list[Issue]:
341 """Check that primary key fields exist in the resource."""
342 pk_fields = resolve("/schema/primaryKey", resource.value)
343 pk_fields_list = _key_fields_as_str_list(pk_fields)
344 unknown_fields = _get_unknown_key_fields(pk_fields_list, resource.value)
346 if not unknown_fields:
347 return []
349 return [
350 Issue(
351 jsonpath=f"{resource.jsonpath}.schema.primaryKey",
352 type="primary-key",
353 message=(
354 f"No fields found in resource for primary key fields: {unknown_fields}."
355 ),
356 instance=pk_fields,
357 )
358 ]
361def _check_foreign_keys(
362 resource: PropertyField, properties: dict[str, Any]
363) -> list[Issue]:
364 """Check that foreign key source and destination fields exist."""
365 # Safe, as only FKs of the correct type here
366 foreign_keys = cast(
367 list[dict[str, Any]], resolve("/schema/foreignKeys", resource.value)
368 )
369 foreign_keys_diff_resource = keep(
370 foreign_keys,
371 lambda fk: "resource" in fk["reference"] and fk["reference"]["resource"] != "",
372 )
373 foreign_keys_same_resource = keep(
374 foreign_keys, lambda fk: fk not in foreign_keys_diff_resource
375 )
377 issues = flat_fmap(foreign_keys, lambda fk: _check_fk_source_fields(fk, resource))
378 issues += flat_fmap(
379 foreign_keys_same_resource,
380 lambda fk: _check_fk_dest_fields_same_resource(fk, resource),
381 )
382 issues += flat_fmap(
383 foreign_keys_diff_resource,
384 lambda fk: _check_fk_dest_fields_diff_resource(fk, resource, properties),
385 )
387 return issues
390def _key_fields_as_str_list(key_fields: Any) -> list[str]:
391 """Returns the list representation of primary and foreign key fields.
393 Key fields can be represented either as a string (containing one field name)
394 or a list of strings.
396 The input should contain a correctly typed `key_fields` object.
397 """
398 if not isinstance(key_fields, list):
399 key_fields = [key_fields]
400 return cast(list[str], key_fields)
403def _get_unknown_key_fields(
404 key_fields: list[str], properties: dict[str, Any], resource_path: str = ""
405) -> str:
406 """Return the key fields that don't exist on the specified resource."""
407 known_fields = findall(f"{resource_path}schema.fields[*].name", properties)
408 unknown_fields = keep(key_fields, lambda field: field not in known_fields)
409 unknown_fields = fmap(unknown_fields, lambda field: f"{field!r}")
410 return ", ".join(unknown_fields)
413def _check_fk_source_fields(
414 foreign_key: dict[str, Any], resource: PropertyField
415) -> list[Issue]:
416 """Check that foreign key source fields exist and have the correct number."""
417 issues = []
418 source_fields = resolve("/fields", foreign_key)
419 source_field_list = _key_fields_as_str_list(source_fields)
420 unknown_fields = _get_unknown_key_fields(source_field_list, resource.value)
421 if unknown_fields:
422 issues.append(
423 Issue(
424 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
425 type="foreign-key-source-fields",
426 message=(
427 "No fields found in resource for foreign key source fields: "
428 f"{unknown_fields}."
429 ),
430 instance=source_fields,
431 )
432 )
434 dest_fields = _key_fields_as_str_list(resolve("/reference/fields", foreign_key))
435 if len(source_field_list) != len(dest_fields):
436 issues.append(
437 Issue(
438 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
439 type="foreign-key-source-fields",
440 message=(
441 "The number of foreign key source fields must be the same as "
442 "the number of foreign key destination fields."
443 ),
444 instance=source_fields,
445 )
446 )
447 return issues
450def _check_fk_dest_fields_same_resource(
451 foreign_key: dict[str, Any],
452 resource: PropertyField,
453) -> list[Issue]:
454 """Check that foreign key destination fields exist on the same resource."""
455 dest_fields = resolve("/reference/fields", foreign_key)
456 dest_field_list = _key_fields_as_str_list(dest_fields)
457 unknown_fields = _get_unknown_key_fields(dest_field_list, resource.value)
458 if not unknown_fields:
459 return []
461 return [
462 Issue(
463 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
464 type="foreign-key-destination-fields",
465 message=(
466 "No fields found in resource for foreign key "
467 f"destination fields: {unknown_fields}."
468 ),
469 instance=dest_fields,
470 )
471 ]
474def _check_fk_dest_fields_diff_resource(
475 foreign_key: dict[str, Any], resource: PropertyField, properties: dict[str, Any]
476) -> list[Issue]:
477 """Check that foreign key destination fields exist on the destination resource."""
478 dest_fields = resolve("/reference/fields", foreign_key)
479 dest_field_list = _key_fields_as_str_list(dest_fields)
480 # Safe, as only keys of the correct type here
481 dest_resource_name = cast(str, resolve("/reference/resource", foreign_key))
483 dest_resource_path = f"resources[?(@.name == '{dest_resource_name}')]"
484 if not findall(dest_resource_path, properties):
485 return [
486 Issue(
487 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.resource",
488 type="foreign-key-destination-resource",
489 message=(
490 f"The destination resource {dest_resource_name!r} of this foreign "
491 "key doesn't exist in the package."
492 ),
493 instance=dest_resource_name,
494 )
495 ]
497 unknown_fields = _get_unknown_key_fields(
498 dest_field_list, properties, f"{dest_resource_path}."
499 )
500 if not unknown_fields:
501 return []
503 return [
504 Issue(
505 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
506 type="foreign-key-destination-fields",
507 message=(
508 f"No fields found in destination resource {dest_resource_name!r} "
509 f"for foreign key destination fields: {unknown_fields}."
510 ),
511 instance=dest_fields,
512 )
513 ]
516def _set_should_fields_to_required(schema: dict[str, Any]) -> dict[str, Any]:
517 """Set 'SHOULD' fields to 'REQUIRED' in the schema."""
518 should_fields = ("name", "id", "licenses")
519 name_pattern = r"^[a-z0-9._-]+$"
521 # From https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
522 semver_pattern = (
523 r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)"
524 r"(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0"
525 r"|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>"
526 r"[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"
527 )
529 # Convert to required
530 schema["required"].extend(should_fields)
531 schema["properties"]["name"]["pattern"] = name_pattern
532 schema["properties"]["version"]["pattern"] = semver_pattern
533 schema["properties"]["contributors"]["items"]["required"] = ["title"]
534 schema["properties"]["sources"]["items"]["required"] = ["title"]
535 schema["properties"]["resources"]["items"]["properties"]["name"]["pattern"] = (
536 name_pattern
537 )
538 return schema
541def _check_object_against_json_schema(
542 json_object: dict[str, Any], schema: dict[str, Any]
543) -> list[Issue]:
544 """Checks that `json_object` matches the given JSON schema.
546 Structural, type and format constraints are all checked. All schema violations are
547 collected before issues are returned.
549 Args:
550 json_object: The JSON object to check.
551 schema: The JSON schema to check against.
553 Returns:
554 A list of issues. An empty list, if no issues are found.
556 Raises:
557 jsonschema.exceptions.SchemaError: If the given schema is invalid.
558 """
559 Draft7Validator.check_schema(schema)
560 validator = Draft7Validator(schema, format_checker=FormatChecker())
561 return _validation_errors_to_issues(validator.iter_errors(json_object))
564@dataclass(frozen=True)
565class SchemaError:
566 """A simpler representation of `ValidationError` for easier processing.
568 Attributes:
569 message (str): The error message generated by `jsonschema`.
570 type (str): The type of the error (e.g., a JSON schema type such as "required",
571 "type", "pattern", or "format", or a custom type).
572 schema_path (str): The path to the violated check in the JSON schema.
573 Path components are separated by '/'.
574 jsonpath (str): The JSON path to the field that violates the check.
575 instance (Any): The part of the object that failed the check.
576 schema_value (Optional[Any]): The expected value that is checked against,
577 which is part of the schema violated by this error.
578 parent (Optional[SchemaError]): The error group the error belongs to, if any.
579 """
581 message: str
582 type: str
583 schema_path: str
584 jsonpath: str
585 instance: Any
586 schema_value: Optional[Any] = None
587 parent: Optional["SchemaError"] = None
590def _validation_errors_to_issues(
591 validation_errors: Iterator[ValidationError],
592) -> list[Issue]:
593 """Transforms `jsonschema.ValidationError`s to more compact `Issue`s.
595 Args:
596 validation_errors: The `jsonschema.ValidationError`s to transform.
598 Returns:
599 A list of `Issue`s.
600 """
601 schema_errors = flat_fmap(validation_errors, _validation_error_to_schema_errors)
602 grouped_errors = keep(schema_errors, lambda error: error.type in GROUP_ERRORS)
603 schema_errors = reduce(_handle_grouped_error, grouped_errors, schema_errors)
605 return fmap(schema_errors, _create_issue)
608@dataclass(frozen=True)
609class SchemaErrorEdits:
610 """Expresses which errors to add to or remove from schema errors."""
612 add: list[SchemaError] = field(default_factory=list)
613 remove: list[SchemaError] = field(default_factory=list)
616def _handle_S_resources_x(
617 parent_error: SchemaError,
618 schema_errors: list[SchemaError],
619) -> SchemaErrorEdits:
620 """Do not flag missing `path` and `data` separately."""
621 edits = SchemaErrorEdits()
622 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
623 # If the parent error is caused by other errors, remove it
624 if errors_in_group:
625 edits.remove.append(parent_error)
627 path_or_data_required_errors = keep(errors_in_group, _path_or_data_required_error)
628 # If path and data are both missing, add a more informative error
629 if len(path_or_data_required_errors) > 1:
630 edits.add.append(
631 SchemaError(
632 message=(
633 "This resource has no `path` or `data` field. "
634 "One of them must be provided."
635 ),
636 type="required",
637 jsonpath=parent_error.jsonpath,
638 schema_path=parent_error.schema_path,
639 instance=parent_error.instance,
640 )
641 )
643 # Remove all required errors on path and data
644 edits.remove.extend(path_or_data_required_errors)
645 return edits
648def _handle_S_resources_x_path(
649 parent_error: SchemaError,
650 schema_errors: list[SchemaError],
651) -> SchemaErrorEdits:
652 """Only flag errors for the relevant type.
654 If `path` is a string, flag errors for the string-based schema.
655 If `path` is an array, flag errors for the array-based schema.
656 """
657 edits = SchemaErrorEdits()
658 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
659 type_errors = keep(errors_in_group, _is_path_type_error)
660 only_type_errors = len(errors_in_group) == len(type_errors)
662 if type_errors:
663 edits.remove.append(parent_error)
665 # If the only error is that $.resources[x].path is of the wrong type,
666 # add a more informative error
667 if only_type_errors:
668 edits.add.append(
669 SchemaError(
670 message="The `path` property must be either a string or an array.",
671 type="type",
672 jsonpath=type_errors[0].jsonpath,
673 schema_path=type_errors[0].schema_path,
674 instance=parent_error.instance,
675 )
676 )
678 # Remove all original type errors on $.resources[x].path
679 edits.remove.extend(type_errors)
680 return edits
683def _handle_S_resources_x_schema_fields_x(
684 parent_error: SchemaError,
685 schema_errors: list[SchemaError],
686) -> SchemaErrorEdits:
687 """Only flag errors for the relevant field type.
689 E.g., if the field type is `string`, flag errors for the string-based schema only.
690 """
691 edits = SchemaErrorEdits(remove=[parent_error])
692 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
694 parent_instance = parent_error.instance
695 if not isinstance(parent_instance, dict):
696 return edits
698 field_type: str = parent_instance.get("type", "string")
700 # The field's type is unknown
701 if field_type not in FIELD_TYPES:
702 unknown_field_error = SchemaError(
703 message=(
704 "The type property in this resource schema field is incorrect. "
705 f"The value can only be one of these types: {', '.join(FIELD_TYPES)}."
706 ),
707 type="enum",
708 jsonpath=f"{parent_error.jsonpath}.type",
709 schema_path=parent_error.schema_path,
710 instance=parent_instance,
711 )
712 # Replace all errors with an unknown field error
713 edits.add.append(unknown_field_error)
714 edits.remove.extend(errors_in_group)
715 return edits
717 # The field's type is known; keep only errors for this field type
718 schema_index = FIELD_TYPES.index(field_type)
720 errors_for_other_types = keep(
721 errors_in_group,
722 lambda error: f"fields/items/oneOf/{schema_index}/" not in error.schema_path,
723 )
724 edits.remove.extend(errors_for_other_types)
725 return edits
728def _handle_S_resources_x_schema_fields_x_constraints_enum(
729 parent_error: SchemaError,
730 schema_errors: list[SchemaError],
731) -> SchemaErrorEdits:
732 """Only flag errors for the relevant field type and simplify errors."""
733 edits = SchemaErrorEdits(remove=[parent_error])
734 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
736 # Remove errors for other field types
737 if _not_field_type_error(parent_error):
738 edits.remove.extend(errors_in_group)
739 return edits
741 value_errors = keep(
742 errors_in_group,
743 lambda error: not error.jsonpath.endswith("enum"),
744 )
746 # If there are only value errors, simplify them
747 if value_errors == errors_in_group:
748 edits.add.append(_get_enum_values_error(parent_error, value_errors))
750 # Otherwise, keep only top-level enum errors
751 edits.remove.extend(value_errors)
752 return edits
755def _get_enum_values_error(
756 parent_error: SchemaError,
757 value_errors: list[SchemaError],
758) -> SchemaError:
759 message = "All enum values must be the same type."
760 same_type = len(set(fmap(parent_error.instance, lambda value: type(value)))) == 1
761 if same_type:
762 allowed_types = set(fmap(value_errors, lambda error: str(error.schema_value)))
763 message = (
764 "The enum value type is not correct. Enum values should be "
765 f"one of {', '.join(allowed_types)}."
766 )
767 return SchemaError(
768 message=message,
769 type="type",
770 schema_path=value_errors[0].schema_path,
771 jsonpath=_strip_index(value_errors[0].jsonpath),
772 instance=value_errors[0].instance,
773 )
776def _not_field_type_error(parent_error: SchemaError) -> bool:
777 if not parent_error.parent:
778 return True
779 field_type: str = parent_error.parent.instance.get("type", "string")
780 if field_type not in FIELD_TYPES:
781 return True
782 schema_index = FIELD_TYPES.index(field_type)
783 return f"fields/items/oneOf/{schema_index}/" not in parent_error.schema_path
786def _handle_S_resources_x_schema_primary_key(
787 parent_error: SchemaError,
788 schema_errors: list[SchemaError],
789) -> SchemaErrorEdits:
790 """Only flag errors for the relevant type and simplify errors."""
791 PRIMARY_KEY_TYPES: tuple[type[Any], ...] = (list, str)
792 edits = SchemaErrorEdits(remove=[parent_error])
793 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
795 key_type = type(parent_error.instance)
796 if key_type in PRIMARY_KEY_TYPES:
797 schema_for_type = f"primaryKey/oneOf/{PRIMARY_KEY_TYPES.index(key_type)}/"
798 edits.remove.extend(
799 keep(
800 errors_in_group,
801 lambda error: schema_for_type not in error.schema_path,
802 )
803 )
804 return edits
806 edits.remove.extend(errors_in_group)
807 edits.add.append(
808 SchemaError(
809 message="The `primaryKey` property must be a string or an array.",
810 type="type",
811 jsonpath=parent_error.jsonpath,
812 schema_path=parent_error.schema_path,
813 instance=parent_error.instance,
814 )
815 )
817 return edits
820def _handle_S_resources_x_schema_foreign_keys(
821 parent_error: SchemaError,
822 schema_errors: list[SchemaError],
823) -> SchemaErrorEdits:
824 """Only flag errors for the relevant type and simplify errors.
826 The sub-schema to use is determined based on the type of the top-level foreign
827 key fields property.
828 """
829 FOREIGN_KEY_TYPES: tuple[type[Any], ...] = (list, str)
830 edits = SchemaErrorEdits(remove=[parent_error])
831 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
833 parent_instance = parent_error.instance
834 key_exists = isinstance(parent_instance, dict) and "fields" in parent_instance
836 # If the key type is correct, use that schema
837 if (
838 key_exists
839 and (key_type := type(parent_instance["fields"])) in FOREIGN_KEY_TYPES
840 ):
841 schema_part = f"foreignKeys/items/oneOf/{FOREIGN_KEY_TYPES.index(key_type)}/"
842 edits.remove.extend(
843 keep(
844 errors_in_group,
845 lambda error: schema_part not in error.schema_path,
846 )
847 )
848 return edits
850 # If the key type is incorrect, remove all errors that depend on it
851 key_type_errors = keep(
852 errors_in_group,
853 lambda error: (
854 error.schema_path.endswith("fields/type")
855 or "reference/properties/fields" in error.schema_path
856 ),
857 )
858 edits.remove.extend(key_type_errors)
860 # If the key exists, flag incorrect type
861 if key_exists:
862 edits.add.append(
863 SchemaError(
864 message=(
865 "The `fields` property of a foreign key must be a string or "
866 "an array."
867 ),
868 type="type",
869 jsonpath=f"{parent_error.jsonpath}.fields",
870 schema_path=parent_error.schema_path,
871 instance=parent_error.instance,
872 )
873 )
875 return edits
878def _handle_licenses(
879 parent_error: SchemaError,
880 schema_errors: list[SchemaError],
881) -> SchemaErrorEdits:
882 """Only include one error if both `name` and `path` are missing."""
883 errors_in_group = _get_errors_in_group(schema_errors, parent_error)
884 return SchemaErrorEdits(
885 remove=errors_in_group + [parent_error],
886 add=[
887 SchemaError(
888 message=(
889 "Licenses must have at least one of the following properties: "
890 "`name`, `path`."
891 ),
892 type="required",
893 schema_path=parent_error.schema_path,
894 jsonpath=parent_error.jsonpath,
895 instance=parent_error.instance,
896 )
897 ],
898 )
901_schema_path_to_handler: list[
902 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]]
903] = [
904 ("resources/items/oneOf", _handle_S_resources_x),
905 ("resources/items/properties/path/oneOf", _handle_S_resources_x_path),
906 ("fields/items/oneOf", _handle_S_resources_x_schema_fields_x),
907 (
908 "constraints/properties/enum/oneOf",
909 _handle_S_resources_x_schema_fields_x_constraints_enum,
910 ),
911 ("primaryKey/oneOf", _handle_S_resources_x_schema_primary_key),
912 ("foreignKeys/items/oneOf", _handle_S_resources_x_schema_foreign_keys),
913 ("licenses/items/anyOf", _handle_licenses),
914]
917def _handle_grouped_error(
918 schema_errors: list[SchemaError], parent_error: SchemaError
919) -> list[SchemaError]:
920 """Handle grouped schema errors that need special treatment.
922 Args:
923 schema_errors: All remaining schema errors.
924 parent_error: The parent error of a group.
926 Returns:
927 The schema errors after processing.
928 """
930 def _get_edits(
931 handlers: list[
932 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]]
933 ],
934 ) -> SchemaErrorEdits:
935 schema_path, handler = handlers[0]
936 edits = SchemaErrorEdits()
937 if parent_error.schema_path.endswith(schema_path):
938 edits = handler(parent_error, schema_errors)
940 if len(handlers) == 1:
941 return edits
943 next_edits = _get_edits(handlers[1:])
944 return SchemaErrorEdits(
945 add=edits.add + next_edits.add,
946 remove=edits.remove + next_edits.remove,
947 )
949 edits = _get_edits(_schema_path_to_handler)
950 return keep(schema_errors, lambda error: error not in edits.remove) + edits.add
953def _validation_error_to_schema_errors(error: ValidationError) -> list[SchemaError]:
954 current = [_create_schema_error(error)]
955 if not error.context:
956 return current
958 return current + flat_fmap(error.context, _validation_error_to_schema_errors)
961def _get_full_json_path_from_error(error: ValidationError) -> str:
962 """Returns the full `json_path` to the error.
964 For 'required' errors, the field name is extracted from the error message.
966 Args:
967 error: The error to get the full `json_path` for.
969 Returns:
970 The full `json_path` of the error.
971 """
972 if str(error.validator) == "required":
973 match = re.search("'(.*)' is a required property", error.message)
974 if match:
975 return f"{error.json_path}.{match.group(1)}"
976 return error.json_path
979def _create_schema_error(error: ValidationError) -> SchemaError:
980 return SchemaError(
981 message=error.message,
982 type=str(error.validator),
983 jsonpath=_get_full_json_path_from_error(error),
984 schema_path="/".join(fmap(error.absolute_schema_path, str)),
985 instance=error.instance,
986 schema_value=error.validator_value,
987 parent=_create_schema_error(error.parent) if error.parent else None, # type: ignore[arg-type]
988 )
991def _path_or_data_required_error(error: SchemaError) -> bool:
992 return error.jsonpath.endswith(("path", "data")) and error.type == "required"
995def _is_path_type_error(error: SchemaError) -> bool:
996 return error.type == "type" and error.jsonpath.endswith("path")
999def _create_issue(error: SchemaError) -> Issue:
1000 return Issue(
1001 message=error.message,
1002 jsonpath=error.jsonpath,
1003 type=error.type,
1004 instance=MISSING if _is_missing_required_property(error) else error.instance,
1005 )
1008def _is_missing_required_property(error: SchemaError) -> bool:
1009 return error.type == "required" and bool(
1010 re.fullmatch(r"'.+' is a required property", error.message)
1011 )
1014def _get_errors_in_group(
1015 schema_errors: list[SchemaError], parent_error: SchemaError
1016) -> list[SchemaError]:
1017 return keep(schema_errors, lambda error: error.parent == parent_error)
1020def _strip_index(jsonpath: str) -> str:
1021 return re.sub(r"\[\d+\]$", "", jsonpath)
1024# Set up exception hooks at module load time
1025_setup_suppressed_tracebacks(DataPackageError)