adjust polling during daytime, retries with backoff and authentication
This commit is contained in:
194
src/main.py
194
src/main.py
@@ -4,7 +4,10 @@ import socket
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from zoneinfo import ZoneInfo
|
||||
import backoff
|
||||
from suntime import Sun
|
||||
import goodwe
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
@@ -16,106 +19,155 @@ MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
|
||||
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "goodwe")
|
||||
MQTT_USER = os.getenv("MQTT_USER", "goodwe")
|
||||
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "goodwe")
|
||||
DISCOVERY_INTERVAL = int(os.getenv("DISCOVERY_INTERVAL", 60)) # seconds
|
||||
BROADCAST_SUBNET = os.getenv("BROADCAST_SUBNET", "192.168.1.255") # directed broadcast
|
||||
POLLING_INTERVAL = int(os.getenv("POLLING_INTERVAL", 20))
|
||||
DISCOVERY_INTERVAL = int(os.getenv("DISCOVERY_INTERVAL", 60))
|
||||
BROADCAST_SUBNET = os.getenv("BROADCAST_SUBNET", "192.168.1.255")
|
||||
|
||||
LATITUDE = float(os.getenv("LATITUDE", 52.3676))
|
||||
LONGITUDE = float(os.getenv("LONGITUDE", 4.9041))
|
||||
LOCAL_TZ = ZoneInfo(os.getenv("TIMEZONE", "Europe/Amsterdam"))
|
||||
|
||||
UDP_PORT = 48899
|
||||
UDP_MSG = b"WIFIKIT-214028-READ"
|
||||
|
||||
# -----------------------------
|
||||
# LOGGING SETUP
|
||||
# LOGGING HELPER
|
||||
# -----------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s"
|
||||
)
|
||||
logger = logging.getLogger("goodwe-daemon")
|
||||
class MqttLogger:
|
||||
"""Helper to log to both standard logger and MQTT subtopics."""
|
||||
def __init__(self, base_topic, mqtt_client):
|
||||
self.logger = logging.getLogger("goodwe-daemon")
|
||||
self.client = mqtt_client
|
||||
self.base_topic = f"{base_topic}/logs"
|
||||
|
||||
def _log(self, level, msg):
|
||||
# Standard Logging
|
||||
getattr(self.logger, level)(msg)
|
||||
# MQTT Publishing (if connected)
|
||||
if self.client.is_connected():
|
||||
self.client.publish(f"{self.base_topic}/{level}", msg)
|
||||
|
||||
def debug(self, msg): self._log("debug", msg)
|
||||
def info(self, msg): self._log("info", msg)
|
||||
def warning(self, msg): self._log("warning", msg)
|
||||
def error(self, msg): self._log("error", msg)
|
||||
|
||||
# Global Logger Instance (initialized after MQTT client)
|
||||
mqtt_log = None
|
||||
|
||||
# -----------------------------
|
||||
# MQTT CLIENT SETUP
|
||||
# LOGIC HELPERS
|
||||
# -----------------------------
|
||||
def is_operational_hours():
|
||||
sun = Sun(LATITUDE, LONGITUDE)
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
try:
|
||||
sr = sun.get_sunrise_time().replace(tzinfo=timezone.utc)
|
||||
ss = sun.get_sunset_time().replace(tzinfo=timezone.utc)
|
||||
|
||||
# Windows: 2 hours before sunrise until 2 hours after sunset
|
||||
start_window = sr - timedelta(hours=2)
|
||||
end_window = ss + timedelta(hours=2)
|
||||
|
||||
def on_connect(client, userdata, flags, reason_code, properties):
|
||||
if reason_code == "Success":
|
||||
logger.info("MQTT connected and authenticated")
|
||||
else:
|
||||
raise RuntimeError(f"MQTT connect/auth failed: {reason_code}")
|
||||
return start_window <= now_utc <= end_window
|
||||
except Exception as e:
|
||||
if mqtt_log: mqtt_log.error(f"Error calculating sun times: {e}")
|
||||
return True
|
||||
|
||||
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
mqtt_client.username_pw_set(
|
||||
username=MQTT_USER,
|
||||
password=MQTT_PASSWORD,
|
||||
)
|
||||
mqtt_client.on_connect = on_connect
|
||||
mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
|
||||
mqtt_client.loop_start()
|
||||
|
||||
# -----------------------------
|
||||
# HELPER FUNCTIONS
|
||||
# -----------------------------
|
||||
async def discover_inverters(timeout=3):
|
||||
"""Discover inverters using UDP broadcast (non-root)."""
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
sock.settimeout(timeout)
|
||||
inverters = []
|
||||
found_ips = []
|
||||
try:
|
||||
sock.sendto(UDP_MSG, (BROADCAST_SUBNET, UDP_PORT))
|
||||
while True:
|
||||
data, addr = sock.recvfrom(1024)
|
||||
inverters.append(addr[0])
|
||||
found_ips.append(addr[0])
|
||||
except socket.timeout:
|
||||
pass
|
||||
finally:
|
||||
sock.close()
|
||||
return list(set(inverters))
|
||||
|
||||
async def read_runtime(ip):
|
||||
"""Connect to a GoodWe inverter and read runtime data."""
|
||||
try:
|
||||
inverter = await goodwe.connect(ip)
|
||||
runtime = await inverter.read_runtime_data()
|
||||
print(runtime)
|
||||
return runtime
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read {ip}: {e}")
|
||||
return None
|
||||
|
||||
def json_serializer(obj):
|
||||
if isinstance(obj, datetime.datetime):
|
||||
return obj.isoformat()
|
||||
raise TypeError(f"Type {type(obj)} not serializable")
|
||||
|
||||
async def publish_runtime(ip):
|
||||
runtime = await read_runtime(ip)
|
||||
if runtime:
|
||||
payload = {"ip": ip, "data": runtime}
|
||||
result = mqtt_client.publish(MQTT_TOPIC, json.dumps(payload, default=json_serializer))
|
||||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||||
raise logger.warning(f"Publish failed: {result.rc}")
|
||||
logger.info(f"Published runtime for {ip}")
|
||||
return list(set(found_ips))
|
||||
|
||||
# -----------------------------
|
||||
# MAIN LOOP
|
||||
# -----------------------------
|
||||
async def main_loop():
|
||||
while True:
|
||||
logger.info("Discovering inverters...")
|
||||
inverters = await discover_inverters()
|
||||
if not inverters:
|
||||
logger.warning("No inverters found")
|
||||
for ip in inverters:
|
||||
await publish_runtime(ip)
|
||||
await asyncio.sleep(DISCOVERY_INTERVAL)
|
||||
def on_connect(client, userdata, flags, reason_code, properties):
|
||||
if reason_code == "Success":
|
||||
logging.info("MQTT connected and authenticated")
|
||||
else:
|
||||
logging.error(f"MQTT connect/auth failed: {reason_code}")
|
||||
|
||||
async def main_loop():
|
||||
global mqtt_log
|
||||
|
||||
# Standard logging setup for initial boot
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
|
||||
# MQTT Setup
|
||||
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
client.username_pw_set(username=MQTT_USER, password=MQTT_PASSWORD)
|
||||
client.on_connect = on_connect
|
||||
client.connect(MQTT_BROKER, MQTT_PORT)
|
||||
client.loop_start()
|
||||
|
||||
# Initialize our helper
|
||||
mqtt_log = MqttLogger(MQTT_TOPIC, client)
|
||||
|
||||
active_inverter = None
|
||||
inverter_ip = None
|
||||
|
||||
# Redefine backoff here to use the mqtt_log
|
||||
@backoff.on_exception(backoff.expo, Exception, max_tries=5,
|
||||
on_backoff=lambda details: mqtt_log.warning(f"Retrying connection... {details['wait']:.1f}s"))
|
||||
async def connect_with_retry(ip):
|
||||
return await goodwe.connect(ip)
|
||||
|
||||
while True:
|
||||
if not is_operational_hours():
|
||||
mqtt_log.info("Outside operational hours (Night). Sleeping 10m.")
|
||||
active_inverter = None
|
||||
await asyncio.sleep(600)
|
||||
continue
|
||||
|
||||
if active_inverter is None:
|
||||
mqtt_log.info("Starting discovery...")
|
||||
ips = await discover_inverters()
|
||||
|
||||
if ips:
|
||||
inverter_ip = ips[0]
|
||||
try:
|
||||
active_inverter = await connect_with_retry(inverter_ip)
|
||||
mqtt_log.info(f"Connected to inverter at {inverter_ip}")
|
||||
except Exception as e:
|
||||
mqtt_log.error(f"Discovery failed to bind to {inverter_ip}: {e}")
|
||||
active_inverter = None
|
||||
else:
|
||||
mqtt_log.debug(f"No inverters found. Retrying in {DISCOVERY_INTERVAL}s")
|
||||
await asyncio.sleep(DISCOVERY_INTERVAL)
|
||||
continue
|
||||
|
||||
try:
|
||||
runtime_data = await active_inverter.read_runtime_data()
|
||||
payload = {
|
||||
"timestamp": datetime.now(LOCAL_TZ).isoformat(),
|
||||
"inverter_ip": inverter_ip,
|
||||
"sensors": {k: v for k, v in runtime_data.items()}
|
||||
}
|
||||
|
||||
client.publish(MQTT_TOPIC, json.dumps(payload, default=str))
|
||||
mqtt_log.debug(f"Data published for {inverter_ip}")
|
||||
|
||||
await asyncio.sleep(POLLING_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
mqtt_log.warning(f"Inverter {inverter_ip} lost connection: {e}")
|
||||
active_inverter = None
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# -----------------------------
|
||||
# ENTRY POINT
|
||||
# -----------------------------
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main_loop())
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Daemon stopped")
|
||||
mqtt_client.loop_stop()
|
||||
mqtt_client.disconnect()
|
||||
if mqtt_log: mqtt_log.info("Daemon stopping via KeyboardInterrupt")
|
||||
|
||||
Reference in New Issue
Block a user