Coverage for src / check_datapackage / check.py: 98%

311 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 08:54 +0000

1import os 

2import re 

3import sys 

4from dataclasses import dataclass, field 

5from functools import reduce 

6from types import TracebackType 

7from typing import Any, Callable, Iterator, Optional, cast 

8 

9from jsonpath import findall, resolve 

10from jsonschema import Draft7Validator, FormatChecker, ValidationError 

11from rich import print as rprint 

12 

13from check_datapackage.config import Config 

14from check_datapackage.constants import ( 

15 DATA_PACKAGE_SCHEMA_PATH, 

16 FIELD_TYPES, 

17 GROUP_ERRORS, 

18) 

19from check_datapackage.exclusion import exclude 

20from check_datapackage.extensions import apply_extensions 

21from check_datapackage.internals import ( 

22 PropertyField, 

23 _filter, 

24 _flat_map, 

25 _get_fields_at_jsonpath, 

26 _map, 

27) 

28from check_datapackage.issue import Issue 

29from check_datapackage.read_json import read_json 

30 

31# Type alias for Python exception hook 

32PythonExceptionHook = Callable[ 

33 [type[BaseException], BaseException, Optional[TracebackType]], 

34 None, 

35] 

36 

37# Type alias for IPython custom exception handler (includes self and tb_offset) 

38IPythonExceptionHandler = Callable[ 

39 [Any, type[BaseException], BaseException, Optional[TracebackType], None], 

40 Optional[list[str]], 

41] 

42 

43 

44def _pretty_print_exception( 

45 exc_type: type[BaseException], 

46 exc_value: BaseException, 

47) -> None: 

48 rprint(f"\n[red]{exc_type.__name__}[/red]: {exc_value}") 

49 

50 

51def _create_suppressed_traceback_hook( 

52 exception_types: tuple[type[BaseException], ...], 

53 old_hook: PythonExceptionHook, 

54) -> PythonExceptionHook: 

55 """Create a Python exception hook that suppresses tracebacks. 

56 

57 Args: 

58 exception_types: Exception types to suppress tracebacks for. 

59 old_hook: The previous exception hook to delegate unregistered exceptions to. 

60 

61 Returns: 

62 A composable exception hook function. 

63 """ 

64 

65 def hook( 

66 exc_type: type[BaseException], 

67 exc_value: BaseException, 

68 exc_traceback: Optional[TracebackType], 

69 ) -> None: 

70 if issubclass(exc_type, exception_types): 

71 _pretty_print_exception(exc_type, exc_value) 

72 else: 

73 old_hook(exc_type, exc_value, exc_traceback) 

74 

75 return hook 

76 

77 

78def _create_suppressed_traceback_ipython_hook( 

79 exception_types: tuple[type[BaseException], ...], 

80 old_custom_tb: Optional[IPythonExceptionHandler], 

81) -> Callable[ 

82 [Any, type[BaseException], BaseException, Optional[TracebackType], None], 

83 Optional[list[str]], 

84]: 

85 """Create an IPython exception hook that suppresses tracebacks. 

86 

87 Args: 

88 exception_types: Exception types to suppress tracebacks for. 

89 old_custom_tb: The previous IPython custom exception handler, if any. 

90 

91 Returns: 

92 A composable IPython exception hook function. 

93 """ 

94 has_old_handler = old_custom_tb is not None 

95 

96 def hook( 

97 self: Any, 

98 exc_type: type[BaseException], 

99 exc_value: BaseException, 

100 exc_traceback: Optional[TracebackType], 

101 tb_offset: None = None, 

102 ) -> Optional[list[str]]: 

103 if issubclass(exc_type, exception_types): 

104 _pretty_print_exception(exc_type, exc_value) 

105 return [] 

106 elif has_old_handler and old_custom_tb is not None: 

107 return old_custom_tb(self, exc_type, exc_value, exc_traceback, tb_offset) 

108 else: 

109 return None 

110 

111 return hook 

112 

113 

114def _is_running_from_ipython() -> bool: 

115 """Checks whether running in IPython interactive console or not.""" 

116 try: 

117 from IPython import get_ipython # type: ignore[attr-defined] 

118 except ImportError: 

119 return False 

120 else: 

121 return get_ipython() is not None # type: ignore[no-untyped-call] 

122 

123 

124def _setup_suppressed_tracebacks( 

125 *exception_types: type[BaseException], 

126) -> None: 

