Skip to content

RunManager

Run Manager for organising simulation runs, results, and checkpoints.

This module provides a RunManager class that handles: - Unique run identification using UUIDs - Directory structure creation and management - Metadata tracking and persistence - Status tracking throughout simulation lifecycle - Checkpoint association with runs - Run indexing and lookup capabilities

RunManager

Manages simulation runs with unique identification, metadata, and organization.

This class provides a comprehensive system for organising simulation runs, tracking their status, managing outputs, and associating checkpoints.

Source code in june/run_manager.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
class RunManager:
    """Manages simulation runs with unique identification, metadata, and organization.

    This class provides a comprehensive system for organising simulation runs,
    tracking their status, managing outputs, and associating checkpoints.

    """

    def __init__(self, base_runs_dir: Union[str, Path] = "runs", auto_create: bool = True):
        """
        Initialise the RunManager.

        Args:
            base_runs_dir: Base directory where all runs will be stored
            auto_create: Whether to automatically create the base directory
        """
        self.base_runs_dir = Path(base_runs_dir)
        self.run_id = None
        self.run_dir = None
        self.metadata = {}
        self.index_file = self.base_runs_dir / ".run_index.json"

        if auto_create:
            self.base_runs_dir.mkdir(exist_ok=True)

    def create_run(self, 
                   description: Optional[str] = None,
                   tags: Optional[List[str]] = None,
                   run_id: Optional[str] = None,
                   **kwargs) -> str:
        """Create a new run with unique identifier and directory structure.

        Args:
            description (Optional[str], optional): Human-readable description of the run (Default value = None)
            tags (Optional[List[str]], optional): List of tags for categorising the run (Default value = None)
            run_id (Optional[str], optional): Optional specific run ID (generates UUID if not provided) (Default value = None)
            **kwargs: Additional metadata to store

        """
        # Generate or use provided run ID
        if run_id is None:
            self.run_id = str(uuid.uuid4())
        else:
            self.run_id = run_id

        # Create run directory
        self.run_dir = self.base_runs_dir / self.run_id
        self.run_dir.mkdir(exist_ok=True)

        # Create subdirectories
        (self.run_dir / "results").mkdir(exist_ok=True)
        (self.run_dir / "checkpoints").mkdir(exist_ok=True)
        (self.run_dir / "logs").mkdir(exist_ok=True)

        # Initialise metadata
        self.metadata = {
            "run_id": self.run_id,
            "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            "description": description or f"Simulation run {self.run_id[:8]}",
            "status": RunStatus.INITIALising,
            "tags": tags or [],
            "hostname": socket.gethostname(),
            "user": os.getenv("USER", "unknown"),
            "git_commit": self._get_git_commit(),
            "parent_run_id": None,
            "checkpoints": [],
            "started_at": None,
            "completed_at": None,
            **kwargs
        }

        # Save initial metadata
        self._save_metadata()

        # Update run index
        self._update_run_index()

        # Update current symlink
        self._update_current_symlink()

        logger.info(f"Created new run: {self.run_id}")
        logger.info(f"Run directory: {self.run_dir}")

        return self.run_id

    def create_child_run(self,
                        parent_run_id: str,
                        description: Optional[str] = None,
                        tags: Optional[List[str]] = None,
                        copy_checkpoints: bool = True,
                        **kwargs) -> str:
        """Create a new run that is a child of an existing run.

        This is useful for resumed runs where you want to maintain the relationship
        to the original run while creating a new run directory.

        Args:
            parent_run_id (str): The run ID of the parent run
            description (Optional[str], optional): Human-readable description of the child run (Default value = None)
            tags (Optional[List[str]], optional): List of tags for categorising the run (Default value = None)
            copy_checkpoints (bool, optional): Whether to copy checkpoints from parent run (Default value = True)
            **kwargs: Additional metadata to store

        """
        # Verify parent run exists
        parent_run_dir = self.base_runs_dir / parent_run_id
        if not parent_run_dir.exists():
            raise ValueError(f"Parent run not found: {parent_run_id}")

        # Load parent metadata
        parent_metadata_file = parent_run_dir / "metadata.json"
        if not parent_metadata_file.exists():
            raise ValueError(f"Parent run metadata not found: {parent_metadata_file}")

        with open(parent_metadata_file, 'r') as f:
            parent_metadata = json.load(f)

        # Generate child run ID
        child_run_id = str(uuid.uuid4())

        # Create child run with parent relationship
        description = description or f"Resumed from {parent_run_id[:8]}"
        if tags is None:
            tags = parent_metadata.get("tags", [])
            # Add a "resumed" tag if not already present
            if "resumed" not in tags:
                tags = tags + ["resumed"]

        # Create the child run directory structure
        self.run_id = child_run_id
        self.run_dir = self.base_runs_dir / child_run_id
        self.run_dir.mkdir(exist_ok=True)

        # Create subdirectories
        (self.run_dir / "results").mkdir(exist_ok=True)
        (self.run_dir / "checkpoints").mkdir(exist_ok=True)
        (self.run_dir / "logs").mkdir(exist_ok=True)

        # Reference parent checkpoints instead of copying
        parent_checkpoints = parent_metadata.get("checkpoints", []) if copy_checkpoints else []

        # Initialise child metadata with parent relationship
        self.metadata = {
            "run_id": child_run_id,
            "created_at": datetime.utcnow().isoformat() + "Z",
            "description": description,
            "status": RunStatus.INITIALising,
            "tags": tags,
            "hostname": socket.gethostname(),
            "user": os.getenv("USER", "unknown"),
            "git_commit": self._get_git_commit(),
            "parent_run_id": parent_run_id,
            "checkpoints": parent_checkpoints.copy() if copy_checkpoints else [],
            "started_at": None,
            "completed_at": None,
            "resumed_from": parent_run_id,
            **kwargs
        }

        # Keep checkpoint paths pointing to parent directory (no modification needed)
        if copy_checkpoints:
            logger.info(f"Child run {child_run_id[:8]} will reference checkpoints from parent run {parent_run_id[:8]}")

        # Save initial metadata
        self._save_metadata()

        # Update run index
        self._update_run_index()

        # Update current symlink
        self._update_current_symlink()

        logger.info(f"Created child run: {child_run_id} (parent: {parent_run_id[:8]})")
        logger.info(f"Child run directory: {self.run_dir}")

        return child_run_id

    def load_run(self, run_id: str) -> bool:
        """Load an existing run by ID.

        Args:
            run_id (str): The run ID to load

        """
        run_dir = self.base_runs_dir / run_id
        if not run_dir.exists():
            logger.error(f"Run directory not found: {run_dir}")
            return False

        metadata_file = run_dir / "metadata.json"
        if not metadata_file.exists():
            logger.error(f"Metadata file not found: {metadata_file}")
            return False

        try:
            with open(metadata_file, 'r') as f:
                self.metadata = json.load(f)

            self.run_id = run_id
            self.run_dir = run_dir

            logger.info(f"Loaded existing run: {self.run_id}")
            return True

        except Exception as e:
            logger.error(f"Failed to load run metadata: {e}")
            return False

    def update_status(self, status: str, **kwargs):
        """Update the run status and optional additional metadata.

        Args:
            status (str): New status from RunStatus enum
            **kwargs: Additional metadata to update

        """
        if not self.run_id:
            logger.warning("No active run to update status")
            return

        self.metadata["status"] = status

        # Set timestamps for specific status changes
        if status == RunStatus.RUNNING and "started_at" not in self.metadata:
            self.metadata["started_at"] = datetime.utcnow().isoformat() + "Z"
        elif status in [RunStatus.COMPLETED, RunStatus.FAILED]:
            self.metadata["completed_at"] = datetime.utcnow().isoformat() + "Z"

        # Update additional metadata
        self.metadata.update(kwargs)

        # Save updated metadata
        self._save_metadata()
        self._update_run_index()

        logger.info(f"Updated run {self.run_id[:8]} status to: {status}")

    def add_checkpoint(self, checkpoint_name: str, checkpoint_path: Optional[str] = None):
        """Associate a checkpoint with this run.

        Args:
            checkpoint_name (str): Name/identifier of the checkpoint
            checkpoint_path (Optional[str], optional): Optional path to the checkpoint files (Default value = None)

        """
        if not self.run_id:
            logger.warning("No active run to add checkpoint")
            return

        checkpoint_info = {
            "name": checkpoint_name,
            "created_at": datetime.utcnow().isoformat() + "Z",
            "path": checkpoint_path
        }

        if "checkpoints" not in self.metadata:
            self.metadata["checkpoints"] = []

        self.metadata["checkpoints"].append(checkpoint_info)
        self._save_metadata()
        self._update_run_index()

        logger.info(f"Added checkpoint {checkpoint_name} to run {self.run_id[:8]}")

    def get_results_dir(self) -> Optional[Path]:
        """Get the results directory for the current run.

        """
        if self.run_dir:
            return self.run_dir / "results"
        return None

    def get_checkpoints_dir(self) -> Optional[Path]:
        """Get the checkpoints directory for the current run.

        For child runs (resumed runs), this returns the parent's checkpoint directory
        since checkpoints should remain in their original location.

        """
        if not self.run_dir:
            return None

        # Check if this is a child run (has parent_run_id)
        parent_run_id = self.metadata.get("parent_run_id")
        if parent_run_id:
            # For child runs, use parent's checkpoint directory
            parent_run_dir = self.base_runs_dir / parent_run_id
            if parent_run_dir.exists():
                logger.debug(f"Child run {self.run_id[:8]} using parent's checkpoint directory: {parent_run_dir / 'checkpoints'}")
                return parent_run_dir / "checkpoints"
            else:
                logger.warning(f"Parent run directory not found: {parent_run_dir}, falling back to child directory")

        # For regular runs or fallback, use own checkpoint directory
        return self.run_dir / "checkpoints"

    def get_logs_dir(self) -> Optional[Path]:
        """Get the logs directory for the current run.

        """
        if self.run_dir:
            return self.run_dir / "logs"
        return None

    def save_config(self, config_data: Any, filename: str = "config.yaml"):
        """Save configuration data to the run directory.

        Args:
            config_data (Any): Configuration data to save
            filename (str, optional): Name of the config file (Default value = "config.yaml")

        """
        if not self.run_dir:
            logger.warning("No active run to save config")
            return

        config_file = self.run_dir / filename

        if isinstance(config_data, (dict, list)):
            with open(config_file, 'w') as f:
                json.dump(config_data, f, indent=2)
        elif isinstance(config_data, str):
            with open(config_file, 'w') as f:
                f.write(config_data)
        else:
            # For other types, try to write as string
            with open(config_file, 'w') as f:
                f.write(str(config_data))

        logger.info(f"Saved config to {config_file}")

    def save_run_params(self, params: Dict[str, Any]):
        """Save run parameters to run_params.json.

        Args:

        Args:
            params (Dict[str, Any]): 

        """
        if not self.run_dir:
            logger.warning("No active run to save parameters")
            return

        params_file = self.run_dir / "run_params.json"
        with open(params_file, 'w') as f:
            json.dump(params, f, indent=2)

        logger.info(f"Saved run parameters to {params_file}")

    def list_runs(self, limit: Optional[int] = None, status_filter: Optional[str] = None) -> List[Dict]:
        """List recent runs with optional filtering.

        Args:
            limit (Optional[int], optional): Maximum number of runs to return (Default value = None)
            status_filter (Optional[str], optional): Filter by specific status (Default value = None)

        """
        try:
            if not self.index_file.exists():
                return []

            with open(self.index_file, 'r') as f:
                index_data = json.load(f)

            runs = index_data.get("runs", [])

            # Filter by status if specified
            if status_filter:
                runs = [run for run in runs if run.get("status") == status_filter]

            # Sort by creation time (newest first)
            runs.sort(key=lambda x: x.get("created_at", ""), reverse=True)

            # Apply limit if specified
            if limit:
                runs = runs[:limit]

            return runs

        except Exception as e:
            logger.error(f"Failed to list runs: {e}")
            return []

    def get_latest_run(self) -> Optional[str]:
        """Get the ID of the most recent run.

        """
        try:
            if not self.index_file.exists():
                return None

            with open(self.index_file, 'r') as f:
                index_data = json.load(f)

            return index_data.get("latest")

        except Exception as e:
            logger.error(f"Failed to get latest run: {e}")
            return None

    def cleanup_old_runs(self, keep_count: int = 10, older_than_days: Optional[int] = None):
        """Clean up old runs, keeping only the most recent ones.

        Args:
            keep_count (int, optional): Number of recent runs to keep (Default value = 10)
            older_than_days (Optional[int], optional): Only delete runs older than this many days (Default value = None)

        """
        runs = self.list_runs()

        if older_than_days:
            cutoff_date = datetime.utcnow().timestamp() - (older_than_days * 24 * 3600)
            runs_to_delete = []

            for run in runs[keep_count:]:
                try:
                    created_at = datetime.fromisoformat(run["created_at"].replace("Z", "+00:00"))
                    if created_at.timestamp() < cutoff_date:
                        runs_to_delete.append(run)
                except Exception:
                    continue
        else:
            runs_to_delete = runs[keep_count:]

        for run in runs_to_delete:
            run_dir = self.base_runs_dir / run["run_id"]
            if run_dir.exists():
                shutil.rmtree(run_dir)
                logger.info(f"Deleted old run: {run['run_id'][:8]}")

        # Update index
        self._update_run_index()

    def _save_metadata(self):
        """Save metadata to the run directory."""
        if not self.run_dir:
            return

        metadata_file = self.run_dir / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(self.metadata, f, indent=2)

    def _update_run_index(self):
        """Update the central run index."""
        try:
            # Load existing index
            if self.index_file.exists():
                with open(self.index_file, 'r') as f:
                    index_data = json.load(f)
            else:
                index_data = {"runs": [], "latest": None}

            # Update or add current run
            current_run_summary = {
                "run_id": self.metadata["run_id"],
                "description": self.metadata.get("description", ""),
                "created_at": self.metadata["created_at"],
                "status": self.metadata["status"],
                "tags": self.metadata.get("tags", [])
            }

            # Remove existing entry if present
            index_data["runs"] = [
                run for run in index_data["runs"] 
                if run["run_id"] != self.metadata["run_id"]
            ]

            # Add updated entry
            index_data["runs"].append(current_run_summary)

            # Update latest
            index_data["latest"] = self.metadata["run_id"]

            # Save index
            with open(self.index_file, 'w') as f:
                json.dump(index_data, f, indent=2)

        except Exception as e:
            logger.error(f"Failed to update run index: {e}")

    def _update_current_symlink(self):
        """Update the 'current' symlink to point to the latest run."""
        try:
            current_link = self.base_runs_dir / "current"

            # Remove existing symlink
            if current_link.is_symlink() or current_link.exists():
                current_link.unlink()

            # Create new symlink
            if self.run_dir:
                current_link.symlink_to(self.run_dir.name)

        except Exception as e:
            logger.warning(f"Failed to update current symlink: {e}")

    def _get_git_commit(self) -> Optional[str]:
        """Get the current git commit hash.

        """
        try:
            result = subprocess.run(
                ["git", "rev-parse", "HEAD"],
                capture_output=True,
                text=True,
                cwd=Path(__file__).parent.parent
            )
            if result.returncode == 0:
                return result.stdout.strip()[:12]  # Short hash
        except Exception:
            pass
        return None

