Generierung der Views für das Statistikportal
Dr. Harald Grohganz, Bundesinstitut für Arzneimittel und Medizinprodukte, harald.grohganz@bfarm.de
Dieses jupyter-Notebook beschreibt alle Aggregationsschritte, mit denen die Abrechnungsdaten der Krankenkassen (DaTraV-Daten) am Forschungsdatenzentrum Gesundheit für das Statistikportal aufbereitet werden.
Hier werden die View-Definitionen auf Basis des öffentlich verfügten Public Use File (PUF) beschrieben, damit auch Außenstehende die Schritte selbst nachvollziehen können. Wir haben dieses Skript für die Entwicklungsversion des Statistikportals verwendet. Aufgrund von einigen Abweichungen in den Datenbankstrukturen unterscheiden sich diese Views in etlichen Details von denen für den Produktiveinsatz, der Aufbau der maßgeblichen Tabellen ist jedoch identisch.
Hinweis: Dieses Skript beinhaltet nicht den R-Code für die Verhinderung von Rückrechenmöglichkeiten. Dieser Algorithmus wird ebenfalls in Kürze zur Verfügung gestellt.
Vorbereitungen
Speichere Datenmodelle 1-3 in eine Postgres-Datenbank "PUF" mit Schemata "DM1", "DM2" und "DM3". Achte auf die verschiedenen Namenskonventionen – Satzarten "SA" mit 3-stelligen Nummern für DM1+DM2, sprechende Namen ab DM3.
Teil 1: Vorbereitung der PUF-Daten
Verbindung zur Datenbank aufbauen
# Database connection
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from sqlalchemy import text
url = URL.create(
drivername="postgresql",
username="postgres",
password="dev",
host="localhost",
port=5432,
database="PUF"
)
engine = create_engine(url)
def sql(my_string):
with engine.connect() as conn:
result = conn.execute(text(my_string))
return result
result = sql('SELECT SUBSTRING("SA651_ICD_CODE", 0, 4) icd3, COUNT(*) num FROM "DM2"."SA651" GROUP BY icd3 ORDER BY num DESC LIMIT 5')
result.fetchall()
[('I10', 894103),
('M54', 630118),
('H52', 578401),
('E78', 561327),
('E11', 423228)]
ICD-Informationen ergänzen
Auf dem produktiven System gibt eine ICD-Datentabelle, die beim PUF nicht vorliegt. In der Datenbank wird daher eine stark reduzierte ICD-Nachschlagetabelle mit anderen Spaltennamen und abweichenden Datentypen angelegt. Die importierte json-Datei besteht aus einer Liste für jeden dreistelligen ICD-Code in der Form:
[{"Code":"B30","Gruppe":"B25-B34","Anzahl":6.0,"Key":"B30","Name":"Viruskonjunktivitis","Ebene":"3","Parent":"B25-B34","Gruppe_Name":"Sonstige Viruskrankheiten","Kapitel":"01","Kapitel_Name":"Bestimmte infektiöse und parasitäre Krankheiten"}]
Hinweis: Das Statistikportal verwendet für die Darstellung die jahresaktuellen ICD-Codes, die auf der
Website des BfArM
heruntergeladen werden können. Wenn Sie Interesse an der Nachimplemetierung haben, verwenden Sie die jahresaktuellen ICD-Codes und erstellen eine Nachschlagetabelle, welche die höheren ICD-Ebenen auf Basis des Tupels (icd3, berichtsjahr) bestimmt. So erhalten Sie eine genauere Zählung von Diagnosen auf den Ebenen ICD-Gruppen und Kapitel. Ein Beispiel hierfür finden Sie z.B. auf der Website der Visionsberatung.
import pandas as pd
df_icd = pd.read_json("../website/icd10.json")
df_icd[['Kapitel']] = df_icd[['Kapitel']].astype(str)
df_icd['Kapitel'] = df_icd['Kapitel'].str.zfill(2)
with engine.connect() as conn:
df_icd.to_sql("ICD_CODES", conn, schema="StatPort", if_exists='replace')
Diagnosen: Benennungen anpassen
Da die Spalten in den beiden Tabellen SA551 (stationäre Diagnosen) und SA651 (ambulante Diagnosen) durch die Prefixes verschieden heißen, müssen erst die Namen standardisiert werden. Dies realisieren wir über Views.
Wir nutzen die Gelegenheit, um direkt die dreistelligen ICD-Codes in diesen Tabellen zu hinterlegen.
sqlquery = """
CREATE OR REPLACE VIEW "StatPort"."DM1_SA651" AS
SELECT
"SA651_BERICHTSJAHR" as BERICHTSJAHR,
"SA651_AUSGLEICHSJAHR" as JAHR,
"SA651_PSID" as PSID,
"SA651_LEISTUNGSQUARTAL" as QUARTAL,
"SA651_ICD_CODE" AS ICD_CODE,
SUBSTR("SA651_ICD_CODE",0,4) AS ICD3,
"SA651_QUALIFIZIERUNG" AS QUALIFIZIERUNG
FROM "DM1"."SA651";
CREATE OR REPLACE VIEW "StatPort"."DM2_SA651" AS
SELECT
"SA651_BERICHTSJAHR" as BERICHTSJAHR,
"SA651_PSID" as PSID,
"SA651_LEISTUNGSQUARTAL" as QUARTAL,
"SA651_ICD_CODE" AS ICD_CODE,
SUBSTR("SA651_ICD_CODE",0,4) AS ICD3,
"SA651_QUALIFIZIERUNG" AS QUALIFIZIERUNG
FROM "DM2"."SA651";
CREATE OR REPLACE VIEW "StatPort"."DM1_SA551" AS
SELECT
"SA551_BERICHTSJAHR" as BERICHTSJAHR,
"SA551_AUSGLEICHSJAHR" as JAHR,
"SA551_PSID" as PSID,
"SA551_ENTLASSUNGSMONAT" as MONAT,
"SA551_ICD_CODE" AS ICD_CODE,
SUBSTR("SA551_ICD_CODE",0,4) AS ICD3,
"SA551_ARTDIAGNOSE" AS ARTDIAGNOSE
FROM "DM1"."SA551";
CREATE OR REPLACE VIEW "StatPort"."DM2_SA551" AS
SELECT
"SA551_BERICHTSJAHR" as BERICHTSJAHR,
"SA551_PSID" as PSID,
"SA551_ENTLASSUNGSMONAT" as MONAT,
"SA551_ICD_CODE" AS ICD_CODE,
SUBSTR("SA551_ICD_CODE",0,4) AS ICD3,
"SA551_ARTDIAGNOSE" AS ARTDIAGNOSE
FROM "DM2"."SA551";
-- Datenmodell 3
CREATE OR REPLACE VIEW "StatPort"."DM3_AMBDIAG" AS
SELECT
"BJAHR" as BERICHTSJAHR,
"PSID" as PSID,
"ICDAMB_CODE" AS ICD_CODE,
SUBSTR("ICDAMB_CODE",0,4) AS ICD3,
"DIAGSICH" AS QUALIFIZIERUNG
FROM "DM3"."AMBDIAG";
CREATE OR REPLACE VIEW "StatPort"."DM3_KHDIAG" AS
SELECT
"BJAHR" as BERICHTSJAHR,
"PSID" as PSID,
"ICDKH_CODE" AS ICD_CODE,
SUBSTR("ICDKH_CODE",0,4) AS ICD3,
"DIAGART" AS DIAGART
FROM "DM3"."KHDIAG";
"""
sql(sqlquery) # Das bringt nichts - sieht so aus, als ob mehrere Anweisungen nicht unterstützt würden. Oder postgres-Schemaänderungen klappen nicht so.
<sqlalchemy.engine.cursor.CursorResult at 0x20a363d4360>
sql('SELECT * FROM "StatPort"."DM2_SA651" LIMIT 10;').fetchall()
[(2017, 'SRxD4O6OG1QVL29owyz', 2, 'M5499', 'M54', 'G'), (2017, 's5e8SQSvmlaZoNFIJ7z', 2, 'N921', 'N92', 'G'), (2017, 'w0JFFV4OK44oyfdj9Tv', 1, 'E049', 'E04', 'Z'), (2017, 'cYvV6aUSgNKKLX6Qh4T', 3, 'H609', 'H60', 'G'), (2017, 'gJprOh61Kfvk77wv1eI', 3, 'F480', 'F48', 'G'), (2017, 'AP3Yak42cVfqTLkwS7e', 3, 'E041', 'E04', 'G'), (2017, 'r6BAJU6JXIkB1m4Ev7T', 3, 'H524', 'H52', 'G'), (2017, '2jnlqxgQri5bk64nOnb', 1, 'K4091', 'K40', 'G'), (2017, 'afdB1S2wHzHeho9oTwL', 3, 'K510', 'K51', 'G'), (2017, '98RhjoOUdg9AwQjIz2G', 4, 'F334', 'F33', 'G')]
Stammdaten: Anpassen und für Statistikportal aufbereiten
Alters- und Ortsinformationen vorbereiten
sqlquery = """
CREATE OR REPLACE VIEW "StatPort"."DM1_SA151" AS
SELECT
"SA151_BERICHTSJAHR" as BERICHTSJAHR,
"SA151_PSID" as PSID,
"SA151_GEBURTSJAHR" as GEBURTSJAHR,
"SA151_GESCHLECHT" as GESCHLECHT
FROM "DM1"."SA151";
CREATE OR REPLACE VIEW "StatPort"."DM2_SA151" AS
SELECT
"SA151_BERICHTSJAHR" as BERICHTSJAHR,
"SA151_PSID" as PSID,
"SA151_GEBURTSJAHR" as GEBURTSJAHR,
"SA151_GESCHLECHT" as GESCHLECHT
FROM "DM2"."SA151";
CREATE OR REPLACE VIEW "StatPort"."DM2_SA131" AS
SELECT
"SA131_BERICHTSJAHR" as BERICHTSJAHR,
"SA131_PSID" as PSID,
"SA131_PLZ" as PLZ
FROM "DM2"."SA131";
-- DM3
CREATE OR REPLACE VIEW "StatPort"."DM3_VERSQ" AS
SELECT
"BJAHR" as BERICHTSJAHR,
"PSID" as PSID,
"GESCHLECHT" as GESCHLECHT
FROM "DM3"."VERSQ";
CREATE OR REPLACE VIEW "StatPort"."DM3_VERS" AS
SELECT
"BJAHR" as BERICHTSJAHR,
"PSID" as PSID,
"GEBJAHR" as GEBURTSJAHR,
"PLZ" as PLZ
FROM "DM3"."VERS";
"""
sql('SELECT * FROM "StatPort"."DM2_SA151" LIMIT 5;').fetchall()
[(2017, 'd1fVfChqaNBlZDMzlxJ', 1970, 2), (2017, 'Q24mxraOPu84IAIcqi5', 1955, 1), (2017, 's1b58lDKkLIUZM290C4', 1977, 2), (2017, 'mwARKhxc0EaJO3mAqNG', 1981, 2), (2017, 'hJ2ALcF0Ez2IhZKqP41', 1986, 1)]
Erzeugung der Basis-Tabellen DIAG und PERS
Die Materialized View funktioniert wie CREATE TABLE AS, behält aber den Code für die Erzeugung gespeichert und kann daher innerhalb der Datenbank aktualisiert werden.
Diagnosetabelle DIAG
Die Tabelle DIAG für die Datenmodelle 1 und 2 wird mittels der Hilfsviews von weiter oben generiert.
Die Erzeugung hier benötigt grob 30 Sekunden.
Update: Mit den Tabellen aus DM3 sind es dann 1:30 min.
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."DIAG"
AS
SELECT
diag.JAHR,
diag.psid AS PSID,
diag.icd3 AS ICD3,
icd."Gruppe" AS ICD2,
icd."Kapitel" AS ICD1
FROM ( SELECT "DM1_SA551".jahr AS JAHR,
"DM1_SA551".psid,
"DM1_SA551".icd3
FROM "StatPort"."DM1_SA551"
UNION ALL
SELECT "DM2_SA551".berichtsjahr AS JAHR,
"DM2_SA551".psid,
"DM2_SA551".icd3
FROM "StatPort"."DM2_SA551"
UNION ALL
SELECT "DM1_SA651".jahr AS JAHR,
"DM1_SA651".psid,
"DM1_SA651".icd3
FROM "StatPort"."DM1_SA651"
UNION ALL
SELECT "DM2_SA651".berichtsjahr AS JAHR,
"DM2_SA651".psid,
"DM2_SA651".icd3
FROM "StatPort"."DM2_SA651"
UNION ALL
SELECT "DM3_KHDIAG".berichtsjahr AS JAHR,
"DM3_KHDIAG".psid,
"DM3_KHDIAG".icd3
FROM "StatPort"."DM3_KHDIAG"
UNION ALL
SELECT "DM3_AMBDIAG".berichtsjahr AS JAHR,
"DM3_AMBDIAG".psid,
"DM3_AMBDIAG".icd3
FROM "StatPort"."DM3_AMBDIAG"
) diag
JOIN "StatPort"."ICD_CODES" icd ON diag.icd3 = icd."Code";
"""
Stammdatentabelle
Da im PUF die Tabelle SA131 fehlt, werden die Postleitzahlen zufällig generiert. Update 03.02.2025: Mittlerweile liegt die Tabelle SA131 vor, der untenstehende Code wurde angepasst.*
Achtung: Wegen der zufälligen Anordnung der Spalten kann es sein, dass Krankenkassenwechsel auf einmal mehrere Geburtsdaten haben. Per Konvention wird in solchen Fällen jetzt immer die größte Zahl genommen. Dasselbe gilt für den Geschlechtswert ab DM3, der hier quartalsweise geliefert wird - hier wird zufällig einer der vorkommenden Werte (postgresql: any_value) verwendet.
Laufzeit ca. 10 Sekunden (mit DM3: 13 Sekunden)
Zu Dokumentationszwecken:
JOIN (
SELECT
PSID,
BERICHTSJAHR,
CASE WHEN BERICHTSJAHR > 2015 THEN floor(random() * 99999 + 1)::int
ELSE NULL::int
END AS PLZ
FROM (SELECT * FROM "StatPort"."DM1_SA151" UNION ALL SELECT * FROM "StatPort"."DM2_SA151")
) sa131
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."PERS"
AS
SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
FROM (
SELECT
sa151.PSID as PSID,
sa151.BERICHTSJAHR as JAHR,
SUBSTR(LPAD(MAX(sa131.PLZ)::text, 5, '0'), 1, 2) AS PLZ2,
SUBSTR(LPAD(MAX(sa131.PLZ)::text, 5, '0'), 1, 1) AS PLZ1,
any_value(sa151.GESCHLECHT) AS SEX,
LEAST(80, FLOOR(MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR)/10)*10) AS AGE10,
CASE WHEN MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR) < 20 THEN '0-19'
WHEN MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR) >= 60 THEN '60+'
ELSE '20-59'
END AS AGE3
FROM (SELECT * FROM "StatPort"."DM1_SA151" UNION ALL SELECT * FROM "StatPort"."DM2_SA151") sa151
LEFT JOIN "StatPort"."DM2_SA131" sa131
ON (sa151.PSID = sa131.PSID AND sa151.BERICHTSJAHR = sa131.BERICHTSJAHR)
GROUP BY sa151.PSID, sa151.BERICHTSJAHR
UNION ALL
SELECT
vers.PSID as PSID,
vers.BERICHTSJAHR as JAHR,
SUBSTR(LPAD(MAX(vers.PLZ)::text, 5, '0'), 1, 2) AS PLZ2,
SUBSTR(LPAD(MAX(vers.PLZ)::text, 5, '0'), 1, 1) AS PLZ1,
any_value(versq.GESCHLECHT) AS SEX,
LEAST(80, FLOOR(MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR)/10)*10) AS AGE10,
CASE WHEN MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR) < 20 THEN '0-19'
WHEN MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR) >= 60 THEN '60+'
ELSE '20-59'
END AS AGE3
FROM "StatPort"."DM3_VERS" vers
LEFT JOIN "StatPort"."DM3_VERSQ" versq
ON (vers.PSID = versq.PSID AND vers.BERICHTSJAHR = versq.BERICHTSJAHR)
GROUP BY vers.PSID, vers.BERICHTSJAHR
)
GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
"""
Erstellung der aggregierten Views
Hinweis: Im PUF ist die Zuordnung zwischen Personen und Diagnosen zufällig. Daher wird hier kein COUNT DISTINCT für die PSIDs verwendet, sondern nur ein normaler COUNT, damit die Ergebnisse aussagekräftiger sind.
Rollup-Tabelle
Die Gruppierung mit mehreren ROLLUP-Bestandteilen liefert die aggregierten Zahlen auf allen Stufen der Hierarchie. Das erlaubt es uns nicht nur, im Statistikportal selbst keine Rechnungen mehr durchzuführen, sondern ermöglicht auch eine direkte Überprüfung der Re-Identifikationsmöglichkeiten durch Rückrechnungen.
Die Berechnung dieser Tabelle dauert (ohne Indizes) ca. 23 Minuten, mit DM3 steigt die Dauer auf 41 Minuten.
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."ROLLUP" AS
SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, ICD3, ICD2, ICD1, COUNT(d.PSID) AS CNT
FROM "StatPort"."PERS" p LEFT JOIN "StatPort"."DIAG" d ON (p.psid = d.psid AND p.JAHR = d.JAHR)
GROUP BY d.JAHR, ROLLUP(SEX), ROLLUP(AGE3, AGE10), ROLLUP(PLZ1, PLZ2), ROLLUP(ICD1, ICD2, ICD3);
"""
Mit den folgend definierten Indizes dauert die Berechnung ca. 21 Minuten.
Die Berechnung der Indizes selbst dauert ca. 2,5 Minuten. Lohnt sich also hierfür nicht wirklich...
sqlquery = """
CREATE INDEX idx_join_pers on "StatPort"."PERS" (JAHR, PSID);
CREATE INDEX idx_join_diag on "StatPort"."DIAG" (JAHR, PSID);
CREATE INDEX idx_icd1 on "StatPort"."DIAG" (ICD1);
CREATE INDEX idx_icd2 on "StatPort"."DIAG" (ICD1, ICD2);
CREATE INDEX idx_icd3 on "StatPort"."DIAG" (ICD1, ICD2, ICD3);
CREATE INDEX idx_age3 on "StatPort"."PERS" (AGE3);
CREATE INDEX idx_age10 on "StatPort"."PERS" (AGE3, AGE10);
"""
Die Basistabelle
Die folgende Tabelle ist die maximale Information, die im Statistikportal dargestellt werden kann. Sie enthält alle Zahlen für alle möglichen Hierarchiestufen. Nicht vorhandene Konstallationen (also Kombinationen aus PLZ, AGE, SEX und ICD mit 0 gezählten Patienten) sind nicht enthalten. (Laufzeit < 10 Sekunden)
Diese Tabelle ist der Ausgangspunkt für den Anonymisierungsalgorithmus, der sowohl die zu kleinen Fallzahlen selbst als auch alle Rückrechenmöglichkeiten entfernt.
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."ALGO_BASE" AS
SELECT
JAHR,
CASE
WHEN ICD3 IS NULL AND d.ICD2 IS NULL AND ICD1 IS NULL THEN 'any_diagnosis'
WHEN ICD3 is null and d.ICD2 IS NULL AND ICD1 IS NOT NULL then ICD1
-- WHEN ICD3 is null and d.ICD2 is NOT NULL then icd.ICD2
WHEN ICD3 is null and d.ICD2 IS NOT NULL then d.ICD2
WHEN ICD3 is not null then icd3
END as icd,
CASE
WHEN PLZ2 IS NULL AND PLZ1 is null then NULL
WHEN PLZ2 IS NULL then PLZ1
WHEN PLZ2 IS NOT NULL then PLZ2
END as PLZ,
CASE
WHEN AGE10 IS NULL AND AGE3 is NULL then NULL
WHEN AGE10 IS NULL then AGE3
WHEN AGE10 is not null then AGE10::text
END as age,
SEX,
CNT
from "StatPort"."ROLLUP" d
-- left join kat_icd_gruppen icd on (d.icd2 = icd.grvon)
WHERE JAHR IS NOT NULL;
"""