Skip to content

Checkpoint saver

combine_checkpoints_for_ranks(hdf5_file_root)

After running a parallel simulation with checkpoints, the checkpoint data will be scattered accross, with each process saving a checkpoint_date.0.hdf5 file. This function can be used to unify all data in one single checkpoint, so that we can load it later with any arbitray number of cores.

Parameters:

Name Type Description Default
hdf5_file_root str

the str root of the pasts like "checkpoint_2020-01-01". The checkpoint files

required

will be expected to have names like "checkpoint_2020-01-01.{rank}.hdf5 where rank = 0, 1, 2, etc.

Source code in june/hdf5_savers/checkpoint_saver.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def combine_checkpoints_for_ranks(hdf5_file_root: str):
    """After running a parallel simulation with checkpoints, the
    checkpoint data will be scattered accross, with each process
    saving a checkpoint_date.0.hdf5 file. This function can be used
    to unify all data in one single checkpoint, so that we can load it
    later with any arbitray number of cores.

    Args:
        hdf5_file_root (str): the str root of the pasts like "checkpoint_2020-01-01". The checkpoint files
    will be expected to have names like "checkpoint_2020-01-01.{rank}.hdf5 where
    rank = 0, 1, 2, etc.

    """
    checkpoint_files = glob(hdf5_file_root + ".[0-9]*.hdf5")
    try:
        cp_date = hdf5_file_root.split("_")[-1]
    except Exception:
        cp_date = hdf5_file_root
    logger.info(f"found {len(checkpoint_files)} {cp_date} checkpoint files")
    ret = load_checkpoint_from_hdf5(checkpoint_files[0])
    for i in range(1, len(checkpoint_files)):
        file = checkpoint_files[i]
        ret2 = load_checkpoint_from_hdf5(file, load_date=False)
        for key, value in ret2.items():
            ret[key] = np.concatenate((ret[key], value))

    unified_checkpoint_path = hdf5_file_root + ".hdf5"
    with h5py.File(unified_checkpoint_path, "w") as f:
        f.create_group("time")
        f["time"].attrs["date"] = ret["date"]
        f.create_group("people_data")
        for name in ["people_id", "infected_id", "dead_id"]:
            write_dataset(
                group=f["people_data"],
                dataset_name=name,
                data=np.array(ret[name], dtype=np.int64),
            )
    save_infections_to_hdf5(
        hdf5_file_path=unified_checkpoint_path,
        infections=ret["infection_list"],
        chunk_size=1000000,
    )
    save_immunities_to_hdf5(
        hdf5_file_path=unified_checkpoint_path, immunities=ret["immunity_list"]
    )

generate_simulator_from_checkpoint(world, checkpoint_path, interaction, chunk_size=50000, epidemiology=None, tracker=None, policies=None, leisure=None, travel=None, events=None, config_filename=default_config_filename, record=None, reset_infections=False)

Parameters:

Name Type Description Default
world World
required
checkpoint_path str
required
interaction Interaction
required
chunk_size Optional[int]

(Default value = 50000)

50000
epidemiology Optional[Epidemiology]

(Default value = None)

None
tracker Optional[Tracker]

(Default value = None)

None
policies Optional[Policies]

(Default value = None)

None
leisure Optional[Leisure]

(Default value = None)

None
travel Optional[Travel]

(Default value = None)

None
events Optional[Events]

(Default value = None)

None
config_filename str

(Default value = default_config_filename)

default_config_filename
record Record

(Default value = None)

None
reset_infections

(Default value = False)

False
Source code in june/hdf5_savers/checkpoint_saver.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def generate_simulator_from_checkpoint(
    world: World,
    checkpoint_path: str,
    interaction: Interaction,
    chunk_size: Optional[int] = 50000,
    epidemiology: Optional[Epidemiology] = None,
    tracker: Optional[Tracker] = None,
    policies: Optional[Policies] = None,
    leisure: Optional[Leisure] = None,
    travel: Optional[Travel] = None,
    events: Optional[Events] = None,
    config_filename: str = default_config_filename,
    record: "Record" = None,
    reset_infections=False,
):
    """

    Args:
        world (World): 
        checkpoint_path (str): 
        interaction (Interaction): 
        chunk_size (Optional[int], optional): (Default value = 50000)
        epidemiology (Optional[Epidemiology], optional): (Default value = None)
        tracker (Optional[Tracker], optional): (Default value = None)
        policies (Optional[Policies], optional): (Default value = None)
        leisure (Optional[Leisure], optional): (Default value = None)
        travel (Optional[Travel], optional): (Default value = None)
        events (Optional[Events], optional): (Default value = None)
        config_filename (str, optional): (Default value = default_config_filename)
        record ("Record", optional): (Default value = None)
        reset_infections: (Default value = False)

    """
    simulator = Simulator.from_file(
        world=world,
        interaction=interaction,
        epidemiology=epidemiology,
        tracker=tracker,
        policies=policies,
        leisure=leisure,
        travel=travel,
        events=events,
        config_filename=config_filename,
        record=record,
    )
    return restore_simulator_to_checkpoint(
        world=world,
        checkpoint_path=checkpoint_path,
        chunk_size=chunk_size,
        simulator=simulator,
        reset_infections=reset_infections,
    )

load_checkpoint_from_hdf5(hdf5_file_path, chunk_size=50000, load_date=True)

Loads checkpoint data from hdf5.

Parameters:

Name Type Description Default
hdf5_file_path str

hdf5 path to load from

required
chunk_size

number of hdf5 chunks to use while loading (Default value = 50000)

50000
load_date

(Default value = True)

True
Source code in june/hdf5_savers/checkpoint_saver.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def load_checkpoint_from_hdf5(hdf5_file_path: str, chunk_size=50000, load_date=True):
    """Loads checkpoint data from hdf5.

    Args:
        hdf5_file_path (str): hdf5 path to load from
        chunk_size: number of hdf5 chunks to use while loading (Default value = 50000)
        load_date: (Default value = True)

    """
    ret = {}
    ret["infection_list"] = load_infections_from_hdf5(
        hdf5_file_path, chunk_size=chunk_size
    )
    ret["immunity_list"] = load_immunities_from_hdf5(
        hdf5_file_path, chunk_size=chunk_size
    )
    with h5py.File(hdf5_file_path, "r") as f:
        people_group = f["people_data"]
        ret["infected_id"] = people_group["infected_id"][:]
        ret["dead_id"] = people_group["dead_id"][:]
        ret["people_id"] = people_group["people_id"][:]
        if load_date:
            ret["date"] = f["time"].attrs["date"]
    return ret

restore_simulator_to_checkpoint(simulator, world, checkpoint_path, chunk_size=50000, reset_infections=False)

Initialises the simulator from a saved checkpoint. The arguments are the same as the standard .from_file() initialisation but with the additional path to where the checkpoint pickle file is located. The checkpoint saves information about the infection status of all the people in the world as well as the timings. Note, nonetheless, that all the past infections / deaths will have the checkpoint date as date.

Parameters:

Name Type Description Default
simulator

An instance of the Simulator class

required
world World
required
checkpoint_path str

path to the hdf5 file containing the checkpoint data

required
chunk_size Optional[int]

chunk load size of the hdf5 (Default value = 50000)

50000
reset_infections

(Default value = False)

False
Source code in june/hdf5_savers/checkpoint_saver.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def restore_simulator_to_checkpoint(
    simulator,
    world: World,
    checkpoint_path: str,
    chunk_size: Optional[int] = 50000,
    reset_infections=False,
):
    """Initialises the simulator from a saved checkpoint. The arguments are the same as the standard .from_file()
    initialisation but with the additional path to where the checkpoint pickle file is located.
    The checkpoint saves information about the infection status of all the people in the world as well as the timings.
    Note, nonetheless, that all the past infections / deaths will have the checkpoint date as date.

    Args:
        simulator: An instance of the Simulator class
        world (World): 
        checkpoint_path (str): path to the hdf5 file containing the checkpoint data
        chunk_size (Optional[int], optional): chunk load size of the hdf5 (Default value = 50000)
        reset_infections: (Default value = False)

    """
    people_ids = set(world.people.people_ids)
    checkpoint_data = load_checkpoint_from_hdf5(checkpoint_path, chunk_size=chunk_size)
    for dead_id in checkpoint_data["dead_id"]:
        if dead_id not in people_ids:
            continue
        person = simulator.world.people.get_from_id(dead_id)
        person.dead = True
        cemetery = world.cemeteries.get_nearest(person)
        cemetery.add(person)
        person.subgroups = Activities(None, None, None, None, None, None)
    if not reset_infections:
        for infected_id, infection in zip(
            checkpoint_data["infected_id"], checkpoint_data["infection_list"]
        ):
            if infected_id not in people_ids:
                continue
            person = simulator.world.people.get_from_id(infected_id)
            person.infection = infection
    # restore immunities
    for person_id, immunity in zip(
        checkpoint_data["people_id"], checkpoint_data["immunity_list"]
    ):
        if person_id not in people_ids:
            continue
        person = world.people.get_from_id(person_id)
        person.immunity = immunity
    # restore timer
    checkpoint_date = datetime.strptime(checkpoint_data["date"], "%Y-%m-%d")
    # we need to start the next day
    checkpoint_date += timedelta(days=1)
    simulator.timer.reset_to_new_date(checkpoint_date)
    logger.info(f"Restored checkpoint at date {checkpoint_date.date()}")
    return simulator

save_checkpoint_to_hdf5(population, date, hdf5_file_path, chunk_size=50000)

Saves a checkpoint at the given date by saving the infection information of the world.

Parameters:

Name Type Description Default
population Population

world's population

required
date str

date of the checkpoint

required
hdf5_file_path str

path where to save the hdf5 checkpoint

required
chunk_size int

hdf5 chunk_size to write data (Default value = 50000)

50000
Source code in june/hdf5_savers/checkpoint_saver.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def save_checkpoint_to_hdf5(
    population: Population, date: str, hdf5_file_path: str, chunk_size: int = 50000
):
    """Saves a checkpoint at the given date by saving the infection information of the world.

    Args:
        population (Population): world's population
        date (str): date of the checkpoint
        hdf5_file_path (str): path where to save the hdf5 checkpoint
        chunk_size (int, optional): hdf5 chunk_size to write data (Default value = 50000)

    """
    dead_people_ids = [person.id for person in population if person.dead]
    people_ids = []
    infected_people_ids = []
    infection_list = []
    for person in population:
        people_ids.append(person.id)
        if person.infected:
            infected_people_ids.append(person.id)
            infection_list.append(person.infection)
    with h5py.File(hdf5_file_path, "w") as f:
        f.create_group("time")
        f["time"].attrs["date"] = date
        f.create_group("people_data")
        for name, data in zip(
            ["people_id", "infected_id", "dead_id"],
            [people_ids, infected_people_ids, dead_people_ids],
        ):
            write_dataset(
                group=f["people_data"],
                dataset_name=name,
                data=np.array(data, dtype=np.int64),
            )
    save_infections_to_hdf5(
        hdf5_file_path=hdf5_file_path, infections=infection_list, chunk_size=chunk_size
    )
    immunities = [person.immunity for person in population]
    save_immunities_to_hdf5(hdf5_file_path=hdf5_file_path, immunities=immunities)