"""
- ``get_*`` functions to check for empty files, misindented JSON files, and invalid JSON files.
See "pytest example" in each function's documentation for usage examples.
- ``validate_*`` methods to test JSON Schema.
Each method's behavior is customizable, and not all methods are relevant to all schema.
The typical usage is to first define a test method like so:
.. code-block:: python
from jscc.testing.filesystem import walk_json_data
from jscc.schema import is_json_schema
from jscc.testing.util import http_get
schemas = [(path, name, data) for path, name, _, data in walk_json_data() if is_json_schema(data)]
metaschema = http_get('http://json-schema.org/draft-04/schema').json()
@pytest.mark.parametrize('path,name,data', schemas)
def test_schema_valid(path, name, data):
validate_json_schema(path, name, data, metaschema)
You can edit ``metaschema`` to be more strict and/or to add new properties. Then, define the ``validate_json_schema``
method that uses the ``validate_*`` methods. For example:
.. code-block:: python
import jsonref
from jscc.testing.checks import (
validate_array_items,
validate_codelist_enum,
validate_deep_properties,
validate_items_type,
validate_letter_case,
validate_merge_properties,
validate_metadata_presence,
validate_null_type,
validate_object_id,
validate_ref,
validate_schema,
)
from jsonschema import FormatChecker
from jsonschema.validators import Draft4Validator
validator = Draft4Validator(Draft4Validator.META_SCHEMA, format_checker=FormatChecker())
def validate_json_schema(path, name, data, schema):
errors = 0
errors += validate_schema(path, data, validator)
errors += validate_array_items(path, data)
errors += validate_items_type(path, data)
errors += validate_codelist_enum(path, data)
errors += validate_letter_case(path, data)
errors += validate_merge_properties(path, data)
errors += validate_ref(path, data)
errors += validate_metadata_presence(path, data)
errors += validate_object_id(path, jsonref.replace_refs(data))
errors += validate_null_type(path, data)
# Here, we don't add to `errors`, in order to not count these warnings as errors.
validate_deep_properties(path, data)
assert not errors, "One or more JSON Schema files are invalid. See warnings below."
You can monkeypatch ``warnings.formatwarning`` to customize and abbreviate the warning messages:
.. code-block:: python
import os
import warnings
from jscc.exceptions import DeepPropertiesWarning
cwd = os.getcwd()
def formatwarning(message, category, filename, lineno, line=None):
# Prefix warnings that count as errors with "ERROR: ".
if category != DeepPropertiesWarning:
message = 'ERROR: ' + message
# Remove the path to the current working directory.
return str(message).replace(cwd + os.sep, '')
warnings.formatwarning = formatwarning
"""
# In this module, it would be simpler to use a tuple for each `pointer` keyword argument, especially when calculating
# the last component of the pointer (the "parent"). However, it's easier when *using* the module to write JSON Pointers
# as strings. Therefore, we use strings instead of tuples.
import json
import os
import re
from warnings import warn
import jsonref
from jscc.exceptions import (
CodelistEnumWarning,
DeepPropertiesWarning,
DuplicateKeyError,
ItemsTypeWarning,
LetterCaseWarning,
MergePropertiesWarning,
MetadataPresenceWarning,
NullTypeWarning,
ObjectIdWarning,
RefWarning,
SchemaCodelistsMatchWarning,
SchemaWarning,
)
from jscc.schema import get_types, is_array_of_objects, is_codelist, is_missing_property, rejecting_dict
from jscc.testing.filesystem import tracked, walk, walk_csv_data, walk_json_data
from jscc.testing.util import difference
def _true(*args):
"""Return ``True`` (used internally as a default method)."""
return True
def _false(*args):
"""Return ``False`` (used internally as a default method)."""
return False
[docs]
def get_empty_files(include=_true, **kwargs):
"""
Yield the path (as a tuple) of any file that is empty.
JSON files are empty if their parsed contents are empty (empty array, empty object, empty string or ``null``).
Other files are empty if they contain whitespace only.
:param function include: a method that accepts a file path and file name, and returns whether to test the file
(default true)
pytest example::
from jscc.testing.checks import get_empty_files
from jscc.testing.util import warn_and_assert
def test_empty():
warn_and_assert(get_empty_files(), '{0} is empty, run: rm {0}',
'Files are empty. See warnings below.')
"""
for path, name in walk(**kwargs):
if tracked(path) and include(path, name) and name != '__init__.py':
try:
with open(path) as f:
text = f.read()
except UnicodeDecodeError:
continue # the file is non-empty, and might be binary
if not text.strip():
yield (path,)
elif name.endswith('.json'):
try:
value = json.loads(text)
if not value and not isinstance(value, (bool, int, float)):
yield (path,)
except json.decoder.JSONDecodeError:
continue # the file is non-empty
[docs]
def get_misindented_files(include=_true, **kwargs):
r"""
Yield the path (as a tuple) of any JSON file that isn't formatted for humans.
JSON files must be indented with two spaces, mustn't escape non-ASCII characters (no ``\uXXXX`` sequences), and
must have a newline at end of file.
:param function include: a method that accepts a file path and file name, and returns whether to test the file
(default true)
pytest example::
from jscc.testing.checks import get_misindented_files
from jscc.testing.util import warn_and_assert
def test_indent():
warn_and_assert(get_misindented_files(), '{0} is not indented as expected, run: ocdskit indent {0}',
'Files are not indented as expected. See warnings below, or run: ocdskit indent -r .')
"""
for path, name, text, data in walk_json_data(**kwargs):
if tracked(path) and include(path, name):
expected = json.dumps(data, ensure_ascii=False, indent=2) + '\n'
if text != expected:
yield (path,)
[docs]
def get_invalid_json_files(**kwargs):
"""
Yield the path and exception (as a tuple) of any JSON file that isn't valid.
JSON files must be parsed without error by the ``json`` module, and JSON objects mustn't have duplicate keys.
See https://tools.ietf.org/html/rfc7493#section-2.3
pytest example::
from jscc.testing.checks import get_invalid_json_files
from jscc.testing.util import warn_and_assert
def test_invalid_json():
warn_and_assert(get_invalid_json_files(), '{0} is not valid JSON: {1}',
'JSON files are invalid. See warnings below.')
"""
for path, _ in walk(**kwargs):
if path.endswith('.json'):
with open(path) as f:
text = f.read()
if text:
try:
json.loads(text, object_pairs_hook=rejecting_dict)
except (json.decoder.JSONDecodeError, DuplicateKeyError) as e:
yield path, e
[docs]
def validate_schema(path, data, validator): # noqa: ARG001 # consistency
"""
Warn and return the number of errors relating to JSON Schema validation.
Uses the `jsonschema <https://python-jsonschema.readthedocs.io/>`__ module.
:param validator: The validator to use
:returns: the number of errors
:rtype: int
"""
errors = 0
for error in validator.iter_errors(data):
errors += 1
warn(f"{json.dumps(error.instance, indent=2)}\n{error.message} ({'/'.join(error.absolute_schema_path)})\n",
SchemaWarning)
return errors
[docs]
def validate_letter_case(*args, property_exceptions=(), definition_exceptions=()):
"""
Warn and return the number of errors relating to the letter case of properties and definitions.
Property names must be lowerCamelCase. Definition names must be UpperCamelCase. All must be ASCII letters only.
:param property_exceptions: property names to ignore
:type property_exceptions: list, tuple or set
:param definition_exceptions: definition names to ignore
:type definition_exceptions: list, tuple or set
:returns: the number of errors
:rtype: int
"""
def block(path, data, pointer):
errors = 0
parent = pointer.rsplit('/', 1)[-1]
if parent == 'properties':
for key in data:
if not re.search(r'^[a-z][A-Za-z]+$', key) and key not in property_exceptions:
errors += 1
warn(f"{path}: {pointer}/{key} field isn't lowerCamelCase ASCII letters", LetterCaseWarning)
elif parent in {'definitions', '$defs'}:
for key in data:
if not re.search(r'^[A-Z][A-Za-z]+$', key) and key not in definition_exceptions:
errors += 1
warn(f"{path}: {pointer}/{key} block isn't UpperCamelCase ASCII letters", LetterCaseWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_null_type(path, data, pointer='', *, no_null=False, expect_null=True, allow_object_null=(),
allow_no_null=(), allow_null=()):
"""
Warn and return the number of errors relating to non-nullable optional fields and nullable required fields.
If :code:`no_null` is ``True``, then "type" properties mustn't include "null".
Otherwise, the "type" properties for objects and arrays of objects mustn't include "null". If a field is `required
<https://tools.ietf.org/html/draft-fge-json-schema-validation-00#section-5.4.3>`__, then its "type" property
mustn't include "null". If a field isn't required (and it isn't an object or array of objects), then its "type"
property must include "null".
:param bool no_null: whether the standard disallows "null" in the "type" property of any field
:param bool expect_null: whether the field, in context, is expected to have "null" in its "type" property
:param allow_object_null: JSON Pointers of fields whose "type" properties are allowed to include "object", "null"
:type allow_object_null: list, tuple or set
:param allow_no_null: JSON Pointers of fields whose "type" properties are allowed to exclude "null"
:type allow_no_null: list, tuple or set
:param allow_null: JSON Pointers of fields whose "type" properties are allowed to include "null"
:type allow_null: list, tuple or set
:returns: the number of errors
:rtype: int
"""
errors = 0
kwargs = {
'allow_object_null': allow_object_null,
'allow_no_null': allow_no_null,
'allow_null': allow_null,
}
if no_null:
expect_null = False
if isinstance(data, list):
for index, item in enumerate(data):
errors += validate_null_type(path, item, pointer=f'{pointer}/{index}', **kwargs, no_null=no_null)
elif isinstance(data, dict):
if 'type' in data and pointer:
null_in_type = 'null' in data['type']
null_not_allowed = 'object' in data['type'] or is_array_of_objects(data)
# Objects and arrays of objects mustn't be nullable.
if null_in_type and null_not_allowed and pointer not in allow_object_null:
errors += 1
warn(f'{path} includes "null" in "type" at {pointer}', NullTypeWarning)
elif expect_null:
if not null_in_type and not null_not_allowed and pointer not in allow_no_null:
errors += 1
warn(f'{path} is missing "null" in "type" at {pointer}', NullTypeWarning)
elif null_in_type and pointer not in allow_null:
errors += 1
warn(f'{path} includes "null" in "type" at {pointer}', NullTypeWarning)
required = data.get('required', [])
for key, value in data.items():
if key in {'properties', 'definitions', '$defs'}:
for k, v in data[key].items():
expect_null = key == 'properties' and k not in required
errors += validate_null_type(path, v, pointer=f'{pointer}/{key}/{k}', **kwargs, no_null=no_null,
expect_null=expect_null)
else:
v = data['items'] if key == 'items' else value
errors += validate_null_type(path, v, pointer=f'{pointer}/{key}', **kwargs, no_null=no_null,
expect_null=key != 'items')
return errors
[docs]
def validate_codelist_enum(*args, fallback=None, allow_enum=_false, allow_missing=_false):
"""
Warn and return the number of errors relating to codelists in a JSON Schema.
If a field has a "codelist" property but no "type" property (e.g. if the "codelist" property is being overwritten),
then its "type" is assumed to be "array" unless a fallback "type" is provided via :code:`fallback`.
If the "codelist" property is set:
- If the "openCodelist" property is set to ``true``, then the "enum" property mustn't be set.
- If the "openCodelist" property is set to ``false``, then the "enum" property must be set, its value must include
``null`` if the "type" property includes "null", and its value must match the codes in the codelist.
If the "enum" property is set, then the "codelist" and "openCodelist" properties must be set.
:param dict fallback: a dict in which keys are JSON Pointers and values are lists of "type" values
:param function allow_enum: a method that accepts a JSON Pointer, and returns whether the field is allowed to set
the "enum" property without setting the "codelist" property
:param function allow_missing: a method that accepts a codelist name, and returns whether the codelist file
is allowed to be missing from the repository
:returns: the number of errors
:rtype: int
"""
if not fallback:
fallback = {}
def block(path, data, pointer):
errors = 0
parent = pointer.rsplit('/', 1)[-1]
if 'codelist' in data:
# `type` can be missing if changing an existing property.
types = get_types(data) if 'type' in data else fallback.get(pointer, ['array'])
if data['openCodelist']:
if (('string' in types and 'enum' in data) or ('array' in types and 'enum' in data['items'])):
errors += 1
warn(f'{path} sets "enum", though "openCodelist" is true, at {pointer}',
CodelistEnumWarning)
else:
if ('string' in types and 'enum' not in data) or ('array' in types and 'enum' not in data['items']):
errors += 1
warn(f'{path} is missing "enum", though "openCodelist" is false, at {pointer}',
CodelistEnumWarning)
actual = None
elif 'string' in types:
actual = set(data['enum'])
else:
actual = set(data['items']['enum'])
# It'd be faster to cache the CSVs, but most extensions have only one closed codelist.
for _, csvname, _, _, rows in walk_csv_data():
# The codelist's CSV file must exist.
if csvname == data['codelist']:
if actual:
expected = {row['Code'] for row in rows}
if 'string' in types and 'null' in types:
expected.add(None)
if actual != expected:
added, removed = difference(actual, expected)
errors += 1
warn(f"{path}: {pointer}/enum doesn't match codelists/{data['codelist']}"
f"{added}{removed}", CodelistEnumWarning)
break
else:
# When validating a patched schema, the above code will fail to find the core codelists in an
# extension, but that is not an error. This overlaps with `validate_schema_codelists_match`.
if not allow_missing(data['codelist']):
errors += 1
warn(f"{path} refers to missing file codelists/{data['codelist']} at {pointer}",
CodelistEnumWarning)
elif ('enum' in data and parent != 'items') or ('items' in data and 'enum' in data['items']):
if not allow_enum(pointer):
errors += 1
warn(f'{path} is missing "codelist" and "openCodelist" at {pointer}', CodelistEnumWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_array_items(*args, allow_invalid=()):
"""
Warn and return the number of errors relating to array fields without an "items" property.
A field whose "type" property includes "array" must set the "items" property.
:param allow_invalid: JSON Pointers of fields whose "items" properties are allowed to be missing
:type allow_invalid: list, tuple or set
:returns: the number of errors
:rtype: int
"""
def block(path, data, pointer):
errors = 0
if 'type' in data and 'array' in data['type'] and 'items' not in data and pointer not in allow_invalid:
errors += 1
warn(f'{path} is missing "items" at {pointer}', DeepPropertiesWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_items_type(*args, additional_valid_types=None, allow_invalid=()):
"""
Warn and return the number of errors relating to the "type" property under an "items" property.
The "type" property under an "items" property must only include "array" (e.g. for geometries), "number"
(e.g. for coordinates) and/or "string".
:param additional_valid_types: additional valid values of the "type" property under an "items" property
:type additional_valid_types: list, tuple or set
:param allow_invalid: JSON Pointers of fields whose "type" properties are allowed to include invalid values
:type allow_invalid: list, tuple or set
:returns: the number of errors
:rtype: int
"""
valid_types = {
'array',
'number',
'string',
}
if additional_valid_types:
valid_types.update(additional_valid_types)
def block(path, data, pointer):
errors = 0
parent = pointer.rsplit('/', 1)[-1]
if parent == 'items':
for _type in get_types(data):
if _type not in valid_types and pointer not in allow_invalid:
errors += 1
warn(f'{path} includes "{_type}" in "items/type" at {pointer}', ItemsTypeWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_deep_properties(*args, allow_deep=()):
"""
Warn and return the number of errors relating to deep objects.
The schema must use "definitions" or "$defs" instead of nesting "properties".
:param allow_deep: JSON Pointers of fields to ignore
:type allow_deep: list, tuple or set
:returns: the number of errors
:rtype: int
"""
def block(path, data, pointer):
errors = 0
parts = pointer.rsplit('/', 2)
grandparent = parts[-2] if len(parts) == 3 else None
if (
pointer
and grandparent not in {'definitions', '$defs'}
and 'properties' in data
and pointer not in allow_deep
):
errors += 1
warn(f'{path} has "properties" within "properties" at {pointer}', DeepPropertiesWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_object_id(*args, allow_missing=_false, allow_optional=()):
"""
Warn and return the number of errors relating to objects within arrays lacking "id" fields.
If an array field's "wholeListMerge" and "omitWhenMerged" properties aren't set or are set to ``false`` or
``null``, then the object fields under it must have an "id" field, and the "id" field must be required.
:param function allow_missing: a method that accepts a JSON Pointer, and returns whether the field is allowed to
not have an "id" field
:param allow_optional: JSON Pointers of fields whose "id" field is allowed to be optional
:type allow_optional: list, tuple or set
:returns: the number of errors
:rtype: int
"""
def block(path, data, pointer):
errors = 0
if data.get('wholeListMerge') or data.get('omitWhenMerged') or allow_missing(pointer):
return errors
# If it's an array of objects.
if 'type' in data and 'array' in data['type'] and 'properties' in data.get('items', {}):
required = data['items'].get('required', [])
original = data['items'].__reference__['$ref'][1:] if hasattr(data['items'], '__reference__') else pointer
# See https://standard.open-contracting.org/latest/en/schema/merging/#whole-list-merge
if 'id' not in data['items']['properties']:
errors += 1
if original == pointer:
warn(f'{path} is missing "id" in "items/properties" at {pointer}', ObjectIdWarning)
else:
warn(f'{path} is missing "id" in "items/properties" at {original} (from {pointer})',
ObjectIdWarning)
elif 'id' not in required and original not in allow_optional:
errors += 1
if original == pointer:
warn(f'{path} is missing "id" in "items/required" at {pointer}', ObjectIdWarning)
else:
warn(f'{path} is missing "id" in "items/required" at {original} (from {pointer})',
ObjectIdWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_merge_properties(*args):
"""
Warn and return the number of errors relating to missing or extra merge properties.
The "omitWhenMerged" and "wholeListMerge" properties mustn't both be set, and mustn't be set to ``false`` or
``null``. The "wholeListMerge" property must be set on non-nullable arrays of objects only.
See https://standard.open-contracting.org/1.1/en/schema/merging/#whole-list-merge
:returns: the number of errors
:rtype: int
"""
def block(path, data, pointer):
errors = 0
if 'omitWhenMerged' in data and not data['omitWhenMerged']:
errors += 1
warn(f'{path} sets "omitWhenMerged" to false or null at {pointer}', MergePropertiesWarning)
if 'wholeListMerge' in data and not data['wholeListMerge']:
errors += 1
warn(f'{path} sets "wholeListMerge" to false or null at {pointer}', MergePropertiesWarning)
elif 'wholeListMerge' in data:
if not is_array_of_objects(data):
errors += 1
warn(f'{path} sets "wholeListMerge", though the field is not an array of objects, at {pointer}',
MergePropertiesWarning)
if 'omitWhenMerged' in data:
errors += 1
warn(f'{path} sets both "omitWhenMerged" and "wholeListMerge" at {pointer}',
MergePropertiesWarning)
return errors
return _traverse(block)(*args)
[docs]
def validate_ref(path, data, **kwargs):
"""
Warn and return ``1`` if not all ``$ref``'erences can be resolved.
Uses the `jsonref <https://jsonref.readthedocs.io/>`__ module.
:returns: ``0`` or ``1``
:rtype: int
"""
try:
jsonref.replace_refs(data, lazy_load=False, **kwargs)
except jsonref.JsonRefError as e:
warn(f"{path} has {e.message} at {'/'.join(map(str, e.path))}", RefWarning)
return 1
return 0
[docs]
def validate_schema_codelists_match(path, data, top, *, is_extension=False, is_profile=False, external_codelists=None):
"""
Warn and return the number of errors relating to mismatches between codelist files and codelist references from
JSON Schema.
All codelist filenames that don't start with ``+`` or ``-`` must match codelist references. All codelist references
must match codelist filenames or be in :code:`external_codelists`. All codelist filenames that start with ``+`` or
``-`` must be in :code:`external_codelists`.
:param str top: the file path of the directory tree
:param bool is_extension: whether the repository is an extension or a profile
:param bool is_profile: whether the repository is a profile (a collection of extensions)
:param external_codelists: names of codelists defined by the standard
:type external_codelists: list, tuple or set
:returns: the number of errors
:rtype: int
"""
if not external_codelists:
external_codelists = set()
def collect_codelist_values(path, data, pointer=''):
"""Collect ``codelist`` values from JSON Schema."""
codelists = set()
if isinstance(data, list):
for index, item in enumerate(data):
codelists.update(collect_codelist_values(path, item, pointer=f'{pointer}/{index}'))
elif isinstance(data, dict):
if 'codelist' in data:
codelists.add(data['codelist'])
for key, value in data.items():
codelists.update(collect_codelist_values(path, value, pointer=f'{pointer}/{key}'))
return codelists
errors = 0
codelist_files = set()
for csvpath, csvname, _, fieldnames, _ in walk_csv_data(top=top):
parts = csvpath.replace(top, '').split(os.sep) # maybe inelegant way to isolate consolidated extension
# Take all codelists in extensions, all codelists in core, and non-core codelists in profiles.
if is_codelist(fieldnames) and ((is_extension and not is_profile) or 'patched' not in parts):
if csvname.startswith(('+', '-')):
if csvname[1:] not in external_codelists:
errors += 1
warn(f'{csvname} patches unknown codelist', SchemaCodelistsMatchWarning)
else:
codelist_files.add(csvname)
codelist_values = collect_codelist_values(path, data)
all_codelist_files = codelist_files | external_codelists if is_extension else codelist_files
unused_codelists = [codelist for codelist in codelist_files if codelist not in codelist_values]
missing_codelists = [codelist for codelist in codelist_values if codelist not in all_codelist_files]
if unused_codelists:
errors += 1
warn(f"unused codelists: {', '.join(sorted(unused_codelists))}", SchemaCodelistsMatchWarning)
if missing_codelists:
errors += 1
warn(f"missing codelists: {', '.join(sorted(missing_codelists))}", SchemaCodelistsMatchWarning)
return errors
def _traverse(block):
def method(path, data, pointer=''):
errors = 0
if isinstance(data, list):
for index, item in enumerate(data):
errors += method(path, item, pointer=f'{pointer}/{index}')
elif isinstance(data, dict):
errors += block(path, data, pointer)
for key, value in data.items():
errors += method(path, value, pointer=f'{pointer}/{key}')
return errors
return method