theraPy/arztapi/APIHandler.py
2024-11-08 09:49:19 +01:00

463 lines
21 KiB
Python

import json
import sys
from typing import List
import requests
from requests import JSONDecodeError
import base64
from datetime import datetime, timedelta
from arztapi.ArztPraxisDatas import ArztPraxisDatas, ArztPraxisData
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
from arztapi.DoctorPhoneTime import DoctorPhoneTime
class APIHandler:
"""
Class for accessing and handling the Arztsuche API -> Get necessary data and filter it for theraPy
"""
def __init__(self, redis_client):
# Base URL as given by the base website
self.base_api_url = "https://arztsuche.116117.de/api/"
self.json_data = {}
# Containers for phone times, general doctor information and processed phone times of doctors
self.phone_times = []
self.general_information = []
self.processed_doctor_phone_times = []
self.redis_client = redis_client
def get_lat_lon_location_list(self, location):
"""
Use a given location input string and search for the location with the Arztsuche API -> input string can contain
more or less everything to search for, but should not contain spaces (since the original API is not able to
handle spaces properly)
:param location: given as input string/plz with more or less validation, directly given to the Arztsuche API
:return: location matches in JSON format or catching raised JSONDecodeError (resulting in None)
"""
# API path as given by original website
api_path = self.base_api_url + "location"
# Headers copied by manual cURL
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
# Authorization header gathered by initial cURLing
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
}
params = {
"loc": location,
}
response = requests.get(api_path, params=params, headers=headers)
# Try to return the response in JSON
try:
# Possibly multiple results, further processing by caller
return response.json()
# JSONDecodeError for empty response object -> None, no location match
except JSONDecodeError:
return None
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting,
amount_of_weeks) -> ArztPraxisDatas:
# Use the selected therapy types in case of "Verhaltenstherapie" -> most data used for it, might overload the
# request, so other therapy types are missing
if "V" in therapy_types:
selected_codes = therapy_types
# Collect all three other therapy types in case of missing V
else:
selected_codes = ["A", "S", "T"]
# Data object as built by the original website, some fields might not be plausible or known (since there is no
# API documentation itself available
self.json_data = {
# TODO: Find out what r means
"r": 900,
"lat": lat,
"lon": lon,
"filterSelections": [
{
"title": "Fachgebiet Kategorie",
"fieldName": "fgg",
"selectedCodes": [
"12",
],
},
{
"title": "Psychotherapie: Verfahren",
"fieldName": "ptv",
"selectedCodes": selected_codes
},
{
"title": "Psychotherapie: Altersgruppe",
"fieldName": "pta",
"selectedCodes": [
therapy_age
],
},
{
"title": "Psychotherapie: Setting",
"fieldName": "pts",
"selectedCodes": [
therapy_setting
],
},
],
"locOrigin": "USER_INPUT",
"initialSearch": False,
"viaDeeplink": False,
}
amount_of_days = amount_of_weeks * 7
cache_data = self.get_current_doctor_information_data_in_cache_with_time_check(amount_of_days)
if cache_data:
self.phone_times = cache_data
else:
self.get_list_of_doctors_from_api(lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting)
self.set_current_doctor_information_data_in_cache()
# Filter for the relevant therapy times before processing
self.filter_for_therapy_types(therapy_types)
self.filter_for_duplicates()
def get_list_of_doctors_from_api(self, lat, lon, req_val_base64, therapy_types, therapy_age,
therapy_setting) -> ArztPraxisDatas:
"""
:param lat: Latitude as given by location API
:param lon: Longitude as given by location API
:param req_val_base64: base64 value required for API access (is this a token?)
:param therapy_types: Therapy types of interest
:param therapy_age: Therapy age range of interest
:param therapy_setting: Therapy setting of interest
:return: Relevant doctor/therapist data
"""
# API path for doctor data
api_path = self.base_api_url + "data"
headers = {
"Accept": "application/json",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
# Authorization header gathered by initial cURLing
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
# Calculated base64 value based on latitude and longitude
"req-val": req_val_base64,
}
response = requests.post(api_path, headers=headers, json=self.json_data)
# Check for HTTP errors
response.raise_for_status()
# Convert phone times to data format as input validation, save as class variable for further processing
self.phone_times = ArztPraxisDatas(**response.json())
# Return result for processing by caller
return self.phone_times
def get_current_doctor_information_data_in_cache_with_time_check(self, amount_of_days):
cached_data = self.redis_client.get(str(self.json_data))
if cached_data:
cached_data = json.loads(cached_data)
cache_timestamp = datetime.fromisoformat(cached_data["timestamp"])
current_date = datetime.now()
time_difference = current_date - cache_timestamp
if time_difference.days <= amount_of_days:
data = [ArztPraxisData.model_validate(json.loads(item)) for item in cached_data["data"]]
data = ArztPraxisDatas(arztPraxisDatas=data)
return data
def set_current_doctor_information_data_in_cache(self):
current_date = datetime.now()
serialized_data = [item.json() for item in self.phone_times.arztPraxisDatas]
self.redis_client.set(str(self.json_data), json.dumps({"timestamp": current_date.isoformat(),
"data": serialized_data}))
def filter_for_therapy_types(self, therapy_types):
"""
The idea is to get as much data as possible from the API at once to minimize the number of API calls.
For some cases, more data than actually necessary is cached and this is a filter for it.
:param therapy_types: Desired therapy types
:return:
"""
# Mapping of the selected codes to the actual therapy type
mapping = {
"V": "Verhaltenstherapie",
"T": "Tiefenpsychologisch fundierte Psychotherapie",
"A": "Analytische Psychotherapie",
"S": "Systemische Therapie"
}
# Store relevant phone times
relevant_phone_times = []
for data in self.phone_times.arztPraxisDatas:
# Relevant therapy type stored in psy
settings = data.psy
# Multiple therapy types might be available
for setting in settings:
# Check if the therapy type which is desired is available
if any(mapping[code] in setting for code in therapy_types):
relevant_phone_times.append(data)
# Update with relevant phone times
self.phone_times.arztPraxisDatas = relevant_phone_times
def filter_for_duplicates(self):
self.phone_times.arztPraxisDatas = [i for n, i in enumerate(self.phone_times.arztPraxisDatas)
if i not in self.phone_times.arztPraxisDatas[:n]]
def get_general_doctor_information(self) -> List[DoctorInformation]:
"""
Transform and filter data to more usable format: Check for phone times and collect general doctor information
data of interest
Function should be called after initial API call, but doesn't create an error if not, just returning empty list
in case of
:return: General doctor information
"""
# Create container for saving information
general_doctor_information = []
# Iterate over phone times -> API should have been accessed before
for data in self.phone_times.arztPraxisDatas:
# Remove empty phone number fields
if data.tel == "":
continue
# Get times of doctors in a day (maybe tsz = tageszeit?)
doctor_day_times = data.tsz
phone_times = []
# Get phone times on every given day
for day in doctor_day_times:
# If available -> phone time(s) detected
if day.tszDesTyps:
# Collect every phone time
for contact_times in day.tszDesTyps:
# Check if phone time is actually phone time and not only opening time
if contact_times.typ == "Telefonische Erreichbarkeit":
# Get as call time available for speaking with therapist
phone_times_day = contact_times.sprechzeiten
# Process phone time properly
for phone_time_day in phone_times_day:
# String magic since the actual phone time is given as string such as 9:00-10:00
start_time_str, end_time_str = phone_time_day.zeit.split("-")
# Parse both to datetime object
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
# Create a dict out of it
current_phone_time_dict = {
"start": start_date_time,
"end": end_date_time
}
# Dict to actual PhoneTime object -> input validation
current_phone_time = PhoneTime(**current_phone_time_dict)
# Add result to overall list
phone_times.append(current_phone_time)
# Collect relevant information of doctor/therapist
doctor_information_dict = {
"name": data.name,
"tel": data.tel,
"fax": data.fax,
"anrede": data.anrede,
"email": data.email,
"distance": data.distance,
"strasse": data.strasse,
"hausnummer": data.hausnummer,
"plz": data.plz,
"ort": data.ort,
"telefonzeiten": phone_times
}
# Convert to actual DoctorInformation object -> input validation
doctor_information = DoctorInformation(**doctor_information_dict)
general_doctor_information.append(doctor_information)
# Save result to class for further processing and return for caller
self.general_information = general_doctor_information
return self.general_information
def filter_doctor_information_for_distance(self, distance):
"""
Filter the given list of doctors based on the distance to them calculated by the given location as a result of
the original API call
:param distance: Distance to location given in meters for doctors within given radius
:return: No return, only filter class variable
"""
# Keep every doctor information within the given distance/radius
self.general_information = [doctor_information for doctor_information in self.general_information
if doctor_information.distance <= distance]
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
"""
Sort the current list of doctor phone times by start date and return the result
:param therapy_phone_weeks: Amount of weeks for showing phone times
:return:
"""
# Process doctor information to desired format with relevant information for phone times
for doctor_information in self.general_information:
# Perspective: phone time as data object of choice
for phone_time in doctor_information.telefonzeiten:
# Relevant information
doctor_phone_time_dict = {
"phone_time": phone_time,
# workaround until properly assigned in sort
"doctor_nr": 0,
"doctor_name": doctor_information.name,
# Address in readable format
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
f"{doctor_information.strasse} {doctor_information.hausnummer}",
"doctor_phone_number": doctor_information.tel
}
# Convert to actual DoctorPhoneTime object -> input validation
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
self.processed_doctor_phone_times.append(doctor_phone_time)
# Apply filter for desired amount of weeks for phone times
self.filter_for_relevant_weeks(therapy_phone_weeks)
# Sort the times by the starting time
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
# Throw out already filtered times
self.filter_for_already_passed_times_today()
# Assign the numbers for showing them properly in the web interface
self.assign_numbers_to_doctor_phone_times()
return self.processed_doctor_phone_times
def filter_for_relevant_weeks(self, therapy_phone_weeks):
"""
Get the desired amount of weeks and filter for them
:param therapy_phone_weeks: Desired amount of weeks for showing phone times
:return: None, class variable affected
"""
# Get the current date for calculation and determining the next weeks
current_date = datetime.now()
# Calculate the end date by a timedelta based on the given amount of weeks
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
# Filter for the relevant weeks
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if current_date <= dpt.phone_time.start <= end_date]
def filter_for_already_passed_times_today(self):
"""
Get the phone times of the current day and filter already passed phone times since they are considered not
relevant anymore
:return: None, class variable affected
"""
# Get the current date with time for calculations
current_datetime = datetime.now()
# Check if a phone time on the same day has already passed based on the end time or if the phone time is on
# another date (and keep them)
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if (dpt.phone_time.start.date() == current_datetime.date() and
dpt.phone_time.end > current_datetime)
or dpt.phone_time.start.date() != current_datetime.date()]
def assign_numbers_to_doctor_phone_times(self):
"""
Assign numbers to the doctors in phone time objects to distinct them from each other visually, since a doctor
can have multiple phone times in the next week(s)
:return:
"""
# Store known doctors and their number
known_doctor_names_with_nr = {}
# Users prefer starting with 1 instead of 0
doctor_count = 1
# For every phone time, process the doctor
for doctor_phone_time in self.processed_doctor_phone_times:
# New count for new doctor
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
doctor_phone_time.doctor_nr = doctor_count
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
doctor_count += 1
# Add known count to phone time with known doctor
else:
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
doctor_phone_time.doctor_nr = known_doctor_count
@staticmethod
def calculate_req_value_base64(lat, lon):
"""
This function is based on the initial Javascript code found in app.js.
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
:param lat: Latitude given by the Arztsuche API
:param lon: Longitude given by the Arztsuche API
:return:
"""
# Adjust lat and lon values slightly
adjusted_lat = lat + 1.1
adjusted_lon = lon + 2.3
# Get the current time in milliseconds since epoch
current_time = datetime.now()
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
# Extract digits from latitude
lat_integer_part = str(adjusted_lat).split(".")[0]
lat_last_digit = lat_integer_part[-1]
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
# Extract digits from longitude
lon_integer_part = str(adjusted_lon).split(".")[0]
lon_last_digit = lon_integer_part[-1]
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
# Create the final string by combining digits
combined_string = (
lat_last_digit +
timestamp_str[-1] +
lon_last_digit +
timestamp_str[-2] +
lat_first_fraction_digit +
timestamp_str[-3] +
lon_first_fraction_digit
)
# Encode the combined string in Base64
encoded_value = base64.b64encode(combined_string.encode()).decode()
return encoded_value
@staticmethod
def parse_date_string(date_string):
"""
Parse a date string given by the Arztsuche API and return an actual datetime object
:param date_string: Date string to parse
:return: parsed datetime as actual object or exception for failed parsing (we like input validation)
"""
# String as given as result by the API (based on known values)
format_string = "%d.%m. %H:%M"
# Get the current year since the year is not given as part of the date string to prevent the use of a wrong year
current_year = datetime.now().year
# Try block to prevent invalid dates
try:
# 24:00 can be part of a date string returned by the API - even though it doesn't make lots of sense
if "24:00" in date_string:
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
# less to work with proper input
date_string = date_string.replace("24:00", "23:59")
# Add the current year since it is not part of the original date sent by the API
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
if parsed_date < datetime.now():
parsed_date = parsed_date.replace(year=current_year + 1)
return parsed_date
except ValueError as e:
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")