Added login

This commit is contained in:
Roger Gonzalez 2023-08-16 23:14:35 -03:00
parent b8e1cb50d7
commit e14a5fcfa4
Signed by: rogs
GPG Key ID: C7ECE9C6C36EC2E6
2 changed files with 53 additions and 19 deletions

View File

@ -1,2 +1,3 @@
DEVICE_ID=12345
AUTHORIZATION=authorization_code
EMAIL=your@email.com
PHONE_NUMBER=598123456

View File

@ -9,10 +9,13 @@ from dotenv import load_dotenv
load_dotenv()
DEVICE_ID = os.environ.get("DEVICE_ID")
AUTHORIZATION = os.environ.get("AUTHORIZATION")
EMAIL = os.environ.get("EMAIL")
PHONE_NUMBER = os.environ.get("PHONE_NUMBER")
def make_request(method: str, url: str, data: Optional[dict] = None) -> requests.Response:
def make_request(
method: str, url: str, authorization: str = None, data: Optional[dict] = None, headers: dict = None
) -> requests.Response:
"""
Make request
@ -22,14 +25,15 @@ def make_request(method: str, url: str, data: Optional[dict] = None) -> requests
:return: Response
"""
headers = {
"Authorization": f"Bearer {AUTHORIZATION}",
"X-Client-Type": "Android",
"User-Agent": "okhttp/3.8.1",
"Content-Type": "application/json; charset=utf-8",
"Connection": "Keep-Alive",
"User-Agent": "okhttp/3.8.1",
}
if not headers and authorization:
headers = {
"Authorization": f"Bearer {authorization}",
"X-Client-Type": "Android",
"User-Agent": "okhttp/3.8.1",
"Content-Type": "application/json; charset=utf-8",
"Connection": "Keep-Alive",
"User-Agent": "okhttp/3.8.1",
}
if method == "GET":
return requests.get(url, headers=headers)
@ -40,8 +44,36 @@ def make_request(method: str, url: str, data: Optional[dict] = None) -> requests
return None
def login(email: str, phone_number: str) -> requests.Response:
"""
Login
:param email: Email
:param phone_number: Phone number
:return: Authorization token
"""
url = "https://rocme.ute.com.uy/api/v1/token"
headers = {
"X-Client-Type": "Android",
"Content-Type": "application/json; charset=utf-8",
"Connection": "Keep-Alive",
"User-Agent": "okhttp/3.8.1",
}
data = {
"Email": email,
"PhoneNumber": phone_number,
}
return make_request("POST", url, data=data, headers=headers).text
def generate_ute_info(
device_id: str, cost_per_kwh: float, date_start: Optional[str] = None, date_end: Optional[str] = None
device_id: str,
authorization: str,
cost_per_kwh: float,
date_start: Optional[str] = None,
date_end: Optional[str] = None,
) -> dict:
"""
Generate UTE info from device id and date range
@ -64,7 +96,7 @@ def generate_ute_info(
url = f"https://rocme.ute.com.uy/api/v2/device/{device_id}/curvefromtodate/D/{date_start}/{date_end}"
response = make_request("GET", url).json()
response = make_request("GET", url, authorization=authorization).json()
active_energy = {"total": {"sum_in_kwh": 0}}
@ -88,7 +120,7 @@ def generate_ute_info(
return active_energy
def get_current_usage(device_id: str) -> dict:
def get_current_usage(device_id: str, authorization: str) -> dict:
"""
Get current usage
@ -102,16 +134,16 @@ def get_current_usage(device_id: str) -> dict:
data = {"AccountServicePointId": device_id}
reading_request = make_request("POST", reading_request_url, data)
reading_request = make_request("POST", reading_request_url, authorization=authorization, data=data)
if reading_request.status_code != 200:
raise Exception("Error getting reading request")
response = make_request("GET", reading_url).json()
response = make_request("GET", reading_url, authorization=authorization).json()
while not response["success"]:
sleep(5)
response = make_request("GET", reading_url).json()
response = make_request("GET", reading_url, authorization=authorization).json()
readings = response["data"]["readings"]
@ -146,6 +178,7 @@ if __name__ == "__main__":
date_start = "2023-08-01"
date_end = "2023-08-31"
average_price = (10.680 * 0.1667) + (2.223 * 0.2917) + (4.875 * 0.5416) # Each price * percentage of time on the day
authorization = login(EMAIL, PHONE_NUMBER)
ute_historic_usage = generate_ute_info(DEVICE_ID, average_price, date_start, date_end)
ute_current_usage = get_current_usage(DEVICE_ID)
ute_historic_usage = generate_ute_info(DEVICE_ID, authorization, average_price, date_start, date_end)
ute_current_usage = get_current_usage(DEVICE_ID, authorization)