__init__(base_runs_dir='runs', auto_create=True)

Initialise the RunManager.

Parameters:

Name Type Description Default
base_runs_dir Union[str, Path]

Base directory where all runs will be stored

'runs'
auto_create bool

Whether to automatically create the base directory

True
Source code in june/run_manager.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, base_runs_dir: Union[str, Path] = "runs", auto_create: bool = True):
    """
    Initialise the RunManager.

    Args:
        base_runs_dir: Base directory where all runs will be stored
        auto_create: Whether to automatically create the base directory
    """
    self.base_runs_dir = Path(base_runs_dir)
    self.run_id = None
    self.run_dir = None
    self.metadata = {}
    self.index_file = self.base_runs_dir / ".run_index.json"

    if auto_create:
        self.base_runs_dir.mkdir(exist_ok=True)

add_checkpoint(checkpoint_name, checkpoint_path=None)

Associate a checkpoint with this run.

Parameters:

Name Type Description Default
checkpoint_name str

Name/identifier of the checkpoint

required
checkpoint_path Optional[str]

Optional path to the checkpoint files (Default value = None)

None
Source code in june/run_manager.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def add_checkpoint(self, checkpoint_name: str, checkpoint_path: Optional[str] = None):
    """Associate a checkpoint with this run.

    Args:
        checkpoint_name (str): Name/identifier of the checkpoint
        checkpoint_path (Optional[str], optional): Optional path to the checkpoint files (Default value = None)

    """
    if not self.run_id:
        logger.warning("No active run to add checkpoint")
        return

    checkpoint_info = {
        "name": checkpoint_name,
        "created_at": datetime.utcnow().isoformat() + "Z",
        "path": checkpoint_path
    }

    if "checkpoints" not in self.metadata:
        self.metadata["checkpoints"] = []

    self.metadata["checkpoints"].append(checkpoint_info)
    self._save_metadata()
    self._update_run_index()

    logger.info(f"Added checkpoint {checkpoint_name} to run {self.run_id[:8]}")

