Generierung der Views für das Statistikportal

Dr. Harald Grohganz, Bundesinstitut für Arzneimittel und Medizinprodukte, harald.grohganz@bfarm.de

Dieses jupyter-Notebook beschreibt alle Aggregationsschritte, mit denen die Abrechnungsdaten der Krankenkassen (DaTraV-Daten) am Forschungsdatenzentrum Gesundheit für das Statistikportal aufbereitet werden.

Hier werden die View-Definitionen auf Basis des öffentlich verfügten Public Use File (PUF) beschrieben, damit auch Außenstehende die Schritte selbst nachvollziehen können. Wir haben dieses Skript für die Entwicklungsversion des Statistikportals verwendet. Aufgrund von einigen Abweichungen in den Datenbankstrukturen unterscheiden sich diese Views in etlichen Details von denen für den Produktiveinsatz, der Aufbau der maßgeblichen Tabellen ist jedoch identisch.

Hinweis: Dieses Skript beinhaltet nicht den R-Code für die Verhinderung von Rückrechenmöglichkeiten. Dieser Algorithmus wird ebenfalls in Kürze zur Verfügung gestellt.

Vorbereitungen

Speichere Datenmodelle 1-3 in eine Postgres-Datenbank "PUF" mit Schemata "DM1", "DM2" und "DM3". Achte auf die verschiedenen Namenskonventionen – Satzarten "SA" mit 3-stelligen Nummern für DM1+DM2, sprechende Namen ab DM3.

Teil 1: Vorbereitung der PUF-Daten

Verbindung zur Datenbank aufbauen

In [1]:
# Database connection

from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from sqlalchemy import text

url = URL.create(
    drivername="postgresql",
    username="postgres",
    password="dev",
    host="localhost",
    port=5432,
    database="PUF"
)

engine = create_engine(url)
In [2]:
def sql(my_string):
    with engine.connect() as conn:
        result = conn.execute(text(my_string))
        return result
In [3]:
result = sql('SELECT SUBSTRING("SA651_ICD_CODE", 0, 4) icd3, COUNT(*) num FROM "DM2"."SA651" GROUP BY icd3 ORDER BY num DESC LIMIT 5')

result.fetchall()
Out[3]:
[('I10', 894103),
 ('M54', 630118),
 ('H52', 578401),
 ('E78', 561327),
 ('E11', 423228)]

ICD-Informationen ergänzen

Auf dem produktiven System gibt eine ICD-Datentabelle, die beim PUF nicht vorliegt. In der Datenbank wird daher eine stark reduzierte ICD-Nachschlagetabelle mit anderen Spaltennamen und abweichenden Datentypen angelegt. Die importierte json-Datei besteht aus einer Liste für jeden dreistelligen ICD-Code in der Form:

[{"Code":"B30","Gruppe":"B25-B34","Anzahl":6.0,"Key":"B30","Name":"Viruskonjunktivitis","Ebene":"3","Parent":"B25-B34","Gruppe_Name":"Sonstige Viruskrankheiten","Kapitel":"01","Kapitel_Name":"Bestimmte infektiöse und parasitäre Krankheiten"}]

Hinweis: Das Statistikportal verwendet für die Darstellung die jahresaktuellen ICD-Codes, die auf der Website des BfArM heruntergeladen werden können. Wenn Sie Interesse an der Nachimplemetierung haben, verwenden Sie die jahresaktuellen ICD-Codes und erstellen eine Nachschlagetabelle, welche die höheren ICD-Ebenen auf Basis des Tupels (icd3, berichtsjahr) bestimmt. So erhalten Sie eine genauere Zählung von Diagnosen auf den Ebenen ICD-Gruppen und Kapitel. Ein Beispiel hierfür finden Sie z.B. auf der Website der Visionsberatung.

In [ ]:
import pandas as pd

df_icd = pd.read_json("../website/icd10.json")
df_icd[['Kapitel']] = df_icd[['Kapitel']].astype(str)
df_icd['Kapitel'] = df_icd['Kapitel'].str.zfill(2)

with engine.connect() as conn:
    df_icd.to_sql("ICD_CODES", conn, schema="StatPort", if_exists='replace')

Diagnosen: Benennungen anpassen

Da die Spalten in den beiden Tabellen SA551 (stationäre Diagnosen) und SA651 (ambulante Diagnosen) durch die Prefixes verschieden heißen, müssen erst die Namen standardisiert werden. Dies realisieren wir über Views.

Wir nutzen die Gelegenheit, um direkt die dreistelligen ICD-Codes in diesen Tabellen zu hinterlegen.

In [22]:
sqlquery = """
CREATE OR REPLACE VIEW "StatPort"."DM1_SA651" AS
SELECT
	"SA651_BERICHTSJAHR" as BERICHTSJAHR,
    "SA651_AUSGLEICHSJAHR" as JAHR,
	"SA651_PSID" as PSID,
	"SA651_LEISTUNGSQUARTAL" as QUARTAL,
	"SA651_ICD_CODE" AS ICD_CODE,
    SUBSTR("SA651_ICD_CODE",0,4) AS ICD3,
	"SA651_QUALIFIZIERUNG" AS QUALIFIZIERUNG
FROM "DM1"."SA651";

CREATE OR REPLACE VIEW "StatPort"."DM2_SA651" AS
SELECT
	"SA651_BERICHTSJAHR" as BERICHTSJAHR,
	"SA651_PSID" as PSID,
	"SA651_LEISTUNGSQUARTAL" as QUARTAL,
	"SA651_ICD_CODE" AS ICD_CODE,
    SUBSTR("SA651_ICD_CODE",0,4) AS ICD3,
	"SA651_QUALIFIZIERUNG" AS QUALIFIZIERUNG
FROM "DM2"."SA651";

CREATE OR REPLACE VIEW "StatPort"."DM1_SA551" AS
SELECT
	"SA551_BERICHTSJAHR" as BERICHTSJAHR,
    "SA551_AUSGLEICHSJAHR" as JAHR,
	"SA551_PSID" as PSID,
	"SA551_ENTLASSUNGSMONAT" as MONAT,
	"SA551_ICD_CODE" AS ICD_CODE,
    SUBSTR("SA551_ICD_CODE",0,4) AS ICD3,
	"SA551_ARTDIAGNOSE" AS ARTDIAGNOSE
FROM "DM1"."SA551";

CREATE OR REPLACE VIEW "StatPort"."DM2_SA551" AS
SELECT
	"SA551_BERICHTSJAHR" as BERICHTSJAHR,
	"SA551_PSID" as PSID,
	"SA551_ENTLASSUNGSMONAT" as MONAT,
	"SA551_ICD_CODE" AS ICD_CODE,
    SUBSTR("SA551_ICD_CODE",0,4) AS ICD3,
	"SA551_ARTDIAGNOSE" AS ARTDIAGNOSE
FROM "DM2"."SA551";


-- Datenmodell 3

CREATE OR REPLACE VIEW "StatPort"."DM3_AMBDIAG" AS
SELECT
	"BJAHR" as BERICHTSJAHR,
	"PSID" as PSID,
	"ICDAMB_CODE" AS ICD_CODE,
    SUBSTR("ICDAMB_CODE",0,4) AS ICD3,
	"DIAGSICH" AS QUALIFIZIERUNG
FROM "DM3"."AMBDIAG";

CREATE OR REPLACE VIEW "StatPort"."DM3_KHDIAG" AS
SELECT
	"BJAHR" as BERICHTSJAHR,
	"PSID" as PSID,
	"ICDKH_CODE" AS ICD_CODE,
    SUBSTR("ICDKH_CODE",0,4) AS ICD3,
	"DIAGART" AS DIAGART
FROM "DM3"."KHDIAG";

"""
In [18]:
sql(sqlquery) # Das bringt nichts - sieht so aus, als ob mehrere Anweisungen nicht unterstützt würden. Oder postgres-Schemaänderungen klappen nicht so.
Out[18]:
<sqlalchemy.engine.cursor.CursorResult at 0x20a363d4360>
In [23]:
sql('SELECT * FROM "StatPort"."DM2_SA651" LIMIT 10;').fetchall()
Out[23]:
[(2017, 'SRxD4O6OG1QVL29owyz', 2, 'M5499', 'M54', 'G'),
 (2017, 's5e8SQSvmlaZoNFIJ7z', 2, 'N921', 'N92', 'G'),
 (2017, 'w0JFFV4OK44oyfdj9Tv', 1, 'E049', 'E04', 'Z'),
 (2017, 'cYvV6aUSgNKKLX6Qh4T', 3, 'H609', 'H60', 'G'),
 (2017, 'gJprOh61Kfvk77wv1eI', 3, 'F480', 'F48', 'G'),
 (2017, 'AP3Yak42cVfqTLkwS7e', 3, 'E041', 'E04', 'G'),
 (2017, 'r6BAJU6JXIkB1m4Ev7T', 3, 'H524', 'H52', 'G'),
 (2017, '2jnlqxgQri5bk64nOnb', 1, 'K4091', 'K40', 'G'),
 (2017, 'afdB1S2wHzHeho9oTwL', 3, 'K510', 'K51', 'G'),
 (2017, '98RhjoOUdg9AwQjIz2G', 4, 'F334', 'F33', 'G')]

Stammdaten: Anpassen und für Statistikportal aufbereiten

Alters- und Ortsinformationen vorbereiten

In [ ]:
sqlquery = """
CREATE OR REPLACE VIEW "StatPort"."DM1_SA151" AS
SELECT
	"SA151_BERICHTSJAHR" as BERICHTSJAHR,
	"SA151_PSID" as PSID,
    "SA151_GEBURTSJAHR" as GEBURTSJAHR,
    "SA151_GESCHLECHT" as GESCHLECHT
FROM "DM1"."SA151";

CREATE OR REPLACE VIEW "StatPort"."DM2_SA151" AS
SELECT
	"SA151_BERICHTSJAHR" as BERICHTSJAHR,
	"SA151_PSID" as PSID,
    "SA151_GEBURTSJAHR" as GEBURTSJAHR,
    "SA151_GESCHLECHT" as GESCHLECHT
FROM "DM2"."SA151";

CREATE OR REPLACE VIEW "StatPort"."DM2_SA131" AS
SELECT
	"SA131_BERICHTSJAHR" as BERICHTSJAHR,
	"SA131_PSID" as PSID,
    "SA131_PLZ" as PLZ
FROM "DM2"."SA131";


-- DM3

CREATE OR REPLACE VIEW "StatPort"."DM3_VERSQ" AS
SELECT
	"BJAHR" as BERICHTSJAHR,
	"PSID" as PSID,
    "GESCHLECHT" as GESCHLECHT
FROM "DM3"."VERSQ";

CREATE OR REPLACE VIEW "StatPort"."DM3_VERS" AS
SELECT
	"BJAHR" as BERICHTSJAHR,
	"PSID" as PSID,
    "GEBJAHR" as GEBURTSJAHR,
    "PLZ" as PLZ
FROM "DM3"."VERS";

"""
In [25]:
sql('SELECT * FROM "StatPort"."DM2_SA151" LIMIT 5;').fetchall()
Out[25]:
[(2017, 'd1fVfChqaNBlZDMzlxJ', 1970, 2),
 (2017, 'Q24mxraOPu84IAIcqi5', 1955, 1),
 (2017, 's1b58lDKkLIUZM290C4', 1977, 2),
 (2017, 'mwARKhxc0EaJO3mAqNG', 1981, 2),
 (2017, 'hJ2ALcF0Ez2IhZKqP41', 1986, 1)]

Erzeugung der Basis-Tabellen DIAG und PERS

Die Materialized View funktioniert wie CREATE TABLE AS, behält aber den Code für die Erzeugung gespeichert und kann daher innerhalb der Datenbank aktualisiert werden.

Diagnosetabelle DIAG

Die Tabelle DIAG für die Datenmodelle 1 und 2 wird mittels der Hilfsviews von weiter oben generiert.

Die Erzeugung hier benötigt grob 30 Sekunden.

Update: Mit den Tabellen aus DM3 sind es dann 1:30 min.

In [ ]:
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."DIAG"
AS
SELECT 
    diag.JAHR,
    diag.psid AS PSID,
    diag.icd3 AS ICD3,
    icd."Gruppe" AS ICD2,
    icd."Kapitel" AS ICD1
FROM ( SELECT "DM1_SA551".jahr AS JAHR,
            "DM1_SA551".psid,
            "DM1_SA551".icd3
           FROM "StatPort"."DM1_SA551"
        UNION ALL
         SELECT "DM2_SA551".berichtsjahr AS JAHR,
            "DM2_SA551".psid,
            "DM2_SA551".icd3
           FROM "StatPort"."DM2_SA551"
        UNION ALL
         SELECT "DM1_SA651".jahr AS JAHR,
            "DM1_SA651".psid,
            "DM1_SA651".icd3
           FROM "StatPort"."DM1_SA651"
        UNION ALL
         SELECT "DM2_SA651".berichtsjahr AS JAHR,
            "DM2_SA651".psid,
            "DM2_SA651".icd3
           FROM "StatPort"."DM2_SA651"
        UNION ALL
         SELECT "DM3_KHDIAG".berichtsjahr AS JAHR,
            "DM3_KHDIAG".psid,
            "DM3_KHDIAG".icd3
           FROM "StatPort"."DM3_KHDIAG"
        UNION ALL
         SELECT "DM3_AMBDIAG".berichtsjahr AS JAHR,
            "DM3_AMBDIAG".psid,
            "DM3_AMBDIAG".icd3
           FROM "StatPort"."DM3_AMBDIAG"
         ) diag
    JOIN "StatPort"."ICD_CODES" icd ON diag.icd3 = icd."Code";
"""

Stammdatentabelle

Da im PUF die Tabelle SA131 fehlt, werden die Postleitzahlen zufällig generiert. Update 03.02.2025: Mittlerweile liegt die Tabelle SA131 vor, der untenstehende Code wurde angepasst.*

Achtung: Wegen der zufälligen Anordnung der Spalten kann es sein, dass Krankenkassenwechsel auf einmal mehrere Geburtsdaten haben. Per Konvention wird in solchen Fällen jetzt immer die größte Zahl genommen. Dasselbe gilt für den Geschlechtswert ab DM3, der hier quartalsweise geliefert wird - hier wird zufällig einer der vorkommenden Werte (postgresql: any_value) verwendet.

Laufzeit ca. 10 Sekunden (mit DM3: 13 Sekunden)

Zu Dokumentationszwecken:

JOIN (
    SELECT 
        PSID,
        BERICHTSJAHR,
        CASE WHEN BERICHTSJAHR > 2015 THEN floor(random() * 99999 + 1)::int
            ELSE NULL::int
        END AS PLZ 
    FROM (SELECT * FROM "StatPort"."DM1_SA151" UNION ALL SELECT * FROM "StatPort"."DM2_SA151")
) sa131

In [ ]:
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."PERS" 
AS
SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
FROM (
    SELECT
        sa151.PSID as PSID,
        sa151.BERICHTSJAHR as JAHR,
        SUBSTR(LPAD(MAX(sa131.PLZ)::text, 5, '0'), 1, 2) AS PLZ2, 
        SUBSTR(LPAD(MAX(sa131.PLZ)::text, 5, '0'), 1, 1) AS PLZ1, 
        any_value(sa151.GESCHLECHT) AS SEX, 
        LEAST(80, FLOOR(MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR)/10)*10) AS AGE10, 
        CASE WHEN MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR) < 20 THEN '0-19'
			WHEN MAX(sa151.BERICHTSJAHR - sa151.GEBURTSJAHR) >= 60 THEN '60+'
			ELSE '20-59'
		END AS AGE3 
    FROM (SELECT * FROM "StatPort"."DM1_SA151" UNION ALL SELECT * FROM "StatPort"."DM2_SA151") sa151
	LEFT JOIN "StatPort"."DM2_SA131" sa131
	ON (sa151.PSID = sa131.PSID AND sa151.BERICHTSJAHR = sa131.BERICHTSJAHR)
	GROUP BY sa151.PSID, sa151.BERICHTSJAHR
  UNION ALL
    SELECT
        vers.PSID as PSID,
        vers.BERICHTSJAHR as JAHR,
        SUBSTR(LPAD(MAX(vers.PLZ)::text, 5, '0'), 1, 2) AS PLZ2, 
        SUBSTR(LPAD(MAX(vers.PLZ)::text, 5, '0'), 1, 1) AS PLZ1, 
        any_value(versq.GESCHLECHT) AS SEX, 
        LEAST(80, FLOOR(MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR)/10)*10) AS AGE10, 
        CASE WHEN MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR) < 20 THEN '0-19'
			WHEN MAX(vers.BERICHTSJAHR - vers.GEBURTSJAHR) >= 60 THEN '60+'
			ELSE '20-59'
		END AS AGE3 
    FROM "StatPort"."DM3_VERS" vers
	LEFT JOIN "StatPort"."DM3_VERSQ" versq
	ON (vers.PSID = versq.PSID AND vers.BERICHTSJAHR = versq.BERICHTSJAHR)
	GROUP BY vers.PSID, vers.BERICHTSJAHR
) 
GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
"""

