Skip to content

Records reader

RecordReader

Source code in june/records/records_reader.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class RecordReader:
    """ """
    def __init__(self, results_path=Path("results"), record_name: str = None):
        self.results_path = Path(results_path)
        try:
            self.regional_summary = self.get_regional_summary(
                self.results_path / "summary.csv"
            )
        except Exception:
            self.regional_summary = None
            logger.warning("No summary available to read...")
        if self.regional_summary is not None:
            self.world_summary = self.get_world_summary()
        if record_name is None:
            self.record_name = "june_record.h5"
        else:
            self.record_name = record_name

    def decode_bytes_columns(self, df):
        """

        Args:
            df: 

        """
        str_df = df.select_dtypes([object])
        for col in str_df:
            df[col] = str_df[col].str.decode("utf-8")
        return df

    def get_regional_summary(self, summary_path):
        """

        Args:
            summary_path: 

        """
        df = pd.read_csv(summary_path)
        cols = [col for col in df.columns if col not in ["time_stamp", "region"]]
        self.aggregator = {col: np.mean if "current" in col else sum for col in cols}
        df = df.groupby(["region", "time_stamp"], as_index=False).agg(self.aggregator)
        df.set_index("time_stamp", inplace=True)
        df.index = pd.to_datetime(df.index)
        return df

    def get_world_summary(self):
        """ """
        return (
            self.regional_summary.drop(columns="region")
            .groupby("time_stamp")
            .agg(self.aggregator)
        )


    def table_to_df(
        self, table_name: str, index: str = "id", fields: Optional[Tuple] = None
    ) -> pd.DataFrame:
        """

        Args:
            table_name (str): 
            index (str, optional): (Default value = "id")
            fields (Optional[Tuple], optional): (Default value = None)

        """
        # TODO: include fields to read only certain columns
        with tables.open_file(self.results_path / self.record_name, mode="r") as f:
            table = getattr(f.root, table_name)
            df = pd.DataFrame.from_records(table.read(), index=index)
        df = self.decode_bytes_columns(df)
        return df

    def get_geography_df(
        self,
    ):
        """ """
        areas_df = self.table_to_df("areas")
        super_areas_df = self.table_to_df("super_areas")
        regions_df = self.table_to_df("regions")

        geography_df = areas_df[["super_area_id", "name"]].merge(
            super_areas_df[["region_id", "name"]],
            how="inner",
            left_on="super_area_id",
            right_index=True,
            suffixes=("_area", "_super_area"),
        )
        geography_df = geography_df.merge(
            regions_df, how="inner", left_on="region_id", right_index=True
        )
        return geography_df.rename(
            columns={geography_df.index.name: "area_id", "name": "name_region"}
        )

    def get_table_with_extras(
        self,
        table_name,
        index,
        with_people=True,
        with_geography=True,
        people_df=None,
        geography_df=None,
    ):
        """

        Args:
            table_name: 
            index: 
            with_people: (Default value = True)
            with_geography: (Default value = True)
            people_df: (Default value = None)
            geography_df: (Default value = None)

        """
        logger.info(f"Loading {table_name} table")
        df = self.table_to_df(table_name, index=index)
        if with_people:
            logger.info("Loading population table")
            if people_df is None:
                people_df = self.table_to_df("population", index="id")
            logger.info("Merging infection and population tables")
            df = df.merge(people_df, how="inner", left_index=True, right_index=True)
            if with_geography:
                logger.info("Loading geography table")
                if geography_df is None:
                    geography_df = self.get_geography_df()
                logger.info("Mergeing infection and geography tables")
                df = df.merge(
                    geography_df.drop_duplicates(),
                    left_on="area_id",
                    right_index=True,
                    how="inner",
                )
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        return df

decode_bytes_columns(df)

Parameters:

