255 lines
10 KiB
Python
255 lines
10 KiB
Python
from typing import List
|
|
|
|
import requests
|
|
from requests import JSONDecodeError
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from arztapi.ArztPraxisDatas import ArztPraxisDatas
|
|
from arztapi.DoctorInformation import DoctorInformation, PhoneTime
|
|
from arztapi.DoctorPhoneTime import DoctorPhoneTime
|
|
|
|
|
|
class APIHandler:
|
|
def __init__(self):
|
|
self.base_api_url = "https://arztsuche.116117.de/api/"
|
|
self.phone_times = []
|
|
self.general_information = []
|
|
self.processed_doctor_phone_times = []
|
|
|
|
def get_lat_lon_location_list(self, location):
|
|
api_path = self.base_api_url + "location"
|
|
headers = {
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
params = {
|
|
"loc": location,
|
|
}
|
|
|
|
response = requests.get(api_path, params=params, headers=headers)
|
|
|
|
try:
|
|
return response.json()
|
|
|
|
except JSONDecodeError:
|
|
print("foo")
|
|
print(response.text)
|
|
|
|
def get_list_of_doctors(self, lat, lon, req_val_base64, therapy_types, therapy_age,
|
|
therapy_setting) -> ArztPraxisDatas:
|
|
api_path = self.base_api_url + "data"
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
"Authorization": "Basic YmRwczpma3I0OTNtdmdfZg==",
|
|
"Connection": "keep-alive",
|
|
"req-val": req_val_base64,
|
|
}
|
|
|
|
json_data = {
|
|
# TODO: Find out what r means
|
|
"r": 900,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"filterSelections": [
|
|
{
|
|
"title": "Fachgebiet Kategorie",
|
|
"fieldName": "fgg",
|
|
"selectedCodes": [
|
|
"12",
|
|
],
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Verfahren",
|
|
"fieldName": "ptv",
|
|
"selectedCodes": therapy_types,
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Altersgruppe",
|
|
"fieldName": "pta",
|
|
"selectedCodes": [
|
|
therapy_age
|
|
],
|
|
},
|
|
{
|
|
"title": "Psychotherapie: Setting",
|
|
"fieldName": "pts",
|
|
"selectedCodes": [
|
|
therapy_setting
|
|
],
|
|
},
|
|
],
|
|
"locOrigin": "USER_INPUT",
|
|
"initialSearch": False,
|
|
"viaDeeplink": False,
|
|
}
|
|
|
|
response = requests.post(api_path, headers=headers, json=json_data)
|
|
response.raise_for_status()
|
|
self.phone_times = ArztPraxisDatas(**response.json())
|
|
return self.phone_times
|
|
|
|
def get_general_doctor_information(self) -> List[DoctorInformation]:
|
|
general_doctor_information = []
|
|
for data in self.phone_times.arztPraxisDatas:
|
|
doctor_day_times = data.tsz
|
|
phone_times = []
|
|
for day in doctor_day_times:
|
|
if day.tszDesTyps:
|
|
for contact_times in day.tszDesTyps:
|
|
if contact_times.typ == "Telefonische Erreichbarkeit":
|
|
phone_times_day = contact_times.sprechzeiten
|
|
for phone_time_day in phone_times_day:
|
|
start_time_str, end_time_str = phone_time_day.zeit.split("-")
|
|
start_date_time = self.parse_date_string(f"{day.d} {start_time_str}")
|
|
end_date_time = self.parse_date_string(f"{day.d} {end_time_str}")
|
|
current_phone_time_dict = {
|
|
"start": start_date_time,
|
|
"end": end_date_time
|
|
}
|
|
current_phone_time = PhoneTime(**current_phone_time_dict)
|
|
phone_times.append(current_phone_time)
|
|
|
|
doctor_information_dict = {
|
|
"name": data.name,
|
|
"tel": data.tel,
|
|
"fax": data.fax,
|
|
"anrede": data.anrede,
|
|
"email": data.email,
|
|
"distance": data.distance,
|
|
"strasse": data.strasse,
|
|
"hausnummer": data.hausnummer,
|
|
"plz": data.plz,
|
|
"ort": data.ort,
|
|
"telefonzeiten": phone_times
|
|
}
|
|
doctor_information = DoctorInformation(**doctor_information_dict)
|
|
general_doctor_information.append(doctor_information)
|
|
|
|
self.general_information = general_doctor_information
|
|
return self.general_information
|
|
|
|
def filter_doctor_information_for_distance(self, distance):
|
|
"""
|
|
:param distance: in meters
|
|
:return:
|
|
"""
|
|
self.general_information = [doctor_information for doctor_information in self.general_information
|
|
if doctor_information.distance <= distance]
|
|
|
|
def get_doctor_phone_times_sorted(self, therapy_phone_weeks):
|
|
for doctor_information in self.general_information:
|
|
for phone_time in doctor_information.telefonzeiten:
|
|
doctor_phone_time_dict = {
|
|
"phone_time": phone_time,
|
|
# workaround until properly assigned in sort
|
|
"doctor_nr": 0,
|
|
"doctor_name": doctor_information.name,
|
|
"doctor_address": f"{doctor_information.plz} {doctor_information.ort} "
|
|
f"{doctor_information.strasse} {doctor_information.hausnummer}",
|
|
"doctor_phone_number": doctor_information.tel
|
|
}
|
|
doctor_phone_time = DoctorPhoneTime(**doctor_phone_time_dict)
|
|
self.processed_doctor_phone_times.append(doctor_phone_time)
|
|
|
|
self.filter_for_relevant_weeks(therapy_phone_weeks)
|
|
self.processed_doctor_phone_times.sort(key=lambda dpt: dpt.phone_time.start)
|
|
self.filter_for_already_passed_times_today()
|
|
self.assign_numbers_to_doctor_phone_times()
|
|
|
|
return self.processed_doctor_phone_times
|
|
|
|
def filter_for_relevant_weeks(self, therapy_phone_weeks):
|
|
current_date = datetime.now()
|
|
end_date = current_date + timedelta(weeks=therapy_phone_weeks)
|
|
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
|
|
if current_date <= dpt.phone_time.start <= end_date]
|
|
|
|
def filter_for_already_passed_times_today(self):
|
|
current_datetime = datetime.now()
|
|
self.processed_doctor_phone_times = [dpt for dpt in self.processed_doctor_phone_times
|
|
if (dpt.phone_time.start.date() == current_datetime.date() and
|
|
dpt.phone_time.end > current_datetime)
|
|
or dpt.phone_time.start.date() != current_datetime.date()]
|
|
|
|
def assign_numbers_to_doctor_phone_times(self):
|
|
known_doctor_names_with_nr = {}
|
|
doctor_count = 1
|
|
for doctor_phone_time in self.processed_doctor_phone_times:
|
|
if doctor_phone_time.doctor_name not in known_doctor_names_with_nr:
|
|
doctor_phone_time.doctor_nr = doctor_count
|
|
known_doctor_names_with_nr[doctor_phone_time.doctor_name] = doctor_count
|
|
doctor_count += 1
|
|
else:
|
|
known_doctor_count = known_doctor_names_with_nr[doctor_phone_time.doctor_name]
|
|
doctor_phone_time.doctor_nr = known_doctor_count
|
|
|
|
@staticmethod
|
|
def calculate_req_value_base64(lat, lon):
|
|
"""
|
|
This function is based on the initial Javascript code found in app.js.
|
|
It is rewritten in Python to calculate the HTTP header req_val for proper requests with the correct location.
|
|
:param lat:
|
|
:param lon:
|
|
:return:
|
|
"""
|
|
# Adjust lat and lon values slightly
|
|
adjusted_lat = lat + 1.1
|
|
adjusted_lon = lon + 2.3
|
|
|
|
# Get the current time in milliseconds since epoch
|
|
current_time = datetime.now()
|
|
timestamp_str = str(int(current_time.timestamp() * 1000)) # Convert to milliseconds
|
|
|
|
# Extract digits from latitude
|
|
lat_integer_part = str(adjusted_lat).split(".")[0]
|
|
lat_last_digit = lat_integer_part[-1]
|
|
lat_first_fraction_digit = str(adjusted_lat).split(".")[1][0] if len(str(adjusted_lat).split(".")) > 1 else "0"
|
|
|
|
# Extract digits from longitude
|
|
lon_integer_part = str(adjusted_lon).split(".")[0]
|
|
lon_last_digit = lon_integer_part[-1]
|
|
lon_first_fraction_digit = str(adjusted_lon).split(".")[1][0] if len(str(adjusted_lon).split(".")) > 1 else "0"
|
|
|
|
# Create the final string by combining digits
|
|
combined_string = (
|
|
lat_last_digit +
|
|
timestamp_str[-1] +
|
|
lon_last_digit +
|
|
timestamp_str[-2] +
|
|
lat_first_fraction_digit +
|
|
timestamp_str[-3] +
|
|
lon_first_fraction_digit
|
|
)
|
|
|
|
# Encode the combined string in Base64
|
|
encoded_value = base64.b64encode(combined_string.encode()).decode()
|
|
|
|
return encoded_value
|
|
|
|
@staticmethod
|
|
def parse_date_string(date_string):
|
|
format_string = "%d.%m. %H:%M"
|
|
current_year = datetime.now().year
|
|
|
|
try:
|
|
if "24:00" in date_string:
|
|
# Sometimes the API returns 24:00 as time, so filtering for those cases and replacing it with a minute
|
|
# less to work with propery input
|
|
date_string =date_string.replace("24:00", "23:59")
|
|
|
|
# Add the current year since it is not part of the original date sent by the API
|
|
parsed_date = datetime.strptime(date_string, format_string).replace(year=datetime.now().year)
|
|
|
|
# Handle turn of the year: if date is in the past relative to today, consider it as part of the next year
|
|
if parsed_date < datetime.now():
|
|
parsed_date = parsed_date.replace(year=current_year + 1)
|
|
|
|
return parsed_date
|
|
|
|
except ValueError as e:
|
|
raise ValueError(f"Error parsing date string: '{date_string}'. Details: {e}")
|