127 """Set up exception hooks to hide tracebacks for specified exceptions. 

128 

129 This function is composable - multiple calls add to the existing hook 

130 rather than replacing it. Each package only needs to register its own 

131 exceptions. 

132 

133 Args: 

134 *exception_types: Exception types to hide tracebacks for. 

135 

136 Raises: 

137 TypeError: If any exception_type is not an exception class. 

138 

139 Examples: 

140 ```python 

141 # In package A 

142 _setup_suppressed_tracebacks(ErrorA) 

143 

144 # In package B - adds to existing hook 

145 _setup_suppressed_tracebacks(ErrorB, ErrorC) 

146 # Now ErrorA, ErrorB, and ErrorC will all have suppressed tracebacks 

147 ``` 

148 """ 

149 for exc_type in exception_types: 

150 if not (isinstance(exc_type, type) and issubclass(exc_type, BaseException)): 

151 raise TypeError(f"{exc_type!r} is not an exception class") 

152 

153 sys.excepthook = _create_suppressed_traceback_hook(exception_types, sys.excepthook) 

154 

155 if _is_running_from_ipython(): 

156 ip = get_ipython() # type: ignore # noqa: F821 

157 old_custom_tb: Optional[IPythonExceptionHandler] = getattr(ip, "CustomTB", None) 

158 ip.set_custom_exc( 

159 (Exception,), 

160 _create_suppressed_traceback_ipython_hook(exception_types, old_custom_tb), 

161 ) 

162 

163 

164class DataPackageError(Exception): 

165 """Convert Data Package issues to an error and hide the traceback.""" 

166 

167 def __init__( 

168 self, 

169 issues: list[Issue], 

170 ) -> None: 

171 """Create the DataPackageError from issues.""" 

172 super().__init__(explain(issues)) 

173 

174 

175def explain(issues: list[Issue]) -> str: 

176 """Explain the issues in a human-readable format. 

177 

178 The explanation of the issue is outputted as a string. To display the 

179 string in a easily readable format, the `pretty_print()` function from 

180 `check-datapackage` can be used. 

181 

182 Args: 

183 issues: A list of `Issue` objects to explain. 

184 

185 Returns: 

186 A human-readable explanation of the issues. 

187 

188 Examples: 

189 ```{python} 

190 import check_datapackage as cdp 

191 

192 issue = cdp.Issue( 

193 jsonpath="$.resources[2].title", 

194 type="required", 

195 message="The `title` field is required but missing at the given JSON path.", 

196 ) 

197 

198 issues = cdp.explain([issue]) 

199 

200 # On its own 

201 issues 

202 # Normal print 

203 print(issues) 

204 # Pretty print with rich 

205 cdp.pretty_print(issues) 

206 ``` 

207 """ 

208 issue_explanations: list[str] = _map( 

209 issues, 

210 _create_explanation, 

211 ) 

212 num_issues = len(issue_explanations) 

213 singular_or_plural = " was" if num_issues == 1 else "s were" 

214 return ( 

215 f"{num_issues} issue{singular_or_plural} found in your [u]datapackage.json[/u]:\n\n" # noqa: E501 

216 + "\n".join(issue_explanations) 

217 ) 

218 

219 

220def _create_explanation(issue: Issue) -> str: 

221 """Create an informative explanation of what went wrong in each issue.""" 

222 # Remove suffix '$' to account for root path when `[]` is passed to `check()` 

223 property_name = issue.jsonpath.removesuffix("$").split(".")[-1] 

224 if not property_name: 

225 return ( 

226 "check() requires a dictionary with metadata," 

227 f" but received {issue.instance}." 

228 ) 

229 

230 number_of_carets = len(str(issue.instance)) 

231 return ( # noqa: F401 

232 f"At {issue.jsonpath.removeprefix('$.')}:\n" 

233 "|\n" 

234 f"| {property_name}{': ' if property_name else ' '}{issue.instance}\n" 

235 f"| {' ' * len(property_name)} [red]{'^' * number_of_carets}[/red]\n" 

236 f"{issue.message}\n" 

237 ) 

238 

239 

240def check( 

241 properties: dict[str, Any], config: Config = Config(), error: bool = False 

242) -> list[Issue]: 

243 """Checks a Data Package's properties against the Data Package standard. 

244 

245 Args: 

246 properties: A Data Package's metadata from `datapackage.json` as a Python 

247 dictionary. 

248 config: Configuration for the checks to be done. See the `Config` 

249 class for more details, especially about the default values. 

250 error: Whether to treat any issues found as errors. Defaults 

251 to `False`, meaning that issues will be returned as a list of `Issue` 

252 objects. Will internally run `explain()` on the Issues 

253 if set to `True`. 

254 

255 Returns: 

256 A list of `Issue` objects representing any issues found 

257 while checking the properties. If no issues are found, an empty list 

258 is returned. 

259 """ 