cleanup_old_runs(keep_count=10, older_than_days=None)

Clean up old runs, keeping only the most recent ones.

Parameters:

Name Type Description Default
keep_count int

Number of recent runs to keep (Default value = 10)

10
older_than_days Optional[int]

Only delete runs older than this many days (Default value = None)

None
Source code in june/run_manager.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def cleanup_old_runs(self, keep_count: int = 10, older_than_days: Optional[int] = None):
    """Clean up old runs, keeping only the most recent ones.

    Args:
        keep_count (int, optional): Number of recent runs to keep (Default value = 10)
        older_than_days (Optional[int], optional): Only delete runs older than this many days (Default value = None)

    """
    runs = self.list_runs()

    if older_than_days:
        cutoff_date = datetime.utcnow().timestamp() - (older_than_days * 24 * 3600)
        runs_to_delete = []

        for run in runs[keep_count:]:
            try:
                created_at = datetime.fromisoformat(run["created_at"].replace("Z", "+00:00"))
                if created_at.timestamp() < cutoff_date:
                    runs_to_delete.append(run)
            except Exception:
                continue
    else:
        runs_to_delete = runs[keep_count:]

    for run in runs_to_delete:
        run_dir = self.base_runs_dir / run["run_id"]
        if run_dir.exists():
            shutil.rmtree(run_dir)
            logger.info(f"Deleted old run: {run['run_id'][:8]}")

    # Update index
    self._update_run_index()

