This commit is contained in:
darthsandmann
2022-11-29 19:10:30 +01:00
parent 9b39f1af50
commit c74c45998c
57 changed files with 3163 additions and 6 deletions

View File

@ -0,0 +1,7 @@
# flake8: noqa
from .eq3btsmart import Mode, TemperatureException, Thermostat
from .structures import *
class BackendException(Exception):
"""Exception to wrap backend exceptions."""

View File

@ -0,0 +1,123 @@
"""
Bleak connection backend.
This creates a new event loop that is used to integrate bleak's
asyncio functions to synchronous architecture of python-eq3bt.
"""
import asyncio
import codecs
import contextlib
import logging
from typing import Optional
from bleak import BleakClient, BleakError
from . import BackendException
DEFAULT_TIMEOUT = 1
# bleak backends are very loud on debug, this reduces the log spam when using --debug
logging.getLogger("bleak.backends").setLevel(logging.WARNING)
_LOGGER = logging.getLogger(__name__)
class BleakConnection:
"""Representation of a BTLE Connection."""
def __init__(self, mac, iface):
"""Initialize the connection."""
self._conn: Optional[BleakClient] = None
self._mac = mac
self._iface = iface
self._callbacks = {}
self._notifyevent = asyncio.Event()
self._notification_handle = None
self._loop = asyncio.new_event_loop()
def __enter__(self):
"""
Context manager __enter__ for connecting the device
:rtype: BTLEConnection
:return:
"""
_LOGGER.debug("Trying to connect to %s", self._mac)
kwargs = {}
if self._iface is not None:
kwargs["adapter"] = self._iface
self._conn = BleakClient(self._mac, **kwargs)
try:
self._loop.run_until_complete(self._conn.connect())
except BleakError as ex:
_LOGGER.debug(
"Unable to connect to the device %s, retrying: %s", self._mac, ex
)
try:
self._loop.run_until_complete(self._conn.connect())
except Exception as ex2:
_LOGGER.debug("Second connection try to %s failed: %s", self._mac, ex2)
raise BackendException(
"unable to connect to device using bleak"
) from ex2
# The notification handles are off-by-one compared to gattlib and bluepy
self._loop.run_until_complete(
self._conn.start_notify(self._notification_handle - 1, self.on_notification)
)
_LOGGER.debug("Connected to %s", self._mac)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._loop.run_until_complete(self._conn.disconnect())
self._conn = None
async def on_notification(self, handle, data):
"""Handle Callback from a Bluetooth (GATT) request."""
# The notification handles are off-by-one compared to gattlib and bluepy
handle = handle + 1
_LOGGER.debug(
"Got notification from %s: %s", handle, codecs.encode(data, "hex")
)
self._notifyevent.set()
if handle in self._callbacks:
self._callbacks[handle](data)
@property
def mac(self):
"""Return the MAC address of the connected device."""
return self._mac
def set_callback(self, handle, function):
"""Set the callback for a Notification handle. It will be called with the parameter data, which is binary."""
self._notification_handle = handle
self._callbacks[handle] = function
async def wait_for_response(self, timeout):
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._notifyevent.wait(), timeout)
def make_request(self, handle, value, timeout=DEFAULT_TIMEOUT, with_response=True):
"""Write a GATT Command without callback - not utf-8."""
try:
with self:
_LOGGER.debug(
"Writing %s to %s",
codecs.encode(value, "hex"),
handle,
)
self._notifyevent.clear()
self._loop.run_until_complete(
self._conn.write_gatt_char(handle - 1, value)
)
if timeout:
_LOGGER.debug("Waiting for notifications for %s", timeout)
self._loop.run_until_complete(self.wait_for_response(timeout))
except BleakError as ex:
_LOGGER.debug("Got exception from bleak while making a request: %s", ex)
raise BackendException("Exception on write using bleak") from ex

View File

