Compare commits

...

1 Commits

Author SHA1 Message Date
600fe297a1 feature : mqtt 2022-01-22 10:11:59 +00:00
3 changed files with 89 additions and 2 deletions

View File

@@ -4,3 +4,8 @@ processes:
cmd: cat cmd: cat
- key: nano - key: nano
cmd: nano cmd: nano
mqtt: # only add mqtt if mqtt required
brokerAddress: "192.168.1.251"
topic: "devices/$CLIENTID/audio/input"
brokerPort: 8883 #optional default value 1883
clientId: DeviceName # optional default value os.uname()

View File

@@ -1,10 +1,10 @@
from distutils.command.config import config
import os import os
import psutil import psutil
import logging import logging
import yaml import yaml
from typing import List from typing import List
from onlyone import namedpipes from onlyone import namedpipes
from onlyone import mqtt
DEFAULT_CONFIG_PATH = "/etc/onlyone/config.yaml" DEFAULT_CONFIG_PATH = "/etc/onlyone/config.yaml"
@@ -134,5 +134,15 @@ class Server:
if(configPath != None and configPath!=""): if(configPath != None and configPath!=""):
configDict = config_file_to_dict(configPath) configDict = config_file_to_dict(configPath)
self.manager.load(configDict, True) self.manager.load(configDict, True)
self.npserver = namedpipes.Server(self.manager) self.npserver = namedpipes.Server(self.manager)
if "mqtt" in configDict:
try:
self.mqtt = mqtt.MqttClient(self.manager)
self.mqtt.load(configDict["mqtt"])
self.mqtt.start()
except:
log.error("error initializing mqtt")

72
onlyone/mqtt/__init__.py Normal file
View File

@@ -0,0 +1,72 @@
from aifc import Error
import paho.mqtt.client as pahomqtt
import os
import logging
import sys
log = logging.getLogger(__name__)
class MqttClient():
def __init__(self, manager):
self.mqttClient=None
self.connected=False
self.brokerAddress="";
self.brokerPort=1883
self.manager = manager
self.clientid = os.uname().nodename.lower().replace(" ", "_")
self.mqttTopic= "devices/$CLIENTID/onlyone"
def _mqtt_on_connect(self, client, userdata, flags, reasonCode, properties):
self.connected=True
options = pahomqtt.SubscribeOptions()
options.noLocal=True
if self.mqttTopic== None:
error = "missing required configuration mqttTopic"
log.error(error)
raise error
log.info("subscribing to :" + self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set")
self.mqttClient.subscribe(self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set" ,0, options)
def _mqtt_on_message(self, client, userdata, msg):
data = msg.payload.decode("utf-8")
log.info("mqtt server received message:" + data)
self.manager.current(data)
def load(self, dict):
if dict == None: return
if("brokerAddress" in dict): self.brokerAddress = dict["brokerAddress"]
if("brokerPort" in dict): self.brokerPort = dict["brokerPort"]
if("clientId" in dict): self.clientId = dict["clientId"]
if("topic" in dict): self.mqttTopic = dict["topic"]
log.info("load finished:")
log.info("brokerAddress:" + self.brokerAddress)
log.info("brokerPort:" + str(self.brokerPort))
log.info("clientid:" + self.clientid)
log.info("topic:" + self.mqttTopic)
def start(self):
if self.brokerAddress == None:
error = "missing required configuration brokerAddress"
log.error(error)
raise error
if self.brokerPort == None:
error = "missing required configuration brokerPort"
log.error(error)
raise error
self.mqttClient = pahomqtt.Client(client_id=self.clientid, protocol=pahomqtt.MQTTv5)
self.mqttClient.on_connect = self._mqtt_on_connect
self.mqttClient.on_message = self._mqtt_on_message
log.info("starting server -> address:" + self.brokerAddress + ", port:" + str(self.brokerPort))
try:
self.mqttClient.connect(self.brokerAddress, self.brokerPort)
self.mqttClient.loop_start()
except BaseException as ex:
log.error(str(ex))
raise "Error connecting to mqtt broker"