theraPy/arztapi/APIHandler.py

463 lines
21 KiB
Python
Raw Normal View History

2024-11-07 17:03:19 +01:00
import json
2024-11-08 09:49:19 +01:00
import sys
2024-08-26 21:05:34 +02:00
from typing import List
import requests
from requests import JSONDecodeError
import base64
from datetime import datetime, timedelta
2024-11-07 17:03:19 +01:00
2024-11-08 09:49:19 +01:00
from arztapi.ArztPraxisDatas import ArztPraxisDatas, ArztPraxisData
2024-08-26 21:05:34 +02:00
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
from arztapi.DoctorPhoneTime import DoctorPhoneTime
class APIHandler:
2024-09-07 21:12:55 +02:00
"""
Class for accessing and handling the Arztsuche API -> Get necessary data and filter it for theraPy
"""
2024-11-08 09:49:19 +01:00
def __init__(self, redis_client):
2024-09-07 21:12:55 +02:00
# Base URL as given by the base website
2024-08-26 21:05:34 +02:00
self.base_api_url = "https://arztsuche.116117.de/api/"
2024-09-10 21:02:23 +02:00
self.json_data = {}
2024-09-07 21:12:55 +02:00
# Containers for phone times, general doctor information and processed phone times of doctors
2024-08-26 21:05:34 +02:00
self.phone_times = []
self.general_information = []
self.processed_doctor_phone_times = []
2024-11-08 09:49:19 +01:00
self.redis_client = redis_client
2024-08-26 21:05:34 +02:00
def get_lat_lon_location_list(self, location):
2024-09-07 21:12:55 +02:00
"""
Use a given location input string and search for the location with the Arztsuche API -> input string can contain
more or less everything to search for, but should not contain spaces (since the original API is not able to
handle spaces properly)
:param location: given as input string/plz with more or less validation, directly given to the Arztsuche API
:return: location matches in JSON format or catching raised JSONDecodeError (resulting in None)
"""
# API path as given by original website
2024-08-26 21:05:34 +02:00
api_path = self.base_api_url + "location"
2024-09-07 21:12:55 +02:00
# Headers copied by manual cURL
2024-08-26 21:05:34 +02:00
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
2024-09-07 21:12:55 +02:00
# Authorization header gathered by initial cURLing
2024-08-26 21:05:34 +02:00
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
}
params = {
"loc": location,
}
response = requests.get(api_path, params=params, headers=headers)
2024-09-07 21:12:55 +02:00
# Try to return the response in JSON
2024-08-26 21:05:34 +02:00
try:
2024-09-07 21:12:55 +02:00
# Possibly multiple results, further processing by caller
2024-08-26 21:05:34 +02:00
return response.json()
2024-09-07 21:12:55 +02:00
# JSONDecodeError for empty response object -> None, no location match
2024-08-26 21:05:34 +02:00
except JSONDecodeError:
2024-09-07 21:12:55 +02:00
return None
2024-08-26 21:05:34 +02:00
2024-09-10 21:02:23 +02:00
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting,
amount_of_weeks) -> ArztPraxisDatas:
2024-08-26 21:05:34 +02:00
# Use the selected therapy types in case of "Verhaltenstherapie" -> most data used for it, might overload the
# request, so other therapy types are missing
if "V" in therapy_types:
selected_codes = therapy_types
# Collect all three other therapy types in case of missing V
else:
selected_codes = ["A", "S", "T"]
2024-09-07 21:12:55 +02:00
# Data object as built by the original website, some fields might not be plausible or known (since there is no
# API documentation itself available
2024-09-10 21:02:23 +02:00
self.json_data = {
2024-08-26 21:05:34 +02:00
# TODO: Find out what r means
"r": 900,
"lat": lat,
"lon": lon,
"filterSelections": [
{
"title": "Fachgebiet Kategorie",
"fieldName": "fgg",
"selectedCodes": [
"12",
],
},
{
"title": "Psychotherapie: Verfahren",
"fieldName": "ptv",
"selectedCodes": selected_codes
2024-08-26 21:05:34 +02:00
},
{
"title": "Psychotherapie: Altersgruppe",
"fieldName": "pta",
"selectedCodes": [
therapy_age
],
},
{
"title": "Psychotherapie: Setting",
"fieldName": "pts",
"selectedCodes": [
therapy_setting
],
},
],
"locOrigin": "USER_INPUT",
"initialSearch": False,
"viaDeeplink": False,
}
2024-09-10 21:02:23 +02:00
amount_of_days = amount_of_weeks * 7
cache_data = self.get_current_doctor_information_data_in_cache_with_time_check(amount_of_days)
if cache_data:
self.phone_times = cache_data
else:
self.get_list_of_doctors_from_api(lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting)
self.set_current_doctor_information_data_in_cache()
# Filter for the relevant therapy times before processing
self.filter_for_therapy_types(therapy_types)
2024-10-07 19:55:09 +02:00
self.filter_for_duplicates()
2024-09-10 21:02:23 +02:00
def get_list_of_doctors_from_api(self, lat, lon, req_val_base64, therapy_types, therapy_age,
therapy_setting) -> ArztPraxisDatas:
"""
:param lat: Latitude as given by location API
:param lon: Longitude as given by location API
:param req_val_base64: base64 value required for API access (is this a token?)
:param therapy_types: Therapy types of interest
:param therapy_age: Therapy age range of interest
:param therapy_setting: Therapy setting of interest
:return: Relevant doctor/therapist data
"""
# API path for doctor data
api_path = self.base_api_url + "data"
headers = {
"Accept": "application/json",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
# Authorization header gathered by initial cURLing
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
"Connection": "keep-alive",
# Calculated base64 value based on latitude and longitude
"req-val": req_val_base64,
}
response = requests.post(api_path, headers=headers, json=self.json_data)
2024-09-07 21:12:55 +02:00
# Check for HTTP errors
2024-08-26 21:05:34 +02:00
response.raise_for_status()
2024-09-07 21:12:55 +02:00
# Convert phone times to data format as input validation, save as class variable for further processing
2024-08-26 21:05:34 +02:00
self.phone_times = ArztPraxisDatas(**response.json())
2024-09-07 21:12:55 +02:00
# Return result for processing by caller
2024-08-26 21:05:34 +02:00
return self.phone_times
2024-09-10 21:02:23 +02:00
def get_current_doctor_information_data_in_cache_with_time_check(self, amount_of_days):
2024-11-08 09:49:19 +01:00
cached_data = self.redis_client.get(str(self.json_data))
2024-09-10 21:02:23 +02:00
if cached_data:
2024-11-07 17:03:19 +01:00
cached_data = json.loads(cached_data)
2024-11-08 09:49:19 +01:00
cache_timestamp = datetime.fromisoformat(cached_data["timestamp"])
2024-09-10 21:02:23 +02:00
current_date = datetime.now()
time_difference = current_date - cache_timestamp
if time_difference.days <= amount_of_days:
2024-11-08 09:49:19 +01:00
data = [ArztPraxisData.model_validate(json.loads(item)) for item in cached_data["data"]]
data = ArztPraxisDatas(arztPraxisDatas=data)
return data
2024-09-10 21:02:23 +02:00
def set_current_doctor_information_data_in_cache(self):
current_date = datetime.now()
2024-11-08 09:49:19 +01:00
serialized_data = [item.json() for item in self.phone_times.arztPraxisDatas]
self.redis_client.set(str(self.json_data), json.dumps({"timestamp": current_date.isoformat(),
"data": serialized_data}))
2024-09-10 21:02:23 +02:00
def filter_for_therapy_types(self, therapy_types):
"""
The idea is to get as much data as possible from the API at once to minimize the number of API calls.
For some cases, more data than actually necessary is cached and this is a filter for it.
:param therapy_types: Desired therapy types
:return:
"""
# Mapping of the selected codes to the actual therapy type
mapping = {
"V": "Verhaltenstherapie",
"T": "Tiefenpsychologisch fundierte Psychotherapie",
"A": "Analytische Psychotherapie",
"S": "Systemische Therapie"
}
# Store relevant phone times
relevant_phone_times = []
for data in self.phone_times.arztPraxisDatas:
# Relevant therapy type stored in psy
settings = data.psy
# Multiple therapy types might be available
for setting in settings:
# Check if the therapy type which is desired is available
if any(mapping[code] in setting for code in therapy_types):
relevant_phone_times.append(data)
# Update with relevant phone times
self.phone_times.arztPraxisDatas = relevant_phone_times
2024-10-07 19:55:09 +02:00
def filter_for_duplicates(self):
self.phone_times.arztPraxisDatas = [i for n, i in enumerate(self.phone_times.arztPraxisDatas)
if i not in self.phone_times.arztPraxisDatas[:n]]
2024-08-26 21:05:34 +02:00
def get_general_doctor_information(self) -> List[DoctorInformation]:
2024-09-07 21:12:55 +02:00
"""
Transform and filter data to more usable format: Check for phone times and collect general doctor information
data of interest
Function should be called after initial API call, but doesn't create an error if not, just returning empty list
in case of
:return: General doctor information
"""
# Create container for saving information
2024-08-26 21:05:34 +02:00
general_doctor_information = []
2024-09-07 21:12:55 +02:00
# Iterate over phone times -> API should have been accessed before
2024-08-26 21:05:34 +02:00
for data in self.phone_times.arztPraxisDatas:
# Remove empty phone number fields
if data.tel == "":
continue
2024-09-07 21:12:55 +02:00
# Get times of doctors in a day (maybe tsz = tageszeit?)
2024-08-26 21:05:34 +02:00
doctor_day_times = data.tsz
phone_times = []
2024-09-07 21:12:55 +02:00
# Get phone times on every given day
2024-08-26 21:05:34 +02:00
for day in doctor_day_times:
2024-09-07 21:12:55 +02:00
# If available -> phone time(s) detected
2024-08-26 21:05:34 +02:00
if day.tszDesTyps:
2024-09-07 21:12:55 +02:00
# Collect every phone time
2024-08-26 21:05:34 +02:00
for contact_times in day.tszDesTyps:
2024-09-07 21:12:55 +02:00
# Check if phone time is actually phone time and not only opening time
2024-08-26 21:05:34 +02:00
if contact_times.typ == "Telefonische Erreichbarkeit":
2024-09-07 21:12:55 +02:00
# Get as call time available for speaking with therapist
2024-08-26 21:05:34 +02:00
phone_times_day = contact_times.sprechzeiten
2024-09-07 21:12:55 +02:00
# Process phone time properly
2024-08-26 21:05:34 +02:00
for phone_time_day in phone_times_day:
2024-09-07 21:12:55 +02:00
# String magic since the actual phone time is given as string such as 9:00-10:00
2024-08-26 21:05:34 +02:00
start_time_str, end_time_str = phone_time_day.zeit.split("-")
2024-09-07 21:12:55 +02:00
# Parse both to datetime object
2024-08-26 21:05:34 +02:00
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
2024-09-07 21:12:55 +02:00
# Create a dict out of it
2024-08-26 21:05:34 +02:00
current_phone_time_dict = {
"start": start_date_time,
"end": end_date_time
}
2024-09-07 21:12:55 +02:00
# Dict to actual PhoneTime object -> input validation
2024-08-26 21:05:34 +02:00
current_phone_time = PhoneTime(**current_phone_time_dict)
2024-09-07 21:12:55 +02:00
# Add result to overall list
2024-08-26 21:05:34 +02:00
phone_times.append(current_phone_time)
2024-09-07 21:12:55 +02:00
# Collect relevant information of doctor/therapist
2024-08-26 21:05:34 +02:00
doctor_information_dict = {
"name": data.name,
"tel": data.tel,
"fax": data.fax,
"anrede": data.anrede,
"email": data.email,
"distance": data.distance,
"strasse": data.strasse,
"hausnummer": data.hausnummer,
"plz": data.plz,
"ort": data.ort,
"telefonzeiten": phone_times
}
2024-09-07 21:12:55 +02:00
# Convert to actual DoctorInformation object -> input validation
2024-08-26 21:05:34 +02:00
doctor_information = DoctorInformation(**doctor_information_dict)
general_doctor_information.append(doctor_information)
2024-09-07 21:12:55 +02:00
# Save result to class for further processing and return for caller
2024-08-26 21:05:34 +02:00
self.general_information = general_doctor_information
return self.general_information
2024-08-27 23:09:19 +02:00
def filter_doctor_information_for_distance(self, distance):
"""
2024-09-07 21:12:55 +02:00
Filter the given list of doctors based on the distance to them calculated by the given location as a result of
the original API call
:param distance: Distance to location given in meters for doctors within given radius
:return: No return, only filter class variable
2024-08-27 23:09:19 +02:00
"""
2024-09-07 21:12:55 +02:00
# Keep every doctor information within the given distance/radius
2024-08-27 23:09:19 +02:00
self.general_information = [doctor_information for doctor_information in self.general_information
if doctor_information.distance <= distance]
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
2024-09-07 21:12:55 +02:00
"""
Sort the current list of doctor phone times by start date and return the result
:param therapy_phone_weeks: Amount of weeks for showing phone times
:return:
"""
# Process doctor information to desired format with relevant information for phone times
2024-08-26 21:05:34 +02:00
for doctor_information in self.general_information:
2024-09-07 21:12:55 +02:00
# Perspective: phone time as data object of choice
2024-08-26 21:05:34 +02:00
for phone_time in doctor_information.telefonzeiten:
2024-09-07 21:12:55 +02:00
# Relevant information
2024-08-26 21:05:34 +02:00
doctor_phone_time_dict = {
"phone_time": phone_time,
2024-08-28 10:08:46 +02:00
# workaround until properly assigned in sort
"doctor_nr": 0,
2024-08-26 21:05:34 +02:00
"doctor_name": doctor_information.name,
2024-09-07 21:12:55 +02:00
# Address in readable format
2024-08-26 21:05:34 +02:00
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
f"{doctor_information.strasse} {doctor_information.hausnummer}",
"doctor_phone_number": doctor_information.tel
}
2024-09-07 21:12:55 +02:00
# Convert to actual DoctorPhoneTime object -> input validation
2024-08-26 21:05:34 +02:00
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
self.processed_doctor_phone_times.append(doctor_phone_time)
2024-09-07 21:12:55 +02:00
# Apply filter for desired amount of weeks for phone times
self.filter_for_relevant_weeks(therapy_phone_weeks)
2024-09-07 21:12:55 +02:00
# Sort the times by the starting time
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
2024-09-07 21:12:55 +02:00
# Throw out already filtered times
self.filter_for_already_passed_times_today()
2024-09-07 21:12:55 +02:00
# Assign the numbers for showing them properly in the web interface
self.assign_numbers_to_doctor_phone_times()
return self.processed_doctor_phone_times
def filter_for_relevant_weeks(self, therapy_phone_weeks):
2024-09-07 21:12:55 +02:00
"""
Get the desired amount of weeks and filter for them
:param therapy_phone_weeks: Desired amount of weeks for showing phone times
:return: None, class variable affected
"""
# Get the current date for calculation and determining the next weeks
current_date = datetime.now()
2024-09-07 21:12:55 +02:00
# Calculate the end date by a timedelta based on the given amount of weeks
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
2024-09-07 21:12:55 +02:00
# Filter for the relevant weeks
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if current_date <= dpt.phone_time.start <= end_date]
def filter_for_already_passed_times_today(self):
2024-09-07 21:12:55 +02:00
"""
Get the phone times of the current day and filter already passed phone times since they are considered not
relevant anymore
:return: None, class variable affected
"""
# Get the current date with time for calculations
current_datetime = datetime.now()
2024-09-07 21:12:55 +02:00
# Check if a phone time on the same day has already passed based on the end time or if the phone time is on
# another date (and keep them)
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
if (dpt.phone_time.start.date() == current_datetime.date() and
dpt.phone_time.end > current_datetime)
or dpt.phone_time.start.date() != current_datetime.date()]
def assign_numbers_to_doctor_phone_times(self):
2024-09-07 21:12:55 +02:00
"""
Assign numbers to the doctors in phone time objects to distinct them from each other visually, since a doctor
can have multiple phone times in the next week(s)
:return:
"""
# Store known doctors and their number
known_doctor_names_with_nr = {}
2024-09-07 21:12:55 +02:00
# Users prefer starting with 1 instead of 0
2024-08-28 10:08:46 +02:00
doctor_count = 1
2024-09-07 21:12:55 +02:00
# For every phone time, process the doctor
for doctor_phone_time in self.processed_doctor_phone_times:
2024-09-07 21:12:55 +02:00
# New count for new doctor
2024-08-28 10:08:46 +02:00
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
doctor_phone_time.doctor_nr = doctor_count
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
doctor_count += 1
2024-09-07 21:12:55 +02:00
# Add known count to phone time with known doctor
2024-08-28 10:08:46 +02:00
else:
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
doctor_phone_time.doctor_nr = known_doctor_count
2024-08-26 21:05:34 +02:00
@staticmethod
def calculate_req_value_base64(lat, lon):
"""
This function is based on the initial Javascript code found in app.js.
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
2024-09-07 21:12:55 +02:00
:param lat: Latitude given by the Arztsuche API
:param lon: Longitude given by the Arztsuche API
2024-08-26 21:05:34 +02:00
:return:
"""
# Adjust lat and lon values slightly
adjusted_lat = lat + 1.1
adjusted_lon = lon + 2.3
# Get the current time in milliseconds since epoch
current_time = datetime.now()
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
# Extract digits from latitude
lat_integer_part = str(adjusted_lat).split(".")[0]
lat_last_digit = lat_integer_part[-1]
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
# Extract digits from longitude
lon_integer_part = str(adjusted_lon).split(".")[0]
lon_last_digit = lon_integer_part[-1]
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
# Create the final string by combining digits
combined_string = (
lat_last_digit +
timestamp_str[-1] +
lon_last_digit +
timestamp_str[-2] +
lat_first_fraction_digit +
timestamp_str[-3] +
lon_first_fraction_digit
)
# Encode the combined string in Base64
encoded_value = base64.b64encode(combined_string.encode()).decode()
return encoded_value
@staticmethod
def parse_date_string(date_string):
2024-09-07 21:12:55 +02:00
"""
Parse a date string given by the Arztsuche API and return an actual datetime object
:param date_string: Date string to parse
:return: parsed datetime as actual object or exception for failed parsing (we like input validation)
"""
# String as given as result by the API (based on known values)
2024-08-26 21:05:34 +02:00
format_string = "%d.%m. %H:%M"
2024-09-07 21:12:55 +02:00
# Get the current year since the year is not given as part of the date string to prevent the use of a wrong year
2024-08-28 17:17:59 +02:00
current_year = datetime.now().year
2024-09-07 21:12:55 +02:00
# Try block to prevent invalid dates
2024-08-28 17:17:59 +02:00
try:
2024-09-07 21:12:55 +02:00
# 24:00 can be part of a date string returned by the API - even though it doesn't make lots of sense
2024-08-28 17:17:59 +02:00
if "24:00" in date_string:
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
# less to work with proper input
date_string = date_string.replace("24:00", "23:59")
2024-08-28 17:17:59 +02:00
# Add the current year since it is not part of the original date sent by the API
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
if parsed_date < datetime.now():
parsed_date = parsed_date.replace(year=current_year + 1)
return parsed_date
except ValueError as e:
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")