Name Type Description Default
df
required
Source code in june/records/records_reader.py
30
31
32
33
34
35
36
37
38
39
40
def decode_bytes_columns(self, df):
    """

    Args:
        df: 

    """
    str_df = df.select_dtypes([object])
    for col in str_df:
        df[col] = str_df[col].str.decode("utf-8")
    return df

get_geography_df()

Source code in june/records/records_reader.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_geography_df(
    self,
):
    """ """
    areas_df = self.table_to_df("areas")
    super_areas_df = self.table_to_df("super_areas")
    regions_df = self.table_to_df("regions")

    geography_df = areas_df[["super_area_id", "name"]].merge(
        super_areas_df[["region_id", "name"]],
        how="inner",
        left_on="super_area_id",
        right_index=True,
        suffixes=("_area", "_super_area"),
    )
    geography_df = geography_df.merge(
        regions_df, how="inner", left_on="region_id", right_index=True
    )
    return geography_df.rename(
        columns={geography_df.index.name: "area_id", "name": "name_region"}
    )

get_regional_summary(summary_path)

Parameters:

Name Type Description Default
summary_path
required
Source code in june/records/records_reader.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def get_regional_summary(self, summary_path):
    """

    Args:
        summary_path: 

    """
    df = pd.read_csv(summary_path)
    cols = [col for col in df.columns if col not in ["time_stamp", "region"]]
    self.aggregator = {col: np.mean if "current" in col else sum for col in cols}
    df = df.groupby(["region", "time_stamp"], as_index=False).agg(self.aggregator)
    df.set_index("time_stamp", inplace=True)
    df.index = pd.to_datetime(df.index)
    return df

get_table_with_extras(table_name, index, with_people=True, with_geography=True, people_df=None, geography_df=None)

Parameters:

Name Type Description Default
table_name
required
index
required
with_people

(Default value = True)

True
with_geography

(Default value = True)

True
people_df

(Default value = None)

None
geography_df

(Default value = None)

None
Source code in june/records/records_reader.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_table_with_extras(
    self,
    table_name,
    index,
    with_people=True,
    with_geography=True,
    people_df=None,
    geography_df=None,
):
    """

    Args:
        table_name: 
        index: 
        with_people: (Default value = True)
        with_geography: (Default value = True)
        people_df: (Default value = None)
        geography_df: (Default value = None)

    """
    logger.info(f"Loading {table_name} table")
    df = self.table_to_df(table_name, index=index)
    if with_people:
        logger.info("Loading population table")
        if people_df is None:
            people_df = self.table_to_df("population", index="id")
        logger.info("Merging infection and population tables")
        df = df.merge(people_df, how="inner", left_index=True, right_index=True)
        if with_geography:
            logger.info("Loading geography table")
            if geography_df is None:
                geography_df = self.get_geography_df()
            logger.info("Mergeing infection and geography tables")
            df = df.merge(
                geography_df.drop_duplicates(),
                left_on="area_id",
                right_index=True,
                how="inner",
            )
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"])
    return df

get_world_summary()

Source code in june/records/records_reader.py
57
58
59
60
61
62
63
def get_world_summary(self):
    """ """
    return (
        self.regional_summary.drop(columns="region")
        .groupby("time_stamp")
        .agg(self.aggregator)
    )

table_to_df(table_name, index='id', fields=None)

Parameters:

Name Type Description Default
table_name str
required
index str

(Default value = "id")

'id'
fields Optional[Tuple]

(Default value = None)

None
Source code in june/records/records_reader.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def table_to_df(
    self, table_name: str, index: str = "id", fields: Optional[Tuple] = None
) -> pd.DataFrame:
    """

    Args:
        table_name (str): 
        index (str, optional): (Default value = "id")
        fields (Optional[Tuple], optional): (Default value = None)

    """
    # TODO: include fields to read only certain columns
    with tables.open_file(self.results_path / self.record_name, mode="r") as f:
        table = getattr(f.root, table_name)
        df = pd.DataFrame.from_records(table.read(), index=index)
    df = self.decode_bytes_columns(df)
    return df