Compare commits

..

1 Commits

Author SHA1 Message Date
600fe297a1 feature : mqtt 2022-01-22 10:11:59 +00:00
3 changed files with 89 additions and 2 deletions

View File

@@ -4,3 +4,8 @@ processes:
cmd: cat
- key: nano
cmd: nano
mqtt: # only add mqtt if mqtt required
brokerAddress: "192.168.1.251"
topic: "devices/$CLIENTID/audio/input"
brokerPort: 8883 #optional default value 1883
clientId: DeviceName # optional default value os.uname()

View File

@@ -1,10 +1,10 @@
from distutils.command.config import config
import os
import psutil
import logging
import yaml
from typing import List
from onlyone import namedpipes
from onlyone import mqtt
DEFAULT_CONFIG_PATH = "/etc/onlyone/config.yaml"
@@ -134,5 +134,15 @@ class Server:
if(configPath != None and configPath!=""):
configDict = config_file_to_dict(configPath)
self.manager.load(configDict, True)
self.npserver = namedpipes.Server(self.manager)
if "mqtt" in configDict:
try:
self.mqtt = mqtt.MqttClient(self.manager)
self.mqtt.load(configDict["mqtt"])
self.mqtt.start()
except:
log.error("error initializing mqtt")

72
onlyone/mqtt/__init__.py Normal file
View File

@@ -0,0 +1,72 @@
from aifc import Error
import paho.mqtt.client as pahomqtt
import os
import logging
import sys
log = logging.getLogger(__name__)
class MqttClient():
def __init__(self, manager):
self.mqttClient=None
self.connected=False
self.brokerAddress="";
self.brokerPort=1883
self.manager = manager
self.clientid = os.uname().nodename.lower().replace(" ", "_")
self.mqttTopic= "devices/$CLIENTID/onlyone"
def _mqtt_on_connect(self, client, userdata, flags, reasonCode, properties):
self.connected=True
options = pahomqtt.SubscribeOptions()
options.noLocal=True
if self.mqttTopic== None:
error = "missing required configuration mqttTopic"
log.error(error)
raise error
log.info("subscribing to :" + self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set")
self.mqttClient.subscribe(self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set" ,0, options)
def _mqtt_on_message(self, client, userdata, msg):
data = msg.payload.decode("utf-8")
log.info("mqtt server received message:" + data)
self.manager.current(data)
def load(self, dict):
if dict == None: return
if("brokerAddress" in dict): self.brokerAddress = dict["brokerAddress"]
if("brokerPort" in dict): self.brokerPort = dict["brokerPort"]
if("clientId" in dict): self.clientId = dict["clientId"]
if("topic" in dict): self.mqttTopic = dict["topic"]
log.info("load finished:")
log.info("brokerAddress:" + self.brokerAddress)
log.info("brokerPort:" + str(self.brokerPort))
log.info("clientid:" + self.clientid)
log.info("topic:" + self.mqttTopic)
def start(self):
if self.brokerAddress == None:
error = "missing required configuration brokerAddress"
log.error(error)
raise error
if self.brokerPort == None:
error = "missing required configuration brokerPort"
log.error(error)
raise error
self.mqttClient = pahomqtt.Client(client_id=self.clientid, protocol=pahomqtt.MQTTv5)
self.mqttClient.on_connect = self._mqtt_on_connect
self.mqttClient.on_message = self._mqtt_on_message
log.info("starting server -> address:" + self.brokerAddress + ", port:" + str(self.brokerPort))
try:
self.mqttClient.connect(self.brokerAddress, self.brokerPort)
self.mqttClient.loop_start()
except BaseException as ex:
log.error(str(ex))
raise "Error connecting to mqtt broker"