260 schema = read_json(DATA_PACKAGE_SCHEMA_PATH) 

261 

262 if config.strict: 

263 _set_should_fields_to_required(schema) 

264 

265 issues = _check_object_against_json_schema(properties, schema) 

266 issues += _check_keys(properties, issues) 

267 issues += apply_extensions(properties, config.extensions) 

268 issues = exclude(issues, config.exclusions) 

269 issues = sorted(set(issues)) 

270 

271 # Use by doing `CDP_DEBUG=true uv run ...` 

272 if os.getenv("CDP_DEBUG"): 

273 rprint("", properties) 

274 for issue in issues: 

275 rprint(issue) 

276 rprint(explain([issue])) 

277 

278 if error and issues: 

279 raise DataPackageError(issues) 

280 

281 return issues 

282 

283 

284def _check_keys(properties: dict[str, Any], issues: list[Issue]) -> list[Issue]: 

285 """Check that primary and foreign keys exist.""" 

286 # Primary keys 

287 resources_with_pk = _get_fields_at_jsonpath( 

288 "$.resources[?(length(@.schema.primaryKey) > 0 || @.schema.primaryKey == '')]", 

289 properties, 

290 ) 

291 resources_with_pk = _keep_resources_with_no_issue_at_property( 

292 resources_with_pk, issues, "schema.primaryKey" 

293 ) 

294 key_issues = _flat_map(resources_with_pk, _check_primary_key) 

295 

296 # Foreign keys 

297 resources_with_fk = _get_fields_at_jsonpath( 

298 "$.resources[?(length(@.schema.foreignKeys) > 0)]", 

299 properties, 

300 ) 

301 resources_with_fk = _keep_resources_with_no_issue_at_property( 

302 resources_with_fk, issues, "schema.foreignKeys" 

303 ) 

304 key_issues += _flat_map( 

305 resources_with_fk, 

306 lambda resource: _check_foreign_keys(resource, properties), 

307 ) 

308 return key_issues 

309 

310 

311def _issues_at_property( 

312 resource: PropertyField, issues: list[Issue], jsonpath: str 

313) -> list[Issue]: 

314 return _filter( 

315 issues, 

316 lambda issue: f"{resource.jsonpath}.{jsonpath}" in issue.jsonpath, 

317 ) 

318 

319 

320def _keep_resources_with_no_issue_at_property( 

321 resources: list[PropertyField], issues: list[Issue], jsonpath: str 

322) -> list[PropertyField]: 

323 """Filter out resources that have an issue at or under the given `jsonpath`.""" 

324 return _filter( 

325 resources, 

326 lambda resource: not _issues_at_property(resource, issues, jsonpath), 

327 ) 

328 

329 

330def _check_primary_key(resource: PropertyField) -> list[Issue]: 

331 """Check that primary key fields exist in the resource.""" 

332 pk_fields = resolve("/schema/primaryKey", resource.value) 

333 pk_fields_list = _key_fields_as_str_list(pk_fields) 

334 unknown_fields = _get_unknown_key_fields(pk_fields_list, resource.value) 

335 

336 if not unknown_fields: 

337 return [] 

338 

339 return [ 

340 Issue( 

341 jsonpath=f"{resource.jsonpath}.schema.primaryKey", 

342 type="primary-key", 

343 message=( 

344 f"No fields found in resource for primary key fields: {unknown_fields}." 

345 ), 

346 instance=pk_fields, 

347 ) 

348 ] 

349 

350 

351def _check_foreign_keys( 

352 resource: PropertyField, properties: dict[str, Any] 

353) -> list[Issue]: 

354 """Check that foreign key source and destination fields exist.""" 

355 # Safe, as only FKs of the correct type here 

356 foreign_keys = cast( 

357 list[dict[str, Any]], resolve("/schema/foreignKeys", resource.value) 

358 ) 

359 foreign_keys_diff_resource = _filter( 

360 foreign_keys, 

361 lambda fk: "resource" in fk["reference"] and fk["reference"]["resource"] != "", 

362 ) 

363 foreign_keys_same_resource = _filter( 

364 foreign_keys, lambda fk: fk not in foreign_keys_diff_resource 

365 ) 

366 

367 issues = _flat_map(foreign_keys, lambda fk: _check_fk_source_fields(fk, resource)) 

368 issues += _flat_map( 

369 foreign_keys_same_resource, 

370 lambda fk: _check_fk_dest_fields_same_resource(fk, resource), 

371 ) 

372 issues += _flat_map( 

373 foreign_keys_diff_resource, 

374 lambda fk: _check_fk_dest_fields_diff_resource(fk, resource, properties), 

375 ) 

