-
-
Notifications
You must be signed in to change notification settings - Fork 29.8k
/
__init__.py
188 lines (158 loc) · 6.56 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""The National Weather Service integration."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
import datetime
import logging
from typing import TYPE_CHECKING
from pynws import SimpleNWS
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_LATITUDE, CONF_LONGITUDE, Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import debounce
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util.dt import utcnow
from .const import (
CONF_STATION,
COORDINATOR_FORECAST,
COORDINATOR_FORECAST_HOURLY,
COORDINATOR_OBSERVATION,
DOMAIN,
NWS_DATA,
UPDATE_TIME_PERIOD,
)
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR, Platform.WEATHER]
DEFAULT_SCAN_INTERVAL = datetime.timedelta(minutes=10)
FAILED_SCAN_INTERVAL = datetime.timedelta(minutes=1)
DEBOUNCE_TIME = 60 # in seconds
def base_unique_id(latitude: float, longitude: float) -> str:
"""Return unique id for entries in configuration."""
return f"{latitude}_{longitude}"
class NwsDataUpdateCoordinator(DataUpdateCoordinator[None]):
"""NWS data update coordinator.
Implements faster data update intervals for failed updates and exposes a last successful update time.
"""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
name: str,
update_interval: datetime.timedelta,
failed_update_interval: datetime.timedelta,
update_method: Callable[[], Awaitable[None]] | None = None,
request_refresh_debouncer: debounce.Debouncer | None = None,
) -> None:
"""Initialize NWS coordinator."""
super().__init__(
hass,
logger,
name=name,
update_interval=update_interval,
update_method=update_method,
request_refresh_debouncer=request_refresh_debouncer,
)
self.failed_update_interval = failed_update_interval
self.last_update_success_time: datetime.datetime | None = None
@callback
def _schedule_refresh(self) -> None:
"""Schedule a refresh."""
if self._unsub_refresh:
self._unsub_refresh()
self._unsub_refresh = None
# We _floor_ utcnow to create a schedule on a rounded second,
# minimizing the time between the point and the real activation.
# That way we obtain a constant update frequency,
# as long as the update process takes less than a second
if self.last_update_success:
if TYPE_CHECKING:
# the base class allows None, but this one doesn't
assert self.update_interval is not None
update_interval = self.update_interval
self.last_update_success_time = utcnow()
else:
update_interval = self.failed_update_interval
self._unsub_refresh = async_track_point_in_utc_time(
self.hass,
self._handle_refresh_interval,
utcnow().replace(microsecond=0) update_interval,
)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a National Weather Service entry."""
latitude = entry.data[CONF_LATITUDE]
longitude = entry.data[CONF_LONGITUDE]
api_key = entry.data[CONF_API_KEY]
station = entry.data[CONF_STATION]
client_session = async_get_clientsession(hass)
# set_station only does IO when station is None
nws_data = SimpleNWS(latitude, longitude, api_key, client_session)
await nws_data.set_station(station)
async def update_observation() -> None:
"""Retrieve recent observations."""
await nws_data.update_observation(start_time=utcnow() - UPDATE_TIME_PERIOD)
coordinator_observation = NwsDataUpdateCoordinator(
hass,
_LOGGER,
name=f"NWS observation station {station}",
update_method=update_observation,
update_interval=DEFAULT_SCAN_INTERVAL,
failed_update_interval=FAILED_SCAN_INTERVAL,
request_refresh_debouncer=debounce.Debouncer(
hass, _LOGGER, cooldown=DEBOUNCE_TIME, immediate=True
),
)
coordinator_forecast = NwsDataUpdateCoordinator(
hass,
_LOGGER,
name=f"NWS forecast station {station}",
update_method=nws_data.update_forecast,
update_interval=DEFAULT_SCAN_INTERVAL,
failed_update_interval=FAILED_SCAN_INTERVAL,
request_refresh_debouncer=debounce.Debouncer(
hass, _LOGGER, cooldown=DEBOUNCE_TIME, immediate=True
),
)
coordinator_forecast_hourly = NwsDataUpdateCoordinator(
hass,
_LOGGER,
name=f"NWS forecast hourly station {station}",
update_method=nws_data.update_forecast_hourly,
update_interval=DEFAULT_SCAN_INTERVAL,
failed_update_interval=FAILED_SCAN_INTERVAL,
request_refresh_debouncer=debounce.Debouncer(
hass, _LOGGER, cooldown=DEBOUNCE_TIME, immediate=True
),
)
nws_hass_data = hass.data.setdefault(DOMAIN, {})
nws_hass_data[entry.entry_id] = {
NWS_DATA: nws_data,
COORDINATOR_OBSERVATION: coordinator_observation,
COORDINATOR_FORECAST: coordinator_forecast,
COORDINATOR_FORECAST_HOURLY: coordinator_forecast_hourly,
}
# Fetch initial data so we have data when entities subscribe
await coordinator_observation.async_refresh()
await coordinator_forecast.async_refresh()
await coordinator_forecast_hourly.async_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
if len(hass.data[DOMAIN]) == 0:
hass.data.pop(DOMAIN)
return unload_ok
def device_info(latitude: float, longitude: float) -> DeviceInfo:
"""Return device registry information."""
return DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, base_unique_id(latitude, longitude))},
manufacturer="National Weather Service",
name=f"NWS: {latitude}, {longitude}",
)