@ -0,0 +1,95 @@
"""
A simple wrapper for bluepy's btle.Connection.
Handles Connection duties (reconnecting etc.) transparently.
"""
import codecs
import logging
from bluepy import btle
from . import BackendException
DEFAULT_TIMEOUT = 1
_LOGGER = logging.getLogger(__name__)
class BTLEConnection(btle.DefaultDelegate):
"""Representation of a BTLE Connection."""
def __init__(self, mac, iface):
"""Initialize the connection."""
btle.DefaultDelegate.__init__(self)
self._conn = None
self._mac = mac
self._iface = iface
self._callbacks = {}
def __enter__(self):
"""
Context manager __enter__ for connecting the device
:rtype: btle.Peripheral
:return:
"""
self._conn = btle.Peripheral()
self._conn.withDelegate(self)
_LOGGER.debug("Trying to connect to %s", self._mac)
try:
self._conn.connect(self._mac, iface=self._iface)
except btle.BTLEException as ex:
_LOGGER.debug(
"Unable to connect to the device %s, retrying: %s", self._mac, ex
)
try:
self._conn.connect(self._mac, iface=self._iface)
except Exception as ex2:
_LOGGER.debug("Second connection try to %s failed: %s", self._mac, ex2)
raise BackendException(
"Unable to connect to device using bluepy"
) from ex2
_LOGGER.debug("Connected to %s", self._mac)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.disconnect()
self._conn = None
def handleNotification(self, handle, data):
"""Handle Callback from a Bluetooth (GATT) request."""
_LOGGER.debug(
"Got notification from %s: %s", handle, codecs.encode(data, "hex")
)
if handle in self._callbacks:
self._callbacks[handle](data)
@property
def mac(self):
"""Return the MAC address of the connected device."""
return self._mac
def set_callback(self, handle, function):
"""Set the callback for a Notification handle. It will be called with the parameter data, which is binary."""
self._callbacks[handle] = function
def make_request(self, handle, value, timeout=DEFAULT_TIMEOUT, with_response=True):
"""Write a GATT Command without callback - not utf-8."""
try:
with self:
_LOGGER.debug(
"Writing %s to %s with with_response=%s",
codecs.encode(value, "hex"),
handle,
with_response,
)
self._conn.writeCharacteristic(
handle, value, withResponse=with_response
)
if timeout:
_LOGGER.debug("Waiting for notifications for %s", timeout)
self._conn.waitForNotifications(timeout)
except btle.BTLEException as ex:
_LOGGER.debug("Got exception from bluepy while making a request: %s", ex)
raise BackendException("Exception on write using bluepy") from ex

View File