376 

377 return issues 

378 

379 

380def _key_fields_as_str_list(key_fields: Any) -> list[str]: 

381 """Returns the list representation of primary and foreign key fields. 

382 

383 Key fields can be represented either as a string (containing one field name) 

384 or a list of strings. 

385 

386 The input should contain a correctly typed `key_fields` object. 

387 """ 

388 if not isinstance(key_fields, list): 

389 key_fields = [key_fields] 

390 return cast(list[str], key_fields) 

391 

392 

393def _get_unknown_key_fields( 

394 key_fields: list[str], properties: dict[str, Any], resource_path: str = "" 

395) -> str: 

396 """Return the key fields that don't exist on the specified resource.""" 

397 known_fields = findall(f"{resource_path}schema.fields[*].name", properties) 

398 unknown_fields = _filter(key_fields, lambda field: field not in known_fields) 

399 unknown_fields = _map(unknown_fields, lambda field: f"{field!r}") 

400 return ", ".join(unknown_fields) 

401 

402 

403def _check_fk_source_fields( 

404 foreign_key: dict[str, Any], resource: PropertyField 

405) -> list[Issue]: 

406 """Check that foreign key source fields exist and have the correct number.""" 

407 issues = [] 

408 source_fields = resolve("/fields", foreign_key) 

409 source_field_list = _key_fields_as_str_list(source_fields) 

410 unknown_fields = _get_unknown_key_fields(source_field_list, resource.value) 

411 if unknown_fields: 

412 issues.append( 

413 Issue( 

414 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields", 

415 type="foreign-key-source-fields", 

416 message=( 

417 "No fields found in resource for foreign key source fields: " 

418 f"{unknown_fields}." 

419 ), 

420 instance=source_fields, 

421 ) 

422 ) 

423 

424 dest_fields = _key_fields_as_str_list(resolve("/reference/fields", foreign_key)) 

425 if len(source_field_list) != len(dest_fields): 

426 issues.append( 

427 Issue( 

428 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields", 

429 type="foreign-key-source-fields", 

430 message=( 

431 "The number of foreign key source fields must be the same as " 

432 "the number of foreign key destination fields." 

433 ), 

434 instance=source_fields, 

435 ) 

436 ) 

437 return issues 

438 

439 

440def _check_fk_dest_fields_same_resource( 

441 foreign_key: dict[str, Any], 

442 resource: PropertyField, 

443) -> list[Issue]: 

444 """Check that foreign key destination fields exist on the same resource.""" 

445 dest_fields = resolve("/reference/fields", foreign_key) 

446 dest_field_list = _key_fields_as_str_list(dest_fields) 

447 unknown_fields = _get_unknown_key_fields(dest_field_list, resource.value) 

448 if not unknown_fields: 

449 return [] 

450 

451 return [ 

452 Issue( 

453 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields", 

454 type="foreign-key-destination-fields", 

455 message=( 

456 "No fields found in resource for foreign key " 

457 f"destination fields: {unknown_fields}." 

458 ), 

459 instance=dest_fields, 

460 ) 

461 ] 

462 

463 

464def _check_fk_dest_fields_diff_resource( 

465 foreign_key: dict[str, Any], resource: PropertyField, properties: dict[str, Any] 

466) -> list[Issue]: 

467 """Check that foreign key destination fields exist on the destination resource.""" 

468 dest_fields = resolve("/reference/fields", foreign_key) 

469 dest_field_list = _key_fields_as_str_list(dest_fields) 

470 # Safe, as only keys of the correct type here 

471 dest_resource_name = cast(str, resolve("/reference/resource", foreign_key)) 

472 

473 dest_resource_path = f"resources[?(@.name == '{dest_resource_name}')]" 

474 if not findall(dest_resource_path, properties): 

475 return [ 

476 Issue( 

477 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.resource", 

478 type="foreign-key-destination-resource", 

479 message=( 

480 f"The destination resource {dest_resource_name!r} of this foreign " 

481 "key doesn't exist in the package." 

482 ), 

483 instance=dest_resource_name, 

484 ) 

485 ] 

486 

487 unknown_fields = _get_unknown_key_fields( 

488 dest_field_list, properties, f"{dest_resource_path}." 

489 ) 

490 if not unknown_fields: 

491 return [] 

492 

493 return [ 

494 Issue( 

495 jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields", 

496 type="foreign-key-destination-fields", 

497 message=( 

498 f"No fields found in destination resource {dest_resource_name!r} " 

499 f"for foreign key destination fields: {unknown_fields}." 

500 ), 

501 instance=dest_fields, 

502 ) 

503 ] 

504 

505 