Erstellung der aggregierten Views

Hinweis: Im PUF ist die Zuordnung zwischen Personen und Diagnosen zufällig. Daher wird hier kein COUNT DISTINCT für die PSIDs verwendet, sondern nur ein normaler COUNT, damit die Ergebnisse aussagekräftiger sind.

Rollup-Tabelle

Die Gruppierung mit mehreren ROLLUP-Bestandteilen liefert die aggregierten Zahlen auf allen Stufen der Hierarchie. Das erlaubt es uns nicht nur, im Statistikportal selbst keine Rechnungen mehr durchzuführen, sondern ermöglicht auch eine direkte Überprüfung der Re-Identifikationsmöglichkeiten durch Rückrechnungen.

Die Berechnung dieser Tabelle dauert (ohne Indizes) ca. 23 Minuten, mit DM3 steigt die Dauer auf 41 Minuten.

In [ ]:
sqlquery = """
CREATE MATERIALIZED VIEW "StatPort"."ROLLUP" AS
SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, ICD3, ICD2, ICD1, COUNT(d.PSID) AS CNT
FROM "StatPort"."PERS" p LEFT JOIN "StatPort"."DIAG" d ON (p.psid = d.psid AND p.JAHR = d.JAHR)
GROUP BY d.JAHR, ROLLUP(SEX), ROLLUP(AGE3, AGE10), ROLLUP(PLZ1, PLZ2), ROLLUP(ICD1, ICD2, ICD3);
"""

Mit den folgend definierten Indizes dauert die Berechnung ca. 21 Minuten.

Die Berechnung der Indizes selbst dauert ca. 2,5 Minuten. Lohnt sich also hierfür nicht wirklich...

In [ ]:
sqlquery = """
CREATE INDEX idx_join_pers on "StatPort"."PERS" (JAHR, PSID); 
CREATE INDEX idx_join_diag on "StatPort"."DIAG" (JAHR, PSID);

CREATE INDEX idx_icd1 on "StatPort"."DIAG" (ICD1);
CREATE INDEX idx_icd2 on "StatPort"."DIAG" (ICD1, ICD2);
CREATE INDEX idx_icd3 on "StatPort"."DIAG" (ICD1, ICD2, ICD3);
CREATE INDEX idx_age3 on "StatPort"."PERS" (AGE3);
CREATE INDEX idx_age10 on "StatPort"."PERS" (AGE3, AGE10);
"""

Die Basistabelle

Die folgende Tabelle ist die maximale Information, die im Statistikportal dargestellt werden kann. Sie enthält alle Zahlen für alle möglichen Hierarchiestufen. Nicht vorhandene Konstallationen (also Kombinationen aus PLZ, AGE, SEX und ICD mit 0 gezählten Patienten) sind nicht enthalten. (Laufzeit < 10 Sekunden)

Diese Tabelle ist der Ausgangspunkt für den Anonymisierungsalgorithmus, der sowohl die zu kleinen Fallzahlen selbst als auch alle Rückrechenmöglichkeiten entfernt.

In [ ]:
sqlquery = """
    CREATE MATERIALIZED VIEW "StatPort"."ALGO_BASE" AS
    SELECT
        JAHR,
        CASE
            WHEN ICD3 IS NULL AND d.ICD2 IS NULL AND ICD1 IS NULL THEN 'any_diagnosis'
            WHEN ICD3 is null and d.ICD2 IS NULL AND ICD1 IS NOT NULL then ICD1
            -- WHEN ICD3 is null and d.ICD2 is NOT NULL then icd.ICD2
            WHEN ICD3 is null and d.ICD2 IS NOT NULL then d.ICD2
            WHEN ICD3 is not null then icd3
        END as icd,
        CASE
            WHEN PLZ2 IS NULL AND PLZ1 is null then NULL
            WHEN PLZ2 IS NULL then PLZ1
            WHEN PLZ2 IS NOT NULL then PLZ2
        END as PLZ,
        CASE
            WHEN AGE10 IS NULL AND AGE3 is NULL then NULL
            WHEN AGE10 IS NULL then AGE3
            WHEN AGE10 is not null then AGE10::text
        END as age,
        SEX,
        CNT
    from "StatPort"."ROLLUP" d
        -- left join kat_icd_gruppen icd on (d.icd2 = icd.grvon)
    WHERE JAHR IS NOT NULL;
"""