Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise exception when using add_if_missing on ownerless Asset #554

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 17,8 @@
### Changed

- The `from_dict` method on STACObjects will set the object's root link when a `root` parameter is present. An ItemCollection `from_dict` with a root parameter will set the root on each of it's Items. ([#549](https://github.com/stac-utils/pystac/pull/549))
- Calling `ExtensionManagementMixin.validate_has_extension` with `add_if_missing = True`
on an ownerless `Asset` will raise a `STACError` ([#554](https://github.com/stac-utils/pystac/pull/554))

### Fixed

Expand Down
35 changes: 26 additions & 9 deletions pystac/extensions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 126,32 @@ def has_extension(cls, obj: S) -> bool:
)

@classmethod
def validate_has_extension(cls, obj: Union[S, pystac.Asset]) -> None:
"""Given a :class:`~pystac.STACObject` or :class:`pystac.Asset` instance, checks
if the object (or its owner in the case of an Asset) has this extension's schema
URI in it's :attr:`~pystac.STACObject.stac_extensions` list."""
extensible = obj.owner if isinstance(obj, pystac.Asset) else obj
if (
extensible is not None
and cls.get_schema_uri() not in extensible.stac_extensions
):
def validate_has_extension(
cls, extensible: Optional[S], add_if_missing: bool
) -> None:
"""Given a :class:`~pystac.STACObject`, checks if the object has this
extension's schema URI in it's :attr:`~pystac.STACObject.stac_extensions` list.
If ``add_if_missing`` is ``True``, the schema URI will be added to the object.

Args:
extensible : The object to validate.
add_if_missing : Whether to add the schema URI to the object if the URI is
not already present.

Raises:
STACError : If ``add_if_missing`` is ``True`` and ``extensible`` is None.
"""
if add_if_missing:
if extensible is None:
raise pystac.STACError(
"Can only add schema URIs to Assets with an owner."
lossyrob marked this conversation as resolved.
Show resolved Hide resolved
)
cls.add_to(extensible)

if extensible is None:
return

if cls.get_schema_uri() not in extensible.stac_extensions:
raise pystac.ExtensionNotImplemented(
f"Could not find extension schema URI {cls.get_schema_uri()} in object."
)
12 changes: 3 additions & 9 deletions pystac/extensions/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 340,13 @@ def get_schema_uri(cls) -> str:
@classmethod
def ext(cls, obj: T, add_if_missing: bool = False) -> "DatacubeExtension[T]":
if isinstance(obj, pystac.Collection):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(DatacubeExtension[T], CollectionDatacubeExtension(obj))
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(DatacubeExtension[T], ItemDatacubeExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and obj.owner is not None:
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(DatacubeExtension[T], AssetDatacubeExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand Down
13 changes: 3 additions & 10 deletions pystac/extensions/eo.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 361,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "EOExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(EOExtension[T], ItemEOExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(EOExtension[T], AssetEOExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -380,10 376,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesEOExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesEOExtension(obj)


Expand Down
10 changes: 5 additions & 5 deletions pystac/extensions/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 4,7 @@
"""

from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union

import pystac
from pystac.extensions.base import ExtensionManagementMixin, PropertiesExtension
Expand Down Expand Up @@ -85,7 85,9 @@ def summary(self, v: str) -> None:
self.properties["summary"] = v


class FileExtension(PropertiesExtension, ExtensionManagementMixin[pystac.Item]):
class FileExtension(
PropertiesExtension, ExtensionManagementMixin[Union[pystac.Item, pystac.Collection]]
):
"""A class that can be used to extend the properties of an :class:`~pystac.Asset`
with properties from the :stac-ext:`File Info Extension <file>`.

Expand Down Expand Up @@ -197,9 199,7 @@ def ext(cls, obj: pystac.Asset, add_if_missing: bool = False) -> "FileExtension"
This extension can be applied to instances of :class:`~pystac.Asset`.
"""
if isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cls(obj)
else:
raise pystac.ExtensionTypeError(
Expand Down
3 changes: 1 addition & 2 deletions pystac/extensions/item_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 119,7 @@ def ext(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "ItemAssetsExtension":
if isinstance(obj, pystac.Collection):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return cls(obj)
else:
raise pystac.ExtensionTypeError(
Expand Down
9 changes: 2 additions & 7 deletions pystac/extensions/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 691,7 @@ def ext(cls, obj: pystac.Item, add_if_missing: bool = False) -> "LabelExtension"
This extension can be applied to instances of :class:`~pystac.Item`.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cls(obj)
else:
raise pystac.ExtensionTypeError(
Expand All @@ -705,10 703,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesLabelExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesLabelExtension(obj)


Expand Down
13 changes: 7 additions & 6 deletions pystac/extensions/pointcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 525,15 @@ def get_schema_uri(cls) -> str:
@classmethod
def ext(cls, obj: T, add_if_missing: bool = False) -> "PointcloudExtension[T]":
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(PointcloudExtension[T], ItemPointcloudExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
if obj.owner is not None and not isinstance(obj.owner, pystac.Item):
raise pystac.ExtensionTypeError(
"Pointcloud extension does not apply to Assets owned by anything "
"other than an Item."
)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(PointcloudExtension[T], AssetPointcloudExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand Down
13 changes: 3 additions & 10 deletions pystac/extensions/projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 272,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "ProjectionExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(ProjectionExtension[T], ItemProjectionExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(ProjectionExtension[T], AssetProjectionExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -291,10 287,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesProjectionExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesProjectionExtension(obj)


Expand Down
9 changes: 2 additions & 7 deletions pystac/extensions/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 704,7 @@ def ext(cls, obj: pystac.Asset, add_if_missing: bool = False) -> "RasterExtensio
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cls(obj)
else:
raise pystac.ExtensionTypeError(
Expand All @@ -717,10 715,7 @@ def ext(cls, obj: pystac.Asset, add_if_missing: bool = False) -> "RasterExtensio
def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesRasterExtension":
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesRasterExtension(obj)


Expand Down
18 changes: 8 additions & 10 deletions pystac/extensions/sar.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 315,15 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "SarExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(SarExtension[T], ItemSarExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
if obj.owner is not None and not isinstance(obj.owner, pystac.Item):
raise pystac.ExtensionTypeError(
"SAR extension does not apply to Assets owned by anything "
"other than an Item."
)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(SarExtension[T], AssetSarExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -334,10 335,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesSarExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesSarExtension(obj)


Expand Down
13 changes: 3 additions & 10 deletions pystac/extensions/sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 149,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "SatExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(SatExtension[T], ItemSatExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(SatExtension[T], AssetSatExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -168,10 164,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesSatExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesSatExtension(obj)


Expand Down
13 changes: 3 additions & 10 deletions pystac/extensions/scientific.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 234,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "ScientificExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Collection):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(ScientificExtension[T], CollectionScientificExtension(obj))
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(ScientificExtension[T], ItemScientificExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -253,10 249,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesScientificExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesScientificExtension(obj)


Expand Down
13 changes: 3 additions & 10 deletions pystac/extensions/timestamps.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 129,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "TimestampsExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(TimestampsExtension[T], ItemTimestampsExtension(obj))
elif isinstance(obj, pystac.Asset):
if add_if_missing and isinstance(obj.owner, pystac.Item):
cls.add_to(obj.owner)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj.owner, add_if_missing)
return cast(TimestampsExtension[T], AssetTimestampsExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand All @@ -148,10 144,7 @@ def summaries(
cls, obj: pystac.Collection, add_if_missing: bool = False
) -> "SummariesTimestampsExtension":
"""Returns the extended summaries object for the given collection."""
if not add_if_missing:
cls.validate_has_extension(obj)
else:
cls.add_to(obj)
cls.validate_has_extension(obj, add_if_missing)
return SummariesTimestampsExtension(obj)


Expand Down
8 changes: 2 additions & 6 deletions pystac/extensions/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 205,10 @@ def ext(cls, obj: T, add_if_missing: bool = False) -> "VersionExtension[T]":
pystac.ExtensionTypeError : If an invalid object type is passed.
"""
if isinstance(obj, pystac.Collection):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(VersionExtension[T], CollectionVersionExtension(obj))
if isinstance(obj, pystac.Item):
if add_if_missing:
cls.add_to(obj)
cls.validate_has_extension(obj)
cls.validate_has_extension(obj, add_if_missing)
return cast(VersionExtension[T], ItemVersionExtension(obj))
else:
raise pystac.ExtensionTypeError(
Expand Down
Loading