Commit b009dd9e authored by chrissi^'s avatar chrissi^
Browse files

Initial Commit

Signed-off-by: chrissi^'s avatarChris Fiege <>
import network
import time
class Connector(object):
def __init__(self, ssid, passphrase, ap_mode=False):
self._ssid = ssid
self._passphrase = passphrase
self._ap_mode = ap_mode
def do_connect(self):
ap = network.WLAN(network.AP_IF) # create access-point interface
if self._ap_mode: # activate the interface
ap.config(essid='ID-IoT') # set the ESSID of the access point
wlan = network.WLAN(network.STA_IF)
if not wlan.isconnected():
print('connecting to network {} ...'.format(self._ssid))
wlan.connect(self._ssid, self._passphrase)
for _ in range(20):
if wlan.isconnected():
print("Connected =)")
print('network config:', wlan.ifconfig())
print("Wifi still is connected")
import dht
import mac
class DhtMeasure:
"""This class measures and uploads temperature and humidity using DHT22 sensors"""
def __init__(self, humidityId, temperatueId, machinePin, http_uploader):
"""Creates a new DhtMeasure
@param humidityId: The source name of the humidity measurement
@param temperatueId: The source name of the temperature measurement
@param machinePin: The machine.Pin() where the sensor is connectedo
@param http_uploader: The http-uploader to use
self._machinePin = machinePin
self._hId = humidityId
self._tId = temperatueId
self._d = dht.DHT22(machinePin)
self._http = http_uploader
def measure(self):
"""Measures temperature and humidity and uploads it..."""
t = self._d.temperature()
h = self._d.humidity()
self._http.http_graph_put(t, self._tId)
self._http.http_graph_put(h, self._hId)
return (t, h)
G0 = 0
TxD = 1
G2 = 2
RxD = 3
G4 = 4
G5 = 5
SD3 = 10
G12 = 12
G13 = 13
G14 = 14
G15 = 15
G16 = 16
G17 = 17
G18 = 18
G19 = 19
G21 = 21
SDA = 21
SCL = 22
G22 = 22
G23 = 23
G25 = 25
G26 = 26
G27 = 27
G32 = 32
G33 = 33
import socket
import mac
def http_get(url):
"""Query an URL and return the result.
@param url: http://-url to query
@returns: (Status, Received Return)
_, _, host, path = url.split('/', 3)
addr = socket.getaddrinfo(host, 80)[0][-1]
s = socket.socket()
print(('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'utf8'))
s.send(bytes('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'utf8'))
ret = ""
while True:
data = s.recv(100)
if data:
ret += str(data, "utf8")
if "HTTP/1.1 200 OK" in ret:
return (True,ret)
return (False, ret)
except Exception as e:
if s:
return (False, e)
class HttpPut(object):
def __init__(self, url_format, secret):
self._url_format = url_format
self._secret = secret
def http_graph_put(self, value, sensorName):
"""Upload a measurement to the sink.
@param value: Value (int or float) to upload
@param sensorName: Name of the sensor to upload to
st, data = http_get(self._url_format.format(sensorName, float(value), mac.get(float(value), self._secret)))
if st is False or True:
return st
import uhashlib
sha = uhashlib.sha256
except Exception:
import sha256
sha = sha256.sha256
def get(value, secret):
"""Calculate an hmac for a value and a secret.
The hmac is calculaed as an sha256.
@param value: Value to create the hmac from
@param secret: the secret used to calculate the hmac
s = "{}|{}".format(value, secret)
h = sha()
return ".".join([str(x) for x in h.digest()])
import esp32pinmap
import connector
import dht11
import scheduler
import http
import machine
s = scheduler.Scheduler()
# setup Wifi
c = connector.Connector("SSID", "Passphrase")
s.register(c.do_connect, timeout=1)
# create http-uploader
h = http.HttpPut("{}/{}/{}", "yoursecrethere")
# create measurements
m1 = dht11.DhtMeasure("description-hum", "description-temp", machine.Pin(esp32pinmap.G32), h)
s.register(m1.measure, 30)
# start scheduler
import time
class Scheduler:
"""Simple second-base scheduler
A scheduler provides two types of tasks:
* idle-tasks: Is called always when there is no other tasks to run. It is not guaranteed to run, if the main tasks consume too much time.
* timed tasks: run every n seconds.
def __init__(self):
"""Setup a new scheduler"""
self._callbacks = []
self._counter = []
self._idleCallbacks = []
def register(self, callback, timeout):
"""Register a new task.
@param callback: function to call.
@param timeout: timeout in seconds or None for an idle task
if timeout is not None:
self._callbacks.append((callback, timeout))
def run(self):
"""Blocks forever and runs the tasks"""
next = time.time() + 1
while True:
# Run idle tasks
cnt = 0
while next > time.time():
for callback in self._idleCallbacks:
cnt += 1
#prepare next run
next = time.time() + 1
print("Running at time.time() => {}. Had {} idle loops.".format(next-1, cnt))
# run timed tasks
for i in range(len(self._callbacks)):
self._counter[i] +=1
if self._counter[i] == self._callbacks[i][1]:
self._counter[i] = 0
print("Running task #{}".format(i))
except Exception as e:
print("Callback #{} {} raised exception:".format(i, self._callbacks[i][0]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment