Skript zur Anonymisierung für das Statistikportal
main.R
# main.R
# Pakete ------------------------------------------------------------------
library(here)
library(glue)
library(tidyverse)
library(data.table)
# Datenbankverbindung -----------------------------------------------------
# con = Database Connection
# Einstellungen -----------------------------------------------------------
sample_size_psid = 95 # Should be 95 for a real run
sample_size_diag = "" # Muss entweder ein "SAMPLE(1-99)" für Testen oder "" für echten durchlauf sein
# SQL Ausführen -----------------------------------------------------------
send_formatted_sqls_to_db <- function(year, path_to_sql, con, sample_size_psid, sample_size_diag){
readLines(path_to_sql) %>%
paste0(collapse = " ") %>%
glue() %>%
str_split(";") %>%
flatten_chr() -> sql_strings
for(i in seq_along(sql_strings) ){
print(i)
print(sql_strings[[i]])
dbSendUpdate(con, sql_strings[[i]])
}
}
for(year in c("2016", "2017", "2018")){
send_formatted_sqls_to_db(year, "dm2_create_count_tables.sql", con = con, sample_size_psid = sample_size_psid, sample_size_diag = sample_size_diag)
}
for(year in c("2019", "2020", "2021", "2022")){
send_formatted_sqls_to_db(year, "dm3_create_count_tables.sql", con = con, sample_size_psid = sample_size_psid, sample_size_diag = sample_size_diag)
}
# Hierarchien einlesen ----------------------------------------------------
hier_icd = execResultQuery("select * from hier_icd", con = con) %>% as.data.table()
hier_plz = execResultQuery("select * from hier_plz", con = con) %>% as.data.table()
hier_age = execResultQuery("select * from hier_age", con = con) %>% as.data.table()
hier_sex = execResultQuery("select * from hier_sex", con = con) %>% as.data.table() %>% mutate(CURRENT_LEVEL = c("1", "2","3", "4", "all"))
# Baseline für den Algorithmus erstellen ----------------------------------
create_algo_base <- function(year, con, kat_icd_gruppen = NULL){
tt_rollup <- execResultQuery(glue("select * from tt_rollup_{year}"), con = con) %>% as.data.table()
tt_sd <- execResultQuery(glue("select * from tt_sd_{year}"), con = con) %>% as.data.table()
tt_rollup %>%
mutate(
ICD1 = if_else(is.na(ICD1), "any_diagnosis", ICD1)
) %>%
bind_rows(tt_sd) %>%
filter(!is.na(JAHR), !(PLZ2 %in% c('43', '11', '05', '62') ) ) %>%
mutate(
ICD = case_when(
is.na(ICD3) & is.na(ICD2) & is.na(ICD1) ~ "all",
is.na(ICD3) & is.na(ICD2) & !is.na(ICD1) ~ ICD1,
is.na(ICD3) & !is.na(ICD2) & !is.na(ICD1) ~ ICD2,
!is.na(ICD3) & !is.na(ICD2) & !is.na(ICD1) ~ ICD3
),
PLZ = case_when(
is.na(PLZ2) & is.na(PLZ1) ~ NA_character_,
is.na(PLZ2) & !is.na(PLZ1) ~ PLZ1,
!is.na(PLZ2) & !is.na(PLZ1) ~ PLZ2
),
AGE = case_when(
is.na(AGE10) & is.na(AGE3) ~ NA_character_,
is.na(AGE10) & !is.na(AGE3) ~ AGE3,
!is.na(AGE10) & !is.na(AGE3) ~ as.character(AGE10)
)
) %>%
select(JAHR, ICD, PLZ, AGE, SEX, CNT)
}
kat_icd_gruppen <- execResultQuery("select * from kat_icd_gruppen", con = con)
algo_base_2016 = create_algo_base("2016", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2017 = create_algo_base("2017", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2018 = create_algo_base("2018", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2019 = create_algo_base("2019", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2020 = create_algo_base("2020", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2021 = create_algo_base("2021", con = con, kat_icd_gruppen = kat_icd_gruppen)
algo_base_2022 = create_algo_base("2022", con = con, kat_icd_gruppen = kat_icd_gruppen)
dbDisconnect(con)
# Ausschluss von zu geringen Fallzahlen / "Refinement" ---------------------
refine_all <- function(data){
data %>%
mutate(SEX = as.character(SEX)) %>%
replace_na(list("AGE" = "all", "SEX" = "all", "PLZ" = "all", "CNT" = 0, ICD = "all")) %>%
right_join(hier_age, by = c("AGE" = "CURRENT_LEVEL"), suffix = c("", "_AGE")) %>%
right_join(hier_plz, by = c("PLZ" = "CURRENT_LEVEL"), suffix = c("", "_PLZ")) %>%
# Bei SEX sollen die 0er-Bins nicht explizit hinzugefügt werden (Sex 3 und 4 sind ausgeschlossen)
left_join(hier_sex, by = c("SEX" = "CURRENT_LEVEL"), suffix = c("", "_SEX")) %>%
right_join(hier_icd, by = c("ICD" = "CURRENT_LEVEL"), suffix = c("", "_ICD")) %>%
group_by(HIGHER_LEVEL_AGE, HIGHER_LEVEL_PLZ, HIGHER_LEVEL_SEX, HIGHER_LEVEL_ICD) %>%
filter(
all(CNT >= 10) # Definition kleine Fallzahl mit BfDI
)
}
refine_16 <- refine_all(algo_base_2016)
refine_17 <- refine_all(algo_base_2017)
refine_18 <- refine_all(algo_base_2018)
refine_19 <- refine_all(algo_base_2019)
refine_20 <- refine_all(algo_base_2020)
refine_21 <- refine_all(algo_base_2021)
refine_22 <- refine_all(algo_base_2022)
# Ausschluss von Untergruppen, deren Parent fehlt / "Too much Refinement" --------
# In einer Altersgruppe wird ausdifferenziert, die wir nicht ausdifferenzieren wollen, weil in dieser Gruppe alle genug Fallzahlen haben.
# Eine der Gruppen auf einer Ebene höher schmeißt aber die Gruppe drüber raus.
kill_too_much_refinement <- function(data){
d <- data
continue = TRUE
while(continue == TRUE){
d_ok <- d %>%
ungroup() %>%
# AGE
group_by(ICD, PLZ, SEX) %>%
mutate(
ok = case_when(
HIGHER_LEVEL_AGE %in% c(AGE, "all+") ~ "TRUE",
TRUE ~ HIGHER_LEVEL_AGE
)
) %>%
# SEX
group_by(ICD, PLZ, AGE) %>%
mutate(
ok = case_when(
ok != "TRUE" ~ ok,
HIGHER_LEVEL_SEX %in% c(SEX, "all+") ~ "TRUE",
TRUE ~ HIGHER_LEVEL_SEX
)
) %>%
# PLZ
group_by(ICD, AGE, SEX) %>%
mutate(
ok = case_when(
ok != "TRUE" ~ ok,
HIGHER_LEVEL_PLZ %in% c(PLZ, "all+") ~ "TRUE",
TRUE ~ HIGHER_LEVEL_PLZ
)
) %>%
# ICD
group_by(AGE, PLZ, SEX) %>%
mutate(
ok = case_when(
ok != "TRUE" ~ ok,
HIGHER_LEVEL_ICD %in% c(ICD, "all") ~ "TRUE",
TRUE ~ HIGHER_LEVEL_ICD
)
) %>%
ungroup()
d <- d_ok %>% filter(ok == "TRUE")
print(c(nrow(d), "rows with no problem"))
print(c(nrow(d_ok), "rows total"))
continue = if_else(nrow(d) == nrow(d_ok), FALSE, TRUE)
}
return(d)
}
ok_2016 <- kill_too_much_refinement(refine_16)
ok_2017 <- kill_too_much_refinement(refine_17)
ok_2018 <- kill_too_much_refinement(refine_18)
ok_2019 <- kill_too_much_refinement(refine_19)
ok_2020 <- kill_too_much_refinement(refine_20)
ok_2021 <- kill_too_much_refinement(refine_21)
ok_2022 <- kill_too_much_refinement(refine_22)
# Export ------------------------------------------------------------------
export_statport_to_csv <- function(data){
jahr <- data %>% select(JAHR) %>% distinct() %>% pull()
data %>%
ungroup() %>%
select(JAHR, ICD, PLZ, AGE, SEX, CNT) %>%
write_csv(glue("StatPort_{jahr}.csv") )
}
list(ok_2016, ok_2017, ok_2018, ok_2019, ok_2020, ok_2021, ok_2022) %>%
walk(export_statport_to_csv)
dm2_create_count_tables.sql
CALL drop_if_exists('ttmg{year}_psid_sample');
create table ttmg{year}_psid_sample as
(
select SA151_PSID
from datrav.vbj{year}sa151 SAMPLE({sample_size_psid})
);
create index idx_ttmg{year}_psid_sample on ttmg{year}_psid_sample (SA151_PSID);
CALL drop_if_exists('ttmg{year}_sa651');
create table ttmg{year}_sa651 as
WITH
/* reduce variable and sample (while programming) */
smaller_651 as
(select sa651_psid, sa651_qualifizierung, sa651_icd_code
from datrav.vbj{year}sa651 {sample_size_diag} ),
/* filter to PSIDs in sample for publication */
psid_sample_table as (
select * from smaller_651
where sa651_qualifizierung = 'G'
and sa651_psid is not null
and sa651_psid in (select SA151_PSID from ttmg{year}_psid_sample)
)
/* create counts for final table */
SELECT
substr(sa651_icd_code, 1, 3) sa651_icd_code,
sa651_psid,
count(*) as cnt_icd
FROM psid_sample_table
group by sa651_psid, substr(sa651_icd_code, 1, 3) ;
CALL drop_if_exists('ttmg{year}_sa551');
create table ttmg{year}_sa551 as
WITH
/* reduce variable and sample (while programming) */
smaller as
(select sa551_psid, sa551_icd_code
from datrav.vbj{year}sa551 {sample_size_diag} ),
/* filter to PSIDs in sample for publication */
psid_sample_table as (
select * from smaller
where sa551_psid is not null
and sa551_psid in (select SA151_PSID from ttmg{year}_psid_sample)
)
/* create counts for final table*/
SELECT
sa551_psid, substr(sa551_icd_code,1,3) as sa551_icd_code , count(*) as cnt_icd
FROM psid_sample_table
group by sa551_psid, substr(sa551_icd_code,1,3) ;
/* create table that only uses the first occurence of any icd code*/
CALL drop_if_exists('tt{year}_sa_icd');
create table tt{year}_sa_icd as
select
coalesce(sa651.sa651_icd_code, sa551.sa551_icd_code) as icd_code,
coalesce(sa651.sa651_psid, sa551.sa551_psid) as psid
from ttmg{year}_sa651 sa651
full outer join ttmg{year}_sa551 sa551 on sa651.sa651_psid = sa551.sa551_psid
and sa651.sa651_icd_code = sa551.sa551_icd_code
;
/* format table so hierarchy is included */
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
create table T_STATPORT_DIAG_{year} as select
diag.PSID,
{year} as JAHR,
substr(diag.ICD_CODE, 1,3) ICD3,
icd.icd2 ICD2, /* new */
icd.KAPNR ICD1
FROM tt{year}_sa_icd diag
JOIN (statport.KAT_ICD_NEU /* new */
) icd
ON (substr(diag.ICD_CODE, 1, 3) = icd.icd3 and /* new */
diag.jahr = icd.jahr); /* new */
/* free up space on db by deleting temporary tables */
CALL drop_if_exists('tt{year}_sa_icd');
CALL drop_if_exists('ttmg{year}_sa551');
CALL drop_if_exists('ttmg{year}_sa651');
CALL drop_if_exists('T_StatPort_Pers_{year}');
CREATE TABLE T_StatPort_Pers_{year} AS
with
/* filter to psid sample */
psid_sample_table_sa151 as (
select * from DATRAV.VBJ{year}SA151 WHERE SA151_PSID in (select SA151_PSID from ttmg{year}_psid_sample)
),
/* filter to psid sample */
psid_sample_table_sa131 as (
select * from DATRAV.V{year}SA131 WHERE SA131_PSID in (select SA151_PSID from ttmg{year}_psid_sample)
),
/* Preprocessing: cut plz to 1 and 2 places, recode sex > 2 to random one, calculate age buckets */
preprocessed_table as (
SELECT
SA151_PSID AS PSID,
SA151_BERICHTSJAHR AS JAHR,
substr(LPAD(SA131_PLZ, 5, '0'), 1, 2) AS PLZ2,
substr(LPAD(SA131_PLZ, 5, '0'), 1, 1) AS PLZ1,
case
when SA151_GESCHLECHT > 2 THEN ROUND(dbms_random.value(1, 2), 0)
ELSE SA151_GESCHLECHT
END AS SEX,
LEAST(80, FLOOR((SA151_BERICHTSJAHR - SA151_GEBURTSJAHR)/10)*10) AS AGE10,
CASE WHEN (SA151_BERICHTSJAHR - SA151_GEBURTSJAHR) < 20 THEN '0-19'
WHEN (SA151_BERICHTSJAHR - SA151_GEBURTSJAHR) >= 60 THEN '60+'
ELSE '20-59'
END AS AGE3
FROM psid_sample_table_sa151 sa151 JOIN psid_sample_table_sa131 sa131 ON (SA151_PSID = SA131_PSID) )
/* select final table, filtering to existing plz and removing duplicate rows */
SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
FROM preprocessed_table
WHERE PLZ2 NOT IN ('05', '11', '43', '62')
GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
create index idx_total_pers_{year} on T_Statport_pers_{year} (jahr, age3, age10, plz1, plz2, sex);
create index idx_total_diag_{year} on T_StatPort_Diag_{year} (ICD1, ICD2, ICD3, JAHR);
CALL drop_if_exists('tt_rollup_{year}');
create table tt_rollup_{year} as
SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, icd3, icd2, icd1, COUNT( DISTINCT d.PSID) AS CNT
FROM T_StatPort_Pers_{year} p
LEFT JOIN STATPORT.t_statport_diag_{year} d
ON (p.psid = d.psid AND p.JAHR = d.JAHR)
GROUP BY d.JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX), ROLLUP(ICD1, ICD2, ICD3);
CALL drop_if_exists('tt_sd_{year}');
create table tt_sd_{year} as
SELECT JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, COUNT( DISTINCT PSID) AS CNT
FROM T_StatPort_Pers_{year}
GROUP BY JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX);
/*Remove preliminary tables*/
CALL drop_if_exists('ttmg{year}_psid_sample');
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
CALL drop_if_exists('T_StatPort_Pers_{year}')
dm3_create_count_tables.sql
CALL drop_if_exists('ttmg{year}_psid_sample');
create table ttmg{year}_psid_sample as
(
select PSID
from datrav.vbj{year}vers SAMPLE({sample_size_psid})
);
create index idx_ttmg{year}_psid_sample on ttmg{year}_psid_sample (PSID);
CALL drop_if_exists('ttmg{year}_stat');
create table ttmg{year}_stat as
WITH
/*KHFALL und KHDIAG müssen gejoined werden, um die PSID zu erhalten*/
/* Sample is to reduce load while programming */
khfall as (
SELECT fall.*, l.kassensitzik
FROM DATRAV.VBJ{year}KHFALL {sample_size_diag} fall INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
ON fall.IMPFILE_ID = l.IMPFILE_ID
WHERE
PSID is not null and PSID in (SELECT PSID FROM ttmg{year}_psid_sample)
),
khdiag as (
SELECT diag.*, l.kassensitzik
FROM DATRAV.VBJ{year}KHDIAG {sample_size_diag} diag INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
ON diag.IMPFILE_ID = l.IMPFILE_ID
),
kh as (
SELECT khfall.PSID, khfall.FALLIDKH, khdiag.ICDKH
FROM khfall INNER JOIN khdiag
ON khdiag.FALLIDKH = khfall.FALLIDKH AND khdiag.kassensitzik = khfall.kassensitzik
)
SELECT
SUBSTR(ICDKH, 1, 3) as icd_code,
PSID
FROM
kh
GROUP BY
PSID, SUBSTR(ICDKH, 1, 3)
;
CALL drop_if_exists('ttmg{year}_amb');
create table ttmg{year}_amb as
WITH
ambfall AS (
SELECT fall.*, l.kassensitzik
FROM DATRAV.VBJ{year}AMBFALL {sample_size_diag} fall INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
ON fall.IMPFILE_ID = l.IMPFILE_ID
WHERE
PSID is not null and PSID in (SELECT PSID FROM ttmg{year}_psid_sample)
),
ambdiag AS (
SELECT diag.*, l.kassensitzik
FROM DATRAV.VBJ{year}AMBDIAG {sample_size_diag} diag INNER JOIN DATRAV.VLOG_IMPORTEDFILES l
ON diag.IMPFILE_ID = l.IMPFILE_ID
WHERE
DIAGSICH = 'G'
),
amb as (
SELECT ambfall.PSID, ambfall.FALLIDAMB, ambdiag.ICDAMB
FROM ambfall INNER JOIN ambdiag
ON ambdiag.FALLIDAMB = ambfall.FALLIDAMB AND ambdiag.kassensitzik = ambfall.kassensitzik
)
select
substr(ICDAMB, 1, 3) icd_code,
PSID
from
amb
group by
PSID, substr(ICDAMB, 1, 3);
/* create table that only uses the first occurence of any icd code*/
CALL drop_if_exists('tt{year}_sa_icd');
create table tt{year}_sa_icd as
select
{year} as JAHR
coalesce(stat.icd_code, amb.icd_code) as icd_code,
coalesce(stat.psid, amb.psid) as psid
from ttmg{year}_stat stat
full outer join ttmg{year}_amb amb on stat.psid = amb.psid
and stat.icd_code = amb.icd_code
;
/* format table so hierarchy is included */
CALL drop_if_exists('T_STATPORT_DIAG_{year}');
create table T_STATPORT_DIAG_{year} as select
diag.PSID,
{year} as JAHR,
substr(diag.ICD_CODE, 1,3) ICD3,
icd.GRVON ICD2,
icd.KAPNR ICD1
FROM tt{year}_sa_icd diag
JOIN (statport.KAT_ICD_NEU
) icd
ON (substr(diag.ICD_CODE, 1, 3) = icd.icd3 and
diag.jahr = icd.jahr);
/* free up space on db by deleting temporary tables */
CALL drop_if_exists('tt{year}_sa_icd');
CALL drop_if_exists('ttmg{year}_stat');
CALL drop_if_exists('ttmg{year}_amb');
CALL drop_if_exists('T_StatPort_Pers_{year}');
CREATE TABLE T_StatPort_Pers_{year} AS
with
/* filter to psid sample */
psid_sample_table_vers as (
select * from DATRAV.VBJ{year}vers WHERE PSID in (select PSID from ttmg{year}_psid_sample)
),
/* filter to psid sample */
/* we are grouping by SEX -> Possibility to have 2 rows for 1 person changing their SEX, e.g. from 1 to 2*/
psid_sample_table_versq as (
select PSID, GESCHLECHT FROM datrav.vbj{year}versq where PSID in (select PSID from ttmg{year}_psid_sample) group by PSID, GESCHLECHT
),
/* Preprocessing: cut plz to 1 and 2 places, recode sex > 2 to random one, calculate age buckets */
preprocessed_table as (
SELECT
vers.PSID AS PSID,
{year} AS JAHR,
substr(LPAD(PLZ, 5, '0'), 1, 2) AS PLZ2,
substr(LPAD(PLZ, 5, '0'), 1, 1) AS PLZ1,
case
when GESCHLECHT > 2 THEN ROUND(dbms_random.value(1, 2), 0)
ELSE GESCHLECHT
END AS SEX,
LEAST(80, FLOOR(({year} - GEBJAHR)/10)*10) AS AGE10,
CASE WHEN ({year} - GEBJAHR) < 20 THEN '0-19'
WHEN {year} - GEBJAHR >= 60 THEN '60+'
ELSE '20-59'
END AS AGE3
FROM psid_sample_table_vers vers JOIN psid_sample_table_versq versq ON (vers.PSID = versq.PSID) )
/* select final table, filtering to existing plz and removing duplicate rows */
SELECT PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3
FROM preprocessed_table
WHERE PLZ2 NOT IN ('05', '11', '43', '62')
GROUP BY PSID, JAHR, PLZ2, PLZ1, SEX, AGE10, AGE3;
create index idx_total_pers_{year} on T_Statport_pers_{year} (jahr, age3, age10, plz1, plz2, sex);
create index idx_total_diag_{year} on T_StatPort_Diag_{year} (ICD1, ICD2, ICD3, JAHR);
CALL drop_if_exists('tt_rollup_{year}');
create table tt_rollup_{year} as
SELECT d.JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, icd3, icd2, icd1, COUNT( DISTINCT d.PSID) AS CNT
FROM T_StatPort_Pers_{year} p
LEFT JOIN STATPORT.t_statport_diag_{year} d
ON (p.psid = d.psid AND p.JAHR = d.JAHR)
GROUP BY d.JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX), ROLLUP(ICD1, ICD2, ICD3);
CALL drop_if_exists('tt_sd_{year}');
create table tt_sd_{year} as
SELECT JAHR, PLZ1, PLZ2, AGE3, AGE10, SEX, COUNT( DISTINCT PSID) AS CNT
FROM T_StatPort_Pers_{year}
GROUP BY JAHR, ROLLUP(PLZ1, PLZ2), ROLLUP(AGE3, AGE10), ROLLUP(SEX);
/*Remove preliminary tables*/
CALL drop_if_exists('ttmg{year}_psid_sample');
CALL drop_if_exists('T_StatPort_Pers_{year}');
CALL drop_if_exists('T_STATPORT_DIAG_{year}')