506def _set_should_fields_to_required(schema: dict[str, Any]) -> dict[str, Any]: 

507 """Set 'SHOULD' fields to 'REQUIRED' in the schema.""" 

508 should_fields = ("name", "id", "licenses") 

509 name_pattern = r"^[a-z0-9._-]+$" 

510 

511 # From https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string 

512 semver_pattern = ( 

513 r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)" 

514 r"(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0" 

515 r"|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>" 

516 r"[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" 

517 ) 

518 

519 # Convert to required 

520 schema["required"].extend(should_fields) 

521 schema["properties"]["name"]["pattern"] = name_pattern 

522 schema["properties"]["version"]["pattern"] = semver_pattern 

523 schema["properties"]["contributors"]["items"]["required"] = ["title"] 

524 schema["properties"]["sources"]["items"]["required"] = ["title"] 

525 schema["properties"]["resources"]["items"]["properties"]["name"]["pattern"] = ( 

526 name_pattern 

527 ) 

528 return schema 

529 

530 

531def _check_object_against_json_schema( 

532 json_object: dict[str, Any], schema: dict[str, Any] 

533) -> list[Issue]: 

534 """Checks that `json_object` matches the given JSON schema. 

535 

536 Structural, type and format constraints are all checked. All schema violations are 

537 collected before issues are returned. 

538 

539 Args: 

540 json_object: The JSON object to check. 

541 schema: The JSON schema to check against. 

542 

543 Returns: 

544 A list of issues. An empty list, if no issues are found. 

545 

546 Raises: 

547 jsonschema.exceptions.SchemaError: If the given schema is invalid. 

548 """ 

549 Draft7Validator.check_schema(schema) 

550 validator = Draft7Validator(schema, format_checker=FormatChecker()) 

551 return _validation_errors_to_issues(validator.iter_errors(json_object)) 

552 

553 

554@dataclass(frozen=True) 

555class SchemaError: 

556 """A simpler representation of `ValidationError` for easier processing. 

557 

558 Attributes: 

559 message (str): The error message generated by `jsonschema`. 

560 type (str): The type of the error (e.g., a JSON schema type such as "required", 

561 "type", "pattern", or "format", or a custom type). 

562 schema_path (str): The path to the violated check in the JSON schema. 

563 Path components are separated by '/'. 

564 jsonpath (str): The JSON path to the field that violates the check. 

565 instance (Any): The part of the object that failed the check. 

566 schema_value (Optional[Any]): The expected value that is checked against, 

567 which is part of the schema violated by this error. 

568 parent (Optional[SchemaError]): The error group the error belongs to, if any. 

569 """ 

570 

571 message: str 

572 type: str 

573 schema_path: str 

574 jsonpath: str 

575 instance: Any 

576 schema_value: Optional[Any] = None 

577 parent: Optional["SchemaError"] = None 

578 

579 

580def _validation_errors_to_issues( 

581 validation_errors: Iterator[ValidationError], 

582) -> list[Issue]: 

583 """Transforms `jsonschema.ValidationError`s to more compact `Issue`s. 

584 

585 Args: 

586 validation_errors: The `jsonschema.ValidationError`s to transform. 

587 

588 Returns: 

589 A list of `Issue`s. 

590 """ 

591 schema_errors = _flat_map(validation_errors, _validation_error_to_schema_errors) 

592 grouped_errors = _filter(schema_errors, lambda error: error.type in GROUP_ERRORS) 

593 schema_errors = reduce(_handle_grouped_error, grouped_errors, schema_errors) 

594 

595 return _map(schema_errors, _create_issue) 

596 

597 

598@dataclass(frozen=True) 

599class SchemaErrorEdits: 

600 """Expresses which errors to add to or remove from schema errors.""" 

601 

602 add: list[SchemaError] = field(default_factory=list) 

603 remove: list[SchemaError] = field(default_factory=list) 

604 

605 

606def _handle_S_resources_x( 

607 parent_error: SchemaError, 

608 schema_errors: list[SchemaError], 

609) -> SchemaErrorEdits: 

610 """Do not flag missing `path` and `data` separately.""" 

611 edits = SchemaErrorEdits() 

612 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

613 # If the parent error is caused by other errors, remove it 

614 if errors_in_group: 

615 edits.remove.append(parent_error) 

616 

617 path_or_data_required_errors = _filter( 

618 errors_in_group, _path_or_data_required_error 

619 ) 

620 # If path and data are both missing, add a more informative error 

621 if len(path_or_data_required_errors) > 1: 

