Haus & Hof
Computer & Co.
DIY-Projekte
Haus & Hof
Computer & Co.
DIY-Projekte
Voraussetzung: Parallel zu FHEM läuft ein Homebridge-Server. Dieser existiert bei mir, um FHEM Geräte auch über die Apple Homekit App bzw. via Siri zu steuern. Die zweite Voraussetzung ist ein MQTT-Server. Diesen benötige ich zum Schalten meiner Shellys und zum Auslesen meiner 433 Mhz Sensoren und Aktoren. Damit dieser via Python kommuniziert muss zuerst
pip install pywizlight paho-mqtt
dieses Modul installiert werden. Das Python3 auf Deinem System installiert ist, davon gehe ich aus.
Beispiel: WiZ Lampe im Schlafzimmer mit dem namen TVLicht
Das Device wird in FHEM wie folgt angelegt:
define TVLicht MQTT_DEVICE
setuuid TVLicht 69c04ea9-f33f-034c-2c22-c6afa61bf5b672d4
attr TVLicht IODev myBroker
attr TVLicht alias TVLicht
attr TVLicht publishSet on off wiz/tvlicht/set
attr TVLicht publishSet_pct wiz/tvlicht/set
attr TVLicht room Schlafzimmer
attr TVLicht stateFormat { ReadingsVal($name,"state","?")." (".ReadingsVal($name,"pct","0")."%)" }
attr TVLicht subscribeReading_pct wiz/tvlicht/pct
attr TVLicht subscribeReading_state wiz/tvlicht/state
attr TVLicht subscribeReading_status wiz/tvlicht/status
attr TVLicht webCmd on:off:pct
attr TVLicht widgetOverride pct:slider,0,1,100
Zusätzlich gibt es noch
define TVLicht_CMD dummy setuuid TVLicht_CMD 69c04ea9-f33f-034c-ec74-74d6b05fd715a01f attr TVLicht_CMD room Schlafzimmer attr TVLicht_CMD setList pct:slider,0,1,100 on off
und
define n_TVLicht_CMD notify TVLicht_CMD:pct.* set myBroker publish wiz/tvlicht/set $EVTPART1 setuuid n_TVLicht_CMD 69c05086-f33f-034c-f1eb-20862676da8ddf2d
sowie
define n_TVLicht_CMD_onoff notify TVLicht_CMD:(on|off) set myBroker publish wiz/tvlicht/set $EVENT setuuid n_TVLicht_CMD_onoff 69c0509a-f33f-034c-7114-37424932b61b9577
Um zwischen WiZ und FHEM zu vermitteln kommt MQTT ins Spiel. Den Datenaustausch erledigt das Python-Skript wiz_tvlicht_mqtt.py, welches ich unter /opt/fhem/wiz/ abgelegt habe.
import asyncio
import json
import paho.mqtt.client as mqtt
from pywizlight import wizlight, PilotBuilder
WIZ_IP = "192.168.1.67"
MQTT_HOST = "192.168.1.15"
MQTT_PORT = 1883
TOPIC_SET = "wiz/tvlicht/set"
TOPIC_STATE = "wiz/tvlicht/state"
TOPIC_PCT = "wiz/tvlicht/pct"
TOPIC_STATUS = "wiz/tvlicht/status"
loop = None
light = None
mqtt_client = None
def clamp_pct(val):
val = int(val)
if val < 0:
return 0
if val > 100:
return 100
return val
def fhem_to_brightness(pct):
pct = clamp_pct(pct)
if pct == 0:
return 0
# Arbeitsversion: pywizlight reagiert bei dir auf brightness=...
# Kleine Werte strecken, damit 1..25 nicht so hart zusammenfallen
if pct < 26:
pct = 26
return pct
def raw_to_pct(raw):
if raw is None:
return 0
raw = int(raw)
# Bei dir kommen offenbar direkt Prozentwerte zurück
if raw <= 100:
pct = raw
else:
pct = round(raw * 100 / 255)
# kosmetisch
if pct >= 99:
pct = 100
return max(0, min(100, pct))
async def publish_state():
global light, mqtt_client
state = await light.updateState()
is_on = state.get_state()
raw = state.get_brightness()
pct = raw_to_pct(raw)
print(f"STATE -> on={is_on}, raw={raw}, pct={pct}")
mqtt_client.publish(TOPIC_STATE, "on" if is_on else "off", retain=True)
mqtt_client.publish(TOPIC_PCT, str(pct), retain=True)
mqtt_client.publish(
TOPIC_STATUS,
json.dumps({
"state": bool(is_on),
"raw": 0 if raw is None else int(raw),
"pct": int(pct)
}),
retain=True
)
async def set_pct(pct):
global light
pct = clamp_pct(pct)
print(f"SET -> fhem_pct={pct}")
if pct == 0:
await light.turn_off()
return
brightness = fhem_to_brightness(pct)
print(f"SET -> brightness={brightness}")
await light.turn_on(PilotBuilder(brightness=brightness))
async def handle_command(payload):
payload = payload.strip()
print(f"MQTT CMD -> {payload}")
if payload == "on":
await light.turn_on()
elif payload == "off":
await light.turn_off()
else:
try:
data = json.loads(payload)
if "brightness" in data:
await set_pct(data["brightness"])
elif "pct" in data:
await set_pct(data["pct"])
elif "state" in data:
if str(data["state"]).lower() == "on":
await light.turn_on()
elif str(data["state"]).lower() == "off":
await light.turn_off()
else:
return
else:
return
except Exception:
try:
await set_pct(int(payload))
except Exception as e:
print(f"PARSE ERROR -> {e}")
return
await asyncio.sleep(1)
await publish_state()
async def poll_loop():
global light
light = wizlight(WIZ_IP)
try:
await publish_state()
except Exception as e:
print(f"Initial publish error -> {e}")
while True:
try:
await publish_state()
except Exception as e:
print(f"Polling error -> {e}")
await asyncio.sleep(15)
def on_connect(client, userdata, flags, rc, properties=None):
print("MQTT connected")
client.subscribe(TOPIC_SET)
def on_message(client, userdata, msg):
payload = msg.payload.decode("utf-8")
asyncio.run_coroutine_threadsafe(handle_command(payload), loop)
def start_mqtt():
global mqtt_client
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60)
mqtt_client.loop_start()
def main():
global loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_mqtt()
try:
loop.run_until_complete(poll_loop())
finally:
if mqtt_client is not None:
mqtt_client.loop_stop()
mqtt_client.disconnect()
loop.close()
if __name__ == "__main__":
main()