-
-
Notifications
You must be signed in to change notification settings - Fork 30.8k
/
coordinator.py
63 lines (51 loc) · 1.85 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""Define an update coordinator for OpenUV."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import Any, cast
from pyopenuv.errors import InvalidApiKeyError, OpenUvError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
DEFAULT_DEBOUNCER_COOLDOWN_SECONDS = 15 * 60
class OpenUvCoordinator(DataUpdateCoordinator):
"""Define an OpenUV data coordinator."""
config_entry: ConfigEntry
update_method: Callable[[], Awaitable[dict[str, Any]]]
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
name: str,
latitude: str,
longitude: str,
update_method: Callable[[], Awaitable[dict[str, Any]]],
) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=name,
update_method=update_method,
request_refresh_debouncer=Debouncer(
hass,
LOGGER,
cooldown=DEFAULT_DEBOUNCER_COOLDOWN_SECONDS,
immediate=True,
),
)
self._entry = entry
self.latitude = latitude
self.longitude = longitude
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from OpenUV."""
try:
data = await self.update_method()
except InvalidApiKeyError as err:
raise ConfigEntryAuthFailed("Invalid API key") from err
except OpenUvError as err:
raise UpdateFailed(str(err)) from err
return cast(dict[str, Any], data["result"])