From 600fe297a185a47b9e591acca1284e5f03994d78 Mon Sep 17 00:00:00 2001 From: Marcio Fernandes Date: Sat, 22 Jan 2022 10:11:59 +0000 Subject: [PATCH] feature : mqtt --- config.example.yaml | 7 +++- onlyone/__init__.py | 12 ++++++- onlyone/mqtt/__init__.py | 72 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 onlyone/mqtt/__init__.py diff --git a/config.example.yaml b/config.example.yaml index 8580e04..432f30c 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -3,4 +3,9 @@ processes: - key: cat cmd: cat - key: nano - cmd: nano + cmd: nano +mqtt: # only add mqtt if mqtt required + brokerAddress: "192.168.1.251" + topic: "devices/$CLIENTID/audio/input" + brokerPort: 8883 #optional default value 1883 + clientId: DeviceName # optional default value os.uname() diff --git a/onlyone/__init__.py b/onlyone/__init__.py index 9ad219b..284cb6f 100644 --- a/onlyone/__init__.py +++ b/onlyone/__init__.py @@ -1,10 +1,10 @@ -from distutils.command.config import config import os import psutil import logging import yaml from typing import List from onlyone import namedpipes +from onlyone import mqtt DEFAULT_CONFIG_PATH = "/etc/onlyone/config.yaml" @@ -134,5 +134,15 @@ class Server: if(configPath != None and configPath!=""): configDict = config_file_to_dict(configPath) self.manager.load(configDict, True) + self.npserver = namedpipes.Server(self.manager) + + if "mqtt" in configDict: + try: + self.mqtt = mqtt.MqttClient(self.manager) + self.mqtt.load(configDict["mqtt"]) + self.mqtt.start() + except: + log.error("error initializing mqtt") + \ No newline at end of file diff --git a/onlyone/mqtt/__init__.py b/onlyone/mqtt/__init__.py new file mode 100644 index 0000000..9b5b57f --- /dev/null +++ b/onlyone/mqtt/__init__.py @@ -0,0 +1,72 @@ +from aifc import Error +import paho.mqtt.client as pahomqtt +import os +import logging +import sys + +log = logging.getLogger(__name__) +class MqttClient(): + + def __init__(self, manager): + self.mqttClient=None + self.connected=False + self.brokerAddress=""; + self.brokerPort=1883 + self.manager = manager + self.clientid = os.uname().nodename.lower().replace(" ", "_") + self.mqttTopic= "devices/$CLIENTID/onlyone" + + def _mqtt_on_connect(self, client, userdata, flags, reasonCode, properties): + self.connected=True + options = pahomqtt.SubscribeOptions() + options.noLocal=True + if self.mqttTopic== None: + error = "missing required configuration mqttTopic" + log.error(error) + raise error + log.info("subscribing to :" + self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set") + self.mqttClient.subscribe(self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set" ,0, options) + + + def _mqtt_on_message(self, client, userdata, msg): + data = msg.payload.decode("utf-8") + log.info("mqtt server received message:" + data) + self.manager.current(data) + + + def load(self, dict): + if dict == None: return + if("brokerAddress" in dict): self.brokerAddress = dict["brokerAddress"] + if("brokerPort" in dict): self.brokerPort = dict["brokerPort"] + if("clientId" in dict): self.clientId = dict["clientId"] + if("topic" in dict): self.mqttTopic = dict["topic"] + log.info("load finished:") + log.info("brokerAddress:" + self.brokerAddress) + log.info("brokerPort:" + str(self.brokerPort)) + log.info("clientid:" + self.clientid) + log.info("topic:" + self.mqttTopic) + + def start(self): + + if self.brokerAddress == None: + error = "missing required configuration brokerAddress" + log.error(error) + raise error + if self.brokerPort == None: + error = "missing required configuration brokerPort" + log.error(error) + raise error + + self.mqttClient = pahomqtt.Client(client_id=self.clientid, protocol=pahomqtt.MQTTv5) + self.mqttClient.on_connect = self._mqtt_on_connect + self.mqttClient.on_message = self._mqtt_on_message + log.info("starting server -> address:" + self.brokerAddress + ", port:" + str(self.brokerPort)) + try: + self.mqttClient.connect(self.brokerAddress, self.brokerPort) + self.mqttClient.loop_start() + except BaseException as ex: + log.error(str(ex)) + raise "Error connecting to mqtt broker" + + + \ No newline at end of file -- 2.49.1