Source code for jscc.schema

"""Methods for interacting with or reasoning about JSON Schema and CSV codelists."""

from collections import UserDict
from copy import deepcopy

import json_merge_patch

from jscc.exceptions import DuplicateKeyError
from jscc.testing.util import http_get


[docs] def is_codelist(fieldnames): """ :param list fieldnames: the fieldnames of the CSV :returns: whether the CSV is a codelist :rtype: bool """ # OCDS uses titlecase. BODS uses lowercase. return "Code" in fieldnames or "code" in fieldnames
[docs] def is_json_schema(data): """ :param dict data: JSON data :returns: whether the JSON data is a JSON Schema :rtype: bool """ return "$schema" in data or "definitions" in data or "$defs" in data or "properties" in data
[docs] def is_json_merge_patch(data): """ :param dict data: JSON data :returns: whether the JSON data is a JSON Merge Patch :rtype: bool """ return "$schema" not in data and ("definitions" in data or "$defs" in data or "properties" in data)
[docs] def is_array_of_objects(field): """ :param dict field: the field :returns: whether a field is an array of objects :rtype: bool """ return "array" in field.get("type", []) and any(key in field.get("items", {}) for key in ("$ref", "properties"))
[docs] def is_missing_property(field, prop): """ :param dict field: the field :param str prop: the property :returns: whether a field's property isn't set, is empty, or is whitespace :rtype: bool """ return ( prop not in field or (not field[prop] and not isinstance(field[prop], (bool, int, float))) or (isinstance(field[prop], str) and not field[prop].strip()) )
[docs] def get_types(field): """ Return a field's "type" as a list. :param dict field: the field :returns: a field's "type" :rtype: list """ if "type" not in field: return [] if isinstance(field["type"], str): return [field["type"]] return field["type"]
[docs] def extend_schema(basename, schema, metadata, codelists=None): """ Patches a JSON Schema with an extension's dependencies, recursively. If :code:`codelists` is provided, it will be updated with the codelists from the dependencies. .. attention:: No timeout is set. If a user can input malicious ``metadata`` with unresponsive ``dependencies`` or ``testDependencies`` URLs, the program can hang indefinitely. .. attention:: This function is vulnerable to server-side request forgery (SSRF). A user can create an extension whose dependencies point to internal resources, which would receive a GET request. :param str basename: the JSON Schema file's basename :param dict schema: the JSON Schema file's parsed contents :param dict metadata: the extension metadata file's parsed contents :param set codelists: any set :returns: the patched schema :rtype: dict """ def recurse(metadata): for metadata_url in metadata.get("dependencies", []) + metadata.get("testDependencies", []): patch_url = f"{metadata_url.rsplit('/', 1)[0]}/{basename}" metadata = http_get(metadata_url).json() patch = http_get(patch_url).json() if codelists is not None: codelists.update(metadata.get("codelists", [])) json_merge_patch.merge(patched, patch) recurse(metadata) patched = deepcopy(schema) recurse(metadata) return patched
[docs] class RejectingDict(UserDict): """A ``dict`` that raises an error if a key is set more than once.""" # See https://tools.ietf.org/html/rfc7493#section-2.3 def __setitem__(self, k, v): """Raise :class:`~jscc.exceptions.DuplicateKeyError` if key already exists.""" if k in self: raise DuplicateKeyError(k) return super().__setitem__(k, v)
[docs] def rejecting_dict(pairs): """Allow a key to be set at most once. Use as an ``object_pairs_hook`` method.""" # Return the wrapped dict, not the RejectingDict itself, because jsonschema checks the type. return RejectingDict(pairs).data