upgrade homeassistant

This commit is contained in:
2026-02-10 20:02:37 +01:00
parent 44d0750dd7
commit 2dacbb9a3e
9 changed files with 1130 additions and 977 deletions

View File

@@ -54,11 +54,10 @@ google_assistant:
- BRIGHT_LIGHTS
- ENTRY_LIGHTS
# auth_header:
# allow_bypass_login: true
# # username_header: X-Forwarded-Preferred-Username
# username_header: X-Homeassistant-User
# debug: true
auth_header:
allow_bypass_login: true
username_header: X-Homeassistant-User
debug: true
openid:
client_id: !secret openid_client_id

View File

@@ -2,128 +2,83 @@
from __future__ import annotations
import asyncio
from datetime import timedelta
import logging
from dataclasses import dataclass
from datetime import timedelta
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.helpers.entity_registry import async_migrate_entries
from .const import (
CONF_SCAN_INTERVAL,
CONF_STATION_ID,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
GOODWE_SPELLING,
PLATFORMS,
)
from .sems_api import SemsApi
_LOGGER: logging.Logger = logging.getLogger(__package__)
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
@dataclass(slots=True)
class SemsRuntimeData:
"""Runtime data stored on the config entry."""
api: SemsApi
coordinator: SemsDataUpdateCoordinator
type SemsConfigEntry = ConfigEntry[SemsRuntimeData]
@dataclass(slots=True)
class SemsData:
"""Runtime SEMS data returned by the coordinator."""
inverters: dict[str, dict[str, Any]]
homekit: dict[str, Any] | None = None
currency: str | None = None
async def async_setup(hass: HomeAssistant, config: dict):
"""Set up the sems component."""
# Ensure our name space for storing objects is a known type. A dict is
# common/preferred as it allows a separate instance of your class for each
# instance that has been created in the UI.
hass.data.setdefault(DOMAIN, {})
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: SemsConfigEntry) -> bool:
"""Set up sems from a config entry."""
semsApi = SemsApi(hass, entry.data[CONF_USERNAME], entry.data[CONF_PASSWORD])
coordinator = SemsDataUpdateCoordinator(hass, semsApi, entry)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = coordinator
sems_api = SemsApi(hass, entry.data[CONF_USERNAME], entry.data[CONF_PASSWORD])
coordinator = SemsDataUpdateCoordinator(hass, sems_api, entry)
entry.runtime_data = SemsRuntimeData(api=sems_api, coordinator=coordinator)
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
# async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
# """Set up this integration using UI."""
# if hass.data.get(DOMAIN) is None:
# hass.data.setdefault(DOMAIN, {})
# _LOGGER.info(STARTUP_MESSAGE)
# username = entry.data.get(CONF_USERNAME)
# password = entry.data.get(CONF_PASSWORD)
# conn = Connection(username, password)
# client = MyenergiClient(conn)
# coordinator = MyenergiDataUpdateCoordinator(hass, client=client, entry=entry)
# await coordinator.async_config_entry_first_refresh()
# hass.data[DOMAIN][entry.entry_id] = coordinator
# for platform in PLATFORMS:
# if entry.options.get(platform, True):
# coordinator.platforms.append(platform)
# hass.async_add_job(
# hass.config_entries.async_forward_entry_setup(entry, platform)
# )
# entry.add_update_listener(async_reload_entry)
# return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: SemsConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, platform)
for platform in PLATFORMS
]
)
)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# """Handle removal of an entry."""
# coordinator = hass.data[DOMAIN][entry.entry_id]
# unloaded = all(
# await asyncio.gather(
# *[
# hass.config_entries.async_forward_entry_unload(entry, platform)
# for platform in PLATFORMS
# if platform in coordinator.platforms
# ]
# )
# )
# if unloaded:
# hass.data[DOMAIN].pop(entry.entry_id)
# return unloaded
# async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
# """Reload config entry."""
# await async_unload_entry(hass, entry)
# await async_setup_entry(hass, entry)
class SemsDataUpdateCoordinator(DataUpdateCoordinator):
class SemsDataUpdateCoordinator(DataUpdateCoordinator[SemsData]):
"""Class to manage fetching data from the API."""
def __init__(self, hass: HomeAssistant, semsApi: SemsApi, entry) -> None:
def __init__(
self, hass: HomeAssistant, sems_api: SemsApi, entry: ConfigEntry
) -> None:
"""Initialize."""
self.semsApi = semsApi
self.platforms = []
self.stationId = entry.data[CONF_STATION_ID]
self.hass = hass
self.sems_api = sems_api
self.station_id = entry.data[CONF_STATION_ID]
update_interval = timedelta(
seconds=entry.data.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
@@ -136,70 +91,82 @@ class SemsDataUpdateCoordinator(DataUpdateCoordinator):
update_interval=update_interval,
)
async def _async_update_data(self):
async def _async_update_data(self) -> SemsData:
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
try:
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
# async with async_timeout.timeout(10):
try:
result = await self.hass.async_add_executor_job(
self.semsApi.getData, self.stationId
self.sems_api.getData, self.station_id
)
except Exception as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
else:
_LOGGER.debug("semsApi.getData result: %s", result)
# _LOGGER.warning("SEMS - Try get getPowerStationIds")
# powerStationIds = await self.hass.async_add_executor_job(
# self.semsApi.getPowerStationIds
# )
# _LOGGER.warning(
# "SEMS - getPowerStationIds: Found power station IDs: %s",
# powerStationIds,
# )
inverters = result["inverter"]
# found = []
# _LOGGER.debug("Found inverters: %s", inverters)
data = {}
if inverters is None:
# something went wrong, probably token could not be fetched
inverters = result.get("inverter")
inverters_by_sn: dict[str, dict[str, Any]] = {}
if not inverters or not isinstance(inverters, list):
raise UpdateFailed(
"Error communicating with API, probably token could not be fetched, see debug logs"
"Error communicating with API: invalid or missing inverter data. See debug logs."
)
# Get Inverter Data
for inverter in inverters:
name = inverter["invert_full"]["name"]
# powerstation_id = inverter["invert_full"]["powerstation_id"]
sn = inverter["invert_full"]["sn"]
inverter_full = inverter.get("invert_full")
if not isinstance(inverter_full, dict):
continue
name = inverter_full.get("name")
sn = inverter_full.get("sn")
if not isinstance(sn, str):
continue
_LOGGER.debug("Found inverter attribute %s %s", name, sn)
data[sn] = inverter["invert_full"]
inverters_by_sn[sn] = inverter_full
hasPowerflow = result["hasPowerflow"]
hasEnergeStatisticsCharts = result["hasEnergeStatisticsCharts"]
# Add currency
kpi = result.get("kpi")
if not isinstance(kpi, dict):
kpi = {}
currency = kpi.get("currency")
if hasPowerflow:
has_powerflow = bool(result.get("hasPowerflow"))
has_energy_statistics_charts = bool(
result.get(GOODWE_SPELLING.hasEnergyStatisticsCharts)
)
homekit: dict[str, Any] | None = None
if has_powerflow:
_LOGGER.debug("Found powerflow data")
if hasEnergeStatisticsCharts:
StatisticsCharts = {
f"Charts_{key}": val
for key, val in result["energeStatisticsCharts"].items()
}
StatisticsTotals = {
f"Totals_{key}": val
for key, val in result["energeStatisticsTotals"].items()
}
powerflow = {
**result["powerflow"],
**StatisticsCharts,
**StatisticsTotals,
}
else:
powerflow = result["powerflow"]
powerflow = result.get("powerflow")
if not isinstance(powerflow, dict):
powerflow = {}
powerflow["sn"] = result["homKit"]["sn"]
if has_energy_statistics_charts:
charts = result.get(GOODWE_SPELLING.energyStatisticsCharts)
if not isinstance(charts, dict):
charts = {}
totals = result.get(GOODWE_SPELLING.energyStatisticsTotals)
if not isinstance(totals, dict):
totals = {}
powerflow = {
**powerflow,
**{f"Charts_{key}": val for key, val in charts.items()},
**{f"Totals_{key}": val for key, val in totals.items()},
}
homekit_data = result.get(GOODWE_SPELLING.homeKit)
if not isinstance(homekit_data, dict):
homekit_data = {}
powerflow["sn"] = homekit_data.get("sn")
# Goodwe 'Power Meter' (not HomeKit) doesn't have a sn
# Let's put something in, otherwise we can't see the data.
@@ -208,72 +175,16 @@ class SemsDataUpdateCoordinator(DataUpdateCoordinator):
# _LOGGER.debug("homeKit sn: %s", result["homKit"]["sn"])
# This seems more accurate than the Chart_sum
powerflow["all_time_generation"] = result["kpi"]["total_power"]
powerflow["all_time_generation"] = kpi.get("total_power")
data["homeKit"] = powerflow
homekit = powerflow
data = SemsData(
inverters=inverters_by_sn, homekit=homekit, currency=currency
)
_LOGGER.debug("Resulting data: %s", data)
return data
# except ApiError as err:
except Exception as err:
# logging.exception("Something awful happened!")
raise UpdateFailed(f"Error communicating with API: {err}") from err
# # migrate to _power ids for inverter entry
# async def async_migrate_entry(hass, config_entry):
# """Migrate old entry."""
# _LOGGER.debug(
# "Migrating configuration from version %s.%s",
# config_entry.version,
# config_entry.minor_version,
# )
# if config_entry.version < 7:
# # get existing entities for device
# semsApi = SemsApi(
# hass, config_entry.data[CONF_USERNAME], config_entry.data[CONF_PASSWORD]
# )
# coordinator = SemsDataUpdateCoordinator(hass, semsApi, config_entry)
# await coordinator.async_config_entry_first_refresh()
# _LOGGER.debug(f"found inverter {coordinator.data}")
# for idx, ent in enumerate(coordinator.data):
# _LOGGER.debug("Found inverter: %s", ent)
# old_unique_id = f"{ent}"
# new_unique_id = f"{old_unique_id}-XXX"
# _LOGGER.debug(
# "Old unique id: %s; new unique id: %s", old_unique_id, new_unique_id
# )
# @callback
# def update_unique_id(entity_entry):
# """Update unique ID of entity entry."""
# return {
# "new_unique_id": entity_entry.unique_id.replace(
# old_unique_id, new_unique_id
# )
# }
# if old_unique_id != new_unique_id:
# await async_migrate_entries(
# hass, config_entry.entry_id, update_unique_id
# )
# hass.config_entries.async_update_entry(
# config_entry, unique_id=new_unique_id
# )
# # version = 7
# _LOGGER.info(
# "Migrated unique id from %s to %s", old_unique_id, new_unique_id
# )
# _LOGGER.info(
# "Migration from version %s.%s successful",
# config_entry.version,
# config_entry.minor_version,
# )
# return True
# Type alias to make type inference working for pylance
type SemsCoordinator = SemsDataUpdateCoordinator

View File

@@ -6,17 +6,13 @@ import logging
from typing import Any
from homeassistant import config_entries
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN, SEMS_CONFIG_SCHEMA, CONF_STATION_ID
from .const import CONF_STATION_ID, DOMAIN, SEMS_CONFIG_SCHEMA
from .sems_api import SemsApi
from homeassistant.const import (
CONF_PASSWORD,
CONF_USERNAME,
)
_LOGGER = logging.getLogger(__name__)
@@ -73,7 +69,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> dict[str, Any]:
) -> config_entries.ConfigFlowResult:
"""Handle the initial step."""
if user_input is None:
return self.async_show_form(step_id="user", data_schema=SEMS_CONFIG_SCHEMA)

View File

@@ -1,7 +1,6 @@
"""Constants for the SEMS integration."""
import voluptuous as vol
from homeassistant.const import CONF_PASSWORD, CONF_SCAN_INTERVAL, CONF_USERNAME
DOMAIN = "sems"
@@ -23,3 +22,24 @@ SEMS_CONFIG_SCHEMA = vol.Schema(
): int, # , default=DEFAULT_SCAN_INTERVAL
}
)
AC_EMPTY = 6553.5
AC_CURRENT_EMPTY = 6553.5
AC_FEQ_EMPTY = 655.35
STATUS_LABELS = {-1: "Offline", 0: "Waiting", 1: "Normal", 2: "Fault"}
class GOODWE_SPELLING:
"""Constants for correcting GoodWe API spelling errors."""
battery = "bettery"
batteryStatus = "betteryStatus"
homeKit = "homKit"
temperature = "tempperature"
hasEnergyStatisticsCharts = "hasEnergeStatisticsCharts"
energyStatisticsCharts = "energeStatisticsCharts"
energyStatisticsTotals = "energeStatisticsTotals"
thisMonthTotalE = "thismonthetotle"
lastMonthTotalE = "lastmonthetotle"

