import os from datetime import datetime, timedelta from time import sleep from typing import Optional import requests from dotenv import load_dotenv load_dotenv() DEVICE_ID = os.environ.get("DEVICE_ID") AUTHORIZATION = os.environ.get("AUTHORIZATION") def make_request(method: str, url: str, data: Optional[dict] = None) -> requests.Response: """ Make request :param method: Method :param url: URL :param data: Data :return: Response """ headers = { "Authorization": f"Bearer {AUTHORIZATION}", "X-Client-Type": "Android", "User-Agent": "okhttp/3.8.1", "Content-Type": "application/json; charset=utf-8", "Connection": "Keep-Alive", "User-Agent": "okhttp/3.8.1", } if method == "GET": return requests.get(url, headers=headers) if method == "POST": return requests.post(url, headers=headers, json=data) return None def generate_ute_info( device_id: str, cost_per_kwh: float, date_start: Optional[str] = None, date_end: Optional[str] = None ) -> dict: """ Generate UTE info from device id and date range :param device_id: Device id :param authorization: Authorization token :param cost_per_kwh: Cost per kwh :param date_start: Date start :param date_end: Date end :return: UTE info """ if date_start is None: yesterday = datetime.now() - timedelta(days=1) date_start = yesterday.strftime("%Y-%m-%d") if date_end is None: yesterday = datetime.now() - timedelta(days=1) date_end = yesterday.strftime("%Y-%m-%d") url = f"https://rocme.ute.com.uy/api/v2/device/{device_id}/curvefromtodate/D/{date_start}/{date_end}" response = make_request("GET", url).json() active_energy = {"total": {"sum_in_kwh": 0}} for item in response["data"]: if item["magnitudeVO"] == "IMPORT_ACTIVE_ENERGY": date = datetime.strptime(item["date"], "%Y-%m-%dT%H:%M:%S%z") day_in_week = date.strftime("%A") value = round(float(item["value"]), 3) active_energy[date.strftime("%d/%m/%Y")] = { "kwh": value, "aproximated_cost_in_uyu": round(value * cost_per_kwh, 3), "day_in_week": day_in_week, } active_energy["total"]["sum_in_kwh"] = active_energy["total"]["sum_in_kwh"] + value active_energy["total"]["aproximated_cost_in_uyu"] = round(active_energy["total"]["sum_in_kwh"] * average_price, 3) active_energy["total"]["daily_average_cost"] = round( active_energy["total"]["aproximated_cost_in_uyu"] / (len(active_energy) - 1), 3 ) return active_energy def get_current_usage(device_id: str) -> dict: """ Get current usage :param device_id: Device id :param authorization: Authorization token :raises Exception: Error getting reading request :return: Current usage """ reading_request_url = "https://rocme.ute.com.uy/api/v1/device/readingRequest" reading_url = f"https://rocme.ute.com.uy/api/v1/device/{device_id}/lastReading/30" data = {"AccountServicePointId": device_id} reading_request = make_request("POST", reading_request_url, data) if reading_request.status_code != 200: raise Exception("Error getting reading request") response = make_request("GET", reading_url).json() while not response["success"]: sleep(5) response = make_request("GET", reading_url).json() readings = response["data"]["readings"] for reading in readings: reading_type = reading["tipoLecturaMGMI"] if reading_type == "I1": i1 = float(reading["valor"]) elif reading_type == "I2": i2 = float(reading["valor"]) elif reading_type == "I3": i3 = float(reading["valor"]) elif reading_type == "V1": v1 = float(reading["valor"]) elif reading_type == "V2": v2 = float(reading["valor"]) elif reading_type == "V3": v3 = float(reading["valor"]) power_1_in_watts = v1 * i1 power_2_in_watts = v2 * i2 power_3_in_watts = v3 * i3 power_in_watts = round(power_1_in_watts + power_2_in_watts + power_3_in_watts, 3) return_dict = {**response} return_dict["data"]["power_in_watts"] = power_in_watts return return_dict if __name__ == "__main__": date_start = "2023-08-01" date_end = "2023-08-31" average_price = (10.680 * 0.1667) + (2.223 * 0.2917) + (4.875 * 0.5416) # Each price * percentage of time on the day ute_historic_usage = generate_ute_info(DEVICE_ID, average_price, date_start, date_end) ute_current_usage = get_current_usage(DEVICE_ID)