import json import sys from typing import List import requests from requests import JSONDecodeError import base64 from datetime import datetime, timedelta from arztapi.ArztPraxisDatas import ArztPraxisDatas, ArztPraxisData from arztapi.DoctorInformation import DoctorInformation, PhoneTime from arztapi.DoctorPhoneTime import DoctorPhoneTime class APIHandler: """ Class for accessing and handling the Arztsuche API -> Get necessary data and filter it for theraPy """ def __init__(self, redis_client): # Base URL as given by the base website self.base_api_url = "https://arztsuche.116117.de/api/" self.json_data = {} # Containers for phone times, general doctor information and processed phone times of doctors self.phone_times = [] self.general_information = [] self.processed_doctor_phone_times = [] self.redis_client = redis_client def get_lat_lon_location_list(self, location): """ Use a given location input string and search for the location with the Arztsuche API -> input string can contain more or less everything to search for, but should not contain spaces (since the original API is not able to handle spaces properly) :param location: given as input string/plz with more or less validation, directly given to the Arztsuche API :return: location matches in JSON format or catching raised JSONDecodeError (resulting in None) """ # API path as given by original website api_path = self.base_api_url + "location" # Headers copied by manual cURL headers = { "Accept": "application/json, text/plain, */*", "Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7", # Authorization header gathered by initial cURLing "Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==", "Connection": "keep-alive", } params = { "loc": location, } response = requests.get(api_path, params=params, headers=headers) # Try to return the response in JSON try: # Possibly multiple results, further processing by caller return response.json() # JSONDecodeError for empty response object -> None, no location match except JSONDecodeError: return None def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting, amount_of_weeks) -> ArztPraxisDatas: # Use the selected therapy types in case of "Verhaltenstherapie" -> most data used for it, might overload the # request, so other therapy types are missing if "V" in therapy_types: selected_codes = therapy_types # Collect all three other therapy types in case of missing V else: selected_codes = ["A", "S", "T"] # Data object as built by the original website, some fields might not be plausible or known (since there is no # API documentation itself available self.json_data = { # TODO: Find out what r means "r": 900, "lat": lat, "lon": lon, "filterSelections": [ { "title": "Fachgebiet Kategorie", "fieldName": "fgg", "selectedCodes": [ "12", ], }, { "title": "Psychotherapie: Verfahren", "fieldName": "ptv", "selectedCodes": selected_codes }, { "title": "Psychotherapie: Altersgruppe", "fieldName": "pta", "selectedCodes": [ therapy_age ], }, { "title": "Psychotherapie: Setting", "fieldName": "pts", "selectedCodes": [ therapy_setting ], }, ], "locOrigin": "USER_INPUT", "initialSearch": False, "viaDeeplink": False, } amount_of_days = amount_of_weeks * 7 cache_data = self.get_current_doctor_information_data_in_cache_with_time_check(amount_of_days) if cache_data: self.phone_times = cache_data else: self.get_list_of_doctors_from_api(lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting) self.set_current_doctor_information_data_in_cache() # Filter for the relevant therapy times before processing self.filter_for_therapy_types(therapy_types) self.filter_for_duplicates() def get_list_of_doctors_from_api(self, lat, lon, req_val_base64, therapy_types, therapy_age, therapy_setting) -> ArztPraxisDatas: """ :param lat: Latitude as given by location API :param lon: Longitude as given by location API :param req_val_base64: base64 value required for API access (is this a token?) :param therapy_types: Therapy types of interest :param therapy_age: Therapy age range of interest :param therapy_setting: Therapy setting of interest :return: Relevant doctor/therapist data """ # API path for doctor data api_path = self.base_api_url + "data" headers = { "Accept": "application/json", "Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7", # Authorization header gathered by initial cURLing "Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==", "Connection": "keep-alive", # Calculated base64 value based on latitude and longitude "req-val": req_val_base64, } response = requests.post(api_path, headers=headers, json=self.json_data) # Check for HTTP errors response.raise_for_status() # Convert phone times to data format as input validation, save as class variable for further processing self.phone_times = ArztPraxisDatas(**response.json()) # Return result for processing by caller return self.phone_times def get_current_doctor_information_data_in_cache_with_time_check(self, amount_of_days): cached_data = self.redis_client.get(str(self.json_data)) if cached_data: cached_data = json.loads(cached_data) cache_timestamp = datetime.fromisoformat(cached_data["timestamp"]) current_date = datetime.now() time_difference = current_date - cache_timestamp if time_difference.days <= amount_of_days: data = [ArztPraxisData.model_validate(json.loads(item)) for item in cached_data["data"]] data = ArztPraxisDatas(arztPraxisDatas=data) return data def set_current_doctor_information_data_in_cache(self): current_date = datetime.now() serialized_data = [item.json() for item in self.phone_times.arztPraxisDatas] self.redis_client.set(str(self.json_data), json.dumps({"timestamp": current_date.isoformat(), "data": serialized_data})) def filter_for_therapy_types(self, therapy_types): """ The idea is to get as much data as possible from the API at once to minimize the number of API calls. For some cases, more data than actually necessary is cached and this is a filter for it. :param therapy_types: Desired therapy types :return: """ # Mapping of the selected codes to the actual therapy type mapping = { "V": "Verhaltenstherapie", "T": "Tiefenpsychologisch fundierte Psychotherapie", "A": "Analytische Psychotherapie", "S": "Systemische Therapie" } # Store relevant phone times relevant_phone_times = [] for data in self.phone_times.arztPraxisDatas: # Relevant therapy type stored in psy settings = data.psy # Multiple therapy types might be available for setting in settings: # Check if the therapy type which is desired is available if any(mapping[code] in setting for code in therapy_types): relevant_phone_times.append(data) # Update with relevant phone times self.phone_times.arztPraxisDatas = relevant_phone_times def filter_for_duplicates(self): self.phone_times.arztPraxisDatas = [i for n, i in enumerate(self.phone_times.arztPraxisDatas) if i not in self.phone_times.arztPraxisDatas[:n]] def get_general_doctor_information(self) -> List[DoctorInformation]: """ Transform and filter data to more usable format: Check for phone times and collect general doctor information data of interest Function should be called after initial API call, but doesn't create an error if not, just returning empty list in case of :return: General doctor information """ # Create container for saving information general_doctor_information = [] # Iterate over phone times -> API should have been accessed before for data in self.phone_times.arztPraxisDatas: # Remove empty phone number fields if data.tel == "": continue # Get times of doctors in a day (maybe tsz = tageszeit?) doctor_day_times = data.tsz phone_times = [] # Get phone times on every given day for day in doctor_day_times: # If available -> phone time(s) detected if day.tszDesTyps: # Collect every phone time for contact_times in day.tszDesTyps: # Check if phone time is actually phone time and not only opening time if contact_times.typ == "Telefonische Erreichbarkeit": # Get as call time available for speaking with therapist phone_times_day = contact_times.sprechzeiten # Process phone time properly for phone_time_day in phone_times_day: # String magic since the actual phone time is given as string such as 9:00-10:00 start_time_str, end_time_str = phone_time_day.zeit.split("-") # Parse both to datetime object start_date_time = self.parse_date_string(f"{day.d} {start_time_str}") end_date_time = self.parse_date_string(f"{day.d} {end_time_str}") # Create a dict out of it current_phone_time_dict = { "start": start_date_time, "end": end_date_time } # Dict to actual PhoneTime object -> input validation current_phone_time = PhoneTime(**current_phone_time_dict) # Add result to overall list phone_times.append(current_phone_time) # Collect relevant information of doctor/therapist doctor_information_dict = { "name": data.name, "tel": data.tel, "fax": data.fax, "anrede": data.anrede, "email": data.email, "distance": data.distance, "strasse": data.strasse, "hausnummer": data.hausnummer, "plz": data.plz, "ort": data.ort, "telefonzeiten": phone_times } # Convert to actual DoctorInformation object -> input validation doctor_information = DoctorInformation(**doctor_information_dict) general_doctor_information.append(doctor_information) # Save result to class for further processing and return for caller self.general_information = general_doctor_information return self.general_information def filter_doctor_information_for_distance(self, distance): """ Filter the given list of doctors based on the distance to them calculated by the given location as a result of the original API call :param distance: Distance to location given in meters for doctors within given radius :return: No return, only filter class variable """ # Keep every doctor information within the given distance/radius self.general_information = [doctor_information for doctor_information in self.general_information if doctor_information.distance <= distance] def get_doctor_phone_times_sorted(self, therapy_phone_weeks): """ Sort the current list of doctor phone times by start date and return the result :param therapy_phone_weeks: Amount of weeks for showing phone times :return: """ # Process doctor information to desired format with relevant information for phone times for doctor_information in self.general_information: # Perspective: phone time as data object of choice for phone_time in doctor_information.telefonzeiten: # Relevant information doctor_phone_time_dict = { "phone_time": phone_time, # workaround until properly assigned in sort "doctor_nr": 0, "doctor_name": doctor_information.name, # Address in readable format "doctor_address": f"{doctor_information.plz} {doctor_information.ort} " f"{doctor_information.strasse} {doctor_information.hausnummer}", "doctor_phone_number": doctor_information.tel } # Convert to actual DoctorPhoneTime object -> input validation doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict) self.processed_doctor_phone_times.append(doctor_phone_time) # Apply filter for desired amount of weeks for phone times self.filter_for_relevant_weeks(therapy_phone_weeks) # Sort the times by the starting time self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start) # Throw out already filtered times self.filter_for_already_passed_times_today() # Assign the numbers for showing them properly in the web interface self.assign_numbers_to_doctor_phone_times() return self.processed_doctor_phone_times def filter_for_relevant_weeks(self, therapy_phone_weeks): """ Get the desired amount of weeks and filter for them :param therapy_phone_weeks: Desired amount of weeks for showing phone times :return: None, class variable affected """ # Get the current date for calculation and determining the next weeks current_date = datetime.now() # Calculate the end date by a timedelta based on the given amount of weeks end_date = current_date + timedelta(weeks=therapy_phone_weeks) # Filter for the relevant weeks self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times if current_date <= dpt.phone_time.start <= end_date] def filter_for_already_passed_times_today(self): """ Get the phone times of the current day and filter already passed phone times since they are considered not relevant anymore :return: None, class variable affected """ # Get the current date with time for calculations current_datetime = datetime.now() # Check if a phone time on the same day has already passed based on the end time or if the phone time is on # another date (and keep them) self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times if (dpt.phone_time.start.date() == current_datetime.date() and dpt.phone_time.end > current_datetime) or dpt.phone_time.start.date() != current_datetime.date()] def assign_numbers_to_doctor_phone_times(self): """ Assign numbers to the doctors in phone time objects to distinct them from each other visually, since a doctor can have multiple phone times in the next week(s) :return: """ # Store known doctors and their number known_doctor_names_with_nr = {} # Users prefer starting with 1 instead of 0 doctor_count = 1 # For every phone time, process the doctor for doctor_phone_time in self.processed_doctor_phone_times: # New count for new doctor if doctor_phone_time.doctor_name not in known_doctor_names_with_nr: doctor_phone_time.doctor_nr = doctor_count known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count doctor_count += 1 # Add known count to phone time with known doctor else: known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name] doctor_phone_time.doctor_nr = known_doctor_count @staticmethod def calculate_req_value_base64(lat, lon): """ This function is based on the initial Javascript code found in app.js. It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location. :param lat: Latitude given by the Arztsuche API :param lon: Longitude given by the Arztsuche API :return: """ # Adjust lat and lon values slightly adjusted_lat = lat + 1.1 adjusted_lon = lon + 2.3 # Get the current time in milliseconds since epoch current_time = datetime.now() timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds # Extract digits from latitude lat_integer_part = str(adjusted_lat).split(".")[0] lat_last_digit = lat_integer_part[-1] lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0" # Extract digits from longitude lon_integer_part = str(adjusted_lon).split(".")[0] lon_last_digit = lon_integer_part[-1] lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0" # Create the final string by combining digits combined_string = ( lat_last_digit + timestamp_str[-1] + lon_last_digit + timestamp_str[-2] + lat_first_fraction_digit + timestamp_str[-3] + lon_first_fraction_digit ) # Encode the combined string in Base64 encoded_value = base64.b64encode(combined_string.encode()).decode() return encoded_value @staticmethod def parse_date_string(date_string): """ Parse a date string given by the Arztsuche API and return an actual datetime object :param date_string: Date string to parse :return: parsed datetime as actual object or exception for failed parsing (we like input validation) """ # String as given as result by the API (based on known values) format_string = "%d.%m. %H:%M" # Get the current year since the year is not given as part of the date string to prevent the use of a wrong year current_year = datetime.now().year # Try block to prevent invalid dates try: # 24:00 can be part of a date string returned by the API - even though it doesn't make lots of sense if "24:00" in date_string: # Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute # less to work with proper input date_string = date_string.replace("24:00", "23:59") # Add the current year since it is not part of the original date sent by the API parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year) # Handle turn of the year: if date is in the past relative to today, consider it as part of the next year if parsed_date < datetime.now(): parsed_date = parsed_date.replace(year=current_year + 1) return parsed_date except ValueError as e: raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")