@ -0,0 +1,491 @@
"""
Support for eq3 Bluetooth Smart thermostats.
All temperatures in Celsius.
To get the current state, update() has to be called for powersaving reasons.
Schedule needs to be requested with query_schedule() before accessing for similar reasons.
"""
import codecs
import logging
import struct
from datetime import datetime, timedelta
from enum import IntEnum
from construct import Byte
from .structures import AwayDataAdapter, DeviceId, Schedule, Status
_LOGGER = logging.getLogger(__name__)
PROP_WRITE_HANDLE = 0x411
PROP_NTFY_HANDLE = 0x421
PROP_ID_QUERY = 0
PROP_ID_RETURN = 1
PROP_INFO_QUERY = 3
PROP_INFO_RETURN = 2
PROP_COMFORT_ECO_CONFIG = 0x11
PROP_OFFSET = 0x13
PROP_WINDOW_OPEN_CONFIG = 0x14
PROP_SCHEDULE_QUERY = 0x20
PROP_SCHEDULE_RETURN = 0x21
PROP_MODE_WRITE = 0x40
PROP_TEMPERATURE_WRITE = 0x41
PROP_COMFORT = 0x43
PROP_ECO = 0x44
PROP_BOOST = 0x45
PROP_LOCK = 0x80
EQ3BT_AWAY_TEMP = 12.0
EQ3BT_MIN_TEMP = 5.0
EQ3BT_MAX_TEMP = 29.5
EQ3BT_OFF_TEMP = 4.5
EQ3BT_ON_TEMP = 30.0
class Mode(IntEnum):
"""Thermostat modes."""
Unknown = -1
Closed = 0
Open = 1
Auto = 2
Manual = 3
Away = 4
Boost = 5
MODE_NOT_TEMP = [Mode.Unknown, Mode.Closed, Mode.Open]
class TemperatureException(Exception):
"""Temperature out of range error."""
pass
# pylint: disable=too-many-instance-attributes
class Thermostat:
"""Representation of a EQ3 Bluetooth Smart thermostat."""
def __init__(self, _mac, _iface=None, connection_cls=None):
"""Initialize the thermostat."""
self._target_temperature = Mode.Unknown
self._mode = Mode.Unknown
self._valve_state = Mode.Unknown
self._raw_mode = None
self._schedule = {}
self._window_open_temperature = None
self._window_open_time = None
self._comfort_temperature = None
self._eco_temperature = None
self._temperature_offset = None
self._away_temp = EQ3BT_AWAY_TEMP
self._away_duration = timedelta(days=30)
self._away_end = None
self._firmware_version = None
self._device_serial = None
if connection_cls is None:
from .bleakconnection import BleakConnection as connection_cls
self._conn = connection_cls(_mac, _iface)
self._conn.set_callback(PROP_NTFY_HANDLE, self.handle_notification)
def __str__(self):
away_end = "no"
if self.away_end:
away_end = "end: %s" % self._away_end
return "[{}] Target {} (mode: {}, away: {})".format(
self._conn.mac, self.target_temperature, self.mode_readable, away_end
)
def _verify_temperature(self, temp):
"""Verifies that the temperature is valid.
:raises TemperatureException: On invalid temperature.
"""
if temp < self.min_temp or temp > self.max_temp:
raise TemperatureException(
"Temperature {} out of range [{}, {}]".format(
temp, self.min_temp, self.max_temp
)
)
def parse_schedule(self, data):
"""Parses the device sent schedule."""
sched = Schedule.parse(data)
_LOGGER.debug("Got schedule data for day '%s'", sched.day)
return sched
def handle_notification(self, data):
"""Handle Callback from a Bluetooth (GATT) request."""
_LOGGER.debug("Received notification from the device..")
if data[0] == PROP_INFO_RETURN and data[1] == 1:
_LOGGER.debug("Got status: %s" % codecs.encode(data, "hex"))
status = Status.parse(data)
_LOGGER.debug("Parsed status: %s", status)
self._raw_mode = status.mode
self._valve_state = status.valve
self._target_temperature = status.target_temp
if status.mode.BOOST:
self._mode = Mode.Boost
elif status.mode.AWAY:
self._mode = Mode.Away
self._away_end = status.away
elif status.mode.MANUAL:
if status.target_temp == EQ3BT_OFF_TEMP:
self._mode = Mode.Closed
elif status.target_temp == EQ3BT_ON_TEMP:
self._mode = Mode.Open
else:
self._mode = Mode.Manual
else:
self._mode = Mode.Auto
presets = status.presets
if presets:
self._window_open_temperature = presets.window_open_temp
self._window_open_time = presets.window_open_time
self._comfort_temperature = presets.comfort_temp
self._eco_temperature = presets.eco_temp
self._temperature_offset = presets.offset
else:
self._window_open_temperature = None
self._window_open_time = None
self._comfort_temperature = None
self._eco_temperature = None
self._temperature_offset = None
_LOGGER.debug("Valve state: %s", self._valve_state)
_LOGGER.debug("Mode: %s", self.mode_readable)
_LOGGER.debug("Target temp: %s", self._target_temperature)
_LOGGER.debug("Away end: %s", self._away_end)
_LOGGER.debug("Window open temp: %s", self._window_open_temperature)
_LOGGER.debug("Window open time: %s", self._window_open_time)
_LOGGER.debug("Comfort temp: %s", self._comfort_temperature)
_LOGGER.debug("Eco temp: %s", self._eco_temperature)
_LOGGER.debug("Temp offset: %s", self._temperature_offset)
elif data[0] == PROP_SCHEDULE_RETURN:
parsed = self.parse_schedule(data)
self._schedule[parsed.day] = parsed
elif data[0] == PROP_ID_RETURN:
parsed = DeviceId.parse(data)
_LOGGER.debug("Parsed device data: %s", parsed)
self._firmware_version = parsed.version
self._device_serial = parsed.serial
else:
_LOGGER.debug(
"Unknown notification %s (%s)", data[0], codecs.encode(data, "hex")
)
def query_id(self):
"""Query device identification information, e.g. the serial number."""
_LOGGER.debug("Querying id..")
value = struct.pack("B", PROP_ID_QUERY)
self._conn.make_request(PROP_WRITE_HANDLE, value)
def update(self):
"""Update the data from the thermostat. Always sets the current time."""
_LOGGER.debug("Querying the device..")
time = datetime.now()
value = struct.pack(
"BBBBBBB",
PROP_INFO_QUERY,
time.year % 100,
time.month,
time.day,
time.hour,
time.minute,
time.second,
)
self._conn.make_request(PROP_WRITE_HANDLE, value)
def query_schedule(self, day):
_LOGGER.debug("Querying schedule..")
if day < 0 or day > 6:
_LOGGER.error("Invalid day: %s", day)
value = struct.pack("BB", PROP_SCHEDULE_QUERY, day)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def schedule(self):
"""Returns previously fetched schedule.
:return: Schedule structure or None if not fetched.
"""
return self._schedule
def set_schedule(self, data):
"""Sets the schedule for the given day."""
value = Schedule.build(data)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def target_temperature(self):
"""Return the temperature we try to reach."""
return self._target_temperature
@target_temperature.setter
def target_temperature(self, temperature):
"""Set new target temperature."""
dev_temp = int(temperature * 2)
if temperature == EQ3BT_OFF_TEMP or temperature == EQ3BT_ON_TEMP:
dev_temp |= 0x40
value = struct.pack("BB", PROP_MODE_WRITE, dev_temp)
else:
self._verify_temperature(temperature)
value = struct.pack("BB", PROP_TEMPERATURE_WRITE, dev_temp)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def mode(self):
"""Return the current operation mode"""
return self._mode
@mode.setter
def mode(self, mode):
"""Set the operation mode."""
_LOGGER.debug("Setting new mode: %s", mode)
if self.mode == Mode.Boost and mode != Mode.Boost:
self.boost = False
if mode == Mode.Boost:
self.boost = True
return
elif mode == Mode.Away:
end = datetime.now() + self._away_duration
return self.set_away(end, self._away_temp)
elif mode == Mode.Closed:
return self.set_mode(0x40 | int(EQ3BT_OFF_TEMP * 2))
elif mode == Mode.Open:
return self.set_mode(0x40 | int(EQ3BT_ON_TEMP * 2))
if mode == Mode.Manual:
temperature = max(
min(self._target_temperature, self.max_temp), self.min_temp
)
return self.set_mode(0x40 | int(temperature * 2))
else:
return self.set_mode(0)
@property
def away_end(self):
return self._away_end
def set_away(self, away_end=None, temperature=EQ3BT_AWAY_TEMP):
"""Sets away mode with target temperature.
When called without parameters disables away mode."""
if not away_end:
_LOGGER.debug("Disabling away, going to auto mode.")
return self.set_mode(0x00)
_LOGGER.debug("Setting away until %s, temp %s", away_end, temperature)
adapter = AwayDataAdapter(Byte[4])
packed = adapter.build(away_end)
self.set_mode(0x80 | int(temperature * 2), packed)
def set_mode(self, mode, payload=None):
value = struct.pack("BB", PROP_MODE_WRITE, mode)
if payload:
value += payload
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def mode_readable(self):
"""Return a readable representation of the mode.."""
ret = ""
mode = self._raw_mode
if mode.MANUAL:
ret = "manual"
if self.target_temperature < self.min_temp:
ret += " off"
elif self.target_temperature >= self.max_temp:
ret += " on"
else:
ret += " (%sC)" % self.target_temperature
else:
ret = "auto"
if mode.AWAY:
ret += " holiday"
if mode.BOOST:
ret += " boost"
if mode.DST:
ret += " dst"
if mode.WINDOW:
ret += " window"
if mode.LOCKED:
ret += " locked"
if mode.LOW_BATTERY:
ret += " low battery"
return ret
@property
def boost(self):
"""Returns True if the thermostat is in boost mode."""
return self.mode == Mode.Boost
@boost.setter
def boost(self, boost):
"""Sets boost mode."""
_LOGGER.debug("Setting boost mode: %s", boost)
value = struct.pack("BB", PROP_BOOST, bool(boost))
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def valve_state(self):
"""Returns the valve state. Probably reported as percent open."""
return self._valve_state
@property
def window_open(self):
"""Returns True if the thermostat reports a open window
(detected by sudden drop of temperature)"""
return self._raw_mode and self._raw_mode.WINDOW
def window_open_config(self, temperature, duration):
"""Configures the window open behavior. The duration is specified in
5 minute increments."""
_LOGGER.debug(
"Window open config, temperature: %s duration: %s", temperature, duration
)
self._verify_temperature(temperature)
if duration.seconds < 0 and duration.seconds > 3600:
raise ValueError
value = struct.pack(
"BBB",
PROP_WINDOW_OPEN_CONFIG,
int(temperature * 2),
int(duration.seconds / 300),
)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def window_open_temperature(self):
"""The temperature to set when an open window is detected."""
return self._window_open_temperature
@property
def window_open_time(self):
"""Timeout to reset the thermostat after an open window is detected."""
return self._window_open_time
@property
def locked(self):
"""Returns True if the thermostat is locked."""
return self._raw_mode and self._raw_mode.LOCKED
@locked.setter
def locked(self, lock):
"""Locks or unlocks the thermostat."""
_LOGGER.debug("Setting the lock: %s", lock)
value = struct.pack("BB", PROP_LOCK, bool(lock))
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def low_battery(self):
"""Returns True if the thermostat reports a low battery."""
return self._raw_mode and self._raw_mode.LOW_BATTERY
def temperature_presets(self, comfort, eco):
"""Set the thermostats preset temperatures comfort (sun) and
eco (moon)."""
_LOGGER.debug("Setting temperature presets, comfort: %s eco: %s", comfort, eco)
self._verify_temperature(comfort)
self._verify_temperature(eco)
value = struct.pack(
"BBB", PROP_COMFORT_ECO_CONFIG, int(comfort * 2), int(eco * 2)
)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def comfort_temperature(self):
"""Returns the comfort temperature preset of the thermostat."""
return self._comfort_temperature
@property
def eco_temperature(self):
"""Returns the eco temperature preset of the thermostat."""
return self._eco_temperature
@property
def temperature_offset(self):
"""Returns the thermostat's temperature offset."""
return self._temperature_offset
@temperature_offset.setter
def temperature_offset(self, offset):
"""Sets the thermostat's temperature offset."""
_LOGGER.debug("Setting offset: %s", offset)
# [-3,5 .. 0 .. 3,5 ]
# [00 .. 07 .. 0e ]
if offset < -3.5 or offset > 3.5:
raise TemperatureException("Invalid value: %s" % offset)
current = -3.5
values = {}
for i in range(15):
values[current] = i
current += 0.5
value = struct.pack("BB", PROP_OFFSET, values[offset])
self._conn.make_request(PROP_WRITE_HANDLE, value)
def activate_comfort(self):
"""Activates the comfort temperature."""
value = struct.pack("B", PROP_COMFORT)
self._conn.make_request(PROP_WRITE_HANDLE, value)
def activate_eco(self):
"""Activates the comfort temperature."""
value = struct.pack("B", PROP_ECO)
self._conn.make_request(PROP_WRITE_HANDLE, value)
@property
def min_temp(self):
"""Return the minimum temperature."""
return EQ3BT_MIN_TEMP
@property
def max_temp(self):
"""Return the maximum temperature."""
return EQ3BT_MAX_TEMP
@property
def firmware_version(self):
"""Return the firmware version."""
return self._firmware_version
@property
def device_serial(self):
"""Return the device serial number."""
return self._device_serial
@property
def mac(self):
"""Return the mac address."""
return self._conn.mac

