"""
Creator : k0rventen
License : MIT
Source : https://github.com/k0rventen/avea
"""
import asyncio
import logging
import math
import re
import threading
from contextlib import suppress
from typing import Awaitable, Callable, Optional, Sequence
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
from bleak.backends.device import BLEDevice
from bleak_retry_connector import establish_connection
__all__ = [
"Bulb",
"discover_avea_bulbs",
"compute_brightness",
"compute_transition_table",
"compute_color",
"check_bounds",
]
FIRMWARE_REVISION_UUID = "00002a26-0000-1000-8000-00805f9b34fb"
SERIAL_NUMBER_UUID = "00002a25-0000-1000-8000-00805f9b34fb"
HARDWARE_REVISION_UUID = "00002a27-0000-1000-8000-00805f9b34fb"
MANUFACTURER_NAME_UUID = "00002a29-0000-1000-8000-00805f9b34fb"
AVEA_SERVICE_UUID = "f815e810-456c-6761-746f-4d756e696368"
CONTROL_CHARACTERISTIC_UUID = "f815e811-456c-6761-746f-4d756e696368"
MAX_TRANSITION_FPS = 5
_LOGGER = logging.getLogger(__name__)
_FW_VERSION_RE = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)")
def _format_firmware_version(value: str) -> str:
match = _FW_VERSION_RE.match(value)
if not match:
return value
major, minor, patch, build = match.groups()
return f"{major}.{minor}.{patch} ({build})"
[docs]
class Bulb:
"""Represent and control a single Elgato Avea bulb.
The class exposes a synchronous public API while using ``asyncio`` and
``bleak`` internally for Bluetooth Low Energy communication. Methods that
read or write bulb state open a connection on demand when the bulb is not
already connected and close it again afterwards.
Args:
address: Bluetooth device address of the Avea bulb.
"""
def __init__(self, address: str):
self.addr = address
self.name = "Unknown"
self.fw_version = "Unknown"
self.serial_number = "Unknown"
self.hardware_revision = "Unknown"
self.manufacturer_name = "Unknown"
self.red = 0
self.blue = 0
self.green = 0
self.brightness = 0
self.white = 0
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
self._loop_ready: Optional[threading.Event] = None
self._client: Optional[BleakClient] = None
self._notification_event: Optional[asyncio.Event] = None
self._expected_cmd: Optional[int] = None
self._op_lock = threading.RLock()
self._color_known = False
self._device: Optional[BLEDevice] = None
# ------------------------------------------------------------------
# Event loop helpers
# ------------------------------------------------------------------
def _start_loop(self, loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
if self._loop_ready:
self._loop_ready.set()
loop.run_forever()
def _ensure_loop(self) -> None:
if self._loop is not None:
return
loop = asyncio.new_event_loop()
self._loop = loop
self._loop_ready = threading.Event()
self._loop_thread = threading.Thread(
target=self._start_loop, args=(loop,), daemon=True
)
self._loop_thread.start()
self._loop_ready.wait()
def _submit(self, coro: Awaitable):
self._ensure_loop()
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result()
def _shutdown_loop(self) -> None:
loop = self._loop
thread = self._loop_thread
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
if thread and thread.is_alive():
thread.join(timeout=1)
loop.close()
self._loop = None
self._loop_thread = None
self._loop_ready = None
[docs]
def close(self) -> None:
"""Close the BLE connection and stop the internal event loop.
Calling this method is safe even when the bulb is already closed. Use it
when the object is no longer needed to release Bluetooth and thread
resources explicitly.
"""
with self._op_lock:
if self._loop is None:
return
with suppress(Exception):
if self._client:
self._submit(self._disconnect())
self._shutdown_loop()
def __del__(self):
with suppress(Exception):
self.close()
# ------------------------------------------------------------------
# Connection handling
# ------------------------------------------------------------------
def _notification_handler(self, _: int, data: bytearray) -> None:
payload = bytes(data)
self.process_notification(payload)
event = self._notification_event
expected = self._expected_cmd
if event is not None and (expected is None or (payload and payload[0] == expected)):
event.set()
async def _connect(self) -> bool:
if self._client and self._client.is_connected:
return True
client: Optional[BleakClient] = None
try:
device: Optional[BLEDevice]
if isinstance(self.addr, BLEDevice):
device = self.addr
elif self._device:
device = self._device
else:
device = await BleakScanner.find_device_by_address(
self.addr, timeout=5.0
)
if device is None:
raise BleakError(f"Device {self.addr} not found")
self._device = device
display_name = (
device.name
or (self.name if self.name != "Unknown" else self.addr)
)
client = await establish_connection(
BleakClient,
device,
display_name,
timeout=15.0,
use_services_cache=False,
)
await client.start_notify(
CONTROL_CHARACTERISTIC_UUID,
self._notification_handler,
)
except (BleakError, asyncio.TimeoutError, OSError):
_LOGGER.warning(
"Could not connect to the Bulb %s",
self.addr,
exc_info=True,
)
with suppress(Exception):
if client:
await client.disconnect()
return False
self._client = client
return True
async def _disconnect(self) -> None:
client = self._client
self._client = None
if not client:
return
with suppress(Exception):
await client.stop_notify(CONTROL_CHARACTERISTIC_UUID)
with suppress(Exception):
await client.disconnect()
[docs]
def subscribe_to_notification(self):
"""Return whether notification handling is enabled.
Notifications are subscribed automatically during connection setup, so
callers normally do not need to call this method directly.
Returns:
Always ``True``.
"""
return True
[docs]
def connect(self) -> bool:
"""Connect to the bulb over Bluetooth Low Energy.
Returns:
``True`` if a connection is available, otherwise ``False``.
"""
with self._op_lock:
return self._submit(self._connect())
[docs]
def disconnect(self) -> None:
"""Disconnect from the bulb if currently connected."""
with self._op_lock:
if not self._client:
return
self._submit(self._disconnect())
def _is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
# ------------------------------------------------------------------
# Command helpers
# ------------------------------------------------------------------
async def _write_command(self, payload: bytes, *, with_response: bool = False) -> None:
if not self._client or not self._client.is_connected:
raise BleakError("Client is not connected")
await self._client.write_gatt_char(
CONTROL_CHARACTERISTIC_UUID,
payload,
response=with_response,
)
async def _request_notification(
self,
command: bytes,
expected_cmd: Optional[int],
delay: float = 0.0,
timeout: float = 1.0,
) -> None:
if not self._client or not self._client.is_connected:
raise BleakError("Client is not connected")
if delay:
await asyncio.sleep(delay)
event = asyncio.Event()
self._notification_event = event
self._expected_cmd = expected_cmd
try:
await self._client.write_gatt_char(CONTROL_CHARACTERISTIC_UUID, command, response=False)
await asyncio.wait_for(event.wait(), timeout)
except asyncio.TimeoutError:
pass
finally:
self._notification_event = None
self._expected_cmd = None
async def _read_firmware_version(self) -> str:
if not self._client:
return ""
try:
payload = await self._client.read_gatt_char(FIRMWARE_REVISION_UUID)
except (BleakError, AttributeError, OSError):
_LOGGER.warning(
"Could not read firmware version from bulb %s",
self.addr,
exc_info=True,
)
return ""
if isinstance(payload, bytearray):
payload = bytes(payload)
try:
return payload.decode("utf-8").rstrip("\x00")
except UnicodeDecodeError:
_LOGGER.warning(
"Could not decode firmware version from bulb %s",
self.addr,
exc_info=True,
)
return ""
async def _read_serial_number(self) -> str:
if not self._client:
return ""
try:
payload = await self._client.read_gatt_char(SERIAL_NUMBER_UUID)
except (BleakError, AttributeError, OSError):
_LOGGER.warning(
"Could not read serial number from bulb %s",
self.addr,
exc_info=True,
)
return ""
if isinstance(payload, bytearray):
payload = bytes(payload)
try:
return payload.decode("utf-8").split("\x00", 1)[0]
except UnicodeDecodeError:
_LOGGER.warning(
"Could not decode serial number from bulb %s",
self.addr,
exc_info=True,
)
return ""
async def _read_hardware_revision(self) -> str:
if not self._client:
return ""
try:
payload = await self._client.read_gatt_char(HARDWARE_REVISION_UUID)
except (BleakError, AttributeError, OSError):
_LOGGER.warning(
"Could not read hardware revision from bulb %s",
self.addr,
exc_info=True,
)
return ""
if isinstance(payload, bytearray):
payload = bytes(payload)
try:
return payload.decode("utf-8").rstrip("\x00")
except UnicodeDecodeError:
_LOGGER.warning(
"Could not decode hardware revision from bulb %s",
self.addr,
exc_info=True,
)
return ""
async def _read_manufacturer_name(self) -> str:
if not self._client:
return ""
try:
payload = await self._client.read_gatt_char(MANUFACTURER_NAME_UUID)
except (BleakError, AttributeError, OSError):
_LOGGER.warning(
"Could not read manufacturer name from bulb %s",
self.addr,
exc_info=True,
)
return ""
if isinstance(payload, bytearray):
payload = bytes(payload)
try:
return payload.decode("utf-8").rstrip("\x00")
except UnicodeDecodeError:
_LOGGER.warning(
"Could not decode manufacturer name from bulb %s",
self.addr,
exc_info=True,
)
return ""
async def _smooth_transition(
self,
red_table: Sequence[int],
green_table: Sequence[int],
blue_table: Sequence[int],
interval: float,
) -> None:
if not self._client or not self._client.is_connected:
raise BleakError("Client is not connected")
last_payload = None
for index, (r, g, b) in enumerate(zip(red_table, green_table, blue_table), start=1):
payload = compute_color(
check_bounds(0),
check_bounds(r),
check_bounds(g),
check_bounds(b),
)
try:
await self._write_command(payload, with_response=False)
except (BleakError, asyncio.TimeoutError, OSError):
_LOGGER.warning(
"Lost connection during smooth transition for bulb %s; reconnecting",
self.addr,
exc_info=True,
)
await self._disconnect()
if not await self._connect():
break
await self._write_command(payload, with_response=False)
if interval:
await asyncio.sleep(interval)
last_payload = payload
if last_payload is not None:
await self._write_command(last_payload, with_response=True)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def get_fw_version(self) -> str:
"""Read the bulb firmware version.
Returns:
Firmware version as a formatted string, or an empty string when it
cannot be read.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
self.fw_version = ""
return ""
value = self._submit(self._read_firmware_version())
if not already_connected:
self.disconnect()
result = _format_firmware_version(value) if isinstance(value, str) else ""
self.fw_version = result
return result
[docs]
def get_serial_number(self) -> str:
"""Read the bulb serial number.
Returns:
Serial number reported by the bulb, or an empty string when it
cannot be read.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
self.serial_number = ""
return ""
try:
value = self._submit(self._read_serial_number())
finally:
if not already_connected:
self.disconnect()
result = value if isinstance(value, str) else ""
self.serial_number = result
return result
[docs]
def get_hardware_revision(self) -> str:
"""Read the bulb hardware revision.
Returns:
Hardware revision reported by the bulb, or an empty string when it
cannot be read.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
self.hardware_revision = ""
return ""
value = self._submit(self._read_hardware_revision())
if not already_connected:
self.disconnect()
result = value if isinstance(value, str) else ""
self.hardware_revision = result
return result
[docs]
def get_manufacturer_name(self) -> str:
"""Read the bulb manufacturer name.
Returns:
Manufacturer name reported by the bulb, or an empty string when it
cannot be read.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
self.manufacturer_name = ""
return ""
value = self._submit(self._read_manufacturer_name())
if not already_connected:
self.disconnect()
result = value if isinstance(value, str) else ""
self.manufacturer_name = result
return result
[docs]
def set_brightness(self, brightness):
"""Set the bulb brightness.
Args:
brightness: Brightness value from ``0`` to ``4095``. Values outside
this range are clamped.
"""
with self._op_lock:
payload = compute_brightness(check_bounds(brightness))
already_connected = self._is_connected()
if not already_connected and not self.connect():
return
self._submit(self._write_command(payload))
if not already_connected:
self.disconnect()
self.brightness = check_bounds(brightness)
[docs]
def get_brightness(self):
"""Read the current bulb brightness.
Returns:
Brightness value from ``0`` to ``4095``. If the bulb cannot be
reached, the last cached value is returned.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
return self.brightness
self._submit(
self._request_notification(b"\x57", expected_cmd=0x57, delay=0.5)
)
if not already_connected:
self.disconnect()
return self.brightness
[docs]
def set_color(self, white, red, green, blue):
"""Set the bulb color using 12-bit white, red, green and blue channels.
Args:
white: White channel value from ``0`` to ``4095``.
red: Red channel value from ``0`` to ``4095``.
green: Green channel value from ``0`` to ``4095``.
blue: Blue channel value from ``0`` to ``4095``.
Notes:
Values outside the accepted range are clamped before being sent.
"""
with self._op_lock:
payload = compute_color(
check_bounds(white),
check_bounds(red),
check_bounds(green),
check_bounds(blue),
)
already_connected = self._is_connected()
if not already_connected and not self.connect():
return
self._submit(self._write_command(payload))
if not already_connected:
self.disconnect()
self.white = check_bounds(white)
self.red = check_bounds(red)
self.green = check_bounds(green)
self.blue = check_bounds(blue)
self._color_known = True
[docs]
def set_rgb(self, red, green, blue):
"""Set the bulb color using standard RGB channel values.
Args:
red: Red channel value from ``0`` to ``255``.
green: Green channel value from ``0`` to ``255``.
blue: Blue channel value from ``0`` to ``255``.
Notes:
RGB values are converted to the internal 12-bit channel scale.
"""
with self._op_lock:
self.set_color(0, red * 16, green * 16, blue * 16)
[docs]
def set_smooth_transition(self, target_red, target_green, target_blue, duration=2, fps=60):
"""Fade smoothly from the current color to a target RGB color.
Args:
target_red: Target red channel value from ``0`` to ``255``.
target_green: Target green channel value from ``0`` to ``255``.
target_blue: Target blue channel value from ``0`` to ``255``.
duration: Transition duration in seconds.
fps: Requested updates per second. The effective value is capped by
the library to keep BLE communication stable.
"""
if self._color_known:
init_r = self.red
init_g = self.green
init_b = self.blue
else:
try:
init_w, init_r, init_g, init_b = self.get_color()
except (BleakError, asyncio.TimeoutError, OSError):
_LOGGER.warning(
"Could not read current color before smooth transition for bulb %s",
self.addr,
exc_info=True,
)
return
with self._op_lock:
clamped_fps = max(1, min(int(fps), MAX_TRANSITION_FPS))
iterations = max(1, int(duration * clamped_fps))
interval = 0 if clamped_fps <= 0 else 1 / clamped_fps
target_red_12 = check_bounds(target_red * 16)
target_green_12 = check_bounds(target_green * 16)
target_blue_12 = check_bounds(target_blue * 16)
red_table = compute_transition_table(init_r, target_red_12, iterations)
green_table = compute_transition_table(init_g, target_green_12, iterations)
blue_table = compute_transition_table(init_b, target_blue_12, iterations)
already_connected = self._is_connected()
if not already_connected and not self.connect():
return
self._submit(
self._smooth_transition(red_table, green_table, blue_table, interval)
)
self.red = target_red_12
self.green = target_green_12
self.blue = target_blue_12
self._color_known = True
if not already_connected:
self.disconnect()
[docs]
def get_color(self):
"""Read the current 12-bit white, red, green and blue channel values.
Returns:
Tuple ``(white, red, green, blue)`` with values from ``0`` to
``4095``. If the bulb cannot be reached, the last cached values are
returned.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
return self.white, self.red, self.green, self.blue
self._submit(
self._request_notification(b"\x35", expected_cmd=0x35, delay=0.5)
)
if not already_connected:
self.disconnect()
self._color_known = True
return self.white, self.red, self.green, self.blue
[docs]
def get_rgb(self):
"""Read the current color as standard RGB values.
Returns:
Tuple ``(red, green, blue)`` with channel values from ``0`` to
``255``. If the bulb cannot be reached, the last cached values are
returned.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
return int(self.red / 16), int(self.green / 16), int(self.blue / 16)
self._submit(
self._request_notification(b"\x35", expected_cmd=0x35, delay=0.5)
)
if not already_connected:
self.disconnect()
self._color_known = True
return int(self.red / 16), int(self.green / 16), int(self.blue / 16)
[docs]
def get_name(self):
"""Read the bulb name.
Returns:
Name reported by the bulb. If the bulb cannot be reached, the last
cached name is returned.
"""
with self._op_lock:
already_connected = self._is_connected()
if not already_connected and not self.connect():
return self.name
self._submit(
self._request_notification(b"\x58", expected_cmd=0x58, delay=0.5)
)
if not already_connected:
self.disconnect()
return self.name
[docs]
def set_name(self, name: str):
"""Set the bulb name.
Args:
name: New UTF-8 encoded name to send to the bulb.
"""
with self._op_lock:
byte_name = name.encode("utf-8")
command = b"\x58" + byte_name
already_connected = self._is_connected()
if not already_connected and not self.connect():
return
self._submit(self._write_command(command))
if not already_connected:
self.disconnect()
[docs]
def process_notification(self, data: bytes):
"""Update cached bulb state from a raw BLE notification payload.
Args:
data: Notification bytes received from the Avea control
characteristic.
"""
if not data:
return
cmd = data[0]
values = data[1:]
if cmd == 0x57:
self.brightness = int.from_bytes(values, "little")
elif cmd == 0x35:
hex_val = values.hex()
self.red = int.from_bytes(bytes.fromhex(hex_val[-4:]), "little") ^ int(0x3000)
self.green = int.from_bytes(bytes.fromhex(hex_val[-8:-4]), "little") ^ int(0x2000)
self.blue = int.from_bytes(bytes.fromhex(hex_val[-12:-8]), "little") ^ int(0x1000)
self.white = int.from_bytes(bytes.fromhex(hex_val[-16:-12]), "little")
self._color_known = True
elif cmd == 0x58:
try:
self.name = values.decode("utf-8").rstrip("\x00")
except UnicodeDecodeError:
_LOGGER.warning(
"Could not decode bulb name notification for bulb %s",
self.addr,
exc_info=True,
)
self.name = "Unknown"
# ----------------------------------------------------------------------
# Discovery helpers
# ----------------------------------------------------------------------
async def _discover(timeout: float) -> list:
devices = await BleakScanner.discover(timeout=timeout)
bulbs = []
for dev in devices:
if _is_avea_device(dev):
bulb = Bulb(dev.address)
bulb._device = dev
if getattr(dev, "name", None):
bulb.name = dev.name
bulbs.append(bulb)
return bulbs
def _is_avea_device(device) -> bool:
name_sources = [getattr(device, "name", None)]
metadata = getattr(device, "metadata", {}) or {}
name_sources.append(metadata.get("local_name"))
for name in name_sources:
if name and "Avea" in name:
return True
for value in metadata.get("manufacturer_data", {}).values():
try:
decoded = bytes(value).decode("utf-8", errors="ignore")
except (TypeError, ValueError):
continue
if "Avea" in decoded:
return True
for uuid in metadata.get("uuids", []) or []:
if isinstance(uuid, str) and "Avea" in uuid:
return True
return False
def _run_async(factory: Callable[[], Awaitable]):
try:
return asyncio.run(factory())
except RuntimeError:
result = {}
error = {}
def runner():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result["value"] = loop.run_until_complete(factory())
except Exception as exc:
error["exc"] = exc
finally:
loop.close()
thread = threading.Thread(target=runner)
thread.start()
thread.join()
if "exc" in error:
raise error["exc"]
return result.get("value")
[docs]
def discover_avea_bulbs(timeout: float = 4.0):
"""Discover nearby Elgato Avea bulbs.
Args:
timeout: Bluetooth scan duration in seconds.
Returns:
List of :class:`Bulb` objects for discovered Avea devices.
Notes:
Depending on the operating system, Bluetooth discovery may require
additional permissions.
"""
return _run_async(lambda: _discover(timeout))
# ----------------------------------------------------------------------
# Utility functions
# ----------------------------------------------------------------------
[docs]
def compute_brightness(brightness):
"""Build the BLE payload for a brightness command.
Args:
brightness: Brightness value from ``0`` to ``4095``. The value is
expected to be clamped before calling this helper.
Returns:
Command payload bytes starting with ``0x57``.
"""
value = hex(int(brightness))[2:]
value = value.zfill(4)
value = value[2:] + value[:2]
return bytes.fromhex("57" + value)
[docs]
def compute_color(w=2000, r=0, g=0, b=0):
"""Build the BLE payload for a color command.
Args:
w: White channel value from ``0`` to ``4095``.
r: Red channel value from ``0`` to ``4095``.
g: Green channel value from ``0`` to ``4095``.
b: Blue channel value from ``0`` to ``4095``.
Returns:
Command payload bytes starting with ``0x35``.
"""
color = "35"
fading = "1101"
unknown = "0a00"
white = (int(w) | int(0x8000)).to_bytes(2, byteorder="little").hex()
red = (int(r) | int(0x3000)).to_bytes(2, byteorder="little").hex()
green = (int(g) | int(0x2000)).to_bytes(2, byteorder="little").hex()
blue = (int(b) | int(0x1000)).to_bytes(2, byteorder="little").hex()
return bytes.fromhex(color + fading + unknown + white + red + green + blue)
[docs]
def compute_transition_table(init, target, iterations):
"""Compute eased intermediate values for a smooth color transition.
Args:
init: Initial channel value.
target: Target channel value.
iterations: Number of transition steps to generate.
Returns:
List of integer channel values, including the final target value.
"""
iterations = max(1, int(iterations))
if iterations == 1:
return [target]
delta = target - init
values = []
for n in range(1, iterations + 1):
frac = n / iterations
eased = (1 - math.cos(math.pi * frac)) / 2
values.append(round(init + delta * eased))
return values
[docs]
def check_bounds(value):
"""Clamp a numeric value to the Avea 12-bit channel range.
Args:
value: Value to convert to ``int`` and clamp.
Returns:
Integer between ``0`` and ``4095``. Non-numeric values return ``0``.
"""
try:
ivalue = int(value)
except (TypeError, ValueError):
_LOGGER.warning("Value %r was not a number; using default value 0", value)
return 0
if ivalue > 4095:
return 4095
if ivalue < 0:
return 0
return ivalue