create_child_run(parent_run_id, description=None, tags=None, copy_checkpoints=True, **kwargs)

Create a new run that is a child of an existing run.

This is useful for resumed runs where you want to maintain the relationship to the original run while creating a new run directory.

Parameters:

Name Type Description Default
parent_run_id str

The run ID of the parent run

required
description Optional[str]

Human-readable description of the child run (Default value = None)

None
tags Optional[List[str]]

List of tags for categorising the run (Default value = None)

None
copy_checkpoints bool

Whether to copy checkpoints from parent run (Default value = True)

True
**kwargs

Additional metadata to store

{}
Source code in june/run_manager.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def create_child_run(self,
                    parent_run_id: str,
                    description: Optional[str] = None,
                    tags: Optional[List[str]] = None,
                    copy_checkpoints: bool = True,
                    **kwargs) -> str:
    """Create a new run that is a child of an existing run.

    This is useful for resumed runs where you want to maintain the relationship
    to the original run while creating a new run directory.

    Args:
        parent_run_id (str): The run ID of the parent run
        description (Optional[str], optional): Human-readable description of the child run (Default value = None)
        tags (Optional[List[str]], optional): List of tags for categorising the run (Default value = None)
        copy_checkpoints (bool, optional): Whether to copy checkpoints from parent run (Default value = True)
        **kwargs: Additional metadata to store

    """
    # Verify parent run exists
    parent_run_dir = self.base_runs_dir / parent_run_id
    if not parent_run_dir.exists():
        raise ValueError(f"Parent run not found: {parent_run_id}")

    # Load parent metadata
    parent_metadata_file = parent_run_dir / "metadata.json"
    if not parent_metadata_file.exists():
        raise ValueError(f"Parent run metadata not found: {parent_metadata_file}")

    with open(parent_metadata_file, 'r') as f:
        parent_metadata = json.load(f)

    # Generate child run ID
    child_run_id = str(uuid.uuid4())

    # Create child run with parent relationship
    description = description or f"Resumed from {parent_run_id[:8]}"
    if tags is None:
        tags = parent_metadata.get("tags", [])
        # Add a "resumed" tag if not already present
        if "resumed" not in tags:
            tags = tags + ["resumed"]

    # Create the child run directory structure
    self.run_id = child_run_id
    self.run_dir = self.base_runs_dir / child_run_id
    self.run_dir.mkdir(exist_ok=True)

    # Create subdirectories
    (self.run_dir / "results").mkdir(exist_ok=True)
    (self.run_dir / "checkpoints").mkdir(exist_ok=True)
    (self.run_dir / "logs").mkdir(exist_ok=True)

    # Reference parent checkpoints instead of copying
    parent_checkpoints = parent_metadata.get("checkpoints", []) if copy_checkpoints else []

    # Initialise child metadata with parent relationship
    self.metadata = {
        "run_id": child_run_id,
        "created_at": datetime.utcnow().isoformat() + "Z",
        "description": description,
        "status": RunStatus.INITIALising,
        "tags": tags,
        "hostname": socket.gethostname(),
        "user": os.getenv("USER", "unknown"),
        "git_commit": self._get_git_commit(),
        "parent_run_id": parent_run_id,
        "checkpoints": parent_checkpoints.copy() if copy_checkpoints else [],
        "started_at": None,
        "completed_at": None,
        "resumed_from": parent_run_id,
        **kwargs
    }

    # Keep checkpoint paths pointing to parent directory (no modification needed)
    if copy_checkpoints:
        logger.info(f"Child run {child_run_id[:8]} will reference checkpoints from parent run {parent_run_id[:8]}")

    # Save initial metadata
    self._save_metadata()

    # Update run index
    self._update_run_index()

    # Update current symlink
    self._update_current_symlink()

    logger.info(f"Created child run: {child_run_id} (parent: {parent_run_id[:8]})")
    logger.info(f"Child run directory: {self.run_dir}")

    return child_run_id