View File

@ -0,0 +1,212 @@
""" Cli tool for testing connectivity with EQ3 smart thermostats. """
import logging
import re
import click
from eq3bt import Thermostat
pass_dev = click.make_pass_decorator(Thermostat)
def validate_mac(ctx, param, mac):
if re.match("^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$", mac) is None:
raise click.BadParameter(mac + " is no valid mac address")
return mac
@click.group(invoke_without_command=True)
@click.option("--mac", envvar="EQ3_MAC", required=True, callback=validate_mac)
@click.option("--interface", default=None)
@click.option("--debug/--normal", default=False)
@click.option(
"--backend", type=click.Choice(["bleak", "bluepy", "gattlib"]), default="bleak"
)
@click.pass_context
def cli(ctx, mac, interface, debug, backend):
"""Tool to query and modify the state of EQ3 BT smart thermostat."""
if debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if backend == "bluepy":
from .connection import BTLEConnection
connection_cls = BTLEConnection
elif backend == "gattlib":
from .gattlibconnection import BTLEConnection
connection_cls = BTLEConnection
else:
from .bleakconnection import BleakConnection
connection_cls = BleakConnection
thermostat = Thermostat(mac, interface, connection_cls)
thermostat.update()
ctx.obj = thermostat
if ctx.invoked_subcommand is None:
ctx.invoke(state)
@cli.command()
@click.option("--target", type=float, required=False)
@pass_dev
def temp(dev, target):
"""Gets or sets the target temperature."""
click.echo("Current target temp: %s" % dev.target_temperature)
if target:
click.echo("Setting target temp: %s" % target)
dev.target_temperature = target
@cli.command()
@click.option("--target", type=int, required=False)
@pass_dev
def mode(dev, target):
"""Gets or sets the active mode."""
click.echo("Current mode: %s" % dev.mode_readable)
if target:
click.echo("Setting mode: %s" % target)
dev.mode = target
@cli.command()
@click.option("--target", type=bool, required=False)
@pass_dev
def boost(dev, target):
"""Gets or sets the boost mode."""
click.echo("Boost: %s" % dev.boost)
if target is not None:
click.echo("Setting boost: %s" % target)
dev.boost = target
@cli.command()
@pass_dev
def valve_state(dev):
"""Gets the state of the valve."""
click.echo("Valve: %s" % dev.valve_state)
@cli.command()
@click.option("--target", type=bool, required=False)
@pass_dev
def locked(dev, target):
"""Gets or sets the lock."""
click.echo("Locked: %s" % dev.locked)
if target is not None:
click.echo("Setting lock: %s" % target)
dev.locked = target
@cli.command()
@pass_dev
def low_battery(dev):
"""Gets the low battery status."""
click.echo("Batter low: %s" % dev.low_battery)
@cli.command()
@click.option("--temp", type=float, required=False)
@click.option("--duration", type=float, required=False)
@pass_dev
def window_open(dev, temp, duration):
"""Gets and sets the window open settings."""
click.echo("Window open: %s" % dev.window_open)
if dev.window_open_temperature is not None:
click.echo("Window open temp: %s" % dev.window_open_temperature)
if dev.window_open_time is not None:
click.echo("Window open time: %s" % dev.window_open_time)
if temp and duration:
click.echo(f"Setting window open conf, temp: {temp} duration: {duration}")
dev.window_open_config(temp, duration)
@cli.command()
@click.option("--comfort", type=float, required=False)
@click.option("--eco", type=float, required=False)
@pass_dev
def presets(dev, comfort, eco):
"""Sets the preset temperatures for auto mode."""
if dev.comfort_temperature is not None:
click.echo("Current comfort temp: %s" % dev.comfort_temperature)
if dev.eco_temperature is not None:
click.echo("Current eco temp: %s" % dev.eco_temperature)
if comfort and eco:
click.echo(f"Setting presets: comfort {comfort}, eco {eco}")
dev.temperature_presets(comfort, eco)
@cli.command()
@pass_dev
def schedule(dev):
"""Gets the schedule from the thermostat."""
# TODO: expose setting the schedule somehow?
for d in range(7):
dev.query_schedule(d)
for day in dev.schedule.values():
click.echo(f"Day {day.day}, base temp: {day.base_temp}")
current_hour = day.next_change_at
for hour in day.hours:
if current_hour == 0:
continue
click.echo(f"\t[{current_hour}-{hour.next_change_at}] {hour.target_temp}")
current_hour = hour.next_change_at
@cli.command()
@click.argument("offset", type=float, required=False)
@pass_dev
def offset(dev, offset):
"""Sets the temperature offset [-3,5 3,5]"""
if dev.temperature_offset is not None:
click.echo("Current temp offset: %s" % dev.temperature_offset)
if offset is not None:
click.echo("Setting the offset to %s" % offset)
dev.temperature_offset = offset
@cli.command()
@click.argument("away_end", type=click.DateTime(), default=None, required=False)
@click.argument("temperature", type=float, default=None, required=False)
@pass_dev
def away(dev, away_end, temperature):
"""Enables or disables the away mode."""
if away_end:
click.echo(f"Setting away until {away_end}, temperature: {temperature}")
else:
click.echo("Disabling away mode")
dev.set_away(away_end, temperature)
@cli.command()
@pass_dev
def device(dev):
"""Displays basic device information."""
dev.query_id()
click.echo("Firmware version: %s" % dev.firmware_version)
click.echo("Device serial: %s" % dev.device_serial)
@cli.command()
@click.pass_context
def state(ctx):
"""Prints out all available information."""
dev = ctx.obj
click.echo(dev)
ctx.forward(locked)
ctx.forward(low_battery)
ctx.forward(window_open)
ctx.forward(boost)
ctx.forward(temp)
ctx.forward(presets)
ctx.forward(offset)
ctx.forward(mode)
ctx.forward(valve_state)
if __name__ == "__main__":
cli()