622 edits.add.append( 

623 SchemaError( 

624 message=( 

625 "This resource has no `path` or `data` field. " 

626 "One of them must be provided." 

627 ), 

628 type="required", 

629 jsonpath=parent_error.jsonpath, 

630 schema_path=parent_error.schema_path, 

631 instance=parent_error.instance, 

632 ) 

633 ) 

634 

635 # Remove all required errors on path and data 

636 edits.remove.extend(path_or_data_required_errors) 

637 return edits 

638 

639 

640def _handle_S_resources_x_path( 

641 parent_error: SchemaError, 

642 schema_errors: list[SchemaError], 

643) -> SchemaErrorEdits: 

644 """Only flag errors for the relevant type. 

645 

646 If `path` is a string, flag errors for the string-based schema. 

647 If `path` is an array, flag errors for the array-based schema. 

648 """ 

649 edits = SchemaErrorEdits() 

650 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

651 type_errors = _filter(errors_in_group, _is_path_type_error) 

652 only_type_errors = len(errors_in_group) == len(type_errors) 

653 

654 if type_errors: 

655 edits.remove.append(parent_error) 

656 

657 # If the only error is that $.resources[x].path is of the wrong type, 

658 # add a more informative error 

659 if only_type_errors: 

660 edits.add.append( 

661 SchemaError( 

662 message="The `path` property must be either a string or an array.", 

663 type="type", 

664 jsonpath=type_errors[0].jsonpath, 

665 schema_path=type_errors[0].schema_path, 

666 instance=parent_error.instance, 

667 ) 

668 ) 

669 

670 # Remove all original type errors on $.resources[x].path 

671 edits.remove.extend(type_errors) 

672 return edits 

673 

674 

675def _handle_S_resources_x_schema_fields_x( 

676 parent_error: SchemaError, 

677 schema_errors: list[SchemaError], 

678) -> SchemaErrorEdits: 

679 """Only flag errors for the relevant field type. 

680 

681 E.g., if the field type is `string`, flag errors for the string-based schema only. 

682 """ 

683 edits = SchemaErrorEdits(remove=[parent_error]) 

684 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

685 

686 parent_instance = parent_error.instance 

687 if not isinstance(parent_instance, dict): 

688 return edits 

689 

690 field_type: str = parent_instance.get("type", "string") 

691 

692 # The field's type is unknown 

693 if field_type not in FIELD_TYPES: 

694 unknown_field_error = SchemaError( 

695 message=( 

696 "The type property in this resource schema field is incorrect. " 

697 f"The value can only be one of these types: {', '.join(FIELD_TYPES)}." 

698 ), 

699 type="enum", 

700 jsonpath=f"{parent_error.jsonpath}.type", 

701 schema_path=parent_error.schema_path, 

702 instance=parent_instance, 

703 ) 

704 # Replace all errors with an unknown field error 

705 edits.add.append(unknown_field_error) 

706 edits.remove.extend(errors_in_group) 

707 return edits 

708 

709 # The field's type is known; keep only errors for this field type 

710 schema_index = FIELD_TYPES.index(field_type) 

711 

712 errors_for_other_types = _filter( 

713 errors_in_group, 

714 lambda error: f"fields/items/oneOf/{schema_index}/" not in error.schema_path, 

715 ) 

716 edits.remove.extend(errors_for_other_types) 

717 return edits 

718 

719 

720def _handle_S_resources_x_schema_fields_x_constraints_enum( 

721 parent_error: SchemaError, 

722 schema_errors: list[SchemaError], 

723) -> SchemaErrorEdits: 

724 """Only flag errors for the relevant field type and simplify errors.""" 

725 edits = SchemaErrorEdits(remove=[parent_error]) 

726 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

727 

728 # Remove errors for other field types 

729 if _not_field_type_error(parent_error): 

730 edits.remove.extend(errors_in_group) 

731 return edits 

732 

733 value_errors = _filter( 

734 errors_in_group, 

735 lambda error: not error.jsonpath.endswith("enum"), 

736 ) 

737 

738 # If there are only value errors, simplify them 

739 if value_errors == errors_in_group: 

740 edits.add.append(_get_enum_values_error(parent_error, value_errors)) 

741 

742 # Otherwise, keep only top-level enum errors 

743 edits.remove.extend(value_errors) 

744 return edits 

745 

746 

747def _get_enum_values_error( 

748 parent_error: SchemaError, 

749 value_errors: list[SchemaError], 

750) -> SchemaError: 

751 message = "All enum values must be the same type." 

752 same_type = len(set(_map(parent_error.instance, lambda value: type(value)))) == 1 

753 if same_type: 

754 allowed_types = set(_map(value_errors, lambda error: str(error.schema_value))) 

