...
 
Commits (11)
......@@ -34,6 +34,7 @@
roles:
- nginx
- wireguard-concentrator
- mqtt-broker
- hosts:
- client
- concentrator1
......
......@@ -50,6 +50,15 @@
export ETCDCTL_CERT=/etc/ssl/etcd/etcd-client.cert.pem
export ETCDCTL_KEY=/etc/ssl/etcd/etcd-client.key.pem
export ETCDCTL_ENDPOINTS={{etcd_endpoints}}
- name: Make etcd v3 API the default (environment file)
copy:
dest: "/etc/profile.d/etcd.env"
content: |
ETCDCTL_API=3
ETCDCTL_CACERT=/etc/ssl/etcd/etcd-ca.pem
ETCDCTL_CERT=/etc/ssl/etcd/etcd-client.cert.pem
ETCDCTL_KEY=/etc/ssl/etcd/etcd-client.key.pem
ETCDCTL_ENDPOINTS={{etcd_endpoints}}
- name: Create etcd client configuration settings
copy:
dest: "/etc/etcd-client.json"
......
#!/usr/bin/env python3
import etcd
import random
import json
import time
import ssl
import subprocess
import sys
import signal
class EqmttClusterManager():
def whoAmI(self):
# find out which node this is
self.node = False
try:
self.node = subprocess.check_output(['/usr/sbin/emqttd_ctl','status']).decode().split('\'')[1]
except subprocess.CalledProcessError as e:
print("ERROR: emqtt_ctl returned with non zero: %s"% (e,))
sys.exit(-1)
except FileNotFoundError as e:
print("ERROR: emqtt_ctl is no present")
sys.exit(-1)
def parseOutput(self, output):
#output = "Cluster status: [{running_nodes,['emq@172.16.1.2','emq@172.16.1.1']},\n {stopped_nodes,['emq@172.16.1.3']}]"
output = output.decode().split(':')[1]
sBracketBal = 0
cBracketBal = 0
sBracketCount = 0
cBracketCount = 0
val = ''
tList = []
tdict = {}
running_nodes = []
stopped_nodes = []
state = None
success = False
code = ""
for c in output:
if c == '[':
sBracketBal += 1
sBracketCount += 1
elif c == '{':
cBracketBal += 1
cBracketCount += 1
elif c == ']':
sBracketBal -= 1
if state == 'running_nodes':
running_nodes.append(val.strip('\''))
val = ''
if state == 'stopped_nodes':
stopped_nodes.append(val.strip('\''))
val = ''
state = None
elif c == '}':
cBracketBal -= 1
state = None
elif c == ',':
if state == None:
if val == 'running_nodes':
state = 'running_nodes'
val = ''
success = True
if val == 'stopped_nodes':
state = 'stopped_nodes'
val = ''
success = True
if val == 'node_down':
code = val
val = ''
success = False
if val == 'already_in_cluster':
code = val
val = ''
success = True
if state == 'running_nodes':
running_nodes.append(val.strip('\''))
val = ''
if state == 'stopped_nodes':
stopped_nodes.append(val.strip('\''))
val = ''
else:
val += c
return (success, code, running_nodes, stopped_nodes)
def announceMe(self,state):
if self.etcdNodeKey == None:
self.etcNodeKey = self.client.write(self.nodePrefix+self.node, state ,self.nodeTTL)
else:
self.etcNodeKey.ttl = self.nodeTTL
self.etcNodeKey = self.client.update(self.etcNodeKey)
print("announced me to etcd in %s"% (self.etcNodeKey.key))
def getAnnouncedNodes(self):
result = self.client.read(self.nodePrefix, recursive = True)
nodes = []
for clusterNode in result.children:
print("%s: %s" %(clusterNode.key,clusterNode.value))
if clusterNode.value != self.node:
nodes.append(clusterNode.value)
return nodes
def joinCluster(self,node):
try:
output = subprocess.check_output(['/usr/sbin/emqttd_ctl','cluster','join',node])
success, code, self.running_nodes, self.stoped_nodes = self.parseOutput(output)
if not success:
print(code)
return success
except subprocess.CalledProcessError as e:
print("ERROR: emqtt_ctl returned with non zero: %s"% (e,))
return False
except FileNotFoundError as e:
print("ERROR: emqtt_ctl is no present")
sys.exit(-1)
def leaveCluster(self):
try:
output = subprocess.check_output(['/usr/sbin/emqttd_ctl','cluster','leave'])
success, code, self.running_nodes, self.stoped_nodes = self.parseOutput(output)
if not success:
print(code)
return success
except subprocess.CalledProcessError as e:
print("ERROR: emqtt_ctl returned with non zero: %s"% (e,))
return False
except FileNotFoundError as e:
print("ERROR: emqtt_ctl is no present")
sys.exit(-1)
def getCluster(self):
try:
output = subprocess.check_output(['/usr/sbin/emqttd_ctl','cluster','leave'])
success, code, self.running_nodes, self.stoped_nodes = self.parseOutput(output)
return success
except subprocess.CalledProcessError as e:
print("ERROR: emqtt_ctl returned with non zero: %s"% (e,))
return False
except FileNotFoundError as e:
print("ERROR: emqtt_ctl is no present")
sys.exit(-1)
def exit(self,signum, frame):
self.running = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit)
signal.signal(signal.SIGTERM, self.exit)
self.running = True
self.whoAmI()
self.etcdEndpoint = False
self.etcdConf = json.load(open('/etc/etcd-client.json'))
#figure out wether this machiene is an etcd endpoint and connect to it
for endpoint in self.etcdConf['ENDPOINTS'].split(','):
if self.node.split('@')[1] == endpoint.split(':')[1][2:]:
self.etcdEndpoint = endpoint
if not self.etcdEndpoint:
self.etcdEndpoint = random.choice(self.etcdConf['ENDPOINTS'].split(',')) #choose arandom etcd endpoint
self.etcdEndpoint = self.etcdEndpoint.split(':')
self.etcdPort = int(self.etcdEndpoint[2])
self.etcdProtocol = self.etcdEndpoint[0]
self.etcdEndpoint = self.etcdEndpoint[1][2:] # remove slashes
self.etcdCert = None
self.etcdCaCert = None
self.etcdKey = None
self.etcdNodeKey = None
if self.etcdProtocol == 'https':
self.etcdCert = self.etcdConf['CERT']
self.etcdKey = self.etcdConf['KEY']
self.etcdCaCert = self.etcdConf['CACERT']
self.client = etcd.Client(host=self.etcdEndpoint, port=self.etcdPort, read_timeout=60, protocol=self.etcdProtocol, ca_cert=self.etcdCaCert, cert=(self.etcdCert,self.etcdKey))
self.client.http.connection_pool_kw['ssl_version'] = ssl.PROTOCOL_TLSv1_2 #forcing TLS version 1.2
#put this node into the etcd
self.nodePrefix = '/emqttd/cluster/nodes/'
self.nodeTTL = 60
lastUpdate = time.time() - self.nodeTTL
while self.running:
if lastUpdate + self.nodeTTL/2 < time.time():
self.announceMe(self.node)
#join cluster with all announced nodes
for n in self.getAnnouncedNodes():
self.joinCluster(n)
lastUpdate = time.time()
time.sleep(1)
self.leaveCluster()
if __name__ == '__main__':
clusterM = EqmttClusterManager()
sys.exit(0)
[Unit]
Description=emqtt daemon
After=network.target
[Service]
Type=forking
Environment=HOME=/root
EnvironmentFile=/etc/profile.d/etcd.env
ExecStart=/bin/sh /usr/sbin/emqttd start
ExecStop=/bin/sh /usr/sbin/emqttd stop
LimitNOFILE=1048576
[Install]
WantedBy=multi-user.target
\ No newline at end of file
[Unit]
Description=emqttd cluster manager
After=emqttd.service
[Service]
Type=simple
Environment=HOME=/root
Restart=always
ExecStart=/usr/bin/emqtt_cluster_manager
[Install]
WantedBy=multi-user.target
\ No newline at end of file
---
- name: Restart emqttd
become: true
systemd: name=emqttd state=restarted
- name: Restart emqttd_cluster_manager
become: true
systemd: name=emqttd_cluster_manager state=restarted
- name: install emqttd
apt:
deb: http://emqtt.io/downloads/latest/debian9-deb
- name: install python3-etcd
apt:
name: python3-etcd
state: present
- name: copy emqttd cluster manager
copy:
src: emqtt_cluster_manager.py
dest: /usr/bin/emqtt_cluster_manager
mode: 0777
notify:
- Restart emqttd_cluster_manager
- name: configure emqttd
template:
src: emq.j2
dest: /etc/emqttd/emq.conf
owner: emqtt
group: emqtt
mode: 0644
- name: stop emqtt service
service:
name: emqttd
state: stopped
- name: disable emqtt service
service:
name: emqttd
enabled: no
#- name: remove emqttd from init
# file:
# path: /etc/init.d/emqttd
# state: absent
- name: copy emqtt service file
copy:
src: emqttd.service
dest: /etc/systemd/system/emqttd.service
- name: copy emqttd cluster manager service file
copy:
src: emqttd_cluster_manager.service
dest: /etc/systemd/system/emqttd_cluster_manager.service
notify:
- Restart emqttd_cluster_manager
- name: reload systemd service files
systemd:
name: emqttd
daemon_reload: yes
- name: reload systemd service files
systemd:
name: emqttd_cluster_manager
daemon_reload: yes
- name: configure emqttd cluster manager service (systemd)
systemd:
name: emqttd_cluster_manager
enabled: yes
notify:
- Restart emqttd
- name: configure emqtt service (systemd)
systemd:
name: emqttd
enabled: yes
notify:
- Restart emqttd
This diff is collapsed.
......@@ -2,3 +2,6 @@
- name: Activate exit interface
become: true
command: ifup {{ wg_exit_ifname }} -v
- name: Reconfigure firewall
shell: iptables-restore < /etc/iptables.rules
......@@ -15,3 +15,10 @@
dest: '/etc/network/interfaces.d/{{ wg_exit_ifname }}'
notify:
- Activate exit interface
- name: Configure SNAT
template:
src: 'iptables.j2'
dest: '/etc/iptables.rules'
notify:
- Reconfigure firewall
......@@ -7,10 +7,7 @@ iface {{ ifname }} inet static
up ip -6 r replace default via {{ wg_exit_gateway_v6 }} vrf freifunk proto freifunk
down ip -6 a del {{ wg_exit_address_v6 }} dev {{ ifname }}
down ip -6 r del default via {{ wg_exit_gateway_v6 }} vrf freifunk proto freifunk
pre-up iptables -t nat -F POSTROUTING # FIXME use a real firewall service
pre-up iptables -t nat -A POSTROUTING -o wg-azire -j SNAT --to {{ wg_exit_address_v4.split('/')[0] }}
pre-up ip link add {{ ifname }} type wireguard
pre-up wg setconf {{ ifname }} /etc/wireguard/{{ ifname }}.conf
pre-up ip link set {{ ifname }} vrf freifunk
post-down ip link del {{ ifname }}
post-down iptables -t nat -D POSTROUTING -o wg-azire -j SNAT --to {{ wg_exit_address_v4.split('/')[0] }}
*nat
:PREROUTING ACCEPT [0:0]
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
:POSTROUTING ACCEPT [0:0]
-A POSTROUTING -o wg-azire -j SNAT --to-source {{ wg_exit_address_v4.split('/')[0] }}
COMMIT
testinfra_hosts = ['concentrator1', 'concentrator2', 'concentrator3']
def test_installed(host):
assert host.package("emqttd").is_installed
assert host.service("emqttd").is_enabled
def test_health(host):
assert host.service("emqttd").is_running
assert host.socket("tcp://0.0.0.0:1883").is_listening
assert host.socket("tcp://127.0.0.1:8081").is_listening
assert not host.socket("tcp://0.0.0.0:8081").is_listening