View File

@ -0,0 +1,99 @@
"""
A simple adapter to gattlib.
Handles Connection duties (reconnecting etc.) transparently.
"""
import codecs
import logging
import threading
import gattlib
from . import BackendException
DEFAULT_TIMEOUT = 1
_LOGGER = logging.getLogger(__name__)
class BTLEConnection:
"""Representation of a BTLE Connection."""
def __init__(self, mac, iface):
"""Initialize the connection."""
self._conn = None
self._mac = mac
self._iface = iface
self._callbacks = {}
self._notifyevent = None
def __enter__(self):
"""
Context manager __enter__ for connecting the device
:rtype: BTLEConnection
:return:
"""
_LOGGER.debug("Trying to connect to %s", self._mac)
if self._iface is None:
self._conn = gattlib.GATTRequester(self._mac, False)
else:
self._conn = gattlib.GATTRequester(self._mac, False, self._iface)
self._conn.on_notification = self.on_notification
try:
self._conn.connect()
except gattlib.BTBaseException as ex:
_LOGGER.debug(
"Unable to connect to the device %s, retrying: %s", self._mac, ex
)
try:
self._conn.connect()
except Exception as ex2:
_LOGGER.debug("Second connection try to %s failed: %s", self._mac, ex2)
raise BackendException(
"unable to connect to device using gattlib"
) from ex2
_LOGGER.debug("Connected to %s", self._mac)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.disconnect()
self._conn = None
def on_notification(self, handle, data):
"""Handle Callback from a Bluetooth (GATT) request."""
_LOGGER.debug(
"Got notification from %s: %s", handle, codecs.encode(data, "hex")
)
if handle in self._callbacks:
self._callbacks[handle](data[3:])
if self._notifyevent:
self._notifyevent.set()
@property
def mac(self):
"""Return the MAC address of the connected device."""
return self._mac
def set_callback(self, handle, function):
"""Set the callback for a Notification handle. It will be called with the parameter data, which is binary."""
self._callbacks[handle] = function
def make_request(self, handle, value, timeout=DEFAULT_TIMEOUT, with_response=True):
"""Write a GATT Command without callback - not utf-8."""
try:
with self:
_LOGGER.debug(
"Writing %s to %s",
codecs.encode(value, "hex"),
handle,
)
self._notifyevent = threading.Event()
self._conn.write_by_handle(handle, value)
if timeout:
_LOGGER.debug("Waiting for notifications for %s", timeout)
self._notifyevent.wait(timeout)
except gattlib.BTBaseException as ex:
_LOGGER.debug("Got exception from gattlib while making a request: %s", ex)
raise BackendException("Exception on write using gattlib") from ex

View File

@ -0,0 +1,174 @@
""" Contains construct adapters and structures. """
from datetime import datetime, time, timedelta
from construct import (
Adapter,
Bytes,
Const,
Enum,
FlagsEnum,
GreedyRange,
IfThenElse,
Int8ub,
Optional,
Struct,
)
PROP_ID_RETURN = 1
PROP_INFO_RETURN = 2
PROP_SCHEDULE_SET = 0x10
PROP_SCHEDULE_RETURN = 0x21
NAME_TO_DAY = {"sat": 0, "sun": 1, "mon": 2, "tue": 3, "wed": 4, "thu": 5, "fri": 6}
NAME_TO_CMD = {"write": PROP_SCHEDULE_SET, "response": PROP_SCHEDULE_RETURN}
HOUR_24_PLACEHOLDER = 1234
class TimeAdapter(Adapter):
"""Adapter to encode and decode schedule times."""
def _decode(self, obj, ctx, path):
h, m = divmod(obj * 10, 60)
if h == 24: # HACK, can we do better?
return HOUR_24_PLACEHOLDER
return time(hour=h, minute=m)
def _encode(self, obj, ctx, path):
# TODO: encode h == 24 hack
if obj == HOUR_24_PLACEHOLDER:
return int(24 * 60 / 10)
encoded = int((obj.hour * 60 + obj.minute) / 10)
return encoded
class TempAdapter(Adapter):
"""Adapter to encode and decode temperature."""
def _decode(self, obj, ctx, path):
return float(obj / 2.0)
def _encode(self, obj, ctx, path):
return int(obj * 2.0)
class WindowOpenTimeAdapter(Adapter):
"""Adapter to encode and decode window open times (5 min increments)."""
def _decode(self, obj, context, path):
return timedelta(minutes=float(obj * 5.0))
def _encode(self, obj, context, path):
if isinstance(obj, timedelta):
obj = obj.seconds
if 0 <= obj <= 3600.0:
return int(obj / 300.0)
raise ValueError(
"Window open time must be between 0 and 60 minutes "
"in intervals of 5 minutes."
)
class TempOffsetAdapter(Adapter):
"""Adapter to encode and decode the temperature offset."""
def _decode(self, obj, context, path):
return float((obj - 7) / 2.0)
def _encode(self, obj, context, path):
if -3.5 <= obj <= 3.5:
return int(obj * 2.0) + 7
raise ValueError(
"Temperature offset must be between -3.5 and 3.5 (in " "intervals of 0.5)."
)
ModeFlags = "ModeFlags" / FlagsEnum(
Int8ub,
AUTO=0x00, # always True, doesnt affect building
MANUAL=0x01,
AWAY=0x02,
BOOST=0x04,
DST=0x08,
WINDOW=0x10,
LOCKED=0x20,
UNKNOWN=0x40,
LOW_BATTERY=0x80,
)
class AwayDataAdapter(Adapter):
"""Adapter to encode and decode away data."""
def _decode(self, obj, ctx, path):
(day, year, hour_min, month) = obj
year += 2000
min = 0
if hour_min & 0x01:
min = 30
hour = int(hour_min / 2)
return datetime(year=year, month=month, day=day, hour=hour, minute=min)
def _encode(self, obj, ctx, path):
if obj.year < 2000 or obj.year > 2099:
raise Exception("Invalid year, possible [2000,2099]")
year = obj.year - 2000
hour = obj.hour * 2
if obj.minute: # we encode all minute values to h:30
hour |= 0x01
return (obj.day, year, hour, obj.month)
class DeviceSerialAdapter(Adapter):
"""Adapter to decode the device serial number."""
def _decode(self, obj, context, path):
return bytearray(n - 0x30 for n in obj).decode()
Status = "Status" / Struct(
"cmd" / Const(PROP_INFO_RETURN, Int8ub),
Const(0x01, Int8ub),
"mode" / ModeFlags,
"valve" / Int8ub,
Const(0x04, Int8ub),
"target_temp" / TempAdapter(Int8ub),
"away"
/ IfThenElse( # noqa: W503
lambda ctx: ctx.mode.AWAY, AwayDataAdapter(Bytes(4)), Optional(Bytes(4))
),
"presets"
/ Optional( # noqa: W503
Struct(
"window_open_temp" / TempAdapter(Int8ub),
"window_open_time" / WindowOpenTimeAdapter(Int8ub),
"comfort_temp" / TempAdapter(Int8ub),
"eco_temp" / TempAdapter(Int8ub),
"offset" / TempOffsetAdapter(Int8ub),
)
),
)
Schedule = "Schedule" / Struct(
"cmd" / Enum(Int8ub, **NAME_TO_CMD),
"day" / Enum(Int8ub, **NAME_TO_DAY),
"base_temp" / TempAdapter(Int8ub),
"next_change_at" / TimeAdapter(Int8ub),
"hours"
/ GreedyRange( # noqa: W503
Struct(
"target_temp" / TempAdapter(Int8ub),
"next_change_at" / TimeAdapter(Int8ub),
)
),
)
DeviceId = "DeviceId" / Struct(
"cmd" / Const(PROP_ID_RETURN, Int8ub),
"version" / Int8ub,
Int8ub,
Int8ub,
"serial" / DeviceSerialAdapter(Bytes(10)),
Int8ub,
)

View File

@ -0,0 +1,198 @@
import codecs
from datetime import datetime, timedelta
from unittest import TestCase
import pytest
from eq3bt import TemperatureException, Thermostat
from eq3bt.eq3btsmart import PROP_ID_QUERY, PROP_INFO_QUERY, PROP_NTFY_HANDLE, Mode
ID_RESPONSE = b"01780000807581626163606067659e"
STATUS_RESPONSES = {
"auto": b"020100000428",
"manual": b"020101000428",
"window": b"020110000428",
"away": b"0201020004231d132e03",
"boost": b"020104000428",
"low_batt": b"020180000428",
"valve_at_22": b"020100160428",
"presets": b"020100000422000000001803282207",
}
class FakeConnection:
def __init__(self, _iface, mac):
self._callbacks = {}
self._res = "auto"
def set_callback(self, handle, cb):
self._callbacks[handle] = cb
def set_status(self, key):
if key in STATUS_RESPONSES:
self._res = key
else:
raise ValueError("Invalid key for status test response.")
def make_request(self, handle, value, timeout=1, with_response=True):
"""Write a GATT Command without callback - not utf-8."""
if with_response:
cb = self._callbacks.get(PROP_NTFY_HANDLE)
if value[0] == PROP_ID_QUERY:
data = ID_RESPONSE
elif value[0] == PROP_INFO_QUERY:
data = STATUS_RESPONSES[self._res]
else:
return
cb(codecs.decode(data, "hex"))
class TestThermostat(TestCase):
def setUp(self):
self.thermostat = Thermostat(
_mac=None, _iface=None, connection_cls=FakeConnection
)
def test__verify_temperature(self):
with self.assertRaises(TemperatureException):
self.thermostat._verify_temperature(-1)
with self.assertRaises(TemperatureException):
self.thermostat._verify_temperature(35)
self.thermostat._verify_temperature(8)
self.thermostat._verify_temperature(25)
@pytest.mark.skip()
def test_parse_schedule(self):
self.fail()
@pytest.mark.skip()
def test_handle_notification(self):
self.fail()
def test_query_id(self):
self.thermostat.query_id()
self.assertEqual(self.thermostat.firmware_version, 120)
self.assertEqual(self.thermostat.device_serial, "PEQ2130075")
def test_update(self):
th = self.thermostat
th._conn.set_status("auto")
th.update()
self.assertEqual(th.valve_state, 0)
self.assertEqual(th.mode, Mode.Auto)
self.assertEqual(th.target_temperature, 20.0)
self.assertFalse(th.locked)
self.assertFalse(th.low_battery)
self.assertFalse(th.boost)
self.assertFalse(th.window_open)
th._conn.set_status("manual")
th.update()
self.assertTrue(th.mode, Mode.Manual)
th._conn.set_status("away")
th.update()
self.assertEqual(th.mode, Mode.Away)
self.assertEqual(th.target_temperature, 17.5)
self.assertEqual(th.away_end, datetime(2019, 3, 29, 23, 00))
th._conn.set_status("boost")
th.update()
self.assertTrue(th.boost)
self.assertEqual(th.mode, Mode.Boost)
def test_presets(self):
th = self.thermostat
self.thermostat._conn.set_status("presets")
self.thermostat.update()
self.assertEqual(th.window_open_temperature, 12.0)
self.assertEqual(th.window_open_time, timedelta(minutes=15.0))
self.assertEqual(th.comfort_temperature, 20.0)
self.assertEqual(th.eco_temperature, 17.0)
self.assertEqual(th.temperature_offset, 0)
@pytest.mark.skip()
def test_query_schedule(self):
self.fail()
@pytest.mark.skip()
def test_schedule(self):
self.fail()
@pytest.mark.skip()
def test_set_schedule(self):
self.fail()
@pytest.mark.skip()
def test_target_temperature(self):
self.fail()
@pytest.mark.skip()
def test_mode(self):
self.fail()
@pytest.mark.skip()
def test_mode_readable(self):
self.fail()
@pytest.mark.skip()
def test_boost(self):
self.fail()
def test_valve_state(self):
th = self.thermostat
th._conn.set_status("valve_at_22")
th.update()
self.assertEqual(th.valve_state, 22)
def test_window_open(self):
th = self.thermostat
th._conn.set_status("window")
th.update()
self.assertTrue(th.window_open)
@pytest.mark.skip()
def test_window_open_config(self):
self.fail()
@pytest.mark.skip()
def test_locked(self):
self.fail()
@pytest.mark.skip()
def test_low_battery(self):
th = self.thermostat
th._conn.set_status("low_batt")
th.update()
self.assertTrue(th.low_battery)
@pytest.mark.skip()
def test_temperature_offset(self):
self.fail()
@pytest.mark.skip()
def test_activate_comfort(self):
self.fail()
@pytest.mark.skip()
def test_activate_eco(self):
self.fail()
@pytest.mark.skip()
def test_min_temp(self):
self.fail()
@pytest.mark.skip()
def test_max_temp(self):
self.fail()
@pytest.mark.skip()
def test_away_end(self):
self.fail()
@pytest.mark.skip()
def test_decode_mode(self):
self.fail()