This repository has been archived on 2023-06-10. You can view files and clone it, but cannot push or open issues or pull requests.
homeassistant-config/deps/lib/python3.10/site-packages/eq3bt/gattlibconnection.py
darthsandmann c74c45998c updates
2022-11-29 19:10:30 +01:00

100 lines
3.2 KiB
Python

"""
A simple adapter to gattlib.
Handles Connection duties (reconnecting etc.) transparently.
"""
import codecs
import logging
import threading
import gattlib
from . import BackendException
DEFAULT_TIMEOUT = 1
_LOGGER = logging.getLogger(__name__)
class BTLEConnection:
"""Representation of a BTLE Connection."""
def __init__(self, mac, iface):
"""Initialize the connection."""
self._conn = None
self._mac = mac
self._iface = iface
self._callbacks = {}
self._notifyevent = None
def __enter__(self):
"""
Context manager __enter__ for connecting the device
:rtype: BTLEConnection
:return:
"""
_LOGGER.debug("Trying to connect to %s", self._mac)
if self._iface is None:
self._conn = gattlib.GATTRequester(self._mac, False)
else:
self._conn = gattlib.GATTRequester(self._mac, False, self._iface)
self._conn.on_notification = self.on_notification
try:
self._conn.connect()
except gattlib.BTBaseException as ex:
_LOGGER.debug(
"Unable to connect to the device %s, retrying: %s", self._mac, ex
)
try:
self._conn.connect()
except Exception as ex2:
_LOGGER.debug("Second connection try to %s failed: %s", self._mac, ex2)
raise BackendException(
"unable to connect to device using gattlib"
) from ex2
_LOGGER.debug("Connected to %s", self._mac)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.disconnect()
self._conn = None
def on_notification(self, handle, data):
"""Handle Callback from a Bluetooth (GATT) request."""
_LOGGER.debug(
"Got notification from %s: %s", handle, codecs.encode(data, "hex")
)
if handle in self._callbacks:
self._callbacks[handle](data[3:])
if self._notifyevent:
self._notifyevent.set()
@property
def mac(self):
"""Return the MAC address of the connected device."""
return self._mac
def set_callback(self, handle, function):
"""Set the callback for a Notification handle. It will be called with the parameter data, which is binary."""
self._callbacks[handle] = function
def make_request(self, handle, value, timeout=DEFAULT_TIMEOUT, with_response=True):
"""Write a GATT Command without callback - not utf-8."""
try:
with self:
_LOGGER.debug(
"Writing %s to %s",
codecs.encode(value, "hex"),
handle,
)
self._notifyevent = threading.Event()
self._conn.write_by_handle(handle, value)
if timeout:
_LOGGER.debug("Waiting for notifications for %s", timeout)
self._notifyevent.wait(timeout)
except gattlib.BTBaseException as ex:
_LOGGER.debug("Got exception from gattlib while making a request: %s", ex)
raise BackendException("Exception on write using gattlib") from ex