755 message = ( 

756 "The enum value type is not correct. Enum values should be " 

757 f"one of {', '.join(allowed_types)}." 

758 ) 

759 return SchemaError( 

760 message=message, 

761 type="type", 

762 schema_path=value_errors[0].schema_path, 

763 jsonpath=_strip_index(value_errors[0].jsonpath), 

764 instance=value_errors[0].instance, 

765 ) 

766 

767 

768def _not_field_type_error(parent_error: SchemaError) -> bool: 

769 if not parent_error.parent: 

770 return True 

771 field_type: str = parent_error.parent.instance.get("type", "string") 

772 if field_type not in FIELD_TYPES: 

773 return True 

774 schema_index = FIELD_TYPES.index(field_type) 

775 return f"fields/items/oneOf/{schema_index}/" not in parent_error.schema_path 

776 

777 

778def _handle_S_resources_x_schema_primary_key( 

779 parent_error: SchemaError, 

780 schema_errors: list[SchemaError], 

781) -> SchemaErrorEdits: 

782 """Only flag errors for the relevant type and simplify errors.""" 

783 PRIMARY_KEY_TYPES: tuple[type[Any], ...] = (list, str) 

784 edits = SchemaErrorEdits(remove=[parent_error]) 

785 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

786 

787 key_type = type(parent_error.instance) 

788 if key_type in PRIMARY_KEY_TYPES: 

789 schema_for_type = f"primaryKey/oneOf/{PRIMARY_KEY_TYPES.index(key_type)}/" 

790 edits.remove.extend( 

791 _filter( 

792 errors_in_group, 

793 lambda error: schema_for_type not in error.schema_path, 

794 ) 

795 ) 

796 return edits 

797 

798 edits.remove.extend(errors_in_group) 

799 edits.add.append( 

800 SchemaError( 

801 message="The `primaryKey` property must be a string or an array.", 

802 type="type", 

803 jsonpath=parent_error.jsonpath, 

804 schema_path=parent_error.schema_path, 

805 instance=parent_error.instance, 

806 ) 

807 ) 

808 

809 return edits 

810 

811 

812def _handle_S_resources_x_schema_foreign_keys( 

813 parent_error: SchemaError, 

814 schema_errors: list[SchemaError], 

815) -> SchemaErrorEdits: 

816 """Only flag errors for the relevant type and simplify errors. 

817 

818 The sub-schema to use is determined based on the type of the top-level foreign 

819 key fields property. 

820 """ 

821 FOREIGN_KEY_TYPES: tuple[type[Any], ...] = (list, str) 

822 edits = SchemaErrorEdits(remove=[parent_error]) 

823 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

824 

825 parent_instance = parent_error.instance 

826 key_exists = isinstance(parent_instance, dict) and "fields" in parent_instance 

827 

828 # If the key type is correct, use that schema 

829 if ( 

830 key_exists 

831 and (key_type := type(parent_instance["fields"])) in FOREIGN_KEY_TYPES 

832 ): 

833 schema_part = f"foreignKeys/items/oneOf/{FOREIGN_KEY_TYPES.index(key_type)}/" 

834 edits.remove.extend( 

835 _filter( 

836 errors_in_group, 

837 lambda error: schema_part not in error.schema_path, 

838 ) 

839 ) 

840 return edits 

841 

842 # If the key type is incorrect, remove all errors that depend on it 

843 key_type_errors = _filter( 

844 errors_in_group, 

845 lambda error: ( 

846 error.schema_path.endswith("fields/type") 

847 or "reference/properties/fields" in error.schema_path 

848 ), 

849 ) 

850 edits.remove.extend(key_type_errors) 

851 

852 # If the key exists, flag incorrect type 

853 if key_exists: 

854 edits.add.append( 

855 SchemaError( 

856 message=( 

857 "The `fields` property of a foreign key must be a string or " 

858 "an array." 

859 ), 

860 type="type", 

861 jsonpath=f"{parent_error.jsonpath}.fields", 

862 schema_path=parent_error.schema_path, 

863 instance=parent_error.instance, 

864 ) 

865 ) 

866 

867 return edits 

868 

869 

870def _handle_licenses( 

871 parent_error: SchemaError, 

872 schema_errors: list[SchemaError], 

873) -> SchemaErrorEdits: 

874 """Only include one error if both `name` and `path` are missing.""" 

875 errors_in_group = _get_errors_in_group(schema_errors, parent_error) 

876 return SchemaErrorEdits( 

877 remove=errors_in_group + [parent_error], 

878 add=[ 

879 SchemaError( 

880 message=( 

881 "Licenses must have at least one of the following properties: " 

882 "`name`, `path`." 

883 ), 

884 type="required", 

885 schema_path=parent_error.schema_path, 

886 jsonpath=parent_error.jsonpath, 

887 instance=parent_error.instance, 

888 ) 

889 ], 

890 ) 