create_run(description=None, tags=None, run_id=None, **kwargs)

Create a new run with unique identifier and directory structure.

Parameters:

Name Type Description Default
description Optional[str]

Human-readable description of the run (Default value = None)

None
tags Optional[List[str]]

List of tags for categorising the run (Default value = None)

None
run_id Optional[str]

Optional specific run ID (generates UUID if not provided) (Default value = None)

None
**kwargs

Additional metadata to store

{}
Source code in june/run_manager.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def create_run(self, 
               description: Optional[str] = None,
               tags: Optional[List[str]] = None,
               run_id: Optional[str] = None,
               **kwargs) -> str:
    """Create a new run with unique identifier and directory structure.

    Args:
        description (Optional[str], optional): Human-readable description of the run (Default value = None)
        tags (Optional[List[str]], optional): List of tags for categorising the run (Default value = None)
        run_id (Optional[str], optional): Optional specific run ID (generates UUID if not provided) (Default value = None)
        **kwargs: Additional metadata to store

    """
    # Generate or use provided run ID
    if run_id is None:
        self.run_id = str(uuid.uuid4())
    else:
        self.run_id = run_id

    # Create run directory
    self.run_dir = self.base_runs_dir / self.run_id
    self.run_dir.mkdir(exist_ok=True)

    # Create subdirectories
    (self.run_dir / "results").mkdir(exist_ok=True)
    (self.run_dir / "checkpoints").mkdir(exist_ok=True)
    (self.run_dir / "logs").mkdir(exist_ok=True)

    # Initialise metadata
    self.metadata = {
        "run_id": self.run_id,
        "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "description": description or f"Simulation run {self.run_id[:8]}",
        "status": RunStatus.INITIALising,
        "tags": tags or [],
        "hostname": socket.gethostname(),
        "user": os.getenv("USER", "unknown"),
        "git_commit": self._get_git_commit(),
        "parent_run_id": None,
        "checkpoints": [],
        "started_at": None,
        "completed_at": None,
        **kwargs
    }

    # Save initial metadata
    self._save_metadata()

    # Update run index
    self._update_run_index()

    # Update current symlink
    self._update_current_symlink()

    logger.info(f"Created new run: {self.run_id}")
    logger.info(f"Run directory: {self.run_dir}")

    return self.run_id

get_checkpoints_dir()

Get the checkpoints directory for the current run.

For child runs (resumed runs), this returns the parent's checkpoint directory since checkpoints should remain in their original location.

Source code in june/run_manager.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def get_checkpoints_dir(self) -> Optional[Path]:
    """Get the checkpoints directory for the current run.

    For child runs (resumed runs), this returns the parent's checkpoint directory
    since checkpoints should remain in their original location.

    """
    if not self.run_dir:
        return None

    # Check if this is a child run (has parent_run_id)
    parent_run_id = self.metadata.get("parent_run_id")
    if parent_run_id:
        # For child runs, use parent's checkpoint directory
        parent_run_dir = self.base_runs_dir / parent_run_id
        if parent_run_dir.exists():
            logger.debug(f"Child run {self.run_id[:8]} using parent's checkpoint directory: {parent_run_dir / 'checkpoints'}")
            return parent_run_dir / "checkpoints"
        else:
            logger.warning(f"Parent run directory not found: {parent_run_dir}, falling back to child directory")

    # For regular runs or fallback, use own checkpoint directory
    return self.run_dir / "checkpoints"

get_latest_run()

Get the ID of the most recent run.

Source code in june/run_manager.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def get_latest_run(self) -> Optional[str]:
    """Get the ID of the most recent run.

    """
    try:
        if not self.index_file.exists():
            return None

        with open(self.index_file, 'r') as f:
            index_data = json.load(f)

        return index_data.get("latest")

    except Exception as e:
        logger.error(f"Failed to get latest run: {e}")
        return None

get_logs_dir()

Get the logs directory for the current run.

Source code in june/run_manager.py
332
333
334
335
336
337
338
def get_logs_dir(self) -> Optional[Path]:
    """Get the logs directory for the current run.

    """
    if self.run_dir:
        return self.run_dir / "logs"
    return None

get_results_dir()

Get the results directory for the current run.

Source code in june/run_manager.py
300
301
302
303
304
305
306
def get_results_dir(self) -> Optional[Path]:
    """Get the results directory for the current run.

    """
    if self.run_dir:
        return self.run_dir / "results"
    return None

list_runs(limit=None, status_filter=None)

List recent runs with optional filtering.

Parameters:

Name Type Description Default
limit Optional[int]

Maximum number of runs to return (Default value = None)

None
status_filter Optional[str]

Filter by specific status (Default value = None)

None
Source code in june/run_manager.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def list_runs(self, limit: Optional[int] = None, status_filter: Optional[str] = None) -> List[Dict]:
    """List recent runs with optional filtering.

    Args:
        limit (Optional[int], optional): Maximum number of runs to return (Default value = None)
        status_filter (Optional[str], optional): Filter by specific status (Default value = None)

    """
    try:
        if not self.index_file.exists():
            return []

        with open(self.index_file, 'r') as f:
            index_data = json.load(f)

        runs = index_data.get("runs", [])

        # Filter by status if specified
        if status_filter:
            runs = [run for run in runs if run.get("status") == status_filter]

        # Sort by creation time (newest first)
        runs.sort(key=lambda x: x.get("created_at", ""), reverse=True)

        # Apply limit if specified
        if limit:
            runs = runs[:limit]

        return runs

    except Exception as e:
        logger.error(f"Failed to list runs: {e}")
        return []

