from __future__ import annotations
import asyncio
import datetime
import inspect
from collections.abc import Awaitable, Callable, Sequence
from typing import TYPE_CHECKING, Any, Concatenate, Literal, ParamSpec, TypeVar
from astral.location import Location
from dateutil.parser import parse
from domovoy.applications.types import Interval
from domovoy.core.configuration import get_main_config
from domovoy.core.context import context_logger
from domovoy.core.errors import DomovoySchedulerError
from domovoy.core.logging import get_logger
from domovoy.core.utils import (
get_callback_class,
get_callback_name,
get_callback_true_name,
get_datetime_now_with_config_timezone,
is_datetime_aware,
set_callback_true_information,
)
from domovoy.plugins import hass
from domovoy.plugins.callbacks.entity_listener_callbacks import EntityListenerCallback
from domovoy.plugins.callbacks.event_listener_callbacks import EventListenerCallback
from domovoy.plugins.hass.domains import get_type_instance_for_entity_id
from domovoy.plugins.hass.types import EntityID, HassValue
from domovoy.plugins.plugins import AppPlugin
if TYPE_CHECKING:
from domovoy.core.app_infra import AppWrapper
from domovoy.core.services.callback_register import CallbackRegister
P = ParamSpec("P")
T = TypeVar("T")
SunEvents = Literal["dawn", "sunrise", "noon", "sunset", "dusk"]
_logcore = get_logger(__name__)
[docs]
class CallbacksPlugin(AppPlugin):
_wrapper: AppWrapper
__hass: hass.HassPlugin
__register: CallbackRegister
def __init__(
self,
name: str,
wrapper: AppWrapper,
register: CallbackRegister,
) -> None:
super().__init__(name, wrapper)
self.__register = register
[docs]
def prepare(self) -> None:
self.__hass = self._wrapper.get_pluginx(hass.HassPlugin)
[docs]
def listen_event(
self,
events: str | list[str],
callback: EventListenerCallback,
*,
oneshot: bool = False,
) -> str:
signature = inspect.signature(callback)
valid_params = set(signature.parameters.keys())
async def wrapper(event_name: str, data: dict[str, Any]) -> None:
call_args = {}
if "event_name" in valid_params:
call_args["event_name"] = event_name
if "data" in valid_params:
call_args["data"] = data
result = callback(**call_args)
if inspect.isawaitable(result):
await result
set_callback_true_information(wrapper, callback)
return self.listen_event_extended(events, wrapper, oneshot)
[docs]
def listen_event_extended(
self,
events: str | list[str],
callback: Callable[Concatenate[str, dict[str, Any], P], None | Awaitable[None]],
oneshot: bool = False, # noqa: FBT001, FBT002
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
context_logger.set(self._wrapper.logger)
instrumented_callback = self._wrapper.instrument_app_callback(callback)
@self._wrapper.handle_exception_and_logging(callback)
async def listen_event_callback(
callback_id: str,
event: str,
event_data: dict[str, Any],
) -> None:
self._wrapper.logger.trace(
"Calling Listen Event Callback: {cls_name}.{func_name} from callback_id: {callback_id}",
cls_name=callback.__self__.__class__.__name__ if inspect.ismethod(callback) else callback.__class__,
func_name=callback.__name__,
callback_id=callback_id,
)
if oneshot:
self.cancel_callback(callback_id)
await instrumented_callback(
callback_id,
event,
event_data,
*callback_args,
**callback_kwargs,
)
return self.__register.add_event_callback(
self._wrapper,
listen_event_callback,
events,
)
[docs]
def listen_state(
self,
entity_id: EntityID | Sequence[EntityID],
callback: EntityListenerCallback,
*,
immediate: bool = False,
oneshot: bool = False,
) -> list[str]:
return self.listen_attribute(entity_id, "state", callback, immediate=immediate, oneshot=oneshot)
[docs]
def listen_attribute(
self,
entity_id: EntityID | Sequence[EntityID],
attribute: str,
callback: EntityListenerCallback,
*,
immediate: bool = False,
oneshot: bool = False,
) -> list[str]:
signature = inspect.signature(callback)
valid_params = set(signature.parameters.keys())
async def wrapper(entity_id: EntityID, attribute: str, old: HassValue, new: HassValue) -> None:
call_args = {}
if "entity_id" in valid_params:
call_args["entity_id"] = entity_id
if "attribute" in valid_params:
call_args["attribute"] = attribute
if "old" in valid_params:
call_args["old"] = old
if "new" in valid_params:
call_args["new"] = new
result = callback(**call_args)
if inspect.isawaitable(result):
await result
set_callback_true_information(wrapper, callback)
return self.listen_attribute_extended(entity_id, attribute, wrapper, immediate, oneshot)
[docs]
def listen_state_extended(
self,
entity_id: EntityID | list[EntityID],
callback: Callable[
Concatenate[EntityID, str, HassValue, HassValue, P],
None | Awaitable[None],
],
immediate: bool = False, # noqa: FBT001, FBT002
oneshot: bool = False, # noqa: FBT001, FBT002
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> list[str]:
return self.listen_attribute_extended(
entity_id,
"state",
callback,
immediate,
oneshot,
*callback_args,
**callback_kwargs,
)
[docs]
def listen_attribute_extended(
self,
entity_id: EntityID | Sequence[EntityID],
attribute: str,
callback: Callable[
Concatenate[EntityID, str, HassValue, HassValue, P],
None | Awaitable[None],
],
immediate: bool = False, # noqa: FBT001, FBT002
oneshot: bool = False, # noqa: FBT001, FBT002
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> list[str]:
context_logger.set(self._wrapper.logger)
target_entity_id = entity_id
if not isinstance(target_entity_id, Sequence):
target_entity_id = [target_entity_id]
for eid in target_entity_id:
if isinstance(eid, str):
raise TypeError("Passed entity_id as str and not as EntityID")
self.__hass.warn_if_entity_doesnt_exists(target_entity_id)
target_entity_id = set(target_entity_id)
instrumented_callback = self._wrapper.instrument_app_callback(callback)
@self._wrapper.handle_exception_and_logging(callback)
async def listen_attribute_callback(
callback_id: str,
_event_name: str,
event_data: dict[str, Any],
) -> None:
context_logger.set(self._wrapper.logger)
event_entity_id = get_type_instance_for_entity_id(event_data["entity_id"])
if event_entity_id not in target_entity_id:
self._wrapper.logger.warning(
"Received callback for entity_id that should not be part of"
" callback. '{event_entity_id}' not in '{target_entity_id}'",
event_entity_id=event_entity_id,
target_entity_id=target_entity_id,
)
new_state = event_data.get("new_state", {}) or {}
old_state = event_data.get("old_state", {}) or {}
if attribute == "all":
old_value = old_state
new_value = new_state
elif attribute == "state":
old_value = old_state.get("state", None)
new_value = new_state.get("state", None)
if old_value == new_value:
return
else:
old_value = old_state.get("attributes", {}).get(attribute, None)
new_value = new_state.get("attributes", {}).get(attribute, None)
if old_value == new_value:
return
callback_cls_name = get_callback_class(callback)
self._wrapper.logger.trace(
"Calling Entity Callback: {cls_name}.{func_name}",
cls_name=callback_cls_name,
func_name=get_callback_true_name(callback),
)
if oneshot:
self.cancel_callback(callback_id)
await instrumented_callback(
callback_id,
event_entity_id,
attribute,
old_value,
new_value,
*callback_args,
**callback_kwargs,
)
callback_id = [
self.__register.add_event_callback(
self._wrapper,
listen_attribute_callback,
f"state_changed={eid}",
)
for eid in target_entity_id
]
if immediate:
@self._wrapper.handle_exception_and_logging(callback)
async def immediate_callback(callback_id: str) -> None:
all_eid = wrap_entity_id_as_list(entity_id)
all_callbacks: list[Awaitable[None]] = []
for eid in all_eid:
eid_state = self.__hass.get_full_state(eid)
if eid_state is not None:
all_callbacks.append(
listen_attribute_callback(
f"ephemeral_callback-{callback_id}-{entity_id}",
"immediate_state_notification",
{
"entity_id": eid,
"new_state": eid_state.to_dict(),
},
),
)
self.cancel_callback(callback_id)
await asyncio.gather(*all_callbacks)
current_date = get_datetime_now_with_config_timezone()
self.__register.add_scheduler_callback(
self._wrapper,
immediate_callback,
None,
current_date,
)
return callback_id or []
[docs]
def cancel_callback(self, callback_id: str) -> None:
context_logger.set(self._wrapper.logger)
self.__register.cancel_callback(self._wrapper, callback_id)
[docs]
def run_once(
self,
time: datetime.time,
callback: Callable[P, None | Awaitable[None]],
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
context_logger.set(self._wrapper.logger)
current_date = get_datetime_now_with_config_timezone()
target_date = current_date.replace(
hour=time.hour,
minute=time.minute,
second=time.second,
)
if target_date < current_date:
target_date = target_date + datetime.timedelta(days=1)
return self.run_at(callback, target_date, *callback_args, **callback_kwargs)
[docs]
def run_in(
self,
interval: Interval,
callback: Callable[P, None | Awaitable[None]],
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
dt = get_datetime_now_with_config_timezone() + datetime.timedelta(
days=interval.days,
hours=interval.hours,
minutes=interval.minutes,
seconds=interval.seconds,
)
return self.run_at(
callback=callback,
datetime=dt,
*callback_args, # noqa: B026
**callback_kwargs,
)
def __get_next_sun_event_date(
self,
sun_event: SunEvents,
astral_location: Location,
delta: Interval | None,
initial_date: datetime.date,
) -> datetime.datetime:
sun_locations = astral_location.sun(initial_date, local=True)
sun_event_datetime = sun_locations[sun_event]
if delta is not None:
sun_event_datetime = sun_event_datetime + delta.to_timedelta()
if sun_event_datetime < datetime.datetime.now(tz=datetime.UTC):
sun_locations = astral_location.sun(
initial_date + datetime.timedelta(days=1),
local=True,
)
sun_event_datetime = sun_locations[sun_event]
if delta is not None:
sun_event_datetime = sun_event_datetime + delta.to_timedelta()
return sun_event_datetime
[docs]
def run_daily_on_sun_event(
self,
callback: Callable[P, None | Awaitable[None]],
sun_event: SunEvents,
delta: Interval | None = None,
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
context_logger.set(self._wrapper.logger)
# calculate next event
astral_location = get_main_config().get_astral_location()
if astral_location is None:
raise ValueError(
"Domovoy config is missing Astral location information.",
)
sun_event_datetime = self.__get_next_sun_event_date(
sun_event,
astral_location,
delta,
datetime.datetime.now(tz=get_main_config().get_timezone()).date(),
)
self._wrapper.logger.trace(
"Datetime for next `{sun_event}` sun event: {sun_event_datetime}. Delta: {delta}",
sun_event=sun_event,
sun_event_datetime=sun_event_datetime,
delta=delta,
)
instrumented_callback = self._wrapper.instrument_app_callback(callback)
@self._wrapper.handle_exception_and_logging(callback)
async def scheduled_callback(callback_id: str) -> None:
self._wrapper.logger.trace(
"Calling Sun Event Callback: {cls_name}.{func_name}",
cls_name=get_callback_class(callback),
func_name=callback.__name__,
)
tomorrow = datetime.datetime.now(tz=get_main_config().get_timezone()).date() + datetime.timedelta(days=1)
# Calculate next sun event
new_sun_event_datetime = self.__get_next_sun_event_date(
sun_event,
astral_location,
delta,
tomorrow,
)
self._wrapper.logger.trace(
"Datetime for next `{sun_event}` sun event: {sun_event_datetime}. Delta: {delta}",
sun_event=sun_event,
sun_event_datetime=new_sun_event_datetime,
delta=delta,
)
try:
self.__register.add_scheduler_callback(
self._wrapper,
scheduled_callback,
new_sun_event_datetime,
new_sun_event_datetime,
)
except Exception as e:
self._wrapper.logger.exception(
"Failed to schedule next sun event for callback_id: {callback_id}",
e,
callback_id=callback_id,
)
await instrumented_callback(callback_id, *callback_args, **callback_kwargs)
return self.__register.add_scheduler_callback(
self._wrapper,
scheduled_callback,
sun_event_datetime,
sun_event_datetime,
)
[docs]
def run_at(
self,
callback: Callable[P, None | Awaitable[None]],
datetime: datetime.datetime,
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
context_logger.set(self._wrapper.logger)
instrumented_callback = self._wrapper.instrument_app_callback(callback)
@self._wrapper.handle_exception_and_logging(callback)
async def scheduled_callback(callback_id: str) -> None:
self._wrapper.logger.trace(
"Calling Timer Callback: {callback_name}",
callback_name=get_callback_name(callback),
)
await instrumented_callback(callback_id, *callback_args, **callback_kwargs)
current_date = get_datetime_now_with_config_timezone()
if datetime < current_date:
msg = f"Cannot schedule a callback in the past (datetime={datetime}, current_time={current_date})."
raise DomovoySchedulerError(msg)
return self.__register.add_scheduler_callback(
self._wrapper,
scheduled_callback,
datetime,
datetime,
)
[docs]
def run_daily(
self,
callback: Callable[P, None | Awaitable[None]],
time: datetime.time | str = "now",
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
if isinstance(time, datetime.time):
true_start = datetime.datetime.combine(
datetime.datetime.now(tz=get_main_config().get_timezone()).date(),
time,
)
true_start = get_main_config().get_timezone().localize(true_start)
current_time = datetime.datetime.now(tz=get_main_config().get_timezone())
_logcore.trace(
"DT check: true_start: {true_start} [isAware: {true_start_aware}]"
" -- current_time: {current_time} [isAware: {current_time_aware}] ",
true_start=true_start,
true_start_aware=is_datetime_aware(true_start),
current_time=current_time,
current_time_aware=is_datetime_aware(current_time),
)
if true_start < current_time:
true_start = true_start + datetime.timedelta(days=1)
else:
true_start = time
return self.run_every(
Interval(days=1),
callback,
true_start,
*callback_args,
**callback_kwargs,
)
[docs]
def run_hourly(
self,
callback: Callable[P, None | Awaitable[None]],
start: datetime.datetime | str = "now",
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
return self.run_every(
Interval(hours=1),
callback,
start,
*callback_args,
**callback_kwargs,
)
[docs]
def run_minutely(
self,
callback: Callable[P, None | Awaitable[None]],
start: datetime.datetime | str = "now",
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
return self.run_every(
Interval(minutes=1),
callback,
start,
*callback_args,
**callback_kwargs,
)
[docs]
def run_secondly(
self,
callback: Callable[P, None | Awaitable[None]],
start: datetime.datetime | str = "now",
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
return self.run_every(
Interval(seconds=1),
callback,
start,
*callback_args,
**callback_kwargs,
)
[docs]
def run_every(
self,
interval: Interval,
callback: Callable[P, None | Awaitable[None]],
start: datetime.datetime | str = "now",
*callback_args: P.args,
**callback_kwargs: P.kwargs,
) -> str:
context_logger.set(self._wrapper.logger)
instrumented_callback = self._wrapper.instrument_app_callback(callback)
if not interval.is_valid():
raise DomovoySchedulerError(
"Cannot schedule a callback with an empty interval",
)
if start == "now":
start = get_datetime_now_with_config_timezone()
elif isinstance(start, str):
start = parse(start)
self._wrapper.logger.trace(
"Configuring run_every callback with interval: `{interval}` starting at `{start}`",
interval=interval,
start=start,
)
@self._wrapper.handle_exception_and_logging(callback)
async def timer_callback(callback_id: str) -> None:
self._wrapper.logger.trace(
"Calling Timer Callback: {cls_name}.{func_name}",
cls_name=callback.__self__.__class__.__name__, # type: ignore
func_name=callback.__name__,
)
await instrumented_callback(callback_id, *callback_args, **callback_kwargs)
return self.__register.add_scheduler_callback(
self._wrapper,
timer_callback,
interval,
start,
)
[docs]
def wrap_entity_id_as_list(val: EntityID | Sequence[EntityID]) -> list[EntityID]:
if isinstance(val, Sequence):
return list(val)
return [val]