View File

@@ -9,6 +9,7 @@
"documentation": "https://github.com/TimSoethout/goodwe-sems-home-assistant",
"iot_class": "cloud_polling",
"issue_tracker": "https://github.com/TimSoethout/goodwe-sems-home-assistant/issues",
"loggers": ["custom_components.sems"],
"requirements": [],
"version": "7.4.1"
"version": "8.2.0"
}

View File

@@ -1,9 +1,12 @@
from __future__ import annotations
import json
import logging
from typing import Any
import requests
from homeassistant import exceptions
from homeassistant.core import HomeAssistant
_LOGGER = logging.getLogger(__name__)
@@ -26,130 +29,206 @@ _DefaultHeaders = {
class SemsApi:
"""Interface to the SEMS API."""
def __init__(self, hass, username, password):
def __init__(self, hass: HomeAssistant, username: str, password: str) -> None:
"""Init dummy hub."""
self._hass = hass
self._username = username
self._password = password
self._token = None
self._token: dict[str, Any] | None = None
def test_authentication(self) -> bool:
"""Test if we can authenticate with the host."""
try:
self._token = self.getLoginToken(self._username, self._password)
return self._token is not None
except Exception as exception:
_LOGGER.exception("SEMS Authentication exception: %s", exception)
return False
else:
return self._token is not None
def getLoginToken(self, userName, password):
def _make_http_request(
self,
url: str,
headers: dict[str, str],
data: str | None = None,
json_data: dict[str, Any] | None = None,
operation_name: str = "HTTP request",
validate_code: bool = True,
) -> dict[str, Any] | None:
"""Make a generic HTTP request with error handling and optional code validation."""
try:
_LOGGER.debug("SEMS - Making %s to %s", operation_name, url)
response = requests.post(
url,
headers=headers,
data=data,
json=json_data,
timeout=_RequestTimeout,
)
_LOGGER.debug("%s Response: %s", operation_name, response)
# _LOGGER.debug("%s Response text: %s", operation_name, response.text)
response.raise_for_status()
jsonResponse: dict[str, Any] = response.json()
# Validate response code if requested
if validate_code:
if jsonResponse.get("code") not in (0, "0"):
_LOGGER.error(
"%s failed with code: %s, message: %s",
operation_name,
jsonResponse.get("code"),
jsonResponse.get("msg", "Unknown error"),
)
return None
if jsonResponse.get("data") is None:
_LOGGER.error("%s response missing data field", operation_name)
return None
return jsonResponse
except (requests.RequestException, ValueError, KeyError) as exception:
_LOGGER.error("Unable to complete %s: %s", operation_name, exception)
raise
def getLoginToken(self, userName: str, password: str) -> dict[str, Any] | None:
"""Get the login token for the SEMS API."""
try:
# Get our Authentication Token from SEMS Portal API
_LOGGER.debug("SEMS - Getting API token")
# Prepare Login Data to retrieve Authentication Token
# Dict won't work here somehow, so this magic string creation must do.
login_data = '{"account":"' + userName + '","pwd":"' + password + '"}'
# login_data = {"account": userName, "pwd": password}
# Make POST request to retrieve Authentication Token from SEMS API
login_response = requests.post(
jsonResponse = self._make_http_request(
_LoginURL,
headers=_DefaultHeaders,
_DefaultHeaders,
data=login_data,
timeout=_RequestTimeout,
operation_name="login API call",
validate_code=True,
)
_LOGGER.debug("Login Response: %s", login_response)
# _LOGGER.debug("Login Response text: %s", login_response.text)
login_response.raise_for_status()
if jsonResponse is None:
return None
# Process response as JSON
jsonResponse = login_response.json() # json.loads(login_response.text)
# _LOGGER.debug("Login JSON response %s", jsonResponse)
# Get all the details from our response, needed to make the next POST request (the one that really fetches the data)
# Also store the api url send with the authentication request for later use
tokenDict = jsonResponse["data"]
if not isinstance(tokenDict, dict):
_LOGGER.error("Login response data was not a dict")
return None
tokenDict["api"] = jsonResponse["api"]
_LOGGER.debug("SEMS - API Token received: %s", tokenDict)
return tokenDict
except Exception as exception:
_LOGGER.error("Unable to fetch login token from SEMS API. %s", exception)
except (requests.RequestException, ValueError, KeyError) as exception:
_LOGGER.error("Unable to fetch login token from SEMS API: %s", exception)
return None
def getPowerStationIds(self, renewToken=False, maxTokenRetries=2) -> str:
def _make_api_call(
self,
url_part: str,
data: str | None = None,
renewToken: bool = False,
maxTokenRetries: int = 2,
operation_name: str = "API call",
) -> dict[str, Any] | None:
"""Make a generic API call with token management and retry logic."""
_LOGGER.debug("SEMS - Making %s", operation_name)
if maxTokenRetries <= 0:
_LOGGER.info("SEMS - Maximum token fetch tries reached, aborting for now")
raise OutOfRetries
if self._token is None or renewToken:
_LOGGER.debug(
"API token not set (%s) or new token requested (%s), fetching",
self._token,
renewToken,
)
self._token = self.getLoginToken(self._username, self._password)
if self._token is None:
_LOGGER.error("Failed to obtain API token")
return None
# Prepare headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"token": json.dumps(self._token),
}
api_url = self._token["api"] + url_part
try:
jsonResponse: dict[str, Any] | None = self._make_http_request(
api_url,
headers,
data=data,
operation_name=operation_name,
validate_code=True,
)
# _make_http_request already validated the response, so if we get here, it's successful
if jsonResponse is None:
# Response validation failed in _make_http_request
_LOGGER.debug(
"%s not successful, retrying with new token, %s retries remaining",
operation_name,
maxTokenRetries,
)
return self._make_api_call(
url_part, data, True, maxTokenRetries - 1, operation_name
)
# Response is valid, return the data
return jsonResponse["data"]
except (requests.RequestException, ValueError, KeyError) as exception:
_LOGGER.error("Unable to complete %s: %s", operation_name, exception)
return None
def getPowerStationIds(
self, renewToken: bool = False, maxTokenRetries: int = 2
) -> dict[str, Any] | None:
"""Get the power station ids from the SEMS API."""
try:
# Get the status of our SEMS Power Station
_LOGGER.debug(
"SEMS - getPowerStationIds Making Power Station Status API Call"
)
if maxTokenRetries <= 0:
_LOGGER.info(
"SEMS - Maximum token fetch tries reached, aborting for now"
)
raise OutOfRetries
if self._token is None or renewToken:
_LOGGER.debug(
"API token not set (%s) or new token requested (%s), fetching",
self._token,
renewToken,
)
self._token = self.getLoginToken(self._username, self._password)
# Prepare Power Station status Headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"token": json.dumps(self._token),
}
getPowerStationIdUrl = self._token["api"] + _GetPowerStationIdByOwnerURLPart
_LOGGER.debug(
"Querying SEMS API (%s) for power station ids by owner",
getPowerStationIdUrl,
return self._make_api_call(
_GetPowerStationIdByOwnerURLPart,
data=None,
renewToken=renewToken,
maxTokenRetries=maxTokenRetries,
operation_name="getPowerStationIds API call",
)
response = requests.post(
getPowerStationIdUrl,
headers=headers,
# data=data,
timeout=_RequestTimeout,
)
jsonResponse = response.json()
_LOGGER.debug("Response: %s", jsonResponse)
# try again and renew token is unsuccessful
if (
jsonResponse["msg"] not in ["Successful", "操作成功"]
or jsonResponse["data"] is None
):
_LOGGER.debug(
"GetPowerStationIdByOwner Query not successful (%s), retrying with new token, %s retries remaining",
jsonResponse["msg"],
maxTokenRetries,
)
return self.getPowerStationIds(
True, maxTokenRetries=maxTokenRetries - 1
)
return jsonResponse["data"]
except Exception as exception:
_LOGGER.error(
"Unable to fetch power station Ids from SEMS Api. %s", exception
)
def getData(self, powerStationId, renewToken=False, maxTokenRetries=2) -> dict:
def getData(
self, powerStationId: str, renewToken: bool = False, maxTokenRetries: int = 2
) -> dict[str, Any]:
"""Get the latest data from the SEMS API and updates the state."""
try:
# Get the status of our SEMS Power Station
_LOGGER.debug("SEMS - Making Power Station Status API Call")
if maxTokenRetries <= 0:
_LOGGER.info(
"SEMS - Maximum token fetch tries reached, aborting for now"
data = '{"powerStationId":"' + powerStationId + '"}'
result = self._make_api_call(
_PowerStationURLPart,
data=data,
renewToken=renewToken,
maxTokenRetries=maxTokenRetries,
operation_name="getData API call",
)
return result if isinstance(result, dict) else {}
def _make_control_api_call(
self,
data: dict[str, Any],
renewToken: bool = False,
maxTokenRetries: int = 2,
operation_name: str = "Control API call",
) -> bool:
"""Make a control API call with different response handling."""
_LOGGER.debug("SEMS - Making %s", operation_name)
if maxTokenRetries <= 0:
_LOGGER.info("SEMS - Maximum token fetch tries reached, aborting for now")
raise OutOfRetries
if self._token is None or renewToken:
_LOGGER.debug(
"API token not set (%s) or new token requested (%s), fetching",
@@ -158,101 +237,72 @@ class SemsApi:
)
self._token = self.getLoginToken(self._username, self._password)
# Prepare Power Station status Headers
if self._token is None:
_LOGGER.error("Failed to obtain API token")
return False
# Prepare headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"token": json.dumps(self._token),
}
powerStationURL = self._token["api"] + _PowerStationURLPart
_LOGGER.debug(
"Querying SEMS API (%s) for power station id: %s",
powerStationURL,
powerStationId,
api_url = self._token["api"] + _PowerControlURLPart
try:
# Control API uses different validation (HTTP status code), so don't validate JSON response code
self._make_http_request(
api_url,
headers,
json_data=data,
operation_name=operation_name,
validate_code=False,
)
data = '{"powerStationId":"' + powerStationId + '"}'
# For control API, any successful HTTP response (status 200) means success
# The _make_http_request already validated HTTP status via raise_for_status()
return True
response = requests.post(
powerStationURL, headers=headers, data=data, timeout=_RequestTimeout
)
jsonResponse = response.json()
_LOGGER.debug("Response: %s", jsonResponse)
# try again and renew token is unsuccessful
if (
jsonResponse["msg"] not in ["success", "操作成功"]
or jsonResponse["data"] is None
):
_LOGGER.debug(
"Query not successful (%s), retrying with new token, %s retries remaining",
jsonResponse["msg"],
except requests.HTTPError as e:
if hasattr(e.response, "status_code") and e.response.status_code != 200:
_LOGGER.warning(
"%s not successful, retrying with new token, %s retries remaining",
operation_name,
maxTokenRetries,
)
return self.getData(
powerStationId, True, maxTokenRetries=maxTokenRetries - 1
)
return jsonResponse["data"]
except Exception as exception:
_LOGGER.error("Unable to fetch data from SEMS. %s", exception)
return {}
def change_status(self, inverterSn, status, renewToken=False, maxTokenRetries=2):
"""Schedule the downtime of the station"""
try:
# Get the status of our SEMS Power Station
_LOGGER.debug("SEMS - Making Power Station Status API Call")
if maxTokenRetries <= 0:
_LOGGER.info(
"SEMS - Maximum token fetch tries reached, aborting for now"
)
raise OutOfRetries
if self._token is None or renewToken:
_LOGGER.debug(
"API token not set (%s) or new token requested (%s), fetching",
self._token,
renewToken,
)
self._token = self.getLoginToken(self._username, self._password)
# Prepare Power Station status Headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"token": json.dumps(self._token),
}
powerControlURL = self._token["api"] + _PowerControlURLPart
# powerControlURL = _PowerControlURL
_LOGGER.debug(
"Sending power control command (%s) for power station id: %s",
powerControlURL,
inverterSn,
return self._make_control_api_call(
data, True, maxTokenRetries - 1, operation_name
)
_LOGGER.error("Unable to execute %s: %s", operation_name, e)
return False
except (requests.RequestException, ValueError, KeyError) as exception:
_LOGGER.error("Unable to execute %s: %s", operation_name, exception)
return False
def change_status(
self,
inverterSn: str,
status: str | int,
renewToken: bool = False,
maxTokenRetries: int = 2,
) -> None:
"""Schedule the downtime of the station."""
data = {
"InverterSN": inverterSn,
"InverterStatusSettingMark": "1",
"InverterStatus": str(status),
}
response = requests.post(
powerControlURL, headers=headers, json=data, timeout=_RequestTimeout
)
if response.status_code != 200:
# try again and renew token is unsuccessful
_LOGGER.warning(
"Power control command not successful, retrying with new token, %s retries remaining",
maxTokenRetries,
)
return self.change_status(
inverterSn, status, True, maxTokenRetries=maxTokenRetries - 1
success = self._make_control_api_call(
data,
renewToken=renewToken,
maxTokenRetries=maxTokenRetries,
operation_name=f"power control command for inverter {inverterSn}",
)
return
except Exception as exception:
_LOGGER.error("Unable to execute Power control command. %s", exception)
if not success:
_LOGGER.error("Power control command failed after all retries")
class OutOfRetries(exceptions.HomeAssistantError):

File diff suppressed because it is too large Load Diff

View File

@@ -1,52 +1,51 @@
"""Support for switch controlling an output of a GoodWe SEMS inverter.
"""Support for inverter control switches from the GoodWe SEMS API.
For more details about this platform, please refer to the documentation at
https://github.com/TimSoethout/goodwe-sems-home-assistant
"""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.switch import SwitchDeviceClass, SwitchEntity
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from . import SemsCoordinator
from .device import device_info_for_inverter
_LOGGER = logging.getLogger(__name__)
from homeassistant.core import HomeAssistant
_INVERTER_STATUS_ON = 1
_COMMAND_TURN_OFF = 2
_COMMAND_TURN_ON = 4
async def async_setup_entry(hass: HomeAssistant, config_entry, async_add_entities):
"""Add switches for passed config_entry in HA."""
coordinator = hass.data[DOMAIN][config_entry.entry_id]
# stationId = config_entry.data[CONF_STATION_ID]
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up SEMS switches from a config entry."""
coordinator = config_entry.runtime_data.coordinator
# coordinator.data should contain a dictionary of inverters, with as data `invert_full`
# Don't make switches for homekit, since it is not an inverter
async_add_entities(
SemsStatusSwitch(coordinator, ent)
for idx, ent in enumerate(coordinator.data)
if ent != "homeKit"
SemsStatusSwitch(coordinator, sn) for sn in coordinator.data.inverters
)
class SemsStatusSwitch(CoordinatorEntity, SwitchEntity):
"""SemsStatusSwitch using CoordinatorEntity.
The CoordinatorEntity class provides:
should_poll
async_update
async_added_to_hass
available
"""
class SemsStatusSwitch(CoordinatorEntity[SemsCoordinator], SwitchEntity):
"""Switch to control inverter status, backed by the SEMS coordinator."""
# Sensor has device name (e.g. Inverter 123456 Power)
_attr_has_entity_name = True
# _attr_name = None
_attr_device_class = SwitchDeviceClass.SWITCH
def __init__(self, coordinator, sn) -> None:
def __init__(self, coordinator: SemsCoordinator, sn: str) -> None:
"""Initialize the SemsStatusSwitch.
Args:
@@ -54,40 +53,38 @@ class SemsStatusSwitch(CoordinatorEntity, SwitchEntity):
sn: The serial number of the inverter.
"""
_LOGGER.debug("Try create SemsStatusSwitch for Inverter %s", sn)
super().__init__(coordinator, context=sn)
self.coordinator = coordinator
self.sn = sn
self._attr_device_info = DeviceInfo(
identifiers={
# Serial numbers are unique identifiers within a specific domain
(DOMAIN, self.sn)
},
# Commented out for now, since not all inverter entries have a name; could be related to creating too much switch devices, also for non-inverters such as homekit.
# name=f"Inverter {self.coordinator.data[self.sn]['name']}",
)
self._attr_unique_id = f"{self.sn}-switch"
_LOGGER.debug("Try create SemsStatusSwitch for inverter %s", sn)
super().__init__(coordinator)
self._sn = sn
inverter_data = coordinator.data.inverters.get(sn, {})
self._attr_device_info = device_info_for_inverter(sn, inverter_data)
self._attr_unique_id = f"{self._sn}-switch"
# somehow needed, no default naming
self._attr_name = "Switch"
self._attr_device_class = SwitchDeviceClass.OUTLET
_LOGGER.debug("Creating SemsStatusSwitch for Inverter %s", self.sn)
_LOGGER.debug("Creating SemsStatusSwitch for Inverter %s", self._sn)
@property
def is_on(self) -> bool:
"""Return entity status."""
_LOGGER.debug("coordinator.data[sn]: %s", self.coordinator.data[self.sn])
return self.coordinator.data[self.sn]["status"] == 1
status = self.coordinator.data.inverters.get(self._sn, {}).get("status")
return status == _INVERTER_STATUS_ON
async def async_turn_off(self, **kwargs):
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off the inverter."""
_LOGGER.debug("Inverter %s set to Off", self.sn)
_LOGGER.debug("Inverter %s set to off", self._sn)
await self.hass.async_add_executor_job(
self.coordinator.semsApi.change_status, self.sn, 2
self.coordinator.sems_api.change_status,
self._sn,
_COMMAND_TURN_OFF,
)
await self.coordinator.async_request_refresh()
async def async_turn_on(self, **kwargs):
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn on the inverter."""
_LOGGER.debug("Inverter %s set to On", self.sn)
_LOGGER.debug("Inverter %s set to on", self._sn)
await self.hass.async_add_executor_job(
self.coordinator.semsApi.change_status, self.sn, 4
self.coordinator.sems_api.change_status,
self._sn,
_COMMAND_TURN_ON,
)
await self.coordinator.async_request_refresh()

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIGgbmLKO9Z+njxoSQWoeLBRyGzWZK0FYYZzk/dM1pDJVoAoGCCqGSM49
AwEHoUQDQgAEhS5sHVP4YaWgVKAS0e/B/ObkA9lQ6Whx4Z+VYHhZtP3hQQLtVDGG
3e/2ncGTpyStQgLSi9Js3wr1GgoT/DNAsQ==
-----END EC PRIVATE KEY-----