Skip to content

Commit

Permalink
Add Elgato Keylight support to diyHue (diyhue#914)
Browse files Browse the repository at this point in the history
* Update configHandler.py

add elgato config

* Update discover.py

add elgato to discovery

* Update __init__.py

add elgato to protocols

* Create elgato.py

add functionality for elgato keylights
  • Loading branch information
standardgateway authored Jun 4, 2023
1 parent 9b171d0 commit cdc6ec3
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
3 changes: 3 additions & 0 deletions BridgeEmulator/configManager/configHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def load_config(self):
config["hyperion"] = {"enabled": True}
if "tpkasa" not in config:
config["tpkasa"] = {"enabled": True}
if "elgato" not in config:
config["elgato"] = {"enabled": True}

if int(config["swversion"]) < 1958077010:
config["swversion"] = "1958077010"
Expand Down Expand Up @@ -102,6 +104,7 @@ def load_config(self):
"esphome": {"enabled":True},
"hyperion": {"enabled":True},
"tpkasa": {"enabled":True},
"elgato": {"enabled":True},
}
# load lights
if os.path.exists(self.configDir + "/lights.yaml"):
Expand Down
8 changes: 8 additions & 0 deletions BridgeEmulator/lights/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def scanForLights(): # scan for ESP8266 lights and strips
hyperion.discover(detectedLights)
if bridgeConfig["config"]["tpkasa"]["enabled"]:
tpkasa.discover(detectedLights)
if bridgeConfig["config"]["elgato"]["enabled"]:
# Scan with port 9123 before mDNS discovery
elgato_ips = find_hosts(9123)
logging.info(pretty_json(elgato_ips))
elgato.discover(detectedLights, elgato_ips)
bridgeConfig["temp"]["scanResult"]["lastscan"] = datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S")
for light in detectedLights:
Expand Down Expand Up @@ -174,6 +179,9 @@ def scanForLights(): # scan for ESP8266 lights and strips
elif light["protocol"] == "homeassistant_ws":
if lightObj.protocol_cfg["entity_id"] == light["protocol_cfg"]["entity_id"]:
lightIsNew = False
elif light["protocol"] == "elgato":
if lightObj.protocol_cfg['mac'] == light["protocol_cfg"]['mac']:
lightIsNew = False
if lightIsNew:
logging.info("Add new light " + light["name"])
lightId = addNewLight(
Expand Down
4 changes: 2 additions & 2 deletions BridgeEmulator/lights/protocols/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from lights.protocols import wled, hyperion, yeelight, tasmota, shelly, mi_box, hue, deconz, domoticz, tradfri, native, native_single, native_multi, esphome, mqtt, wiz, milight, homeassistant_ws, tpkasa, hue_bl
from lights.protocols import wled, hyperion, yeelight, tasmota, shelly, mi_box, hue, deconz, domoticz, tradfri, native, native_single, native_multi, esphome, mqtt, wiz, milight, homeassistant_ws, tpkasa, hue_bl, elgato

protocols = [wled, hyperion, yeelight, tasmota, shelly, mi_box, hue, deconz, domoticz, tradfri, native, native_single, native_multi, esphome, mqtt, wiz, milight, homeassistant_ws, tpkasa, hue_bl]
protocols = [wled, hyperion, yeelight, tasmota, shelly, mi_box, hue, deconz, domoticz, tradfri, native, native_single, native_multi, esphome, mqtt, wiz, milight, homeassistant_ws, tpkasa, hue_bl, elgato]
111 changes: 111 additions & 0 deletions BridgeEmulator/lights/protocols/elgato.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import socket
import json
import logManager
import requests
from time import sleep
from zeroconf import IPVersion, ServiceBrowser, ServiceStateChange, Zeroconf, ZeroconfServiceTypes

logging = logManager.logger.get_logger(__name__)

discovered_lights = []

def on_mdns_discover(zeroconf, service_type, name, state_change):
if "Elgato Key Light" in name and state_change is ServiceStateChange.Added:
info = zeroconf.get_service_info(service_type, name)
if info:
addresses = ["%s" % (socket.inet_ntoa(addr))
for addr in info.addresses]
discovered_lights.append([addresses[0], name])
logging.debug('<Elgato> mDNS device discovered:'+ addresses[0])


def discover(detectedLights, elgato_ips):
mdns_string = "_elgo._tcp.local."
logging.info('<Elgato> mDNS discovery for ' + mdns_string + ' started')
ip_version = IPVersion.V4Only
zeroconf = Zeroconf(ip_version=ip_version)
ServiceBrowser(zeroconf, mdns_string, handlers=[on_mdns_discover])

sleep(2)

if len(discovered_lights) == 0:
# Didn't find anything using mdns, trying elgato_ips
logging.info("<Elgato> Nothing found using mDNS, trying to find lights by IP")
for ip in elgato_ips:
try:
response = requests.get(
"http://"+ ip +":9123/elgato/accessory-info", timeout=3)
if response.status_code == 200:
json_resp = json.loads(response.content)
if json_resp['productName'] in ["Elgato Key Light Mini", "Elgato Key Light Air", "Elgato Key Light"]:
discovered_lights.append([ip, json_resp['displayName']])
except Exception as e:
logging.warning("<Elgato> ip %s is unknown device", ip)

lights = []
for device in discovered_lights:
try:
response = requests.get("http://"+ device[0] +":9123/elgato/accessory-info", timeout=3)
if response.status_code == 200:
json_accessory_info = json.loads(response.content)

logging.info("<Elgato> Found device: %s at IP %s" % (device[1], device[0]))

lights.append({"protocol": "elgato",
"name": json_accessory_info["displayName"] ,
"modelid": "LTW001", #Colortemp Bulb
"protocol_cfg": {
"ip": device[0],
"mdns_name": device[1],
"mac": json_accessory_info["macAddress"],
}
})
except Exception as e:
logging.warning("<Elgato> EXCEPTION: " + str(e))
break

for light in lights:
detectedLights.append(light)

def translate_range(value, old_min, old_max, new_min, new_max):
old_range = old_max - old_min
new_range = new_max - new_min
scaled_value = (((value - old_min) * new_range) / old_range) + new_min
new_value = max(min(scaled_value, new_max), new_min)
return int(new_value)

def set_light(light, data):
light_state = {}

if 'on' in data:
light_state['on'] = 1 if data['on'] else 0

if 'bri' in data and data['bri'] > 0:
light_state['brightness'] = round((data['bri'] / 255) * 100)

if 'ct' in data:
light_state['temperature'] = translate_range(data['ct'], 153, 500, 143, 344)

# Ingore unsupported values (xy,hue,sat)

if light_state:
json_data = json.dumps({"lights": [light_state]})
response = requests.put("http://"+light.protocol_cfg["ip"]+":9123/elgato/lights", data = json_data, headers={'Content-type': 'application/json'}, timeout=3)
return response.text

def get_light_state(light):
response = requests.get("http://"+light.protocol_cfg["ip"]+":9123/elgato/lights", timeout=3)
state = response.json()
light_info = state['lights'][0]
if light_info['on'] == 1:
light_state_on = True
else:
light_state_on = False

converted_state = {
'bri': round((light_info['brightness']/100)*255),
'on': light_state_on,
'ct': translate_range(light_info['temperature'], 143, 344, 153, 500),
'colormode': 'ct'
}
return converted_state

0 comments on commit cdc6ec3

Please sign in to comment.