theraPy/arztapi/APIHandler.py

255 lines
10 KiB
Python
Raw Normal View History

2024-08-26 21:05:34 +02:00
from typing import List
import requests
from requests import JSONDecodeError
import base64
from datetime import datetime, timedelta
2024-08-26 21:05:34 +02:00
from arztapi.ArztPraxisDatas import ArztPraxisDatas
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
from arztapi.DoctorPhoneTime import DoctorPhoneTime
class APIHandler:
def __init__(self):
self.base_api_url = "https://arztsuche.116117.de/api/"
self.phone_times = []
self.general_information = []
self.processed_doctor_phone_times = []
2024-08-26 21:05:34 +02:00
def get_lat_lon_location_list(self, location):
api_path = self.base_api_url + "location"
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
}
params = {
"loc": location,
}
response = requests.get(api_path, params=params, headers=headers)
try:
return response.json()
except JSONDecodeError:
print("foo")
print(response.text)
2024-08-28 17:17:59 +02:00
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age,
therapy_setting) -> ArztPraxisDatas:
2024-08-26 21:05:34 +02:00
api_path = self.base_api_url + "data"
headers = {
"Accept": "application/json",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
"req-val": req_val_base64,
}
json_data = {
# TODO: Find out what r means
"r": 900,
"lat": lat,
"lon": lon,
"filterSelections": [
{
"title": "Fachgebiet Kategorie",
"fieldName": "fgg",
"selectedCodes": [
"12",
],
},
{
"title": "Psychotherapie: Verfahren",
"fieldName": "ptv",
"selectedCodes": therapy_types,
},
{
"title": "Psychotherapie: Altersgruppe",
"fieldName": "pta",
"selectedCodes": [
therapy_age
],
},
{
"title": "Psychotherapie: Setting",
"fieldName": "pts",
"selectedCodes": [
therapy_setting
],
},
],
"locOrigin": "USER_INPUT",
"initialSearch": False,
"viaDeeplink": False,
}
response = requests.post(api_path, headers=headers, json=json_data)
response.raise_for_status()
self.phone_times = ArztPraxisDatas(**response.json())
return self.phone_times
def get_general_doctor_information(self) -> List[DoctorInformation]:
general_doctor_information = []
for data in self.phone_times.arztPraxisDatas:
doctor_day_times = data.tsz
phone_times = []
for day in doctor_day_times:
if day.tszDesTyps:
for contact_times in day.tszDesTyps:
if contact_times.typ == "Telefonische Erreichbarkeit":
phone_times_day = contact_times.sprechzeiten
for phone_time_day in phone_times_day:
start_time_str, end_time_str = phone_time_day.zeit.split("-")
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
current_phone_time_dict = {
"start": start_date_time,
"end": end_date_time
}
current_phone_time = PhoneTime(**current_phone_time_dict)
phone_times.append(current_phone_time)
doctor_information_dict = {
"name": data.name,
"tel": data.tel,
"fax": data.fax,
"anrede": data.anrede,
"email": data.email,
"distance": data.distance,
"strasse": data.strasse,
"hausnummer": data.hausnummer,
"plz": data.plz,
"ort": data.ort,
"telefonzeiten": phone_times
}
doctor_information = DoctorInformation(**doctor_information_dict)
general_doctor_information.append(doctor_information)
self.general_information = general_doctor_information
return self.general_information
2024-08-27 23:09:19 +02:00
def filter_doctor_information_for_distance(self, distance):
"""
:param distance: in meters
:return:
"""
self.general_information = [doctor_information for doctor_information in self.general_information
if doctor_information.distance <= distance]
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
2024-08-26 21:05:34 +02:00
for doctor_information in self.general_information:
for phone_time in doctor_information.telefonzeiten:
doctor_phone_time_dict = {
"phone_time": phone_time,
2024-08-28 10:08:46 +02:00
# workaround until properly assigned in sort
"doctor_nr": 0,
2024-08-26 21:05:34 +02:00
"doctor_name": doctor_information.name,
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
f"{doctor_information.strasse} {doctor_information.hausnummer}",
"doctor_phone_number": doctor_information.tel
}
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
self.processed_doctor_phone_times.append(doctor_phone_time)
self.filter_for_relevant_weeks(therapy_phone_weeks)
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
self.filter_for_already_passed_times_today()
self.assign_numbers_to_doctor_phone_times()
return self.processed_doctor_phone_times
def filter_for_relevant_weeks(self, therapy_phone_weeks):
current_date = datetime.now()
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if current_date <= dpt.phone_time.start <= end_date]
def filter_for_already_passed_times_today(self):
current_datetime = datetime.now()
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if (dpt.phone_time.start.date() == current_datetime.date() and
dpt.phone_time.end > current_datetime)
or dpt.phone_time.start.date() != current_datetime.date()]
def assign_numbers_to_doctor_phone_times(self):
known_doctor_names_with_nr = {}
2024-08-28 10:08:46 +02:00
doctor_count = 1
for doctor_phone_time in self.processed_doctor_phone_times:
2024-08-28 10:08:46 +02:00
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
doctor_phone_time.doctor_nr = doctor_count
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
doctor_count += 1
else:
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
doctor_phone_time.doctor_nr = known_doctor_count
2024-08-26 21:05:34 +02:00
@staticmethod
def calculate_req_value_base64(lat, lon):
"""
This function is based on the initial Javascript code found in app.js.
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
:param lat:
:param lon:
:return:
"""
# Adjust lat and lon values slightly
adjusted_lat = lat + 1.1
adjusted_lon = lon + 2.3
# Get the current time in milliseconds since epoch
current_time = datetime.now()
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
# Extract digits from latitude
lat_integer_part = str(adjusted_lat).split(".")[0]
lat_last_digit = lat_integer_part[-1]
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
# Extract digits from longitude
lon_integer_part = str(adjusted_lon).split(".")[0]
lon_last_digit = lon_integer_part[-1]
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
# Create the final string by combining digits
combined_string = (
lat_last_digit +
timestamp_str[-1] +
lon_last_digit +
timestamp_str[-2] +
lat_first_fraction_digit +
timestamp_str[-3] +
lon_first_fraction_digit
)
# Encode the combined string in Base64
encoded_value = base64.b64encode(combined_string.encode()).decode()
return encoded_value
@staticmethod
def parse_date_string(date_string):
format_string = "%d.%m. %H:%M"
2024-08-28 17:17:59 +02:00
current_year = datetime.now().year
try:
if "24:00" in date_string:
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
# less to work with propery input
date_string.replace("24:00", "23:59")
# Add the current year since it is not part of the original date sent by the API
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
if parsed_date < datetime.now():
parsed_date = parsed_date.replace(year=current_year + 1)
return parsed_date
except ValueError as e:
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")