theraPy/arztapi/APIHandler.py

257 lines
10 KiB
Python

from typing import List
import requests
from requests import JSONDecodeError
import base64
from datetime import datetime, timedelta
from arztapi.ArztPraxisDatas import ArztPraxisDatas
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
from arztapi.DoctorPhoneTime import DoctorPhoneTime
class APIHandler:
def __init__(self):
self.base_api_url = "https://arztsuche.116117.de/api/"
self.phone_times = []
self.general_information = []
self.processed_doctor_phone_times = []
def get_lat_lon_location_list(self, location):
api_path = self.base_api_url + "location"
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
}
params = {
"loc": location,
}
response = requests.get(api_path, params=params, headers=headers)
try:
return response.json()
except JSONDecodeError:
print(response.text)
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age,
therapy_setting) -> ArztPraxisDatas:
api_path = self.base_api_url + "data"
headers = {
"Accept": "application/json",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
"req-val": req_val_base64,
}
json_data = {
# TODO: Find out what r means
"r": 900,
"lat": lat,
"lon": lon,
"filterSelections": [
{
"title": "Fachgebiet Kategorie",
"fieldName": "fgg",
"selectedCodes": [
"12",
],
},
{
"title": "Psychotherapie: Verfahren",
"fieldName": "ptv",
"selectedCodes": therapy_types,
},
{
"title": "Psychotherapie: Altersgruppe",
"fieldName": "pta",
"selectedCodes": [
therapy_age
],
},
{
"title": "Psychotherapie: Setting",
"fieldName": "pts",
"selectedCodes": [
therapy_setting
],
},
],
"locOrigin": "USER_INPUT",
"initialSearch": False,
"viaDeeplink": False,
}
response = requests.post(api_path, headers=headers, json=json_data)
response.raise_for_status()
self.phone_times = ArztPraxisDatas(**response.json())
return self.phone_times
def get_general_doctor_information(self) -> List[DoctorInformation]:
general_doctor_information = []
for data in self.phone_times.arztPraxisDatas:
# Remove empty phone number fields
if data.tel == "":
continue
doctor_day_times = data.tsz
phone_times = []
for day in doctor_day_times:
if day.tszDesTyps:
for contact_times in day.tszDesTyps:
if contact_times.typ == "Telefonische Erreichbarkeit":
phone_times_day = contact_times.sprechzeiten
for phone_time_day in phone_times_day:
start_time_str, end_time_str = phone_time_day.zeit.split("-")
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
current_phone_time_dict = {
"start": start_date_time,
"end": end_date_time
}
current_phone_time = PhoneTime(**current_phone_time_dict)
phone_times.append(current_phone_time)
doctor_information_dict = {
"name": data.name,
"tel": data.tel,
"fax": data.fax,
"anrede": data.anrede,
"email": data.email,
"distance": data.distance,
"strasse": data.strasse,
"hausnummer": data.hausnummer,
"plz": data.plz,
"ort": data.ort,
"telefonzeiten": phone_times
}
doctor_information = DoctorInformation(**doctor_information_dict)
general_doctor_information.append(doctor_information)
self.general_information = general_doctor_information
return self.general_information
def filter_doctor_information_for_distance(self, distance):
"""
:param distance: in meters
:return:
"""
self.general_information = [doctor_information for doctor_information in self.general_information
if doctor_information.distance <= distance]
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
for doctor_information in self.general_information:
for phone_time in doctor_information.telefonzeiten:
doctor_phone_time_dict = {
"phone_time": phone_time,
# workaround until properly assigned in sort
"doctor_nr": 0,
"doctor_name": doctor_information.name,
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
f"{doctor_information.strasse} {doctor_information.hausnummer}",
"doctor_phone_number": doctor_information.tel
}
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
self.processed_doctor_phone_times.append(doctor_phone_time)
self.filter_for_relevant_weeks(therapy_phone_weeks)
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
self.filter_for_already_passed_times_today()
self.assign_numbers_to_doctor_phone_times()
return self.processed_doctor_phone_times
def filter_for_relevant_weeks(self, therapy_phone_weeks):
current_date = datetime.now()
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if current_date <= dpt.phone_time.start <= end_date]
def filter_for_already_passed_times_today(self):
current_datetime = datetime.now()
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if (dpt.phone_time.start.date() == current_datetime.date() and
dpt.phone_time.end > current_datetime)
or dpt.phone_time.start.date() != current_datetime.date()]
def assign_numbers_to_doctor_phone_times(self):
known_doctor_names_with_nr = {}
doctor_count = 1
for doctor_phone_time in self.processed_doctor_phone_times:
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
doctor_phone_time.doctor_nr = doctor_count
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
doctor_count += 1
else:
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
doctor_phone_time.doctor_nr = known_doctor_count
@staticmethod
def calculate_req_value_base64(lat, lon):
"""
This function is based on the initial Javascript code found in app.js.
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
:param lat:
:param lon:
:return:
"""
# Adjust lat and lon values slightly
adjusted_lat = lat + 1.1
adjusted_lon = lon + 2.3
# Get the current time in milliseconds since epoch
current_time = datetime.now()
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
# Extract digits from latitude
lat_integer_part = str(adjusted_lat).split(".")[0]
lat_last_digit = lat_integer_part[-1]
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
# Extract digits from longitude
lon_integer_part = str(adjusted_lon).split(".")[0]
lon_last_digit = lon_integer_part[-1]
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
# Create the final string by combining digits
combined_string = (
lat_last_digit +
timestamp_str[-1] +
lon_last_digit +
timestamp_str[-2] +
lat_first_fraction_digit +
timestamp_str[-3] +
lon_first_fraction_digit
)
# Encode the combined string in Base64
encoded_value = base64.b64encode(combined_string.encode()).decode()
return encoded_value
@staticmethod
def parse_date_string(date_string):
format_string = "%d.%m. %H:%M"
current_year = datetime.now().year
try:
if "24:00" in date_string:
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
# less to work with proper input
date_string = date_string.replace("24:00", "23:59")
# Add the current year since it is not part of the original date sent by the API
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
if parsed_date < datetime.now():
parsed_date = parsed_date.replace(year=current_year + 1)
return parsed_date
except ValueError as e:
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")