load_run(run_id)

Load an existing run by ID.

Parameters:

Name Type Description Default
run_id str

The run ID to load

required
Source code in june/run_manager.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def load_run(self, run_id: str) -> bool:
    """Load an existing run by ID.

    Args:
        run_id (str): The run ID to load

    """
    run_dir = self.base_runs_dir / run_id
    if not run_dir.exists():
        logger.error(f"Run directory not found: {run_dir}")
        return False

    metadata_file = run_dir / "metadata.json"
    if not metadata_file.exists():
        logger.error(f"Metadata file not found: {metadata_file}")
        return False

    try:
        with open(metadata_file, 'r') as f:
            self.metadata = json.load(f)

        self.run_id = run_id
        self.run_dir = run_dir

        logger.info(f"Loaded existing run: {self.run_id}")
        return True

    except Exception as e:
        logger.error(f"Failed to load run metadata: {e}")
        return False

save_config(config_data, filename='config.yaml')

Save configuration data to the run directory.

Parameters:

Name Type Description Default
config_data Any

Configuration data to save

required
filename str

Name of the config file (Default value = "config.yaml")

'config.yaml'
Source code in june/run_manager.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def save_config(self, config_data: Any, filename: str = "config.yaml"):
    """Save configuration data to the run directory.

    Args:
        config_data (Any): Configuration data to save
        filename (str, optional): Name of the config file (Default value = "config.yaml")

    """
    if not self.run_dir:
        logger.warning("No active run to save config")
        return

    config_file = self.run_dir / filename

    if isinstance(config_data, (dict, list)):
        with open(config_file, 'w') as f:
            json.dump(config_data, f, indent=2)
    elif isinstance(config_data, str):
        with open(config_file, 'w') as f:
            f.write(config_data)
    else:
        # For other types, try to write as string
        with open(config_file, 'w') as f:
            f.write(str(config_data))

    logger.info(f"Saved config to {config_file}")

save_run_params(params)

Save run parameters to run_params.json.

Args:

Parameters:

Name Type Description Default
params Dict[str, Any]
required
Source code in june/run_manager.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def save_run_params(self, params: Dict[str, Any]):
    """Save run parameters to run_params.json.

    Args:

    Args:
        params (Dict[str, Any]): 

    """
    if not self.run_dir:
        logger.warning("No active run to save parameters")
        return

    params_file = self.run_dir / "run_params.json"
    with open(params_file, 'w') as f:
        json.dump(params, f, indent=2)

    logger.info(f"Saved run parameters to {params_file}")

update_status(status, **kwargs)

Update the run status and optional additional metadata.

Parameters:

Name Type Description Default
status str

New status from RunStatus enum

required
**kwargs

Additional metadata to update

{}
Source code in june/run_manager.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def update_status(self, status: str, **kwargs):
    """Update the run status and optional additional metadata.

    Args:
        status (str): New status from RunStatus enum
        **kwargs: Additional metadata to update

    """
    if not self.run_id:
        logger.warning("No active run to update status")
        return

    self.metadata["status"] = status

    # Set timestamps for specific status changes
    if status == RunStatus.RUNNING and "started_at" not in self.metadata:
        self.metadata["started_at"] = datetime.utcnow().isoformat() + "Z"
    elif status in [RunStatus.COMPLETED, RunStatus.FAILED]:
        self.metadata["completed_at"] = datetime.utcnow().isoformat() + "Z"

    # Update additional metadata
    self.metadata.update(kwargs)

    # Save updated metadata
    self._save_metadata()
    self._update_run_index()

    logger.info(f"Updated run {self.run_id[:8]} status to: {status}")

RunStatus

Enumeration of possible run statuses.

Source code in june/run_manager.py
26
27
28
29
30
31
32
33
class RunStatus:
    """Enumeration of possible run statuses."""
    INITIALising = "initialising"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CHECKPOINTED = "checkpointed"
    RESUMED = "resumed"