463 lines
21 KiB
Python
463 lines
21 KiB
Python
import json
|
|
import sys
|
|
from typing import List
|
|
|
|
import requests
|
|
from requests import JSONDecodeError
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
|
|
from arztapi.ArztPraxisDatas import ArztPraxisDatas, ArztPraxisData
|
|
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
|
|
from arztapi.DoctorPhoneTime import DoctorPhoneTime
|
|
|
|
|
|
class APIHandler:
|
|
"""
|
|
Class for accessing and handling the Arztsuche API -> Get necessary data and filter it for theraPy
|
|
"""
|
|
|
|
def __init__(self, redis_client):
|
|
# Base URL as given by the base website
|
|
self.base_api_url = "https://arztsuche.116117.de/api/"
|
|
self.json_data = {}
|
|
# Containers for phone times, general doctor information and processed phone times of doctors
|
|
self.phone_times = []
|
|
self.general_information = []
|
|
self.processed_doctor_phone_times = []
|
|
self.redis_client = redis_client
|
|
|
|
def get_lat_lon_location_list(self, location):
|
|
"""
|
|
Use a given location input string and search for the location with the Arztsuche API -> input string can contain
|
|
more or less everything to search for, but should not contain spaces (since the original API is not able to
|
|
handle spaces properly)
|
|
:param location: given as input string/plz with more or less validation, directly given to the Arztsuche API
|
|
:return: location matches in JSON format or catching raised JSONDecodeError (resulting in None)
|
|
"""
|
|
|
|
# API path as given by original website
|
|
api_path = self.base_api_url + "location"
|
|
# Headers copied by manual cURL
|
|
headers = {
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
# Authorization header gathered by initial cURLing
|
|
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
params = {
|
|
"loc": location,
|
|
}
|
|
|
|
response = requests.get(api_path, params=params, headers=headers)
|
|
|
|
# Try to return the response in JSON
|
|
try:
|
|
# Possibly multiple results, further processing by caller
|
|
return response.json()
|
|
|
|
# JSONDecodeError for empty response object -> None, no location match
|
|
except JSONDecodeError:
|
|
return None
|
|
|
|
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting,
|
|
amount_of_weeks) -> ArztPraxisDatas:
|
|
|
|
# Use the selected therapy types in case of "Verhaltenstherapie" -> most data used for it, might overload the
|
|
# request, so other therapy types are missing
|
|
if "V" in therapy_types:
|
|
selected_codes = therapy_types
|
|
|
|
# Collect all three other therapy types in case of missing V
|
|
else:
|
|
selected_codes = ["A", "S", "T"]
|
|
|
|
# Data object as built by the original website, some fields might not be plausible or known (since there is no
|
|
# API documentation itself available
|
|
self.json_data = {
|
|
# TODO: Find out what r means
|
|
"r": 900,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"filterSelections": [
|
|
{
|
|
"title": "Fachgebiet Kategorie",
|
|
"fieldName": "fgg",
|
|
"selectedCodes": [
|
|
"12",
|
|
],
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Verfahren",
|
|
"fieldName": "ptv",
|
|
"selectedCodes": selected_codes
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Altersgruppe",
|
|
"fieldName": "pta",
|
|
"selectedCodes": [
|
|
therapy_age
|
|
],
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Setting",
|
|
"fieldName": "pts",
|
|
"selectedCodes": [
|
|
therapy_setting
|
|
],
|
|
},
|
|
],
|
|
"locOrigin": "USER_INPUT",
|
|
"initialSearch": False,
|
|
"viaDeeplink": False,
|
|
}
|
|
|
|
amount_of_days = amount_of_weeks * 7
|
|
cache_data = self.get_current_doctor_information_data_in_cache_with_time_check(amount_of_days)
|
|
|
|
if cache_data:
|
|
self.phone_times = cache_data
|
|
|
|
else:
|
|
self.get_list_of_doctors_from_api(lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting)
|
|
self.set_current_doctor_information_data_in_cache()
|
|
|
|
# Filter for the relevant therapy times before processing
|
|
self.filter_for_therapy_types(therapy_types)
|
|
self.filter_for_duplicates()
|
|
|
|
def get_list_of_doctors_from_api(self, lat, lon, req_val_base64, therapy_types, therapy_age,
|
|
therapy_setting) -> ArztPraxisDatas:
|
|
"""
|
|
|
|
:param lat: Latitude as given by location API
|
|
:param lon: Longitude as given by location API
|
|
:param req_val_base64: base64 value required for API access (is this a token?)
|
|
:param therapy_types: Therapy types of interest
|
|
:param therapy_age: Therapy age range of interest
|
|
:param therapy_setting: Therapy setting of interest
|
|
:return: Relevant doctor/therapist data
|
|
"""
|
|
|
|
# API path for doctor data
|
|
api_path = self.base_api_url + "data"
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
# Authorization header gathered by initial cURLing
|
|
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
|
|
"Connection": "keep-alive",
|
|
# Calculated base64 value based on latitude and longitude
|
|
"req-val": req_val_base64,
|
|
}
|
|
|
|
response = requests.post(api_path, headers=headers, json=self.json_data)
|
|
# Check for HTTP errors
|
|
response.raise_for_status()
|
|
# Convert phone times to data format as input validation, save as class variable for further processing
|
|
self.phone_times = ArztPraxisDatas(**response.json())
|
|
# Return result for processing by caller
|
|
return self.phone_times
|
|
|
|
def get_current_doctor_information_data_in_cache_with_time_check(self, amount_of_days):
|
|
cached_data = self.redis_client.get(str(self.json_data))
|
|
if cached_data:
|
|
cached_data = json.loads(cached_data)
|
|
cache_timestamp = datetime.fromisoformat(cached_data["timestamp"])
|
|
current_date = datetime.now()
|
|
time_difference = current_date - cache_timestamp
|
|
if time_difference.days <= amount_of_days:
|
|
data = [ArztPraxisData.model_validate(json.loads(item)) for item in cached_data["data"]]
|
|
data = ArztPraxisDatas(arztPraxisDatas=data)
|
|
return data
|
|
|
|
def set_current_doctor_information_data_in_cache(self):
|
|
current_date = datetime.now()
|
|
serialized_data = [item.json() for item in self.phone_times.arztPraxisDatas]
|
|
self.redis_client.set(str(self.json_data), json.dumps({"timestamp": current_date.isoformat(),
|
|
"data": serialized_data}))
|
|
|
|
def filter_for_therapy_types(self, therapy_types):
|
|
"""
|
|
The idea is to get as much data as possible from the API at once to minimize the number of API calls.
|
|
For some cases, more data than actually necessary is cached and this is a filter for it.
|
|
:param therapy_types: Desired therapy types
|
|
:return:
|
|
"""
|
|
|
|
# Mapping of the selected codes to the actual therapy type
|
|
mapping = {
|
|
"V": "Verhaltenstherapie",
|
|
"T": "Tiefenpsychologisch fundierte Psychotherapie",
|
|
"A": "Analytische Psychotherapie",
|
|
"S": "Systemische Therapie"
|
|
}
|
|
|
|
# Store relevant phone times
|
|
relevant_phone_times = []
|
|
|
|
for data in self.phone_times.arztPraxisDatas:
|
|
# Relevant therapy type stored in psy
|
|
settings = data.psy
|
|
# Multiple therapy types might be available
|
|
for setting in settings:
|
|
# Check if the therapy type which is desired is available
|
|
if any(mapping[code] in setting for code in therapy_types):
|
|
relevant_phone_times.append(data)
|
|
|
|
# Update with relevant phone times
|
|
self.phone_times.arztPraxisDatas = relevant_phone_times
|
|
|
|
def filter_for_duplicates(self):
|
|
self.phone_times.arztPraxisDatas = [i for n, i in enumerate(self.phone_times.arztPraxisDatas)
|
|
if i not in self.phone_times.arztPraxisDatas[:n]]
|
|
|
|
def get_general_doctor_information(self) -> List[DoctorInformation]:
|
|
"""
|
|
Transform and filter data to more usable format: Check for phone times and collect general doctor information
|
|
data of interest
|
|
Function should be called after initial API call, but doesn't create an error if not, just returning empty list
|
|
in case of
|
|
:return: General doctor information
|
|
"""
|
|
|
|
# Create container for saving information
|
|
general_doctor_information = []
|
|
# Iterate over phone times -> API should have been accessed before
|
|
for data in self.phone_times.arztPraxisDatas:
|
|
# Remove empty phone number fields
|
|
if data.tel == "":
|
|
continue
|
|
# Get times of doctors in a day (maybe tsz = tageszeit?)
|
|
doctor_day_times = data.tsz
|
|
phone_times = []
|
|
# Get phone times on every given day
|
|
for day in doctor_day_times:
|
|
# If available -> phone time(s) detected
|
|
if day.tszDesTyps:
|
|
# Collect every phone time
|
|
for contact_times in day.tszDesTyps:
|
|
# Check if phone time is actually phone time and not only opening time
|
|
if contact_times.typ == "Telefonische Erreichbarkeit":
|
|
# Get as call time available for speaking with therapist
|
|
phone_times_day = contact_times.sprechzeiten
|
|
# Process phone time properly
|
|
for phone_time_day in phone_times_day:
|
|
# String magic since the actual phone time is given as string such as 9:00-10:00
|
|
start_time_str, end_time_str = phone_time_day.zeit.split("-")
|
|
# Parse both to datetime object
|
|
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
|
|
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
|
|
# Create a dict out of it
|
|
current_phone_time_dict = {
|
|
"start": start_date_time,
|
|
"end": end_date_time
|
|
}
|
|
# Dict to actual PhoneTime object -> input validation
|
|
current_phone_time = PhoneTime(**current_phone_time_dict)
|
|
# Add result to overall list
|
|
phone_times.append(current_phone_time)
|
|
|
|
# Collect relevant information of doctor/therapist
|
|
doctor_information_dict = {
|
|
"name": data.name,
|
|
"tel": data.tel,
|
|
"fax": data.fax,
|
|
"anrede": data.anrede,
|
|
"email": data.email,
|
|
"distance": data.distance,
|
|
"strasse": data.strasse,
|
|
"hausnummer": data.hausnummer,
|
|
"plz": data.plz,
|
|
"ort": data.ort,
|
|
"telefonzeiten": phone_times
|
|
}
|
|
# Convert to actual DoctorInformation object -> input validation
|
|
doctor_information = DoctorInformation(**doctor_information_dict)
|
|
general_doctor_information.append(doctor_information)
|
|
|
|
# Save result to class for further processing and return for caller
|
|
self.general_information = general_doctor_information
|
|
return self.general_information
|
|
|
|
def filter_doctor_information_for_distance(self, distance):
|
|
"""
|
|
Filter the given list of doctors based on the distance to them calculated by the given location as a result of
|
|
the original API call
|
|
:param distance: Distance to location given in meters for doctors within given radius
|
|
:return: No return, only filter class variable
|
|
"""
|
|
|
|
# Keep every doctor information within the given distance/radius
|
|
self.general_information = [doctor_information for doctor_information in self.general_information
|
|
if doctor_information.distance <= distance]
|
|
|
|
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
|
|
"""
|
|
Sort the current list of doctor phone times by start date and return the result
|
|
:param therapy_phone_weeks: Amount of weeks for showing phone times
|
|
:return:
|
|
"""
|
|
|
|
# Process doctor information to desired format with relevant information for phone times
|
|
for doctor_information in self.general_information:
|
|
# Perspective: phone time as data object of choice
|
|
for phone_time in doctor_information.telefonzeiten:
|
|
# Relevant information
|
|
doctor_phone_time_dict = {
|
|
"phone_time": phone_time,
|
|
# workaround until properly assigned in sort
|
|
"doctor_nr": 0,
|
|
"doctor_name": doctor_information.name,
|
|
# Address in readable format
|
|
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
|
|
f"{doctor_information.strasse} {doctor_information.hausnummer}",
|
|
"doctor_phone_number": doctor_information.tel
|
|
}
|
|
# Convert to actual DoctorPhoneTime object -> input validation
|
|
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
|
|
self.processed_doctor_phone_times.append(doctor_phone_time)
|
|
|
|
# Apply filter for desired amount of weeks for phone times
|
|
self.filter_for_relevant_weeks(therapy_phone_weeks)
|
|
# Sort the times by the starting time
|
|
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
|
|
# Throw out already filtered times
|
|
self.filter_for_already_passed_times_today()
|
|
# Assign the numbers for showing them properly in the web interface
|
|
self.assign_numbers_to_doctor_phone_times()
|
|
|
|
return self.processed_doctor_phone_times
|
|
|
|
def filter_for_relevant_weeks(self, therapy_phone_weeks):
|
|
"""
|
|
Get the desired amount of weeks and filter for them
|
|
:param therapy_phone_weeks: Desired amount of weeks for showing phone times
|
|
:return: None, class variable affected
|
|
"""
|
|
|
|
# Get the current date for calculation and determining the next weeks
|
|
current_date = datetime.now()
|
|
# Calculate the end date by a timedelta based on the given amount of weeks
|
|
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
|
|
# Filter for the relevant weeks
|
|
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
|
|
if current_date <= dpt.phone_time.start <= end_date]
|
|
|
|
def filter_for_already_passed_times_today(self):
|
|
"""
|
|
Get the phone times of the current day and filter already passed phone times since they are considered not
|
|
relevant anymore
|
|
:return: None, class variable affected
|
|
"""
|
|
|
|
# Get the current date with time for calculations
|
|
current_datetime = datetime.now()
|
|
# Check if a phone time on the same day has already passed based on the end time or if the phone time is on
|
|
# another date (and keep them)
|
|
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
|
|
if (dpt.phone_time.start.date() == current_datetime.date() and
|
|
dpt.phone_time.end > current_datetime)
|
|
or dpt.phone_time.start.date() != current_datetime.date()]
|
|
|
|
def assign_numbers_to_doctor_phone_times(self):
|
|
"""
|
|
Assign numbers to the doctors in phone time objects to distinct them from each other visually, since a doctor
|
|
can have multiple phone times in the next week(s)
|
|
:return:
|
|
"""
|
|
|
|
# Store known doctors and their number
|
|
known_doctor_names_with_nr = {}
|
|
# Users prefer starting with 1 instead of 0
|
|
doctor_count = 1
|
|
# For every phone time, process the doctor
|
|
for doctor_phone_time in self.processed_doctor_phone_times:
|
|
# New count for new doctor
|
|
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
|
|
doctor_phone_time.doctor_nr = doctor_count
|
|
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
|
|
doctor_count += 1
|
|
# Add known count to phone time with known doctor
|
|
else:
|
|
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
|
|
doctor_phone_time.doctor_nr = known_doctor_count
|
|
|
|
@staticmethod
|
|
def calculate_req_value_base64(lat, lon):
|
|
"""
|
|
This function is based on the initial Javascript code found in app.js.
|
|
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
|
|
:param lat: Latitude given by the Arztsuche API
|
|
:param lon: Longitude given by the Arztsuche API
|
|
:return:
|
|
"""
|
|
# Adjust lat and lon values slightly
|
|
adjusted_lat = lat + 1.1
|
|
adjusted_lon = lon + 2.3
|
|
|
|
# Get the current time in milliseconds since epoch
|
|
current_time = datetime.now()
|
|
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
|
|
|
|
# Extract digits from latitude
|
|
lat_integer_part = str(adjusted_lat).split(".")[0]
|
|
lat_last_digit = lat_integer_part[-1]
|
|
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
|
|
|
|
# Extract digits from longitude
|
|
lon_integer_part = str(adjusted_lon).split(".")[0]
|
|
lon_last_digit = lon_integer_part[-1]
|
|
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
|
|
|
|
# Create the final string by combining digits
|
|
combined_string = (
|
|
lat_last_digit +
|
|
timestamp_str[-1] +
|
|
lon_last_digit +
|
|
timestamp_str[-2] +
|
|
lat_first_fraction_digit +
|
|
timestamp_str[-3] +
|
|
lon_first_fraction_digit
|
|
)
|
|
|
|
# Encode the combined string in Base64
|
|
encoded_value = base64.b64encode(combined_string.encode()).decode()
|
|
|
|
return encoded_value
|
|
|
|
@staticmethod
|
|
def parse_date_string(date_string):
|
|
"""
|
|
Parse a date string given by the Arztsuche API and return an actual datetime object
|
|
:param date_string: Date string to parse
|
|
:return: parsed datetime as actual object or exception for failed parsing (we like input validation)
|
|
"""
|
|
|
|
# String as given as result by the API (based on known values)
|
|
format_string = "%d.%m. %H:%M"
|
|
# Get the current year since the year is not given as part of the date string to prevent the use of a wrong year
|
|
current_year = datetime.now().year
|
|
|
|
# Try block to prevent invalid dates
|
|
try:
|
|
# 24:00 can be part of a date string returned by the API - even though it doesn't make lots of sense
|
|
if "24:00" in date_string:
|
|
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
|
|
# less to work with proper input
|
|
date_string = date_string.replace("24:00", "23:59")
|
|
|
|
# Add the current year since it is not part of the original date sent by the API
|
|
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
|
|
|
|
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
|
|
if parsed_date < datetime.now():
|
|
parsed_date = parsed_date.replace(year=current_year + 1)
|
|
|
|
return parsed_date
|
|
|
|
except ValueError as e:
|
|
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")
|