Skript zur Anonymisierung für das Statistikportal

des Forschungsdatenzentrums Gesundheit
beim Bundesinstitut für Arzneimittel und Medizinprodukte

main.R

# main.R

# Pakete ------------------------------------------------------------------

library(here)
library(glue)
library(tidyverse)
library(data.table)


# Datenbankverbindung -----------------------------------------------------

# con = Database Connection


# Einstellungen -----------------------------------------------------------

sample_size_psid = 95 # Should be 95 for a real run
sample_size_diag = "" # Muss entweder ein "SAMPLE(1-99)" für Testen oder "" für echten durchlauf sein


# SQL Ausführen -----------------------------------------------------------

send_formatted_sqls_to_db <- function(year, path_to_sql, con, sample_size_psid, sample_size_diag){
  readLines(path_to_sql) %>% 
    paste0(collapse = " ") %>% 
    glue() %>% 
    str_split(";") %>% 
    flatten_chr() -> sql_strings
  
  for(i in seq_along(sql_strings) ){
    print(i)
    print(sql_strings[[i]])
    dbSendUpdate(con, sql_strings[[i]])
  }
}

for(year in c("2016", "2017", "2018")){
  send_formatted_sqls_to_db(year, "dm2_create_count_tables.sql", con = con, sample_size_psid = sample_size_psid, sample_size_diag = sample_size_diag)
}

for(year in c("2019", "2020", "2021", "2022")){
  send_formatted_sqls_to_db(year, "dm3_create_count_tables.sql", con = con, sample_size_psid = sample_size_psid, sample_size_diag = sample_size_diag)
}


# Hierarchien einlesen ----------------------------------------------------

hier_icd = execResultQuery("select * from hier_icd", con = con) %>% as.data.table()
hier_plz = execResultQuery("select * from hier_plz", con = con) %>% as.data.table()
hier_age = execResultQuery("select * from hier_age", con = con) %>% as.data.table()
hier_sex = execResultQuery("select * from hier_sex", con = con) %>% as.data.table() %>% mutate(CURRENT_LEVEL = c("1", "2","3", "4", "all"))


# Baseline für den Algorithmus erstellen ----------------------------------

create_algo_base <- function(year, con, kat_icd_gruppen = NULL){
  tt_rollup <- execResultQuery(glue("select * from tt_rollup_{year}"), con = con) %>% as.data.table()
  tt_sd <- execResultQuery(glue("select * from tt_sd_{year}"), con = con) %>% as.data.table()
  
  tt_rollup %>% 
    mutate(
      ICD1 = if_else(is.na(ICD1), "any_diagnosis", ICD1)
    ) %>%
    bind_rows(tt_sd) %>% 
    filter(!is.na(JAHR), !(PLZ2 %in% c('43', '11', '05', '62') ) ) %>% 
    mutate(
      ICD = case_when(
        is.na(ICD3) & is.na(ICD2) & is.na(ICD1) ~ "all",
        is.na(ICD3) & is.na(ICD2) & !is.na(ICD1) ~ ICD1,
        is.na(ICD3) & !is.na(ICD2) & !is.na(ICD1) ~ ICD2,
        !is.na(ICD3) & !is.na(ICD2) & !is.na(ICD1) ~ ICD3
      ),
      PLZ = case_when(
        is.na(PLZ2) & is.na(PLZ1) ~ NA_character_,
        is.na(PLZ2) & !is.na(PLZ1) ~ PLZ1,
        !is.na(PLZ2) & !is.na(PLZ1) ~ PLZ2
      ),
      AGE = case_when(
        is.na(AGE10) & is.na(AGE3) ~ NA_character_,
        is.na(AGE10) & !is.na(AGE3) ~ AGE3,
        !is.na(AGE10) & !is.na(AGE3) ~ as.character(AGE10)
      )
    ) %>% 
    select(JAHR, ICD, PLZ, AGE, SEX, CNT)
}

kat_icd_gruppen <- execResultQuery("select * from kat_icd_gruppen", con = con)


algo_base_2016 = create_algo_base("2016", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2017 = create_algo_base("2017", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2018 = create_algo_base("2018", con = con, kat_icd_gruppen = kat_icd_gruppen)

algo_base_2019 = create_algo_base("2019", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2020 = create_algo_base("2020", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2021 = create_algo_base("2021", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2022 = create_algo_base("2022", con = con, kat_icd_gruppen = kat_icd_gruppen)

dbDisconnect(con)


# Ausschluss von zu geringen Fallzahlen / "Refinement" ---------------------

refine_all <- function(data){
  data %>% 
    mutate(SEX = as.character(SEX)) %>% 
    replace_na(list("AGE" = "all", "SEX" = "all", "PLZ" = "all", "CNT" = 0, ICD = "all")) %>% 
    
    right_join(hier_age, by = c("AGE" = "CURRENT_LEVEL"), suffix = c("", "_AGE")) %>% 
    
    right_join(hier_plz, by = c("PLZ" = "CURRENT_LEVEL"), suffix = c("", "_PLZ")) %>% 
    
    # Bei SEX sollen die 0er-Bins nicht explizit hinzugefügt werden (Sex 3 und 4 sind ausgeschlossen)
    left_join(hier_sex, by = c("SEX" = "CURRENT_LEVEL"), suffix = c("", "_SEX")) %>% 
    
    right_join(hier_icd, by = c("ICD" = "CURRENT_LEVEL"), suffix = c("", "_ICD")) %>% 
    
    group_by(HIGHER_LEVEL_AGE, HIGHER_LEVEL_PLZ, HIGHER_LEVEL_SEX, HIGHER_LEVEL_ICD) %>% 
    filter(
      all(CNT >= 10)  # Definition kleine Fallzahl mit BfDI 
    )
}

refine_16 <- refine_all(algo_base_2016)
refine_17 <- refine_all(algo_base_2017)
refine_18 <- refine_all(algo_base_2018)

refine_19 <- refine_all(algo_base_2019)
refine_20 <- refine_all(algo_base_2020)
refine_21 <- refine_all(algo_base_2021)
refine_22 <- refine_all(algo_base_2022)


# Ausschluss von Untergruppen, deren Parent fehlt / "Too much Refinement" --------

# In einer Altersgruppe wird ausdifferenziert, die wir nicht ausdifferenzieren wollen, weil in dieser Gruppe alle genug Fallzahlen haben.
# Eine der Gruppen auf einer Ebene höher schmeißt aber die Gruppe drüber raus.

kill_too_much_refinement <- function(data){
  d <- data
  continue = TRUE
  while(continue == TRUE){
    d_ok <- d %>% 
      ungroup() %>% 
      
      # AGE
      group_by(ICD, PLZ, SEX) %>% 
      mutate(
        ok = case_when(
          HIGHER_LEVEL_AGE %in% c(AGE, "all+") ~ "TRUE",
          TRUE ~ HIGHER_LEVEL_AGE
        )
      ) %>% 
      
      # SEX
      group_by(ICD, PLZ, AGE) %>%
      mutate(
        ok = case_when(
          ok != "TRUE" ~ ok,
          HIGHER_LEVEL_SEX %in% c(SEX, "all+") ~ "TRUE",
          TRUE ~ HIGHER_LEVEL_SEX
        )
      ) %>%
      
      # PLZ
      group_by(ICD, AGE, SEX) %>%
      mutate(
        ok = case_when(
          ok != "TRUE" ~ ok,
          HIGHER_LEVEL_PLZ %in% c(PLZ, "all+") ~ "TRUE",
          TRUE ~ HIGHER_LEVEL_PLZ
        )
      ) %>%
      
      # ICD
      group_by(AGE, PLZ, SEX) %>%
      mutate(
        ok = case_when(
          ok != "TRUE" ~ ok,
          HIGHER_LEVEL_ICD %in% c(ICD, "all") ~ "TRUE",
          TRUE ~ HIGHER_LEVEL_ICD
        )
      ) %>%
      ungroup() 
    
    d <- d_ok %>% filter(ok == "TRUE")
    print(c(nrow(d), "rows with no problem"))
    print(c(nrow(d_ok), "rows total"))
    continue = if_else(nrow(d) == nrow(d_ok), FALSE, TRUE)
  }
  return(d)
}

ok_2016 <- kill_too_much_refinement(refine_16)
ok_2017 <- kill_too_much_refinement(refine_17)
ok_2018 <- kill_too_much_refinement(refine_18)

ok_2019 <- kill_too_much_refinement(refine_19)
ok_2020 <- kill_too_much_refinement(refine_20)
ok_2021 <- kill_too_much_refinement(refine_21)
ok_2022 <- kill_too_much_refinement(refine_22)


# Export ------------------------------------------------------------------

export_statport_to_csv <- function(data){
  jahr <- data %>% select(JAHR) %>% distinct() %>% pull()
  data %>%
    ungroup() %>%
    select(JAHR, ICD, PLZ, AGE, SEX, CNT) %>%
    write_csv(glue("StatPort_{jahr}.csv") )
}

list(ok_2016, ok_2017, ok_2018, ok_2019, ok_2020, ok_2021, ok_2022) %>% 
  walk(export_statport_to_csv)
    

dm2_create_count_tables.sql

CALL drop_if_exists('ttmg{year}_psid_sample');

create table ttmg{year}_psid_sample as
    (
    select SA151_PSID
    from datrav.vbj{year}sa151 SAMPLE({sample_size_psid})
);

create index idx_ttmg{year}_psid_sample on ttmg{year}_psid_sample (SA151_PSID);


CALL drop_if_exists('ttmg{year}_sa651');
create table ttmg{year}_sa651 as
  WITH 
    /* reduce variable and sample (while programming) */
    smaller_651 as 
        (select sa651_psid, sa651_qualifizierung, sa651_icd_code 
            from datrav.vbj{year}sa651  {sample_size_diag} ),
    /* filter to PSIDs in sample for publication        */
    psid_sample_table as (
        select * from smaller_651 
            where sa651_qualifizierung = 'G' 
                and sa651_psid is not null 
                and sa651_psid in (select SA151_PSID from ttmg{year}_psid_sample)
     )
     /* create counts for final table */
    SELECT      
        substr(sa651_icd_code, 1, 3) sa651_icd_code,  
        sa651_psid,
        count(*) as cnt_icd   
    FROM psid_sample_table  
        group by sa651_psid, substr(sa651_icd_code, 1, 3) ;

CALL drop_if_exists('ttmg{year}_sa551');

create table ttmg{year}_sa551 as
  WITH 
    /* reduce variable and sample (while programming) */
    smaller as 
        (select sa551_psid, sa551_icd_code 
            from datrav.vbj{year}sa551  {sample_size_diag} ),
    /* filter to PSIDs in sample for publication    */    
    psid_sample_table as (
        select * from smaller
            where sa551_psid is not null 
                and sa551_psid in (select SA151_PSID from ttmg{year}_psid_sample)
     )
     /* create counts for final table*/
    SELECT      
        sa551_psid, substr(sa551_icd_code,1,3) as sa551_icd_code , count(*) as cnt_icd 
    FROM psid_sample_table  
        group by sa551_psid, substr(sa551_icd_code,1,3) ;
        
/* create table that only uses the first occurence of any icd code*/
CALL drop_if_exists('tt{year}_sa_icd');
create table tt{year}_sa_icd as
    select 
        coalesce(sa651.sa651_icd_code, sa551.sa551_icd_code) as icd_code,
        coalesce(sa651.sa651_psid, sa551.sa551_psid) as psid
    from ttmg{year}_sa651 sa651
        full outer join ttmg{year}_sa551 sa551 on sa651.sa651_psid = sa551.sa551_psid 
                                                and sa651.sa651_icd_code = sa551.sa551_icd_code
;

/* format table so hierarchy is included */
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
create table T_STATPORT_DIAG_{year} as select
        diag.PSID,
        {year} as JAHR,
        substr(diag.ICD_CODE, 1,3) ICD3,
        icd.icd2 ICD2, /* new */ 
        icd.KAPNR ICD1
    FROM tt{year}_sa_icd diag 
        JOIN (statport.KAT_ICD_NEU /* new */
                ) icd 
            ON (substr(diag.ICD_CODE, 1, 3) = icd.icd3 and /* new */
                diag.jahr = icd.jahr); /* new */

/* free up space on db by deleting temporary tables */
CALL drop_if_exists('tt{year}_sa_icd');
CALL drop_if_exists('ttmg{year}_sa551');
CALL drop_if_exists('ttmg{year}_sa651');


CALL drop_if_exists('T_StatPort_Pers_{year}');


CREATE TABLE T_StatPort_Pers_{year} AS
    with 
        /* filter to psid sample */
        psid_sample_table_sa151 as (
        select * from DATRAV.VBJ{year}SA151 WHERE SA151_PSID in (select SA151_PSID from ttmg{year}_psid_sample)
        ),
        /* filter to psid sample */
        psid_sample_table_sa131 as (
        select * from DATRAV.V{year}SA131 WHERE SA131_PSID in (select SA151_PSID from ttmg{year}_psid_sample)
        ),
        /* Preprocessing: cut plz to 1 and 2 places, recode sex > 2 to random one, calculate age buckets */
        preprocessed_table as (  
        SELECT 
            SA151_PSID AS PSID, 
            SA151_BERICHTSJAHR AS JAHR, 
            substr(LPAD(SA131_PLZ, 5, '0'), 1, 2) AS PLZ2, 
            substr(LPAD(SA131_PLZ, 5, '0'), 1, 1) AS PLZ1, 
            case 
                when SA151_GESCHLECHT > 2 THEN ROUND(dbms_random.value(1, 2), 0)
                ELSE SA151_GESCHLECHT 
            END AS SEX, 
            LEAST(80, FLOOR((SA151_BERICHTSJAHR - SA151_GEBURTSJAHR)/10)*10) AS AGE10, 
            CASE WHEN (SA151_BERICHTSJAHR - SA151_GEBURTSJAHR) < 20 THEN '0-19'
                            WHEN (SA151_BERICHTSJAHR - SA151_GEBURTSJAHR) >= 60 THEN '60+'
                            ELSE '20-59'
                        END AS AGE3 
        FROM psid_sample_table_sa151 sa151 JOIN psid_sample_table_sa131 sa131 ON (SA151_PSID = SA131_PSID) )
    /* select final table, filtering to existing plz and removing duplicate rows */
    SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
    FROM preprocessed_table
        WHERE PLZ2 NOT IN ('05', '11', '43', '62')
        GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
 

     
create index idx_total_pers_{year} on T_Statport_pers_{year} (jahr, age3, age10, plz1, plz2, sex);

create index idx_total_diag_{year} on T_StatPort_Diag_{year} (ICD1, ICD2, ICD3, JAHR);   


CALL drop_if_exists('tt_rollup_{year}');
create table tt_rollup_{year} as
    SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, icd3, icd2, icd1, COUNT( DISTINCT d.PSID) AS CNT
    FROM T_StatPort_Pers_{year} p 
        LEFT JOIN STATPORT.t_statport_diag_{year} d 
        ON (p.psid = d.psid AND p.JAHR = d.JAHR)
    GROUP BY d.JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX), ROLLUP(ICD1, ICD2, ICD3);


CALL drop_if_exists('tt_sd_{year}');
create table tt_sd_{year} as
    SELECT JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, COUNT( DISTINCT PSID) AS CNT
    FROM T_StatPort_Pers_{year} 
    GROUP BY JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX);
    
/*Remove preliminary tables*/    
CALL drop_if_exists('ttmg{year}_psid_sample');
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
CALL drop_if_exists('T_StatPort_Pers_{year}')
    

dm3_create_count_tables.sql

CALL drop_if_exists('ttmg{year}_psid_sample');

create table ttmg{year}_psid_sample as
    (
    select PSID
    from datrav.vbj{year}vers SAMPLE({sample_size_psid})
);

create index idx_ttmg{year}_psid_sample on ttmg{year}_psid_sample (PSID);


CALL drop_if_exists('ttmg{year}_stat');
create table ttmg{year}_stat as
  WITH 
 /*KHFALL und KHDIAG müssen gejoined werden, um die PSID zu erhalten*/ 
 /* Sample is to reduce load while programming */
            khfall as (
                SELECT fall.*, l.kassensitzik
                FROM DATRAV.VBJ{year}KHFALL {sample_size_diag} fall  INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
                ON fall.IMPFILE_ID = l.IMPFILE_ID    
                WHERE 
                   PSID is not null and PSID in (SELECT PSID FROM ttmg{year}_psid_sample)
            ),
            khdiag as (
                SELECT diag.*, l.kassensitzik
                FROM DATRAV.VBJ{year}KHDIAG {sample_size_diag} diag  INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
                ON diag.IMPFILE_ID = l.IMPFILE_ID
            ),
            kh as (
                SELECT khfall.PSID, khfall.FALLIDKH, khdiag.ICDKH
                FROM khfall INNER JOIN khdiag
                ON khdiag.FALLIDKH = khfall.FALLIDKH AND khdiag.kassensitzik = khfall.kassensitzik
            )
      SELECT
                SUBSTR(ICDKH, 1, 3) as icd_code,
                PSID
            FROM 
                kh
            GROUP BY
                PSID, SUBSTR(ICDKH, 1, 3)  
                ;
    
CALL drop_if_exists('ttmg{year}_amb');
create table ttmg{year}_amb as
  WITH 
            ambfall AS (
                SELECT fall.*, l.kassensitzik
                FROM DATRAV.VBJ{year}AMBFALL {sample_size_diag} fall  INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
                ON fall.IMPFILE_ID = l.IMPFILE_ID
                 WHERE 
                   PSID is not null and PSID in (SELECT PSID FROM ttmg{year}_psid_sample)
            ),
            ambdiag AS (
                SELECT diag.*, l.kassensitzik
                FROM DATRAV.VBJ{year}AMBDIAG {sample_size_diag} diag  INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
                ON diag.IMPFILE_ID = l.IMPFILE_ID
                 WHERE 
                   DIAGSICH = 'G'
            ),
            amb as (
                SELECT ambfall.PSID, ambfall.FALLIDAMB, ambdiag.ICDAMB
                FROM ambfall INNER JOIN ambdiag
                ON ambdiag.FALLIDAMB = ambfall.FALLIDAMB AND ambdiag.kassensitzik = ambfall.kassensitzik
                )
                 
                select 
                    substr(ICDAMB, 1, 3) icd_code,
                    PSID
                from
                    amb
                group by
                    PSID, substr(ICDAMB, 1, 3);
        
/* create table that only uses the first occurence of any icd code*/
CALL drop_if_exists('tt{year}_sa_icd');
create table tt{year}_sa_icd as
    select 
        {year} as JAHR
        coalesce(stat.icd_code, amb.icd_code) as icd_code,
        coalesce(stat.psid, amb.psid) as psid
    from ttmg{year}_stat stat
        full outer join ttmg{year}_amb amb on stat.psid = amb.psid 
                                                and stat.icd_code = amb.icd_code
;

/* format table so hierarchy is included */
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
create table T_STATPORT_DIAG_{year} as select
        diag.PSID,
        {year} as JAHR,
        substr(diag.ICD_CODE, 1,3) ICD3,
        icd.GRVON ICD2,
        icd.KAPNR ICD1
    FROM tt{year}_sa_icd diag 
        JOIN (statport.KAT_ICD_NEU
                ) icd 
            ON (substr(diag.ICD_CODE, 1, 3) = icd.icd3 and
            diag.jahr = icd.jahr);

/* free up space on db by deleting temporary tables */
CALL drop_if_exists('tt{year}_sa_icd');
CALL drop_if_exists('ttmg{year}_stat');
CALL drop_if_exists('ttmg{year}_amb');


CALL drop_if_exists('T_StatPort_Pers_{year}');


CREATE TABLE T_StatPort_Pers_{year} AS
    with 
        /* filter to psid sample */
        psid_sample_table_vers as (
        select * from DATRAV.VBJ{year}vers WHERE PSID in (select PSID from ttmg{year}_psid_sample)
        ),
        /* filter to psid sample */
        /* we are grouping by SEX -> Possibility to have 2 rows for 1 person changing their SEX, e.g. from 1 to 2*/
        psid_sample_table_versq as (
        select PSID, GESCHLECHT FROM datrav.vbj{year}versq where PSID in (select PSID from ttmg{year}_psid_sample) group by PSID, GESCHLECHT
        ),
        /* Preprocessing: cut plz to 1 and 2 places, recode sex > 2 to random one, calculate age buckets */
        preprocessed_table as (  
        SELECT 
            vers.PSID AS PSID, 
            {year} AS JAHR, 
            substr(LPAD(PLZ, 5, '0'), 1, 2) AS PLZ2, 
            substr(LPAD(PLZ, 5, '0'), 1, 1) AS PLZ1, 
            case 
                when GESCHLECHT > 2 THEN ROUND(dbms_random.value(1, 2), 0)
                ELSE GESCHLECHT 
            END AS SEX, 
            LEAST(80, FLOOR(({year} - GEBJAHR)/10)*10) AS AGE10, 
            CASE WHEN ({year} - GEBJAHR) < 20 THEN '0-19'
                            WHEN {year} - GEBJAHR >= 60 THEN '60+'
                            ELSE '20-59'
                        END AS AGE3 
        FROM psid_sample_table_vers vers JOIN psid_sample_table_versq versq ON (vers.PSID = versq.PSID) )
    /* select final table, filtering to existing plz and removing duplicate rows */
    SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
    FROM preprocessed_table
        WHERE PLZ2 NOT IN ('05', '11', '43', '62')
        GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
 

     
create index idx_total_pers_{year} on T_Statport_pers_{year} (jahr, age3, age10, plz1, plz2, sex);

create index idx_total_diag_{year} on T_StatPort_Diag_{year} (ICD1, ICD2, ICD3, JAHR);   


CALL drop_if_exists('tt_rollup_{year}');
create table tt_rollup_{year} as
    SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, icd3, icd2, icd1, COUNT( DISTINCT d.PSID) AS CNT
    FROM T_StatPort_Pers_{year} p 
        LEFT JOIN STATPORT.t_statport_diag_{year} d 
        ON (p.psid = d.psid AND p.JAHR = d.JAHR)
    GROUP BY d.JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX), ROLLUP(ICD1, ICD2, ICD3);


CALL drop_if_exists('tt_sd_{year}');
create table tt_sd_{year} as
    SELECT JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, COUNT( DISTINCT PSID) AS CNT
    FROM T_StatPort_Pers_{year} 
    GROUP BY JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX);

/*Remove preliminary tables*/    
CALL drop_if_exists('ttmg{year}_psid_sample');
CALL drop_if_exists('T_StatPort_Pers_{year}');
CALL drop_if_exists('T_STATPORT_DIAG_{year}')