Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions Sanitize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Sanitize

The SNDS sources used for the extraction are raw data from the CNAM. Thus, they need to be sanitized before any event extraction.


We use some classical data quality rules for the SNDS. The details on the name of the variables and on these rules can be found (in french) on the [SNDS collaborative documentation website](https://documentation-snds.health-data-hub.fr/). The source code corresponding to this filtering can be found in the [etl/sources/](/src/main/scala/fr/polytechnique/cmap/cnam/etl/sources/) folder:

* **DCIR**:
* `DPN_QLF` ≠ "71", remove the lines [for information only](https://documentation-snds.health-data-hub.fr/ressources/documents%20Cnam/faq/faq_dcir.html#dcir) : these lines correspond to services carried out during a stay or an outpatient consultation and sent for information,
* `BSE_PRS_NAT` ≠ "0", remove the lines without a proper *Nature de la prestation*.
* **PMSI**:
* For all PMSI products ([see details here](https://documentation-snds.health-data-hub.fr/fiches/depenses_hopital_public.html#valorisation-des-sejours-a-l-hopital-public)):
* `ETA_NUM` $`\notin`$ `finessDoublons`(cf. end of document), remove information reported by geogaphic FINESS for APHP, HCL and APHM (duplicates for this information also goes back through legal FINESS).
* **MCO**:
* `SEJ_TYP` = $`\empty`$ OR !(`ENT_MOD` == 1 AND `SOR_MOD` == 1) OR (`GRG_GHM` like "28%" AND `GRG_GHM` $`\notin`$ {"28Z14Z", "28Z15Z", "28Z16Z"}), remove inter-caresites prestations except for radiotherapies and dyalises,
* `GRG_GHM` ≠ "14Z08Z", remove induced abortion (IVG) hospital stays, IVG can be made anonymously, thus they cannot be linked with certainty to a given individual,
* `GHS_NUM` ≠ "9999", remove hospital stays in error.,
* {`SEJ_RET`, `FHO_RET`, `PMS_RET`, `DAT_RET`, `NIR_RET`, `SEX_RET`} = "0", remove corrupted patient id or sex,
* `GRG_GHM` `\notlike` "90%", remove corrupted patient grouping codes.
* **MCO_CE**:
* {`NAI_RET`, `IAS_RET`, `ENT_DAT_RET`, `NIR_RET`, `SEX_RET`} = "0", remove corrupted patient id or sex.
* **SSR**:
* !(`ENT_MOD` == 1 AND `SOR_MOD` == 1) OR (`GRG_GME` like "28%" AND `GRG_GME` $`\notin`$ {"28Z14Z", "28Z15Z", "28Z16Z"}), remove inter-caresites prestations except for radiotherapies and dyalises,
* {`NIR_RET`, `SEJ_RET`, `FHO_RET`, `PMS_RET`, `DAT_RET`} = "0" (for **SSR**), remove corrupted hospital stay, date or patient id,
* `GRG_GME` `\notlike` "90%", remove corrupted patient grouping codes.
* **SSR_CE**:
* {`NIR_RET`, `NAI_RET`, `SEX_RET`, `IAS_RET`, `ENT_DAT_RET`} ="0" (for **SSR_CE**), remove corrupted hospital stay, date or patient id.
* **HAD**:
* !(`ENT_MOD` == 1 AND `SOR_MOD` == 1), remove inter-caresites prestations,
* {`NIR_RET`, `SEJ_RET`, `FHO_RET`, `PMS_RET`, `DAT_RET`} = "0", remove corrupted hospital stay, date or patient id.


## Details of the Finess doublons

```scala
val finessDoublons = List(
//APHP
"600100093","600100101","620100016","640790150","640797098","750100018","750806226",
"750100356","750802845","750801524","750100067","750100075","750100042","750805228",
"750018939","750018988","750100091","750100083","750100109","750833345","750019069",
"750803306","750019028","750100125","750801441","750019119","750100166","750100141",
"750100182","750100315","750019648","750830945","750008344","750803199","750803447",
"750100216","750100208","750833337","750000358","750019168","750809576","750100299",
"750041543","750100232","750802258","750803058","750803454","750100273","750801797",
"750803371","830100012","830009809","910100015","910100031","910100023","910005529",
"920100013","920008059","920100021","920008109","920100039","920100047","920812930",
"920008158","920100054","920008208","920100062","920712551","920000122","930100052",
"930100037","930018684","930812334","930811294","930100045","930011408","930811237",
"930100011","940018021","940100027","940100019","940170087","940005739","940100076",
"940100035","940802291","940100043","940019144","940005788","940100050","940802317",
"940100068","940005838","950100024","950100016",
//APHM
"130808231","130809775","130782931",
"130806003","130783293","130804305","130790330","130804297","130783236","130796873",
"130808520","130799695","130802085","130808256","130806052","130808538","130802101",
"130796550","130014558","130784234","130035884","130784259","130796279","130792856",
"130017239","130792534","130793698","130792898","130808546","130789175","130780521",
"130033996","130018229",
//HCL
"90787460","690007422","690007539","690784186","690787429",
"690783063","690007364","690787452","690007406","690787486","690784210","690799416",
"690784137","690007281","690799366","690784202","690023072","690787577","690784194",
"690007380","690784129","690029194","690806054","690029210","690787767","690784178",
"690783154","690799358","690787817","690787742","690784152","690784145","690783121",
"690787478","690007455","690787494","830100558","830213484"
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ import fr.polytechnique.cmap.cnam.etl.sources.data.DoublonFinessPmsi.specialHosp
import org.apache.spark.sql.{Column, DataFrame}

private[data] class HadFilters(rawHad: DataFrame) {
/** Remove geographic finess doublons from APHP, APHM and HCL.
*
* @return dataframe without finess doublons
*/
def filterSpecialHospitals: DataFrame = {
rawHad.where(!HadSource.ETA_NUM_EPMSI.isin(specialHospitalCodes: _*))
}

/** Filter out shared stays (between hospitals).
*
* @return
*/
def filterSharedHospitalStays: DataFrame = {
val duplicateHospitalsFilter: Column = !(HadSource.ENT_MOD === 1 and HadSource.SOR_MOD === 1)
rawHad.filter(duplicateHospitalsFilter)
}

/** Filter out Had corrupted stays as returned by the ATIH.
*
* @return dataframe cleaned of HAD corrupted stays
Expand All @@ -13,16 +30,7 @@ private[data] class HadFilters(rawHad: DataFrame) {
.NIR_RET === "0" and HadSource.SEJ_RET === "0" and HadSource
.FHO_RET === "0" and HadSource.PMS_RET === "0" and HadSource
.DAT_RET === "0"

rawHad.filter(fictionalAndFalseHospitalStaysFilter)
}

/** Remove geographic finess doublons from APHP, APHM and HCL.
*
* @return dataframe without finess doublons
*/
def filterSpecialHospitals: DataFrame = {
rawHad.where(!HadSource.ETA_NUM_EPMSI.isin(specialHospitalCodes: _*))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import org.apache.spark.sql.{Column, DataFrame, SQLContext}
object HadSource extends DataSourceManager with HadSourceSanitizer {

val ETA_NUM_EPMSI: Column = col("ETA_NUM_EPMSI")

val ENT_MOD: Column = col("HAD_B__ENT_MOD")
val SOR_MOD: Column = col("HAD_B__SOR_MOD")
val NIR_RET: Column = col("NIR_RET")
val SEJ_RET: Column = col("SEJ_RET")
val FHO_RET: Column = col("FHO_RET")
Expand All @@ -24,7 +27,8 @@ object HadSource extends DataSourceManager with HadSourceSanitizer {
* https://datainitiative.atlassian.net/wiki/pages/viewpage.action?pageId=40304642
*/
rawHad
.filterHadCorruptedHospitalStays
.filterSpecialHospitals
.filterSharedHospitalStays
.filterHadCorruptedHospitalStays
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ private[data] class McoFilters(rawMco: DataFrame) {
* @return
*/
def filterSharedHospitalStays: DataFrame = {
val duplicateHospitalsFilter: Column = McoSource.SEJ_TYP.isNull or McoSource
.SEJ_TYP =!= "B" or (McoSource.GRG_GHM.like("28%") and !McoSource.GRG_GHM
val duplicateHospitalsFilter: Column = McoSource.SEJ_TYP.isNull or
!(McoSource.ENT_MOD === 1 and McoSource.SOR_MOD === 1) or (McoSource.GRG_GHM.like("28%") and !McoSource.GRG_GHM
.isin(McoFilters.GRG_GHMExceptions: _*))
rawMco.filter(duplicateHospitalsFilter)
}
Expand Down Expand Up @@ -69,5 +69,4 @@ private[data] class McoFilters(rawMco: DataFrame) {
private[data] object McoFilters {
// radiotherapie & dialyse exceptions
val GRG_GHMExceptions = List("28Z14Z", "28Z15Z", "28Z16Z")

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ object McoSource extends DataSourceManager with McoSourceSanitizer {

// Exclusive columns
val SEJ_TYP: Column = col("MCO_B__SEJ_TYP")
val ENT_MOD: Column = col("MCO_B__ENT_MOD")
val SOR_MOD: Column = col("MCO_B__SOR_MOD")
val GRG_GHM: Column = col("MCO_B__GRG_GHM")
val GHS_NUM: Column = col("MCO_B__GHS_NUM")
val SEJ_RET: Column = col("SEJ_RET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@ import fr.polytechnique.cmap.cnam.etl.sources.data.DoublonFinessPmsi.specialHosp
import org.apache.spark.sql.{Column, DataFrame}

private[data] class SsrFilters(rawSsr: DataFrame) {
/** Filter out Finess doublons.
*
* @return
*/
def filterSpecialHospitals: DataFrame = {
rawSsr.where(!SsrSource.ETA_NUM.isin(specialHospitalCodes: _*))
}

/** Filter out shared stays (between hospitals).
*
* @return
*/
def filterSharedHospitalStays: DataFrame = {
val duplicateHospitalsFilter: Column = !(SsrSource.ENT_MOD === 1 and SsrSource.SOR_MOD === 1) or
(SsrSource.GRG_GME.like("28%") and !SsrSource.GRG_GME
.isin(SsrFilters.GRG_GHMExceptions: _*))
rawSsr.filter(duplicateHospitalsFilter)
}

/** Filter out Ssr corrupted stays as returned by the ATIH.
*
* @return
Expand All @@ -28,14 +47,9 @@ private[data] class SsrFilters(rawSsr: DataFrame) {

rawSsr.filter(fictionalAndFalseHospitalStaysFilter)
}

/** Filter out Finess doublons.
*
* @return
*/
def filterSpecialHospitals: DataFrame = {
rawSsr.where(!SsrSource.ETA_NUM.isin(specialHospitalCodes: _*))
}
}

private[data] object SsrFilters
private[data] object SsrFilters {
// radiotherapie & dialyse exceptions
val GRG_GHMExceptions = List("28Z14Z", "28Z15Z", "28Z16Z")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ object SsrSource extends DataSourceManager with SsrSourceSanitizer {
val MOI_ANN_SOR_SEJ: Column = col("SSR_B__MOI_ANN_SOR_SEJ")
val RHS_ANT_SEJ_ENT: Column = col("SSR_B__RHS_ANT_SEJ_ENT")
val FP_PEC: Column = col("SSR_B__FP_PEC")

val ENT_MOD: Column = col("SSR_B__ENT_MOD")
val SOR_MOD: Column = col("SSR_B__SOR_MOD")
val GRG_GME: Column = col("SSR_B__GRG_GME")
val NIR_RET: Column = col("NIR_RET")
val SEJ_RET: Column = col("SEJ_RET")
Expand All @@ -41,6 +44,7 @@ object SsrSource extends DataSourceManager with SsrSourceSanitizer {
*/
rawSsr
.filterSpecialHospitals
.filterSharedHospitalStays
.filterSsrCorruptedHospitalStays
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,33 @@ class HadFiltersSuite extends SharedContext {
assertDFs(result, expected)
}

"filterSharedHospitalStays" should "remove lines that indicates shared hospital stays" in {
val sqlCtx = sqlContext
import sqlCtx.implicits._

// Given
val colNames = List(
HadSource.ENT_MOD,
HadSource.SOR_MOD
).map(col => col.toString)

val input = Seq(
("2", "3"),
("1", "3"),
("1", "1")
).toDF(colNames: _*)


val expected = Seq(
("2", "3"),
("1", "3")
).toDF(colNames: _*)

// When
val instance = new HadFilters(input)
val result = instance.filterSharedHospitalStays

// Then
assertDFs(result, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class HadSourceSuite extends SharedContext {

// Given
val colNames = List(
HadSource.ENT_MOD,
HadSource.SOR_MOD,
HadSource.NIR_RET,
HadSource.SEJ_RET,
HadSource.FHO_RET,
Expand All @@ -18,15 +20,16 @@ class HadSourceSuite extends SharedContext {
).map(col => col.toString)

val input = Seq(
("1", "0", "0", "0", "0", "100000000"),
("1", "1", "0", "0", "0", "100000001"),
("0", "0", "0", "0", "0", "100000001"),
("0", "0", "0", "0", "0", "910100015")
("2", "3", "1", "0", "0", "0", "0", "100000000"),
("2", "3", "1", "1", "0", "0", "0", "100000001"),
("2", "3", "0", "0", "0", "0", "0", "100000001"),
("2", "3", "0", "0", "0", "0", "0", "910100015"),
("1", "1", "0", "0", "0", "0", "0", "100000001")

).toDF(colNames: _*)

val expected = Seq(
("0", "0", "0", "0", "0", "100000001")
("2", "3", "0", "0", "0", "0", "0", "100000001")
).toDF(colNames: _*)

// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,28 @@ class McoFiltersSuite extends SharedContext {
// Given
val colNames = List(
McoSource.SEJ_TYP,
McoSource.ENT_MOD,
McoSource.SOR_MOD,
McoSource.GRG_GHM
).map(col => col.toString)

val input = Seq(
(None, "28XXXX"),
(Some("A"), "15A94Z"),
(Some("B"), "15A94Z"),
(Some("A"), "28XXXX"),
(Some("A"), "28Z14Z"),
(Some("B"), "28XXXX"),
(Some("B"), "28Z14Z")
(None, "2", "3", "28XXXX"),
(Some("A"), "2", "3", "15A94Z"),
(Some("B"), "1", "1", "15A94Z"),
(Some("A"), "2", "3", "28XXXX"),
(Some("A"), "2", "3", "28Z14Z"),
(Some("B"), "1", "1", "28XXXX"),
(Some("B"), "1", "1", "28Z14Z")
).toDF(colNames: _*)


val expected = Seq(
(None, "28XXXX"),
(Some("A"), "15A94Z"),
(Some("A"), "28XXXX"),
(Some("A"), "28Z14Z"),
(Some("B"), "28XXXX")
(None, "2", "3", "28XXXX"),
(Some("A"), "2", "3", "15A94Z"),
(Some("A"), "2", "3", "28XXXX"),
(Some("A"), "2", "3", "28Z14Z"),
(Some("B"), "1", "1", "28XXXX")
).toDF(colNames: _*)

// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ class McoSourceSuite extends SharedContext {
McoSource.DAT_RET,
McoSource.ETA_NUM,
McoSource.GHS_NUM,
McoSource.SEJ_TYP
McoSource.SEJ_TYP,
McoSource.ENT_MOD,
McoSource.SOR_MOD
).map(col => col.toString)

val input = Seq(
("90XXXX", "1", "1", "1", "1", "1", "1", "3333", Some("A")),
("27XXXX", "1", "1", "1", "1", "1", "2", "3424", Some("A")),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("A")),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", None),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B")),
("28XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B")),
("76XXXX", "0", "0", "0", "0", "0", "1", "9999", Some("A")),
("76XXXX", "0", "0", "0", "0", "0", "910100023", "1111", Some("B")),
("28XXXX", "0", "0", "0", "0", "0", "1", "2222", Some("A")),
("28XXXX", "0", "0", "0", "0", "0", "130784234", "1981", Some("A"))
("90XXXX", "1", "1", "1", "1", "1", "1", "3333", Some("A"), "2", "3"),
("27XXXX", "1", "1", "1", "1", "1", "2", "3424", Some("A"), "2", "3"),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("A"), "2", "3"),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", None, "2", "3"),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B"), "1", "1"),
("28XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B"), "1", "1"),
("76XXXX", "0", "0", "0", "0", "0", "1", "9999", Some("A"), "2", "3"),
("76XXXX", "0", "0", "0", "0", "0", "910100023", "1111", Some("B"), "1", "1"),
("28XXXX", "0", "0", "0", "0", "0", "1", "2222", Some("A"), "2", "3"),
("28XXXX", "0", "0", "0", "0", "0", "130784234", "1981", Some("A"), "2", "3")
).toDF(colNames: _*)


val expected = Seq(
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("A")),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", None),
("28XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B")),
("28XXXX", "0", "0", "0", "0", "0", "1", "2222", Some("A"))
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("A"), "2", "3"),
("76XXXX", "0", "0", "0", "0", "0", "1", "8271", None, "2", "3"),
("28XXXX", "0", "0", "0", "0", "0", "1", "8271", Some("B"), "1", "1"),
("28XXXX", "0", "0", "0", "0", "0", "1", "2222", Some("A"), "2", "3")
).toDF(colNames: _*)

// When
Expand Down
Loading