This repository has been archived on 2025-05-11. You can view files and clone it, but cannot push or open issues or pull requests.
ute/script.py
2023-08-16 23:14:35 -03:00

185 lines
5.6 KiB
Python
Executable File

import os
from datetime import datetime, timedelta
from time import sleep
from typing import Optional
import requests
from dotenv import load_dotenv
load_dotenv()
DEVICE_ID = os.environ.get("DEVICE_ID")
EMAIL = os.environ.get("EMAIL")
PHONE_NUMBER = os.environ.get("PHONE_NUMBER")
def make_request(
method: str, url: str, authorization: str = None, data: Optional[dict] = None, headers: dict = None
) -> requests.Response:
"""
Make request
:param method: Method
:param url: URL
:param data: Data
:return: Response
"""
if not headers and authorization:
headers = {
"Authorization": f"Bearer {authorization}",
"X-Client-Type": "Android",
"User-Agent": "okhttp/3.8.1",
"Content-Type": "application/json; charset=utf-8",
"Connection": "Keep-Alive",
"User-Agent": "okhttp/3.8.1",
}
if method == "GET":
return requests.get(url, headers=headers)
if method == "POST":
return requests.post(url, headers=headers, json=data)
return None
def login(email: str, phone_number: str) -> requests.Response:
"""
Login
:param email: Email
:param phone_number: Phone number
:return: Authorization token
"""
url = "https://rocme.ute.com.uy/api/v1/token"
headers = {
"X-Client-Type": "Android",
"Content-Type": "application/json; charset=utf-8",
"Connection": "Keep-Alive",
"User-Agent": "okhttp/3.8.1",
}
data = {
"Email": email,
"PhoneNumber": phone_number,
}
return make_request("POST", url, data=data, headers=headers).text
def generate_ute_info(
device_id: str,
authorization: str,
cost_per_kwh: float,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
) -> dict:
"""
Generate UTE info from device id and date range
:param device_id: Device id
:param authorization: Authorization token
:param cost_per_kwh: Cost per kwh
:param date_start: Date start
:param date_end: Date end
:return: UTE info
"""
if date_start is None:
yesterday = datetime.now() - timedelta(days=1)
date_start = yesterday.strftime("%Y-%m-%d")
if date_end is None:
yesterday = datetime.now() - timedelta(days=1)
date_end = yesterday.strftime("%Y-%m-%d")
url = f"https://rocme.ute.com.uy/api/v2/device/{device_id}/curvefromtodate/D/{date_start}/{date_end}"
response = make_request("GET", url, authorization=authorization).json()
active_energy = {"total": {"sum_in_kwh": 0}}
for item in response["data"]:
if item["magnitudeVO"] == "IMPORT_ACTIVE_ENERGY":
date = datetime.strptime(item["date"], "%Y-%m-%dT%H:%M:%S%z")
day_in_week = date.strftime("%A")
value = round(float(item["value"]), 3)
active_energy[date.strftime("%d/%m/%Y")] = {
"kwh": value,
"aproximated_cost_in_uyu": round(value * cost_per_kwh, 3),
"day_in_week": day_in_week,
}
active_energy["total"]["sum_in_kwh"] = active_energy["total"]["sum_in_kwh"] + value
active_energy["total"]["aproximated_cost_in_uyu"] = round(active_energy["total"]["sum_in_kwh"] * average_price, 3)
active_energy["total"]["daily_average_cost"] = round(
active_energy["total"]["aproximated_cost_in_uyu"] / (len(active_energy) - 1), 3
)
return active_energy
def get_current_usage(device_id: str, authorization: str) -> dict:
"""
Get current usage
:param device_id: Device id
:param authorization: Authorization token
:raises Exception: Error getting reading request
:return: Current usage
"""
reading_request_url = "https://rocme.ute.com.uy/api/v1/device/readingRequest"
reading_url = f"https://rocme.ute.com.uy/api/v1/device/{device_id}/lastReading/30"
data = {"AccountServicePointId": device_id}
reading_request = make_request("POST", reading_request_url, authorization=authorization, data=data)
if reading_request.status_code != 200:
raise Exception("Error getting reading request")
response = make_request("GET", reading_url, authorization=authorization).json()
while not response["success"]:
sleep(5)
response = make_request("GET", reading_url, authorization=authorization).json()
readings = response["data"]["readings"]
for reading in readings:
reading_type = reading["tipoLecturaMGMI"]
if reading_type == "I1":
i1 = float(reading["valor"])
elif reading_type == "I2":
i2 = float(reading["valor"])
elif reading_type == "I3":
i3 = float(reading["valor"])
elif reading_type == "V1":
v1 = float(reading["valor"])
elif reading_type == "V2":
v2 = float(reading["valor"])
elif reading_type == "V3":
v3 = float(reading["valor"])
power_1_in_watts = v1 * i1
power_2_in_watts = v2 * i2
power_3_in_watts = v3 * i3
power_in_watts = round(power_1_in_watts + power_2_in_watts + power_3_in_watts, 3)
return_dict = {**response}
return_dict["data"]["power_in_watts"] = power_in_watts
return return_dict
if __name__ == "__main__":
date_start = "2023-08-01"
date_end = "2023-08-31"
average_price = (10.680 * 0.1667) + (2.223 * 0.2917) + (4.875 * 0.5416) # Each price * percentage of time on the day
authorization = login(EMAIL, PHONE_NUMBER)
ute_historic_usage = generate_ute_info(DEVICE_ID, authorization, average_price, date_start, date_end)
ute_current_usage = get_current_usage(DEVICE_ID, authorization)