Commit eef056ec authored by chrissi^'s avatar chrissi^
Browse files

rgbControl: Move out of main.py


Signed-off-by: chrissi^'s avatarChris Fiege <chris@tinyhost.de>
parent dddbb441
......@@ -7,6 +7,7 @@ import time
import micropython
import config
import json
from rgb import RgbChannel
class Uptime:
......@@ -187,106 +188,6 @@ class OnBoardLed:
def pub(self):
self._mqtt_client.publish(self._topic_get, str(self._state))
class RgbChannel:
def __init__(self, mqtt_client, mqtt_topic, Pin_R, Pin_G, Pin_B):
self._rgb =(0,0,0)
self._on = False
self._freq = 1000
self._map = {
0: 0,
1: 1,
2: 2,
}
self._mapfile = mqtt_topic.replace("/","_")
try:
with open(self._mapfile) as fh:
self._map = json.load(fh)
print("{}: loaded mapfile: {}".format(mqtt_topic, self._map))
except:
print("{}: using default map {}".format(mqtt_topic, self._map))
pass
self._pwms = [
machine.PWM(machine.Pin(Pin_R)),
machine.PWM(machine.Pin(Pin_G)),
machine.PWM(machine.Pin(Pin_B)),
]
self._apply()
self._topic_on_get = mqtt_topic+"/on/get"
self._topic_on_set = mqtt_topic+"/on/set"
self._topic_rgb_get = mqtt_topic+"/rgb/get"
self._topic_rgb_set = mqtt_topic+"/rgb/set"
self._topic_freq_get = mqtt_topic+"/freq/get"
self._topic_freq_set = mqtt_topic+"/freq/set"
self._topic_map_get = mqtt_topic+"/map/get"
self._topic_map_set = mqtt_topic+"/map/set"
self._mqtt_client = mqtt_client
self._mqtt_client.add_cb(self._cb)
self._mqtt_client.subscribe(self._topic_on_set)
self._mqtt_client.subscribe(self._topic_rgb_set)
self._mqtt_client.subscribe(self._topic_freq_set)
self._mqtt_client.subscribe(self._topic_map_set)
def _apply(self):
if self._on:
rgb = [x*4 for x in self._rgb]
else:
rgb = (0,0,0)
self._pwms[0].freq(self._freq)
for inp, outp in self._map.items():
self._pwms[outp].duty(rgb[inp])
def _cb(self, topic, msg):
if topic == self._topic_on_set:
self._on = "True" if msg == "True" else "False"
self._mqtt_client.publish(self._topic_on_get, str(self._on))
self._apply()
return True
elif topic == self._topic_rgb_set:
try:
self._rgb = [int(x) for x in msg.split(",")]
self._mqtt_client.publish(self._topic_rgb_get, str(self._rgb))
self._apply()
return True
except Exception as e:
print("RgbChannel: Failed to parse value '{}' for topic '{}' with: {}".format(msg, topic, e))
elif topic == self._topic_freq_set:
try:
self._freq = int(msg)
self._mqtt_client.publish(self._topic_freq_get, str(self._freq))
self._apply()
return True
except Exception as e:
print("RgbChannel: Failed to parse value '{}' for topic '{}' with: {}".format(msg, topic, e))
elif topic == self._topic_map_set:
try:
newmap = json.loads(msg)
for k,v in newmap.items():
if not (k >= 0 and k <= 2 and v >= 0 and v <= 2):
print("New Map is no allowed. Will not apply map!")
break
else:
with open(self._mapfile, "w") as fh:
json.dump(newmap, fh)
self._map = newmap
self._apply()
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
return True
except Exception as e:
print("RgbChannel: Failed to process new map: {}".format(e))
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
return False
def pub(self):
self._mqtt_client.publish(self._topic_rgb_get, str(self._rgb))
self._mqtt_client.publish(self._topic_on_get, str(self._on))
self._mqtt_client.publish(self._topic_freq_get, str(self._freq))
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
s = scheduler.Scheduler()
......
import machine
import json
class RgbChannel:
def __init__(self, mqtt_client, mqtt_topic, Pin_R, Pin_G, Pin_B):
self._rgb =(0,0,0)
self._on = False
self._freq = 1000
self._map = {
0: 0,
1: 1,
2: 2,
}
self._mapfile = mqtt_topic.replace("/","_")
try:
with open(self._mapfile) as fh:
self._map = json.load(fh)
print("{}: loaded mapfile: {}".format(mqtt_topic, self._map))
except:
print("{}: using default map {}".format(mqtt_topic, self._map))
pass
self._pwms = [
machine.PWM(machine.Pin(Pin_R)),
machine.PWM(machine.Pin(Pin_G)),
machine.PWM(machine.Pin(Pin_B)),
]
self._apply()
self._topic_on_get = mqtt_topic+"/on/get"
self._topic_on_set = mqtt_topic+"/on/set"
self._topic_rgb_get = mqtt_topic+"/rgb/get"
self._topic_rgb_set = mqtt_topic+"/rgb/set"
self._topic_freq_get = mqtt_topic+"/freq/get"
self._topic_freq_set = mqtt_topic+"/freq/set"
self._topic_map_get = mqtt_topic+"/map/get"
self._topic_map_set = mqtt_topic+"/map/set"
self._mqtt_client = mqtt_client
self._mqtt_client.add_cb(self._cb)
self._mqtt_client.subscribe(self._topic_on_set)
self._mqtt_client.subscribe(self._topic_rgb_set)
self._mqtt_client.subscribe(self._topic_freq_set)
self._mqtt_client.subscribe(self._topic_map_set)
def _apply(self):
if self._on:
rgb = [x*4 for x in self._rgb]
else:
rgb = (0,0,0)
self._pwms[0].freq(self._freq)
for inp, outp in self._map.items():
self._pwms[outp].duty(rgb[inp])
def _cb(self, topic, msg):
if topic == self._topic_on_set:
self._on = "True" if msg == "True" else "False"
self._mqtt_client.publish(self._topic_on_get, str(self._on))
self._apply()
return True
elif topic == self._topic_rgb_set:
try:
self._rgb = [int(x) for x in msg.split(",")]
self._mqtt_client.publish(self._topic_rgb_get, str(",".join([str(x) for x in self._rgb])))
self._apply()
return True
except Exception as e:
print("RgbChannel: Failed to parse value '{}' for topic '{}' with: {}".format(msg, topic, e))
elif topic == self._topic_freq_set:
try:
self._freq = int(msg)
self._mqtt_client.publish(self._topic_freq_get, str(self._freq))
self._apply()
return True
except Exception as e:
print("RgbChannel: Failed to parse value '{}' for topic '{}' with: {}".format(msg, topic, e))
elif topic == self._topic_map_set:
try:
newmap = json.loads(msg)
for k,v in newmap.items():
if not (k >= 0 and k <= 2 and v >= 0 and v <= 2):
print("New Map is no allowed. Will not apply map!")
break
else:
with open(self._mapfile, "w") as fh:
json.dump(newmap, fh)
self._map = newmap
self._apply()
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
return True
except Exception as e:
print("RgbChannel: Failed to process new map: {}".format(e))
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
return False
def pub(self):
self._mqtt_client.publish(self._topic_rgb_get, str(",".join([str(x) for x in self._rgb])))
self._mqtt_client.publish(self._topic_on_get, str(self._on))
self._mqtt_client.publish(self._topic_freq_get, str(self._freq))
self._mqtt_client.publish(self._topic_map_get, json.dumps(self._map))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment