Skip to content

Simulator checkpointing

Simulator Integration for Checkpointing

This module provides the integration layer between the JUNE simulator and the checkpointing system, offering methods for checkpoint creation and restoration during simulation runs.

SimulatorCheckpointing

Interface for checkpoint operations during simulation.

This class provides methods for creating and restoring checkpoints from within the simulation loop.

Source code in june/checkpointing/simulator_checkpointing.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class SimulatorCheckpointing:
    """Interface for checkpoint operations during simulation.

    This class provides methods for creating and restoring
    checkpoints from within the simulation loop.

    """

    def __init__(self, simulator, checkpoint_dir: str = "checkpoints", 
                 checkpoint_read_dir: str = None,
                 auto_checkpoint_interval: float = None, checkpoint_dates: list = None):
        """
        Initialise simulator checkpointing.

        Parameters
        ----------
        simulator : Simulator
            The JUNE simulator instance
        checkpoint_dir : str
            Base directory for storing/writing checkpoints
        checkpoint_read_dir : str, optional
            Directory for reading existing checkpoints from (defaults to checkpoint_dir)
        auto_checkpoint_interval : float
            Automatic checkpoint interval in simulation days (only used in automatic mode)
        checkpoint_dates : list
            Specific simulation days when checkpoints should be created
        """
        self.simulator = simulator
        self.checkpoint_dir = Path(checkpoint_dir)  # Write directory
        self.checkpoint_read_dir = Path(checkpoint_read_dir) if checkpoint_read_dir else self.checkpoint_dir  # Read directory
        self.auto_checkpoint_enabled = True

        # Initialise checkpoint manager and restorer
        self.checkpoint_manager = CheckpointManager(
            simulator, 
            checkpoint_interval_days=auto_checkpoint_interval,
            checkpoint_dates=checkpoint_dates
        )
        self.restorer = CheckpointRestorer(simulator)

        # Create checkpoint directory
        if mpi_rank == 0:
            self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

        if self.checkpoint_read_dir == self.checkpoint_dir:
            logger.info(f"Initialised with directory: {self.checkpoint_dir}")
        else:
            logger.info(f"Initialised with read dir: {self.checkpoint_read_dir}, write dir: {self.checkpoint_dir}")

    def should_create_checkpoint(self) -> bool:
        """Check if an automatic checkpoint should be created now.


        Returns:
            bool: True if a checkpoint should be created

        """
        if not self.auto_checkpoint_enabled:
            return False

        result = self.checkpoint_manager.should_checkpoint()
        return result

    def create_checkpoint(self, checkpoint_name: Optional[str] = None) -> bool:
        """Create a simulation checkpoint.

        Args:
            checkpoint_name (Optional[str], optional): Name for the checkpoint directory. If None, auto-generated. (Default value = None)

        Returns:
            bool: True if checkpoint creation was successful

        """
        if checkpoint_name is None:
            # Auto-generate checkpoint name
            current_time = self.simulator.timer.now
            current_date = self.simulator.timer.date
            checkpoint_name = f"checkpoint_day_{current_time:.1f}_{current_date.strftime('%Y%m%d')}"

        checkpoint_path = self.checkpoint_dir / checkpoint_name

        logger.info(f"Creating checkpoint: {checkpoint_name}")
        success = self.checkpoint_manager.create_checkpoint(checkpoint_path)

        if success:
            logger.info(f"Checkpoint created successfully: {checkpoint_path}")

            # Notify RunManager about checkpoint creation if available
            if hasattr(self.simulator, '_run_manager') and self.simulator._run_manager:
                from june.run_manager import logger as rm_logger
                try:
                    self.simulator._run_manager.add_checkpoint(
                        checkpoint_name, 
                        str(checkpoint_path)
                    )
                    rm_logger.info(f"Registered checkpoint {checkpoint_name}")
                except Exception as e:
                    rm_logger.warning(f"Failed to register checkpoint: {e}")
        else:
            logger.error(f"Failed to create checkpoint: {checkpoint_path}")

        return success

    def restore_from_checkpoint(self, checkpoint_name: str) -> bool:
        """Restore simulation from a checkpoint.

        Args:
            checkpoint_name (str): Name of the checkpoint directory to restore from

        Returns:
            bool: True if restoration was successful

        """
        checkpoint_path = self.checkpoint_read_dir / checkpoint_name

        if not checkpoint_path.exists():
            logger.error(f"Checkpoint not found: {checkpoint_path}")
            return False

        logger.info(f"Restoring from checkpoint: {checkpoint_name} (from {self.checkpoint_read_dir})")
        success = self.restorer.restore_from_checkpoint(checkpoint_path)

        if success:
            logger.info(f"Restoration completed successfully from: {checkpoint_path}")

            # Reset completed checkpoint dates so future dates are available for new checkpoints
            restored_time = self.simulator.timer.now
            self.checkpoint_manager.reset_completed_checkpoint_dates_after_restoration(restored_time)
        else:
            logger.error(f"Failed to restore from checkpoint: {checkpoint_path}")


        # Print validation numbers after restore
        #print_validation_summary(self.simulator, "AFTER_RESTORE")

        return success

    def restore_from_latest_checkpoint(self) -> bool:
        """Restore from the most recent checkpoint.


        Returns:
            bool: True if restoration was successful

        """
        latest_checkpoint = self.checkpoint_manager.get_latest_checkpoint(self.checkpoint_read_dir)

        if latest_checkpoint is None:
            logger.warning(f"No checkpoints found for restoration in: {self.checkpoint_read_dir}")
            return False

        checkpoint_name = Path(latest_checkpoint['path']).name
        return self.restore_from_checkpoint(checkpoint_name)

    def list_available_checkpoints(self) -> list:
        """List all available checkpoints.


        Returns:
            list: List of available checkpoint information

        """
        return self.checkpoint_manager.list_available_checkpoints(self.checkpoint_read_dir)

    def enable_auto_checkpoints(self, interval_days: float = 7.0):
        """Enable automatic checkpoint creation.

        Args:
            interval_days (float, optional): Interval between automatic checkpoints in simulation days (Default value = 7.0)

        """
        self.auto_checkpoint_enabled = True
        self.checkpoint_manager.checkpoint_interval = interval_days
        logger.info(f"Auto-checkpointing enabled with {interval_days} day interval")

    def disable_auto_checkpoints(self):
        """Disable automatic checkpoint creation."""
        self.auto_checkpoint_enabled = False
        logger.info("Auto-checkpointing disabled")

__init__(simulator, checkpoint_dir='checkpoints', checkpoint_read_dir=None, auto_checkpoint_interval=None, checkpoint_dates=None)

Initialise simulator checkpointing.

Parameters

simulator : Simulator The JUNE simulator instance checkpoint_dir : str Base directory for storing/writing checkpoints checkpoint_read_dir : str, optional Directory for reading existing checkpoints from (defaults to checkpoint_dir) auto_checkpoint_interval : float Automatic checkpoint interval in simulation days (only used in automatic mode) checkpoint_dates : list Specific simulation days when checkpoints should be created

Source code in june/checkpointing/simulator_checkpointing.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def __init__(self, simulator, checkpoint_dir: str = "checkpoints", 
             checkpoint_read_dir: str = None,
             auto_checkpoint_interval: float = None, checkpoint_dates: list = None):
    """
    Initialise simulator checkpointing.

    Parameters
    ----------
    simulator : Simulator
        The JUNE simulator instance
    checkpoint_dir : str
        Base directory for storing/writing checkpoints
    checkpoint_read_dir : str, optional
        Directory for reading existing checkpoints from (defaults to checkpoint_dir)
    auto_checkpoint_interval : float
        Automatic checkpoint interval in simulation days (only used in automatic mode)
    checkpoint_dates : list
        Specific simulation days when checkpoints should be created
    """
    self.simulator = simulator
    self.checkpoint_dir = Path(checkpoint_dir)  # Write directory
    self.checkpoint_read_dir = Path(checkpoint_read_dir) if checkpoint_read_dir else self.checkpoint_dir  # Read directory
    self.auto_checkpoint_enabled = True

    # Initialise checkpoint manager and restorer
    self.checkpoint_manager = CheckpointManager(
        simulator, 
        checkpoint_interval_days=auto_checkpoint_interval,
        checkpoint_dates=checkpoint_dates
    )
    self.restorer = CheckpointRestorer(simulator)

    # Create checkpoint directory
    if mpi_rank == 0:
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

    if self.checkpoint_read_dir == self.checkpoint_dir:
        logger.info(f"Initialised with directory: {self.checkpoint_dir}")
    else:
        logger.info(f"Initialised with read dir: {self.checkpoint_read_dir}, write dir: {self.checkpoint_dir}")

create_checkpoint(checkpoint_name=None)

Create a simulation checkpoint.

Parameters:

Name Type Description Default
checkpoint_name Optional[str]

Name for the checkpoint directory. If None, auto-generated. (Default value = None)

None

Returns:

Name Type Description
bool bool

True if checkpoint creation was successful

Source code in june/checkpointing/simulator_checkpointing.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def create_checkpoint(self, checkpoint_name: Optional[str] = None) -> bool:
    """Create a simulation checkpoint.

    Args:
        checkpoint_name (Optional[str], optional): Name for the checkpoint directory. If None, auto-generated. (Default value = None)

    Returns:
        bool: True if checkpoint creation was successful

    """
    if checkpoint_name is None:
        # Auto-generate checkpoint name
        current_time = self.simulator.timer.now
        current_date = self.simulator.timer.date
        checkpoint_name = f"checkpoint_day_{current_time:.1f}_{current_date.strftime('%Y%m%d')}"

    checkpoint_path = self.checkpoint_dir / checkpoint_name

    logger.info(f"Creating checkpoint: {checkpoint_name}")
    success = self.checkpoint_manager.create_checkpoint(checkpoint_path)

    if success:
        logger.info(f"Checkpoint created successfully: {checkpoint_path}")

        # Notify RunManager about checkpoint creation if available
        if hasattr(self.simulator, '_run_manager') and self.simulator._run_manager:
            from june.run_manager import logger as rm_logger
            try:
                self.simulator._run_manager.add_checkpoint(
                    checkpoint_name, 
                    str(checkpoint_path)
                )
                rm_logger.info(f"Registered checkpoint {checkpoint_name}")
            except Exception as e:
                rm_logger.warning(f"Failed to register checkpoint: {e}")
    else:
        logger.error(f"Failed to create checkpoint: {checkpoint_path}")

    return success

disable_auto_checkpoints()

Disable automatic checkpoint creation.

Source code in june/checkpointing/simulator_checkpointing.py
335
336
337
338
def disable_auto_checkpoints(self):
    """Disable automatic checkpoint creation."""
    self.auto_checkpoint_enabled = False
    logger.info("Auto-checkpointing disabled")

enable_auto_checkpoints(interval_days=7.0)

Enable automatic checkpoint creation.

Parameters:

Name Type Description Default
interval_days float

Interval between automatic checkpoints in simulation days (Default value = 7.0)

7.0
Source code in june/checkpointing/simulator_checkpointing.py
324
325
326
327
328
329
330
331
332
333
def enable_auto_checkpoints(self, interval_days: float = 7.0):
    """Enable automatic checkpoint creation.

    Args:
        interval_days (float, optional): Interval between automatic checkpoints in simulation days (Default value = 7.0)

    """
    self.auto_checkpoint_enabled = True
    self.checkpoint_manager.checkpoint_interval = interval_days
    logger.info(f"Auto-checkpointing enabled with {interval_days} day interval")

list_available_checkpoints()

List all available checkpoints.

Returns:

Name Type Description
list list

List of available checkpoint information

Source code in june/checkpointing/simulator_checkpointing.py
314
315
316
317
318
319
320
321
322
def list_available_checkpoints(self) -> list:
    """List all available checkpoints.


    Returns:
        list: List of available checkpoint information

    """
    return self.checkpoint_manager.list_available_checkpoints(self.checkpoint_read_dir)

restore_from_checkpoint(checkpoint_name)

Restore simulation from a checkpoint.

Parameters:

Name Type Description Default
checkpoint_name str

Name of the checkpoint directory to restore from

required

Returns:

Name Type Description
bool bool

True if restoration was successful

Source code in june/checkpointing/simulator_checkpointing.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def restore_from_checkpoint(self, checkpoint_name: str) -> bool:
    """Restore simulation from a checkpoint.

    Args:
        checkpoint_name (str): Name of the checkpoint directory to restore from

    Returns:
        bool: True if restoration was successful

    """
    checkpoint_path = self.checkpoint_read_dir / checkpoint_name

    if not checkpoint_path.exists():
        logger.error(f"Checkpoint not found: {checkpoint_path}")
        return False

    logger.info(f"Restoring from checkpoint: {checkpoint_name} (from {self.checkpoint_read_dir})")
    success = self.restorer.restore_from_checkpoint(checkpoint_path)

    if success:
        logger.info(f"Restoration completed successfully from: {checkpoint_path}")

        # Reset completed checkpoint dates so future dates are available for new checkpoints
        restored_time = self.simulator.timer.now
        self.checkpoint_manager.reset_completed_checkpoint_dates_after_restoration(restored_time)
    else:
        logger.error(f"Failed to restore from checkpoint: {checkpoint_path}")


    # Print validation numbers after restore
    #print_validation_summary(self.simulator, "AFTER_RESTORE")

    return success

restore_from_latest_checkpoint()

Restore from the most recent checkpoint.

Returns:

Name Type Description
bool bool

True if restoration was successful

Source code in june/checkpointing/simulator_checkpointing.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def restore_from_latest_checkpoint(self) -> bool:
    """Restore from the most recent checkpoint.


    Returns:
        bool: True if restoration was successful

    """
    latest_checkpoint = self.checkpoint_manager.get_latest_checkpoint(self.checkpoint_read_dir)

    if latest_checkpoint is None:
        logger.warning(f"No checkpoints found for restoration in: {self.checkpoint_read_dir}")
        return False

    checkpoint_name = Path(latest_checkpoint['path']).name
    return self.restore_from_checkpoint(checkpoint_name)

should_create_checkpoint()

Check if an automatic checkpoint should be created now.

Returns:

Name Type Description
bool bool

True if a checkpoint should be created

Source code in june/checkpointing/simulator_checkpointing.py
209
210
211
212
213
214
215
216
217
218
219
220
221
def should_create_checkpoint(self) -> bool:
    """Check if an automatic checkpoint should be created now.


    Returns:
        bool: True if a checkpoint should be created

    """
    if not self.auto_checkpoint_enabled:
        return False

    result = self.checkpoint_manager.should_checkpoint()
    return result

add_checkpointing_from_config(simulator, config_path, checkpoint_dir='checkpoints')

Add checkpointing capabilities to simulator using configuration file.

Parameters:

Name Type Description Default
simulator Simulator

The JUNE simulator instance

required
config_path str

Path to the YAML configuration file

required
checkpoint_dir str

Directory for storing checkpoints (Default value = "checkpoints")

'checkpoints'

Returns:

Name Type Description
SimulatorCheckpointing SimulatorCheckpointing

The checkpointing interface object

Source code in june/checkpointing/simulator_checkpointing.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def add_checkpointing_from_config(simulator, config_path: str, checkpoint_dir: str = "checkpoints") -> SimulatorCheckpointing:
    """Add checkpointing capabilities to simulator using configuration file.

    Args:
        simulator (Simulator): The JUNE simulator instance
        config_path (str): Path to the YAML configuration file
        checkpoint_dir (str, optional): Directory for storing checkpoints (Default value = "checkpoints")

    Returns:
        SimulatorCheckpointing: The checkpointing interface object

    """
    # Parse checkpoint configuration from config
    checkpoint_dates, interval_days = parse_checkpoint_config_from_yaml(config_path, simulator)

    if checkpoint_dates is not None:
        logger.info(f"Parsed checkpoint config - mode: specific_dates, dates: {checkpoint_dates}")
        # Specific dates mode - don't pass interval
        return add_checkpointing_to_simulator(
            simulator,
            checkpoint_dir=checkpoint_dir,
            checkpoint_dates=checkpoint_dates
        )
    else:
        logger.info(f"Parsed checkpoint config - mode: automatic_interval, interval: {interval_days}")
        # Automatic interval mode - pass interval
        return add_checkpointing_to_simulator(
            simulator,
            checkpoint_dir=checkpoint_dir,
            auto_interval=interval_days,
            checkpoint_dates=None
        )

add_checkpointing_from_config_with_directories(simulator, config_path, checkpoint_read_dir='checkpoints', checkpoint_write_dir='checkpoints')

Add checkpointing capabilities to simulator with separate read/write directories.

This version supports checkpoint chaining by allowing child runs to read from parent checkpoints while writing new checkpoints to their own directory.

Parameters:

Name Type Description Default
simulator Simulator

The JUNE simulator instance

required
config_path str

Path to the YAML configuration file

required
checkpoint_read_dir str