891 

892 

893_schema_path_to_handler: list[ 

894 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]] 

895] = [ 

896 ("resources/items/oneOf", _handle_S_resources_x), 

897 ("resources/items/properties/path/oneOf", _handle_S_resources_x_path), 

898 ("fields/items/oneOf", _handle_S_resources_x_schema_fields_x), 

899 ( 

900 "constraints/properties/enum/oneOf", 

901 _handle_S_resources_x_schema_fields_x_constraints_enum, 

902 ), 

903 ("primaryKey/oneOf", _handle_S_resources_x_schema_primary_key), 

904 ("foreignKeys/items/oneOf", _handle_S_resources_x_schema_foreign_keys), 

905 ("licenses/items/anyOf", _handle_licenses), 

906] 

907 

908 

909def _handle_grouped_error( 

910 schema_errors: list[SchemaError], parent_error: SchemaError 

911) -> list[SchemaError]: 

912 """Handle grouped schema errors that need special treatment. 

913 

914 Args: 

915 schema_errors: All remaining schema errors. 

916 parent_error: The parent error of a group. 

917 

918 Returns: 

919 The schema errors after processing. 

920 """ 

921 

922 def _get_edits( 

923 handlers: list[ 

924 tuple[str, Callable[[SchemaError, list[SchemaError]], SchemaErrorEdits]] 

925 ], 

926 ) -> SchemaErrorEdits: 

927 schema_path, handler = handlers[0] 

928 edits = SchemaErrorEdits() 

929 if parent_error.schema_path.endswith(schema_path): 

930 edits = handler(parent_error, schema_errors) 

931 

932 if len(handlers) == 1: 

933 return edits 

934 

935 next_edits = _get_edits(handlers[1:]) 

936 return SchemaErrorEdits( 

937 add=edits.add + next_edits.add, 

938 remove=edits.remove + next_edits.remove, 

939 ) 

940 

941 edits = _get_edits(_schema_path_to_handler) 

942 return _filter(schema_errors, lambda error: error not in edits.remove) + edits.add 

943 

944 

945def _validation_error_to_schema_errors(error: ValidationError) -> list[SchemaError]: 

946 current = [_create_schema_error(error)] 

947 if not error.context: 

948 return current 

949 

950 return current + _flat_map(error.context, _validation_error_to_schema_errors) 

951 

952 

953def _get_full_json_path_from_error(error: ValidationError) -> str: 

954 """Returns the full `json_path` to the error. 

955 

956 For 'required' errors, the field name is extracted from the error message. 

957 

958 Args: 

959 error: The error to get the full `json_path` for. 

960 

961 Returns: 

962 The full `json_path` of the error. 

963 """ 

964 if str(error.validator) == "required": 

965 match = re.search("'(.*)' is a required property", error.message) 

966 if match: 

967 return f"{error.json_path}.{match.group(1)}" 

968 return error.json_path 

969 

970 

971def _create_schema_error(error: ValidationError) -> SchemaError: 

972 return SchemaError( 

973 message=error.message, 

974 type=str(error.validator), 

975 jsonpath=_get_full_json_path_from_error(error), 

976 schema_path="/".join(_map(error.absolute_schema_path, str)), 

977 instance=error.instance, 

978 schema_value=error.validator_value, 

979 parent=_create_schema_error(error.parent) if error.parent else None, # type: ignore[arg-type] 

980 ) 

981 

982 

983def _path_or_data_required_error(error: SchemaError) -> bool: 

984 return error.jsonpath.endswith(("path", "data")) and error.type == "required" 

985 

986 

987def _is_path_type_error(error: SchemaError) -> bool: 

988 return error.type == "type" and error.jsonpath.endswith("path") 

989 

990 

991def _create_issue(error: SchemaError) -> Issue: 

992 return Issue( 

993 message=error.message, 

994 jsonpath=error.jsonpath, 

995 type=error.type, 

996 instance=error.instance, 

997 ) 

998 

999 

1000def _get_errors_in_group( 

1001 schema_errors: list[SchemaError], parent_error: SchemaError 

1002) -> list[SchemaError]: 

1003 return _filter(schema_errors, lambda error: error.parent == parent_error) 

1004 

1005 

1006def _strip_index(jsonpath: str) -> str: 

1007 return re.sub(r"\[\d+\]$", "", jsonpath) 

1008 

1009 

1010# Set up exception hooks at module load time 

1011_setup_suppressed_tracebacks(DataPackageError)