Compare commits
2 Commits
803cf31aa2
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 39b08dd332 | |||
| 600fe297a1 |
@@ -4,3 +4,8 @@ processes:
|
||||
cmd: cat
|
||||
- key: nano
|
||||
cmd: nano
|
||||
mqtt: # only add mqtt if mqtt required
|
||||
brokerAddress: "192.168.1.251"
|
||||
topic: "devices/$CLIENTID/audio/input"
|
||||
brokerPort: 8883 #optional default value 1883
|
||||
clientId: DeviceName # optional default value os.uname()
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from distutils.command.config import config
|
||||
import os
|
||||
import psutil
|
||||
import logging
|
||||
import yaml
|
||||
from typing import List
|
||||
from onlyone import namedpipes
|
||||
from onlyone import mqtt
|
||||
|
||||
DEFAULT_CONFIG_PATH = "/etc/onlyone/config.yaml"
|
||||
|
||||
@@ -134,5 +134,15 @@ class Server:
|
||||
if(configPath != None and configPath!=""):
|
||||
configDict = config_file_to_dict(configPath)
|
||||
self.manager.load(configDict, True)
|
||||
|
||||
self.npserver = namedpipes.Server(self.manager)
|
||||
|
||||
if "mqtt" in configDict:
|
||||
try:
|
||||
self.mqtt = mqtt.MqttClient(self.manager)
|
||||
self.mqtt.load(configDict["mqtt"])
|
||||
self.mqtt.start()
|
||||
except:
|
||||
log.error("error initializing mqtt")
|
||||
|
||||
|
||||
72
onlyone/mqtt/__init__.py
Normal file
72
onlyone/mqtt/__init__.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from aifc import Error
|
||||
import paho.mqtt.client as pahomqtt
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
class MqttClient():
|
||||
|
||||
def __init__(self, manager):
|
||||
self.mqttClient=None
|
||||
self.connected=False
|
||||
self.brokerAddress="";
|
||||
self.brokerPort=1883
|
||||
self.manager = manager
|
||||
self.clientid = os.uname().nodename.lower().replace(" ", "_")
|
||||
self.mqttTopic= "devices/$CLIENTID/onlyone"
|
||||
|
||||
def _mqtt_on_connect(self, client, userdata, flags, reasonCode, properties):
|
||||
self.connected=True
|
||||
options = pahomqtt.SubscribeOptions()
|
||||
options.noLocal=True
|
||||
if self.mqttTopic== None:
|
||||
error = "missing required configuration mqttTopic"
|
||||
log.error(error)
|
||||
raise error
|
||||
log.info("subscribing to :" + self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set")
|
||||
self.mqttClient.subscribe(self.mqttTopic.replace("$CLIENTID", self.clientid) + "/set" ,0, options)
|
||||
|
||||
|
||||
def _mqtt_on_message(self, client, userdata, msg):
|
||||
data = msg.payload.decode("utf-8")
|
||||
log.info("mqtt server received message:" + data)
|
||||
self.manager.current(data)
|
||||
|
||||
|
||||
def load(self, dict):
|
||||
if dict == None: return
|
||||
if("brokerAddress" in dict): self.brokerAddress = dict["brokerAddress"]
|
||||
if("brokerPort" in dict): self.brokerPort = dict["brokerPort"]
|
||||
if("clientId" in dict): self.clientId = dict["clientId"]
|
||||
if("topic" in dict): self.mqttTopic = dict["topic"]
|
||||
log.info("load finished:")
|
||||
log.info("brokerAddress:" + self.brokerAddress)
|
||||
log.info("brokerPort:" + str(self.brokerPort))
|
||||
log.info("clientid:" + self.clientid)
|
||||
log.info("topic:" + self.mqttTopic)
|
||||
|
||||
def start(self):
|
||||
|
||||
if self.brokerAddress == None:
|
||||
error = "missing required configuration brokerAddress"
|
||||
log.error(error)
|
||||
raise error
|
||||
if self.brokerPort == None:
|
||||
error = "missing required configuration brokerPort"
|
||||
log.error(error)
|
||||
raise error
|
||||
|
||||
self.mqttClient = pahomqtt.Client(client_id=self.clientid, protocol=pahomqtt.MQTTv5)
|
||||
self.mqttClient.on_connect = self._mqtt_on_connect
|
||||
self.mqttClient.on_message = self._mqtt_on_message
|
||||
log.info("starting server -> address:" + self.brokerAddress + ", port:" + str(self.brokerPort))
|
||||
try:
|
||||
self.mqttClient.connect(self.brokerAddress, self.brokerPort)
|
||||
self.mqttClient.loop_start()
|
||||
except BaseException as ex:
|
||||
log.error(str(ex))
|
||||
raise "Error connecting to mqtt broker"
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user