Directory for reading existing checkpoints from (parent's directory for child runs) (Default value = "checkpoints")

'checkpoints'
checkpoint_write_dir str

Directory for writing new checkpoints to (child's own directory) (Default value = "checkpoints")

'checkpoints'

Returns:

Name Type Description
SimulatorCheckpointing SimulatorCheckpointing

The checkpointing interface object

Source code in june/checkpointing/simulator_checkpointing.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def add_checkpointing_from_config_with_directories(simulator, config_path: str, 
                                                  checkpoint_read_dir: str = "checkpoints",
                                                  checkpoint_write_dir: str = "checkpoints") -> SimulatorCheckpointing:
    """Add checkpointing capabilities to simulator with separate read/write directories.

    This version supports checkpoint chaining by allowing child runs to read from
    parent checkpoints while writing new checkpoints to their own directory.

    Args:
        simulator (Simulator): The JUNE simulator instance
        config_path (str): Path to the YAML configuration file
        checkpoint_read_dir (str, optional): Directory for reading existing checkpoints from (parent's directory for child runs) (Default value = "checkpoints")
        checkpoint_write_dir (str, optional): Directory for writing new checkpoints to (child's own directory) (Default value = "checkpoints")

    Returns:
        SimulatorCheckpointing: The checkpointing interface object

    """
    # Parse checkpoint configuration from config
    checkpoint_dates, interval_days = parse_checkpoint_config_from_yaml(config_path, simulator)

    if checkpoint_dates is not None:
        logger.info(f"Parsed checkpoint config - mode: specific_dates, dates: {checkpoint_dates}")
        # Specific dates mode - don't pass interval
        return add_checkpointing_to_simulator_with_directories(
            simulator,
            checkpoint_read_dir=checkpoint_read_dir,
            checkpoint_write_dir=checkpoint_write_dir,
            checkpoint_dates=checkpoint_dates
        )
    else:
        logger.info(f"Parsed checkpoint config - mode: automatic_interval, interval: {interval_days}")
        # Automatic interval mode - pass interval
        return add_checkpointing_to_simulator_with_directories(
            simulator,
            checkpoint_read_dir=checkpoint_read_dir,
            checkpoint_write_dir=checkpoint_write_dir,
            auto_interval=interval_days,
            checkpoint_dates=None
        )

add_checkpointing_to_simulator(simulator, checkpoint_dir='checkpoints', auto_interval=None, checkpoint_dates=None)

Add checkpointing capabilities to an existing simulator.

Parameters:

Name Type Description Default
simulator Simulator

The JUNE simulator instance

required
checkpoint_dir str

Directory for storing checkpoints (Default value = "checkpoints")

'checkpoints'
auto_interval float

Automatic checkpoint interval in simulation days (Default value = None)

None
checkpoint_dates list

Specific simulation days when checkpoints should be created (Default value = None)

None

Returns:

Name Type Description
SimulatorCheckpointing SimulatorCheckpointing

The checkpointing interface object

Source code in june/checkpointing/simulator_checkpointing.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def add_checkpointing_to_simulator(simulator, checkpoint_dir: str = "checkpoints", 
                                  auto_interval: float = None, checkpoint_dates: list = None) -> SimulatorCheckpointing:
    """Add checkpointing capabilities to an existing simulator.

    Args:
        simulator (Simulator): The JUNE simulator instance
        checkpoint_dir (str, optional): Directory for storing checkpoints (Default value = "checkpoints")
        auto_interval (float, optional): Automatic checkpoint interval in simulation days (Default value = None)
        checkpoint_dates (list, optional): Specific simulation days when checkpoints should be created (Default value = None)

    Returns:
        SimulatorCheckpointing: The checkpointing interface object

    """
    checkpointing = SimulatorCheckpointing(
        simulator, 
        checkpoint_dir=checkpoint_dir,
        auto_checkpoint_interval=auto_interval,
        checkpoint_dates=checkpoint_dates
    )

    # Add checkpointing as an attribute to the simulator
    simulator.checkpointing = checkpointing

    return checkpointing

add_checkpointing_to_simulator_with_directories(simulator, checkpoint_read_dir='checkpoints', checkpoint_write_dir='checkpoints', auto_interval=None, checkpoint_dates=None)

Add checkpointing capabilities with separate read/write directories for checkpoint chaining.

Parameters:

Name Type Description Default
simulator Simulator

The JUNE simulator instance

required
checkpoint_read_dir str

Directory for reading existing checkpoints from (Default value = "checkpoints")

'checkpoints'
checkpoint_write_dir str

Directory for writing new checkpoints to (Default value = "checkpoints")

'checkpoints'
auto_interval float

Automatic checkpoint interval in simulation days (Default value = None)

None
checkpoint_dates list

Specific simulation days when checkpoints should be created (Default value = None)

None

Returns:

Name Type Description
SimulatorCheckpointing SimulatorCheckpointing

The checkpointing interface object

Source code in june/checkpointing/simulator_checkpointing.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def add_checkpointing_to_simulator_with_directories(simulator, 
                                                   checkpoint_read_dir: str = "checkpoints",
                                                   checkpoint_write_dir: str = "checkpoints", 
                                                   auto_interval: float = None, 
                                                   checkpoint_dates: list = None) -> SimulatorCheckpointing:
    """Add checkpointing capabilities with separate read/write directories for checkpoint chaining.

    Args:
        simulator (Simulator): The JUNE simulator instance
        checkpoint_read_dir (str, optional): Directory for reading existing checkpoints from (Default value = "checkpoints")
        checkpoint_write_dir (str, optional): Directory for writing new checkpoints to (Default value = "checkpoints")
        auto_interval (float, optional): Automatic checkpoint interval in simulation days (Default value = None)
        checkpoint_dates (list, optional): Specific simulation days when checkpoints should be created (Default value = None)

    Returns:
        SimulatorCheckpointing: The checkpointing interface object

    """
    checkpointing = SimulatorCheckpointing(
        simulator, 
        checkpoint_dir=checkpoint_write_dir,
        checkpoint_read_dir=checkpoint_read_dir,
        auto_checkpoint_interval=auto_interval,
        checkpoint_dates=checkpoint_dates
    )

    # Add checkpointing as an attribute to the simulator
    simulator.checkpointing = checkpointing

    return checkpointing

integrate_checkpointing_in_simulation_loop(simulator_checkpointing, force_checkpoint=False)

Check and potentially create a checkpoint during simulation loop.

This function should be called within the main simulation time step loop.

Parameters:

Name Type Description Default
simulator_checkpointing SimulatorCheckpointing

The checkpointing interface

required
force_checkpoint bool

Force creation of a checkpoint regardless of interval (Default value = False)

False

Returns:

Name Type Description
bool

True if a checkpoint was created

Source code in june/checkpointing/simulator_checkpointing.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
def integrate_checkpointing_in_simulation_loop(simulator_checkpointing, force_checkpoint: bool = False):
    """Check and potentially create a checkpoint during simulation loop.

    This function should be called within the main simulation time step loop.

    Args:
        simulator_checkpointing (SimulatorCheckpointing): The checkpointing interface
        force_checkpoint (bool, optional): Force creation of a checkpoint regardless of interval (Default value = False)

    Returns:
        bool: True if a checkpoint was created

    """
    checkpoint_created = False

    if force_checkpoint or simulator_checkpointing.should_create_checkpoint():
        checkpoint_created = simulator_checkpointing.create_checkpoint()

        # AFTER checkpoint is created, write comprehensive infection debugging information
        if checkpoint_created:
            logger.info("Checkpoint created successfully, now writing infection debug information...")

            # Print validation numbers at checkpoint
            #print_validation_summary(simulator_checkpointing.simulator, "AT_CHECKPOINT")

    return checkpoint_created

parse_checkpoint_config_from_yaml(config_path, simulator)

Parse checkpoint configuration from YAML file.

Parameters:

Name Type Description Default
config_path str

Path to the YAML configuration file

required
simulator Simulator

The JUNE simulator instance to get initial date

required

Returns:

Name Type Description
tuple tuple

(checkpoint_dates: List[float] or None, interval_days: float)

tuple

checkpoint_dates is None for automatic interval mode, empty list for no checkpoints

Source code in june/checkpointing/simulator_checkpointing.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def parse_checkpoint_config_from_yaml(config_path: str, simulator) -> tuple:
    """Parse checkpoint configuration from YAML file.

    Args:
        config_path (str): Path to the YAML configuration file
        simulator (Simulator): The JUNE simulator instance to get initial date

    Returns:
        tuple: (checkpoint_dates: List[float] or None, interval_days: float)
        checkpoint_dates is None for automatic interval mode, empty list for no checkpoints

    """
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)

        checkpoint_config = config.get('checkpointing', {})

        # Default values
        mode = checkpoint_config.get('mode', 'automatic_interval')
        interval_days = checkpoint_config.get('automatic_interval_days', 7.0)

        if mode == "automatic_interval":
            logger.info(f"Checkpoint mode: automatic_interval with {interval_days} day interval")
            return None, interval_days
        elif mode == "specific_dates":
            specific_dates = checkpoint_config.get('specific_dates', [])
            checkpoint_days = []

            logger.info(f"Processing specific_dates: {specific_dates}")

            for i, item in enumerate(specific_dates):
                logger.info(f"Processing item {i}: {item} (type: {type(item)})")

                if isinstance(item, (int, float)):
                    checkpoint_days.append(float(item))
                    logger.info(f"Added numeric checkpoint day: {float(item)}")
                elif isinstance(item, str):
                    try:
                        # Try to parse as float first
                        numeric_day = float(item)
                        checkpoint_days.append(numeric_day)
                        logger.info(f"Added string-numeric checkpoint day: {numeric_day}")
                    except ValueError:
                        # It's a date string, convert to simulation day
                        logger.info(f"Attempting to convert date string: '{item}'")
                        try:
                            simulation_day = _convert_date_to_simulation_day(item, simulator)
                            checkpoint_days.append(simulation_day)
                            logger.info(f"Successfully converted date '{item}' to simulation day {simulation_day}")
                        except ValueError as e:
                            logger.error(f"Failed to parse checkpoint date '{item}': {e}")
                            logger.error(f"Checkpoint mode will be disabled due to invalid date")
                            return [], None  # Return empty list to disable checkpointing
                elif isinstance(item, date):
                    # YAML automatically converts dates to datetime.date objects
                    logger.info(f"Processing datetime.date object: {item}")
                    try:
                        simulation_day = _convert_date_object_to_simulation_day(item, simulator)
                        checkpoint_days.append(simulation_day)
                        logger.info(f"Successfully converted date object {item} to simulation day {simulation_day}")
                    except ValueError as e:
                        logger.error(f"Failed to parse checkpoint date object '{item}': {e}")
                        logger.error(f"Checkpoint mode will be disabled due to invalid date")
                        return [], None  # Return empty list to disable checkpointing
                else:
                    logger.warning(f"Unsupported checkpoint date type: {type(item)} for item {item}")

            logger.info(f"Checkpoint mode: specific_dates with dates {sorted(checkpoint_days)}")
            return sorted(checkpoint_days), None  # Don't return interval for specific_dates mode
        else:
            logger.warning(f"Unknown checkpoint mode '{mode}', defaulting to automatic_interval")
            return None, interval_days

    except Exception as e:
        logger.warning(f"Failed to parse checkpoint config from {config_path}: {e}")
        return None, 7.0

print_validation_summary(simulator, stage='FINAL')

Print key numerical metrics for validation comparison.

This function prints standardised metrics that can be easily compared between continuous runs and checkpoint/restore runs.

Parameters:

Name Type Description Default
simulator Simulator

The JUNE simulator instance

required
stage str

Stage identifier (e.g., "AT_CHECKPOINT", "AFTER_RESTORE", "FINAL") (Default value = "FINAL")

'FINAL'
Source code in june/checkpointing/simulator_checkpointing.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def print_validation_summary(simulator, stage: str = "FINAL"):
    """Print key numerical metrics for validation comparison.

    This function prints standardised metrics that can be easily compared
    between continuous runs and checkpoint/restore runs.

    Args:
        simulator (Simulator): The JUNE simulator instance
        stage (str, optional): Stage identifier (e.g., "AT_CHECKPOINT", "AFTER_RESTORE", "FINAL") (Default value = "FINAL")

    """
    from june.mpi_wrapper import mpi_rank
    import hashlib

    # Collect core metrics
    people = simulator.world.people.members

    # Population metrics
    total_people = len(people)
    infected_count = sum(1 for p in people if p.infected)
    hospitalised_count = sum(1 for p in people if p.hospitalised)
    intensive_care_count = sum(1 for p in people if p.intensive_care)
    dead_count = sum(1 for p in people if p.dead)

    # Timer metrics
    current_time = simulator.timer.now
    current_date = simulator.timer.date.isoformat()

    # Test and trace metrics
    tt_enabled = getattr(simulator, 'test_and_trace_enabled', False)
    with_test_trace = sum(1 for p in people if p.test_and_trace is not None) if tt_enabled else 0

    # Contact manager metrics
    cm_leisure_connections = 0
    cm_pending_tests = 0
    if tt_enabled and hasattr(simulator, 'contact_manager'):
        cm = simulator.contact_manager
        cm_leisure_connections = sum(len(companions) for companions in cm.leisure_companions.values())
        cm_pending_tests = len(cm.tests_ids_pending)

    # Interaction metrics
    interaction_timesteps = 0
    initial_infected_count = 0
    total_secondary_infections = 0
    if hasattr(simulator, 'interaction'):
        interaction = simulator.interaction
        interaction_timesteps = interaction.timestep_count
        initial_infected_count = len(interaction._initial_infected_ids)
        total_secondary_infections = sum(interaction.initial_infected_transmission_counts.values())

    # Random state hash (for consistency check)
    import random
    import numpy as np
    python_state = str(random.getstate())
    numpy_state = str(np.random.get_state())
    combined_random = f"python:{python_state}|numpy:{numpy_state}"
    random_hash = hashlib.sha256(combined_random.encode()).hexdigest()[:16]

    # Rat dynamics metrics
    rat_enabled = getattr(simulator, 'ratty_dynamics_enabled', False)
    rat_total = 0
    rat_infected = 0
    rat_recovered = 0
    rat_susceptible = 0
    rat_high_immunity = 0
    rat_immunity_hash = ""
    rat_positions_hash = ""
    rat_states_hash = ""

    if rat_enabled and hasattr(simulator, 'rat_manager') and simulator.rat_manager is not None:
        rm = simulator.rat_manager
        rat_total = rm.num_rats

        if rm.states is not None and len(rm.states) > 0:
            rat_infected = int(np.sum(rm.states == 1))
            rat_recovered = int(np.sum(rm.states == 2))
            rat_susceptible = int(np.sum(rm.states == 0))

        if rm.immunity is not None and len(rm.immunity) > 0:
            rat_high_immunity = int(np.sum(rm.immunity > 0.8))
            # Create hash of immunity values for exact comparison
            immunity_str = ",".join([f"{x:.10f}" for x in rm.immunity[:min(100, len(rm.immunity))]])
            rat_immunity_hash = hashlib.sha256(immunity_str.encode()).hexdigest()[:16]

        if rm.positions is not None and len(rm.positions) > 0:
            # Create hash of first 50 rat positions for exact comparison
            positions_str = ",".join([f"{x:.10f},{y:.10f}" for x, y in rm.positions[:min(50, len(rm.positions))]])
            rat_positions_hash = hashlib.sha256(positions_str.encode()).hexdigest()[:16]

        if rm.states is not None and len(rm.states) > 0:
            # Create hash of first 100 rat states for exact comparison
            states_str = ",".join([str(x) for x in rm.states[:min(100, len(rm.states))]])
            rat_states_hash = hashlib.sha256(states_str.encode()).hexdigest()[:16]

    # Simple infection state hash
    infection_data = []
    for person in sorted(people, key=lambda p: p.id):
        person_info = f"{person.id}:{person.infected}:{person.hospitalised}:{person.dead}"
        if person.infection:
            person_info += f":{person.infection.start_time}:{person.infection.tag}"
        if person.test_and_trace:
            person_info += f":{person.test_and_trace.notification_time}:{person.test_and_trace.test_result}"
        infection_data.append(person_info)

    infection_hash = hashlib.sha256("|".join(infection_data).encode()).hexdigest()[:16]

    # Print validation summary
    print(f"\n{'='*80}")
    print(f"VALIDATION_SUMMARY_{stage}_RANK_{mpi_rank}")
    print(f"{'='*80}")
    print(f"TIMESTAMP: {current_date}")
    print(f"SIMULATION_TIME: {current_time:.10f}")
    print(f"TOTAL_DAYS: {simulator.timer.total_days}")
    print(f"")
    print(f"POPULATION_METRICS:")
    print(f"  TOTAL_PEOPLE: {total_people}")
    print(f"  INFECTED: {infected_count}")
    print(f"  HOSPITALISED: {hospitalised_count}")
    print(f"  INTENSIVE_CARE: {intensive_care_count}")
    print(f"  DEAD: {dead_count}")
    print(f"")
    print(f"TEST_AND_TRACE_METRICS:")
    print(f"  TT_ENABLED: {tt_enabled}")
    print(f"  WITH_TEST_TRACE: {with_test_trace}")
    print(f"  LEISURE_CONNECTIONS: {cm_leisure_connections}")
    print(f"  PENDING_TESTS: {cm_pending_tests}")
    print(f"")
    print(f"INTERACTION_METRICS:")
    print(f"  TIMESTEPS: {interaction_timesteps}")
    print(f"  INITIAL_INFECTED: {initial_infected_count}")
    print(f"  SECONDARY_INFECTIONS: {total_secondary_infections}")
    print(f"")
    print(f"RAT_DYNAMICS_METRICS:")
    print(f"  RAT_ENABLED: {rat_enabled}")
    print(f"  RAT_TOTAL: {rat_total}")
    print(f"  RAT_INFECTED: {rat_infected}")
    print(f"  RAT_RECOVERED: {rat_recovered}")
    print(f"  RAT_SUSCEPTIBLE: {rat_susceptible}")
    print(f"  RAT_HIGH_IMMUNITY: {rat_high_immunity}")
    print(f"")
    print(f"STATE_HASHES:")
    print(f"  INFECTION_HASH: {infection_hash}")
    print(f"  RANDOM_HASH: {random_hash}")
    print(f"  RAT_IMMUNITY_HASH: {rat_immunity_hash}")
    print(f"  RAT_POSITIONS_HASH: {rat_positions_hash}")
    print(f"  RAT_STATES_HASH: {rat_states_hash}")
    print(f"{'='*80}")
    print(f"END_VALIDATION_SUMMARY_{stage}_RANK_{mpi_rank}")
    print(f"{'='*80}\n")