Skip to content

Test trace event recording

TTEvent

Class to represent Test and Trace events.

Source code in june/records/test_trace_event_recording.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class TTEvent:
    """Class to represent Test and Trace events."""
    def __init__(
        self, 
        event_type: str, 
        person_id: int, 
        timestamp: str,
        metadata: Optional[Dict[str, Any]] = None
    ):
        """
        Initialise a Test and Trace event.

        Parameters
        ----------
        event_type : str
            Type of event (e.g., 'test', 'trace', 'quarantine', 'isolation')
        person_id : int
            ID of the person involved in the event
        timestamp : float
            Time when the event occurred (days from simulation start)
        metadata : Dict[str, Any], optional
            Additional event-specific data
        """
        self.event_type = event_type
        self.person_id = person_id
        self.timestamp = timestamp
        self.metadata = metadata or {}
        self.creation_time = datetime.datetime.now()

    def __repr__(self):
        return (f"TTEvent(type={self.event_type}, "
                f"person_id={self.person_id}, "
                f"timestamp={self.timestamp}, "
                f"metadata={self.metadata})")

__init__(event_type, person_id, timestamp, metadata=None)

Initialise a Test and Trace event.

Parameters

event_type : str Type of event (e.g., 'test', 'trace', 'quarantine', 'isolation') person_id : int ID of the person involved in the event timestamp : float Time when the event occurred (days from simulation start) metadata : Dict[str, Any], optional Additional event-specific data

Source code in june/records/test_trace_event_recording.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self, 
    event_type: str, 
    person_id: int, 
    timestamp: str,
    metadata: Optional[Dict[str, Any]] = None
):
    """
    Initialise a Test and Trace event.

    Parameters
    ----------
    event_type : str
        Type of event (e.g., 'test', 'trace', 'quarantine', 'isolation')
    person_id : int
        ID of the person involved in the event
    timestamp : float
        Time when the event occurred (days from simulation start)
    metadata : Dict[str, Any], optional
        Additional event-specific data
    """
    self.event_type = event_type
    self.person_id = person_id
    self.timestamp = timestamp
    self.metadata = metadata or {}
    self.creation_time = datetime.datetime.now()

TTEventRecorder

Records and aggregates Test and Trace events, writing to HDF5 incrementally.

Source code in june/records/test_trace_event_recording.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
class TTEventRecorder:
    """Records and aggregates Test and Trace events, writing to HDF5 incrementally."""
    def __init__(self, output_dir="./results/h5_test_trace"):
        from june.mpi_wrapper import mpi_rank, mpi_available, mpi_size

        # Create output directory if it doesn't exist
        self.output_dir = Path(output_dir)
        os.makedirs(self.output_dir, exist_ok=True)

        # Define the HDF5 filename with timestamp and rank to avoid conflicts
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        if mpi_available and mpi_size > 1:
            self.filename = self.output_dir / f"tt_events_{timestamp}_rank_{mpi_rank}.h5"
        else:
            self.filename = self.output_dir / f"tt_events_{timestamp}.h5"

        # Total counters for different event types (counts all occurrences)
        self.total_counters = {
            'tested': 0,
            'test_positive': 0,
            'test_negative': 0,
            'traced': 0,
            'quarantined': 0,
            'isolated': 0,
        }

        # Detailed counters by day
        self.daily_counters = defaultdict(lambda: defaultdict(int))

        # Set of unique IDs for each category (counts unique people)
        self.unique_ids = {
            'tested': set(),
            'test_positive': set(),
            'test_negative': set(),
            'traced': set(),
            'quarantined': set(),
            'isolated': set(),
        }

        # Current status
        self.currently = {
            'quarantined': set(),  # People currently in quarantine
            'isolated': set(),     # People currently in isolation
        }

        # Deltas for the current time step
        self.deltas = {
            'tested': 0,
            'test_positive': 0,
            'test_negative': 0,
            'traced': 0,
            'unique_quarantined': 0,
            'total_quarantined': 0,
            'unique_isolated': 0,
            'total_isolated': 0,
        }

        # Timestamp for the last reset of deltas
        self.last_delta_reset = 0

        # Event buffer for batch processing
        self._event_buffer = []
        self._buffer_size = 100  # Adjust based on your needs

        # Initialise HDF5 file and tables
        self._initialise_tables()

        logger.info(f"TTEventRecorder initialised with HDF5 file: {self.filename}")

    def _initialise_tables(self):
        """Initialise the HDF5 tables for storing Test and Trace events."""
        with tables.open_file(str(self.filename), mode="a") as file:
            # Check if tables already exist before creating them
            if 'test_and_trace_events' not in file.root:
                # Create table for events with all fields we'll need
                event_description = {
                    'timestamp': tables.StringCol(itemsize=10, pos=0),  # YYYY-MM-DD format
                    'event_type': tables.StringCol(itemsize=20, pos=1), 
                    'person_id': tables.Int32Col(pos=2),
                    'sim_time': tables.Float32Col(pos=3),  # Simulation time in days
                    'infected': tables.Int8Col(pos=4),     # Boolean as int8
                    'hospitalised': tables.Int8Col(pos=5), # Boolean as int8
                    'age': tables.Int32Col(pos=6),         # Person's age
                    'sex': tables.StringCol(itemsize=10, pos=7),  # Person's sex
                    'tracer_id': tables.Int32Col(pos=8),   # ID of person who caused tracing (-1 if none)
                    'contact_reason': tables.StringCol(itemsize=20, pos=9)  # Reason for contact
                }

                # Create the events table
                file.create_table(
                    file.root, 
                    'test_and_trace_events', 
                    event_description, 
                    "Test and Trace Events"
                )

            if 'daily_counters' not in file.root:
                # Create a table for daily counters
                counter_description = {
                    'day': tables.Int32Col(pos=0),
                    'date': tables.StringCol(itemsize=10, pos=1),
                    'event_type': tables.StringCol(itemsize=20, pos=2),
                    'count': tables.Int32Col(pos=3)
                }

                # Create the daily counters table
                file.create_table(
                    file.root,
                    'daily_counters',
                    counter_description,
                    "Daily Test and Trace Counters"
                )

            if 'current_status' not in file.root:
                # Create a table for current status (snapshots)
                status_description = {
                    'timestamp': tables.StringCol(itemsize=10, pos=0),
                    'sim_time': tables.Float32Col(pos=1),
                    'status_type': tables.StringCol(itemsize=20, pos=2),
                    'person_id': tables.Int32Col(pos=3)
                }

                # Create the status table
                file.create_table(
                    file.root,
                    'current_status',
                    status_description,
                    "Current Quarantine/Isolation Status"
                )

    def _check_tables_exist(self):
        """Check if all required tables exist in the HDF5 file."""
        if not os.path.exists(self.filename):
            return False

        with tables.open_file(str(self.filename), mode="r") as file:
            required_tables = ['test_and_trace_events', 'daily_counters', 'current_status']
            for table_name in required_tables:
                if table_name not in file.root:
                    return False
            return True

    def record_event(self, event: TTEvent):
        """Record a Test and Trace event.
        Updates counters and adds event to buffer for batch processing.

        Args:
            event (TTEvent): The event to record

        """

        # Add event to buffer
        self._event_buffer.append(event)

        # Reset deltas if we're in a new time step
        current_day = int(event.timestamp)
        if current_day > self.last_delta_reset:
            self.deltas = {k: 0 for k in self.deltas}
            self.last_delta_reset = current_day

        # Update appropriate counter based on event type
        if event.event_type == 'test':
            self.total_counters['tested'] += 1
            self.deltas['tested'] += 1
            self.unique_ids['tested'].add(event.person_id)
            self.daily_counters[current_day]['tested'] += 1

        elif event.event_type == 'test_positive':
            self.total_counters['test_positive'] += 1
            self.deltas['test_positive'] += 1
            self.unique_ids['test_positive'].add(event.person_id)
            self.daily_counters[current_day]['test_positive'] += 1

        elif event.event_type == 'test_negative':
            self.total_counters['test_negative'] += 1
            self.deltas['test_negative'] += 1
            self.unique_ids['test_negative'].add(event.person_id)
            self.daily_counters[current_day]['test_negative'] += 1

        elif event.event_type == 'trace':
            self.total_counters['traced'] += 1
            self.deltas['traced'] += 1
            self.unique_ids['traced'].add(event.person_id)
            self.daily_counters[current_day]['traced'] += 1

        elif event.event_type == 'quarantine_start':
            # Total quarantine events (all occurrences)
            self.total_counters['quarantined'] += 1
            self.deltas['total_quarantined'] += 1
            self.daily_counters[current_day]['quarantined'] += 1

            # Unique quarantined people
            if event.person_id not in self.unique_ids['quarantined']:
                self.unique_ids['quarantined'].add(event.person_id)
                self.deltas['unique_quarantined'] += 1

            # Add to currently quarantined set
            self.currently['quarantined'].add(event.person_id)

        elif event.event_type == 'quarantine_end':
            # Remove from currently quarantined set
            if event.person_id in self.currently['quarantined']:
                self.currently['quarantined'].remove(event.person_id)

        elif event.event_type == 'isolation_start':
            # Total isolation events (all occurrences)
            self.total_counters['isolated'] += 1
            self.deltas['total_isolated'] += 1
            self.daily_counters[current_day]['isolated'] += 1

            # Unique isolated people
            if event.person_id not in self.unique_ids['isolated']:
                self.unique_ids['isolated'].add(event.person_id)
                self.deltas['unique_isolated'] += 1

            # Add to currently isolated set
            self.currently['isolated'].add(event.person_id)

        elif event.event_type == 'isolation_end':
            # Remove from currently isolated set
            if event.person_id in self.currently['isolated']:
                self.currently['isolated'].remove(event.person_id)

        # Process buffer if it reaches the threshold
        if len(self._event_buffer) >= self._buffer_size:
            self._process_event_buffer()

        logger.debug(f"Recorded event: {event}")


    def _process_event_buffer(self):
        """Process and write the current event buffer to HDF5 file."""
        if not self._event_buffer:
            return

        # Debug: Check file existence and path
        logger.debug(f"TTEventRecorder processing buffer with file: {self.filename}")
        logger.debug(f"File exists: {os.path.exists(self.filename)}")

        if not os.path.exists(self.filename):
            logger.warning(f"HDF5 file does not exist: {self.filename}")
            logger.warning(f"Attempting to recreate file and tables")
            # Recreate the file and tables
            self._initialise_tables()

        with tables.open_file(str(self.filename), mode="a") as file:
            # Check if the required table exists
            if 'test_and_trace_events' not in file.root:
                logger.error(f"Table 'test_and_trace_events' not found in {self.filename}")
                logger.error(f"Available nodes in file: {list(file.root._v_children.keys())}")
                logger.error(f"Recreating missing tables...")
                # Close the file and reinitialise tables

        # Reinitialise tables if they're missing
        if not os.path.exists(self.filename) or not self._check_tables_exist():
            logger.warning(f"Reinitialising tables for file: {self.filename}")
            self._initialise_tables()

        with tables.open_file(str(self.filename), mode="a") as file:
            # Get the events table
            events_table = file.root.test_and_trace_events

            # Create a numpy record array for the events
            event_data = []
            for event in self._event_buffer:
                # Convert simulation time to date string
                simulator = GlobalContext.get_simulator()
                sim_timer = simulator.timer if simulator else None

                if sim_timer:
                    days_whole = int(event.timestamp)
                    sim_date = sim_timer.initial_date + datetime.timedelta(days=days_whole)
                    date_str = sim_date.strftime("%Y-%m-%d")
                else:
                    # Fallback if simulator not available
                    date_str = datetime.datetime.now().strftime("%Y-%m-%d")

                # Get metadata values with defaults
                infected = event.metadata.get('infected', False)
                hospitalised = event.metadata.get('hospitalised', False)
                age = event.metadata.get('age', -1)  # Default to -1 if age not available
                sex = event.metadata.get('sex', 'unknown')  # Default to 'unknown' if sex not available
                tracer_id = event.metadata.get('tracer_id', -1)  # Default to -1 if no tracer
                contact_reason = event.metadata.get('contact_reason', 'unknown')  # Default reason

                # Add to data array
                event_data.append((
                    date_str.encode('utf-8'),  # Convert to bytes for HDF5
                    event.event_type.encode('utf-8'),
                    event.person_id,
                    float(event.timestamp),
                    1 if infected else 0,
                    1 if hospitalised else 0,
                    age,
                    sex.encode('utf-8'),  # Convert to bytes for HDF5
                    tracer_id,
                    contact_reason.encode('utf-8')  # Convert to bytes for HDF5
                ))

            # Convert to numpy record array and append to table
            if event_data:
                data = np.array(
                    event_data,
                    dtype=[
                        ('timestamp', 'S10'),
                        ('event_type', 'S20'),
                        ('person_id', np.int32),
                        ('sim_time', np.float32),
                        ('infected', np.int8),
                        ('hospitalised', np.int8),
                        ('age', np.int32),
                        ('sex', 'S10'),
                        ('tracer_id', np.int32),
                        ('contact_reason', 'S20')
                    ]
                )

                events_table.append(data)
                events_table.flush()

            # Clear the buffer after successful write
            self._event_buffer = []


    def time_step(self, timestamp: float):
        """Process events for the current time step.
        Writes daily counters and current status to HDF5.

        Args:
            timestamp (float): Current simulation timestamp (days from start)

        """

        # First process any events in the buffer
        self._process_event_buffer()

        # Get simulator and format timestamp
        simulator = GlobalContext.get_simulator()
        sim_timer = simulator.timer if simulator else None

        if sim_timer:
            current_day = int(timestamp)
            sim_date = sim_timer.initial_date + datetime.timedelta(days=current_day)
            date_str = sim_date.strftime("%Y-%m-%d")

        with tables.open_file(str(self.filename), mode="a") as file:
            # Update daily counters table
            daily_table = file.root.daily_counters
            daily_data = []

            # Get counts from daily_counters for current day
            if current_day in self.daily_counters:
                # Find rows to delete (matching current day)
                condition = f'(day == {current_day})'

                # Get the indices of rows to delete
                indices = daily_table.get_where_list(condition)
                if len(indices) > 0:
                    # Remove rows using start and stop indices
                    daily_table.remove_rows(indices[0], indices[-1]+1)
                    daily_table.flush()

                # Prepare new data
                for event_type, count in self.daily_counters[current_day].items():
                    daily_data.append((
                        current_day,
                        date_str.encode('utf-8'),
                        event_type.encode('utf-8'),
                        count
                    ))

                # Convert to numpy array and append
                if daily_data:
                    data = np.array(
                        daily_data,
                        dtype=[
                            ('day', np.int32),
                            ('date', 'S10'),
                            ('event_type', 'S20'),
                            ('count', np.int32)
                        ]
                    )

                    # Add new data
                    daily_table.append(data)
                    daily_table.flush()

            # Update current status table (people in quarantine/isolation)
            status_table = file.root.current_status
            status_data = []

            # Delete existing status data for this timestamp
            condition = f'(sim_time == {float(timestamp)})'

            # Get the indices of rows to delete
            indices = status_table.get_where_list(condition)
            if len(indices) > 0:
                # Remove rows using start and stop indices
                status_table.remove_rows(indices[0], indices[-1]+1)
                status_table.flush()

            # Add current quarantine status
            for person_id in self.currently['quarantined']:
                status_data.append((
                    date_str.encode('utf-8'),
                    float(timestamp),
                    'quarantined'.encode('utf-8'),
                    person_id
                ))

            # Add current isolation status
            for person_id in self.currently['isolated']:
                status_data.append((
                    date_str.encode('utf-8'),
                    float(timestamp),
                    'isolated'.encode('utf-8'),
                    person_id
                ))

            # Convert to numpy array and append
            if status_data:
                data = np.array(
                    status_data,
                    dtype=[
                        ('timestamp', 'S10'),
                        ('sim_time', np.float32),
                        ('status_type', 'S20'),
                        ('person_id', np.int32)
                    ]
                )

                status_table.append(data)
                status_table.flush()

        logger.info(f"Processed time step data for day {current_day}")

    def get_stats(self):
        """Return current statistics.


        Returns:
            Dict: Dictionary with various statistics

        """
        # Process any pending events first
        self._process_event_buffer()

        return {
            'total_events': sum(len(self._get_events_for_type(event_type)) for event_type in 
                             ['test', 'test_positive', 'test_negative', 'trace', 
                              'quarantine_start', 'quarantine_end', 'isolation_start', 'isolation_end']),
            'total_counters': self.total_counters,
            'unique_counts': {k: len(v) for k, v in self.unique_ids.items()},
            'daily_counters': dict(self.daily_counters),
            'currently': {k: len(v) for k, v in self.currently.items()},
            'deltas': self.deltas
        }

    def _get_events_for_type(self, event_type):
        """Retrieve events of a specific type from HDF5.

        Args:
            event_type (str): Type of events to retrieve

        Returns:
            List: List of matching events

        """

        results = []
        with tables.open_file(str(self.filename), mode="r") as file:
            if hasattr(file.root, 'test_and_trace_events'):
                # Query events table for this event type
                for row in file.root.test_and_trace_events.where(f'event_type == b"{event_type}"'):
                    results.append(row)
        return results


    def get_daily_stats(self, day: int):
        """Return statistics for a specific day.

        Args:
            day (int): Day from simulation start

        Returns:
            Dict: Dictionary with statistics for the specified day

        """
        return dict(self.daily_counters[day])

    def export_data(self, output_dir="./temp_tt_output", export_csv=True, export_hdf5=True):
        """Export the recorded data to CSV and/or HDF5 files.

        Args:
            output_dir (str, optional): Directory where output files will be saved (Default value = "./temp_tt_output")
            export_csv (bool, optional): Whether to export CSV files (Default value = True)
            export_hdf5 (bool, optional): Whether to export HDF5 file (Default value = True)

        Returns:
            dict: Paths to the exported files

        """
        # Process any remaining events in the buffer
        self._process_event_buffer()

        from june.records.test_trace_event_recording import export_tt_data
        return export_tt_data(
            recorder=self,
            output_dir=output_dir,
            export_csv=export_csv,
            export_hdf5=export_hdf5
        )

export_data(output_dir='./temp_tt_output', export_csv=True, export_hdf5=True)

Export the recorded data to CSV and/or HDF5 files.

Parameters:

Name Type Description Default
output_dir str

Directory where output files will be saved (Default value = "./temp_tt_output")

'./temp_tt_output'
export_csv bool

Whether to export CSV files (Default value = True)

True
export_hdf5 bool

Whether to export HDF5 file (Default value = True)

True

Returns:

Name Type Description
dict

Paths to the exported files

Source code in june/records/test_trace_event_recording.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def export_data(self, output_dir="./temp_tt_output", export_csv=True, export_hdf5=True):
    """Export the recorded data to CSV and/or HDF5 files.

    Args:
        output_dir (str, optional): Directory where output files will be saved (Default value = "./temp_tt_output")
        export_csv (bool, optional): Whether to export CSV files (Default value = True)
        export_hdf5 (bool, optional): Whether to export HDF5 file (Default value = True)

    Returns:
        dict: Paths to the exported files

    """
    # Process any remaining events in the buffer
    self._process_event_buffer()

    from june.records.test_trace_event_recording import export_tt_data
    return export_tt_data(
        recorder=self,
        output_dir=output_dir,
        export_csv=export_csv,
        export_hdf5=export_hdf5
    )

get_daily_stats(day)

Return statistics for a specific day.

Parameters:

Name Type Description Default
day int

Day from simulation start

required

Returns:

Name Type Description
Dict

Dictionary with statistics for the specified day

Source code in june/records/test_trace_event_recording.py
531
532
533
534
535
536
537
538
539
540
541
def get_daily_stats(self, day: int):
    """Return statistics for a specific day.

    Args:
        day (int): Day from simulation start

    Returns:
        Dict: Dictionary with statistics for the specified day

    """
    return dict(self.daily_counters[day])

get_stats()

Return current statistics.

Returns:

Name Type Description
Dict

Dictionary with various statistics

Source code in june/records/test_trace_event_recording.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def get_stats(self):
    """Return current statistics.


    Returns:
        Dict: Dictionary with various statistics

    """
    # Process any pending events first
    self._process_event_buffer()

    return {
        'total_events': sum(len(self._get_events_for_type(event_type)) for event_type in 
                         ['test', 'test_positive', 'test_negative', 'trace', 
                          'quarantine_start', 'quarantine_end', 'isolation_start', 'isolation_end']),
        'total_counters': self.total_counters,
        'unique_counts': {k: len(v) for k, v in self.unique_ids.items()},
        'daily_counters': dict(self.daily_counters),
        'currently': {k: len(v) for k, v in self.currently.items()},
        'deltas': self.deltas
    }

record_event(event)

Record a Test and Trace event. Updates counters and adds event to buffer for batch processing.

Parameters:

Name Type Description Default
event TTEvent

The event to record

required
Source code in june/records/test_trace_event_recording.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def record_event(self, event: TTEvent):
    """Record a Test and Trace event.
    Updates counters and adds event to buffer for batch processing.

    Args:
        event (TTEvent): The event to record

    """

    # Add event to buffer
    self._event_buffer.append(event)

    # Reset deltas if we're in a new time step
    current_day = int(event.timestamp)
    if current_day > self.last_delta_reset:
        self.deltas = {k: 0 for k in self.deltas}
        self.last_delta_reset = current_day

    # Update appropriate counter based on event type
    if event.event_type == 'test':
        self.total_counters['tested'] += 1
        self.deltas['tested'] += 1
        self.unique_ids['tested'].add(event.person_id)
        self.daily_counters[current_day]['tested'] += 1

    elif event.event_type == 'test_positive':
        self.total_counters['test_positive'] += 1
        self.deltas['test_positive'] += 1
        self.unique_ids['test_positive'].add(event.person_id)
        self.daily_counters[current_day]['test_positive'] += 1

    elif event.event_type == 'test_negative':
        self.total_counters['test_negative'] += 1
        self.deltas['test_negative'] += 1
        self.unique_ids['test_negative'].add(event.person_id)
        self.daily_counters[current_day]['test_negative'] += 1

    elif event.event_type == 'trace':
        self.total_counters['traced'] += 1
        self.deltas['traced'] += 1
        self.unique_ids['traced'].add(event.person_id)
        self.daily_counters[current_day]['traced'] += 1

    elif event.event_type == 'quarantine_start':
        # Total quarantine events (all occurrences)
        self.total_counters['quarantined'] += 1
        self.deltas['total_quarantined'] += 1
        self.daily_counters[current_day]['quarantined'] += 1

        # Unique quarantined people
        if event.person_id not in self.unique_ids['quarantined']:
            self.unique_ids['quarantined'].add(event.person_id)
            self.deltas['unique_quarantined'] += 1

        # Add to currently quarantined set
        self.currently['quarantined'].add(event.person_id)

    elif event.event_type == 'quarantine_end':
        # Remove from currently quarantined set
        if event.person_id in self.currently['quarantined']:
            self.currently['quarantined'].remove(event.person_id)

    elif event.event_type == 'isolation_start':
        # Total isolation events (all occurrences)
        self.total_counters['isolated'] += 1
        self.deltas['total_isolated'] += 1
        self.daily_counters[current_day]['isolated'] += 1

        # Unique isolated people
        if event.person_id not in self.unique_ids['isolated']:
            self.unique_ids['isolated'].add(event.person_id)
            self.deltas['unique_isolated'] += 1

        # Add to currently isolated set
        self.currently['isolated'].add(event.person_id)

    elif event.event_type == 'isolation_end':
        # Remove from currently isolated set
        if event.person_id in self.currently['isolated']:
            self.currently['isolated'].remove(event.person_id)

    # Process buffer if it reaches the threshold
    if len(self._event_buffer) >= self._buffer_size:
        self._process_event_buffer()

    logger.debug(f"Recorded event: {event}")

time_step(timestamp)

Process events for the current time step. Writes daily counters and current status to HDF5.

Parameters:

Name Type Description Default
timestamp float

Current simulation timestamp (days from start)

required
Source code in june/records/test_trace_event_recording.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def time_step(self, timestamp: float):
    """Process events for the current time step.
    Writes daily counters and current status to HDF5.

    Args:
        timestamp (float): Current simulation timestamp (days from start)

    """

    # First process any events in the buffer
    self._process_event_buffer()

    # Get simulator and format timestamp
    simulator = GlobalContext.get_simulator()
    sim_timer = simulator.timer if simulator else None

    if sim_timer:
        current_day = int(timestamp)
        sim_date = sim_timer.initial_date + datetime.timedelta(days=current_day)
        date_str = sim_date.strftime("%Y-%m-%d")

    with tables.open_file(str(self.filename), mode="a") as file:
        # Update daily counters table
        daily_table = file.root.daily_counters
        daily_data = []

        # Get counts from daily_counters for current day
        if current_day in self.daily_counters:
            # Find rows to delete (matching current day)
            condition = f'(day == {current_day})'

            # Get the indices of rows to delete
            indices = daily_table.get_where_list(condition)
            if len(indices) > 0:
                # Remove rows using start and stop indices
                daily_table.remove_rows(indices[0], indices[-1]+1)
                daily_table.flush()

            # Prepare new data
            for event_type, count in self.daily_counters[current_day].items():
                daily_data.append((
                    current_day,
                    date_str.encode('utf-8'),
                    event_type.encode('utf-8'),
                    count
                ))

            # Convert to numpy array and append
            if daily_data:
                data = np.array(
                    daily_data,
                    dtype=[
                        ('day', np.int32),
                        ('date', 'S10'),
                        ('event_type', 'S20'),
                        ('count', np.int32)
                    ]
                )

                # Add new data
                daily_table.append(data)
                daily_table.flush()

        # Update current status table (people in quarantine/isolation)
        status_table = file.root.current_status
        status_data = []

        # Delete existing status data for this timestamp
        condition = f'(sim_time == {float(timestamp)})'

        # Get the indices of rows to delete
        indices = status_table.get_where_list(condition)
        if len(indices) > 0:
            # Remove rows using start and stop indices
            status_table.remove_rows(indices[0], indices[-1]+1)
            status_table.flush()

        # Add current quarantine status
        for person_id in self.currently['quarantined']:
            status_data.append((
                date_str.encode('utf-8'),
                float(timestamp),
                'quarantined'.encode('utf-8'),
                person_id
            ))

        # Add current isolation status
        for person_id in self.currently['isolated']:
            status_data.append((
                date_str.encode('utf-8'),
                float(timestamp),
                'isolated'.encode('utf-8'),
                person_id
            ))

        # Convert to numpy array and append
        if status_data:
            data = np.array(
                status_data,
                dtype=[
                    ('timestamp', 'S10'),
                    ('sim_time', np.float32),
                    ('status_type', 'S20'),
                    ('person_id', np.int32)
                ]
            )

            status_table.append(data)
            status_table.flush()

    logger.info(f"Processed time step data for day {current_day}")

are_test_and_trace_policies_active()

Check if test and trace policies are active based on both: 1. Configuration setting 2. Active policies in the current date

Returns:

bool True if test and trace is enabled and active, False otherwise

Source code in june/records/test_trace_event_recording.py
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def are_test_and_trace_policies_active():
    """Check if test and trace policies are active based on both:
    1. Configuration setting
    2. Active policies in the current date

    Returns:
    --------
    bool
        True if test and trace is enabled and active, False otherwise


    """

    # Check if enabled in config
    from june.global_context import GlobalContext
    simulator = GlobalContext.get_simulator()
    if not hasattr(simulator, 'test_and_trace_enabled') or not simulator.test_and_trace_enabled:
        return False

    # If enabled in config, check if any policies are configured for the current date
    date = simulator.timer.date
    policies = simulator.activity_manager.policies.medical_care_policies
    active_policies = policies.get_active(date)

    # Check if there are any actual testing or tracing policies active
    from june.policy.medical_care_policies import Testing, Tracing

    has_testing = any(isinstance(policy, Testing) for policy in active_policies.policies)
    has_tracing = any(isinstance(policy, Tracing) for policy in active_policies.policies)

    return has_testing or has_tracing

emit_isolation_event(person, timestamp, is_start=True)

Emit an isolation event.

Parameters:

Name Type Description Default
person Person

The person isolating

required
timestamp float

The time of the event

required
is_start bool

Whether this is the start or end of isolation (Default value = True)

True
Source code in june/records/test_trace_event_recording.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
def emit_isolation_event(person, timestamp, is_start=True):
    """Emit an isolation event.

    Args:
        person (Person): The person isolating
        timestamp (float): The time of the event
        is_start (bool, optional): Whether this is the start or end of isolation (Default value = True)

    """
    metadata = {
        'age': person.age,
        'sex': person.sex,
        'infected': person.infected if hasattr(person, 'infected') else False
    }

    event_type = 'isolation_start' if is_start else 'isolation_end'

    event = TTEvent(event_type, person.id, timestamp, metadata)
    recorder = GlobalContext.get_tt_event_recorder()
    if recorder is None:
        # No TTEventRecorder registered, skip recording
        return
    recorder.record_event(event)

emit_quarantine_event(person, timestamp, is_start=True)

Emit a quarantine event.

Parameters:

Name Type Description Default
person Person

The person quarantining

required
timestamp float

The time of the event

required
is_start bool

Whether this is the start or end of quarantine (Default value = True)

True
Source code in june/records/test_trace_event_recording.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def emit_quarantine_event(person, timestamp, is_start=True):
    """Emit a quarantine event.

    Args:
        person (Person): The person quarantining
        timestamp (float): The time of the event
        is_start (bool, optional): Whether this is the start or end of quarantine (Default value = True)

    """

    metadata = {
        'age': person.age,
        'sex': person.sex,
        'infected': person.infected if hasattr(person, 'infected') else False
    }

    event_type = 'quarantine_start' if is_start else 'quarantine_end'

    event = TTEvent(event_type, person.id, timestamp, metadata)
    recorder = GlobalContext.get_tt_event_recorder()
    if recorder is None:
        # No TTEventRecorder registered, skip recording
        return
    recorder.record_event(event)

emit_test_event(person, timestamp, result=None)

Emit a test event.

Parameters:

Name Type Description Default
person Person

The person being tested

required
timestamp float

The time of the event

required
result (str, optional)

Test result if available (Default value = None)

None
Source code in june/records/test_trace_event_recording.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
def emit_test_event(person, timestamp, result=None):
    """Emit a test event.

    Args:
        person (Person): The person being tested
        timestamp (float): The time of the event
        result (str, optional, optional): Test result if available (Default value = None)

    """
    metadata = {
        'age': person.age,
        'sex': person.sex,
        'infected': person.infected if hasattr(person, 'infected') else False,
        'hospitalised': person.hospitalised if hasattr(person, 'hospitalised') else False,
    }

    # Add tracer information if person was contacted for testing
    if (hasattr(person, 'test_and_trace') and person.test_and_trace is not None and 
        hasattr(person.test_and_trace, 'tracer_id') and person.test_and_trace.tracer_id is not None):
        metadata['tracer_id'] = person.test_and_trace.tracer_id
        metadata['contact_reason'] = getattr(person.test_and_trace, 'contact_reason', 'unknown')

    recorder = GlobalContext.get_tt_event_recorder()
    if recorder is None:
        # No TTEventRecorder registered, skip recording
        return

    # Result-specific events
    if result is not None:
        metadata['result'] = result
        result_type = result.lower()  # Convert to lowercase for consistency
        result_event = TTEvent(f'test_{result_type}', person.id, timestamp, metadata)
        recorder.record_event(result_event)
    else:
        # Basic test event
        event = TTEvent('test', person.id, timestamp, metadata)
        recorder.record_event(event)

emit_trace_event(person_id, total_mates, timestamp)

Emit a trace event.

Parameters:

Name Type Description Default
person_id int

The person whose contacts are being traced (the tracer)

required
total_mates int

Number of contacts being traced

required
timestamp float

The time of the event

required
Source code in june/records/test_trace_event_recording.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
def emit_trace_event(person_id, total_mates, timestamp):
    """Emit a trace event.

    Args:
        person_id (int): The person whose contacts are being traced (the tracer)
        total_mates (int): Number of contacts being traced
        timestamp (float): The time of the event

    """
    # Lookup person demographics using the same fallback pattern as contact manager
    from june.demography.person import Person
    from june.global_context import GlobalContext

    person = None
    try:
        # First try to get from local population
        simulator = GlobalContext.get_simulator()
        if simulator and hasattr(simulator, 'world') and hasattr(simulator.world, 'people'):
            person = simulator.world.people.get_from_id(person_id)
    except:
        # Fall back to global registry
        person = Person.find_by_id(person_id)

    metadata = {
        'contact_count': total_mates,
    }

    # Add demographic data if person is found
    if person:
        metadata.update({
            'age': person.age,
            'sex': person.sex,
            'infected': person.infected if hasattr(person, 'infected') else False,
            'hospitalised': person.hospitalised if hasattr(person, 'hospitalised') else False,
        })

    event = TTEvent('trace', person_id, timestamp, metadata)
    recorder = GlobalContext.get_tt_event_recorder()
    if recorder is None:
        # No TTEventRecorder registered, skip recording
        return
    recorder.record_event(event)

export_simulation_results(output_dir='./results')

Export all simulation results at the end of a run and cleanup temporary files. In MPI mode, each rank writes to its own files within a test_and_trace folder.

Parameters:

Name Type Description Default
output_dir str

Directory where output files will be saved (Default value = "./results")

'./results'
Source code in june/records/test_trace_event_recording.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
def export_simulation_results(output_dir="./results"):
    """Export all simulation results at the end of a run and cleanup temporary files.
    In MPI mode, each rank writes to its own files within a test_and_trace folder.

    Args:
        output_dir (str, optional): Directory where output files will be saved (Default value = "./results")

    """
    from june.mpi_wrapper import mpi_rank, mpi_available, mpi_size
    import shutil

    recorder = GlobalContext.get_tt_event_recorder()

    # Create test_and_trace directory structure
    base_output_dir = os.path.join(output_dir, "test_and_trace")

    # Create rank-specific output directory path within test_and_trace
    if mpi_available and mpi_size > 1:
        rank_specific_dir = os.path.join(base_output_dir, f"rank_{mpi_rank}")
    else:
        rank_specific_dir = base_output_dir

    # Create the directory
    os.makedirs(rank_specific_dir, exist_ok=True)

    # Get the current timestamp for final processing
    current_timestamp = GlobalContext.get_simulator().timer.now

    # Ensure all events are processed and saved before the final export
    recorder.time_step(current_timestamp)

    # Store the temp directory path before export (in case it gets modified)
    temp_output_dir = str(recorder.output_dir)

    # Export data to files (each rank writes to its own directory)
    exported_files = recorder.export_data(
        output_dir=rank_specific_dir,
        export_csv=True,
        export_hdf5=True
    )

    # MPI Barrier - wait for all ranks to complete export before any cleanup
    if mpi_available and mpi_size > 1:
        try:
            from mpi4py import MPI
            MPI.COMM_WORLD.Barrier()
            logger.info(f"Rank {mpi_rank}: Export completed, synchronised with other ranks")
        except ImportError:
            logger.warning("mpi4py not available, proceeding without synchronization")
        except Exception as e:
            logger.warning(f"MPI barrier failed: {e}, proceeding with cleanup")

    # Verify export was successful before cleanup
    export_successful = all(
        file_path and os.path.exists(file_path) 
        for file_path in exported_files.values() 
        if file_path is not None
    )

    # Now cleanup - only after all ranks have finished
    if export_successful:
        # Only rank 0 does the cleanup to avoid race conditions
        if not mpi_available or mpi_rank == 0:
            if os.path.exists(temp_output_dir):
                try:
                    #shutil.rmtree(temp_output_dir)
                    logger.info(f"Cleaned up temporary directory: {temp_output_dir}")
                    print(f"✓ Cleaned up temporary directory: {temp_output_dir}")
                except Exception as e:
                    logger.warning(f"Failed to cleanup {temp_output_dir}: {e}")
                    print(f"⚠ Warning: Failed to cleanup {temp_output_dir}: {e}")
        else:
            logger.info(f"Rank {mpi_rank}: Cleanup delegated to rank 0")
    else:
        if not mpi_available or mpi_rank == 0:
            logger.warning("Export verification failed - keeping temporary files for debugging")
            print("⚠ Warning: Export verification failed - keeping temporary files for debugging")

    # Print export information (optionally only on rank 0 for cleaner output)
    if not mpi_available or mpi_rank == 0:
        print(f"\nExported test and trace data:")
        if mpi_available and mpi_size > 1:
            print(f"- Base directory: {base_output_dir}")
            print(f"- Rank-specific folders: rank_0, rank_1, ..., rank_{mpi_size-1}")
        else:
            print(f"- Directory: {rank_specific_dir}")

        print(f"\nFiles created for rank {mpi_rank}:")
        for file_type, file_path in exported_files.items():
            if file_path:
                print(f"- {file_type}: {file_path}")

export_tt_data(recorder, output_dir='./results', export_csv=True, export_hdf5=True)

Export Test and Trace data to CSV and/or HDF5 files. Optimised to avoid recreating HDF5 data during export.

Parameters:

Name Type Description Default
recorder
required
output_dir

(Default value = "./results")

'./results'
export_csv

(Default value = True)

True
export_hdf5

(Default value = True)

True
Source code in june/records/test_trace_event_recording.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
def export_tt_data(recorder, output_dir="./results", export_csv=True, export_hdf5=True):
    """Export Test and Trace data to CSV and/or HDF5 files.
    Optimised to avoid recreating HDF5 data during export.

    Args:
        recorder: 
        output_dir: (Default value = "./results")
        export_csv: (Default value = True)
        export_hdf5: (Default value = True)

    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    from june.mpi_wrapper import mpi_rank, mpi_available, mpi_size

    if mpi_available and mpi_size > 1:
        simulation_id = f"sim_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_rank{mpi_rank}"
    else:
        simulation_id = f"sim_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"

    # Process any remaining events in the buffer before export
    if hasattr(recorder, '_process_event_buffer') and callable(getattr(recorder, '_process_event_buffer')):
        recorder._process_event_buffer()

    # Get statistics from recorder
    stats = recorder.get_stats()
    total_counters = stats['total_counters']
    unique_counts = stats['unique_counts']
    daily_data = stats['daily_counters']
    currently = stats['currently']

    # Get simulator and timer for metadata
    simulator = GlobalContext.get_simulator()
    timer = simulator.timer
    days_simulated = timer.total_days
    time_now = timer.now

    # Helper function to format date as string
    def format_date(date_obj):
        """

        Args:
            date_obj: 

        """
        if isinstance(date_obj, datetime.datetime):
            return date_obj.strftime('%Y-%m-%d %H:%M:%S')
        else:
            return date_obj.strftime('%Y-%m-%d')

    # Prepare summary data
    summary_data = {
        'simulation_start_date': format_date(timer.initial_date),
        'days_simulated': days_simulated,
        'simulation_time': str(time_now),
        'total_tests': total_counters['tested'],
        'unique_tested': unique_counts['tested'],
        'positive_tests': total_counters.get('test_positive', 0),
        'negative_tests': total_counters.get('test_negative', 0),
        'people_traced': unique_counts['traced'],
        'total_quarantined': total_counters['quarantined'],
        'unique_quarantined': unique_counts['quarantined'],
        'total_isolated': total_counters['isolated'],
        'unique_isolated': unique_counts['isolated'],
        'currently_quarantined': currently['quarantined'],
        'currently_isolated': currently['isolated'],
    }

    # Calculate positive rate
    positive_tests = total_counters.get('test_positive', 0)
    negative_tests = total_counters.get('test_negative', 0)
    total_tests_with_results = positive_tests + negative_tests
    summary_data['positive_rate'] = positive_tests / max(total_tests_with_results, 1) if total_tests_with_results > 0 else 0

    # Prepare daily data for CSV
    daily_records = []
    for day in range(int(days_simulated)):
        # Get simulation date for this day (just the date part)
        sim_date = timer.initial_date + datetime.timedelta(days=day)
        sim_date_str = format_date(sim_date)

        day_data = daily_data.get(day, {})
        record = {
            'date': sim_date_str,  # Use simulation date
            'positive_tests': day_data.get('test_positive', 0),
            'negative_tests': day_data.get('test_negative', 0),
            'quarantining': day_data.get('quarantined', 0),
            'isolating': day_data.get('isolated', 0),
        }
        daily_records.append(record)

    # Prepare cumulative data
    cumulative_records = []
    cum_tests = 0
    cum_pos_tests = 0
    cum_neg_tests = 0
    cum_traced = 0
    cum_quarantined = 0
    cum_isolated = 0

    for day in range(int(days_simulated)):
        sim_date = timer.initial_date + datetime.timedelta(days=day)
        sim_date_str = format_date(sim_date)

        day_data = daily_data.get(day, {})

        # Update cumulative counts
        cum_tests += day_data.get('tested', 0)
        cum_pos_tests += day_data.get('test_positive', 0)
        cum_neg_tests += day_data.get('test_negative', 0)
        cum_traced += day_data.get('traced', 0)
        cum_quarantined += day_data.get('quarantined', 0)
        cum_isolated += day_data.get('isolated', 0)

        record = {
            'date': sim_date_str,
            'cumulative_positive_tests': cum_pos_tests,
            'cumulative_negative_tests': cum_neg_tests,
            'cumulative_quarantined': cum_quarantined,
            'cumulative_isolated': cum_isolated,
        }
        cumulative_records.append(record)

    # Export files
    daily_file = None
    cumulative_file = None
    hdf5_file = None

    # Export CSV files if requested
    if export_csv:
        # Save daily data
        daily_file = os.path.join(output_dir, f"{simulation_id}_daily.csv")
        with open(daily_file, 'w', newline='') as f:
            if daily_records:
                writer = csv.DictWriter(f, fieldnames=daily_records[0].keys())
                writer.writeheader()
                writer.writerows(daily_records)

        # Save cumulative data
        cumulative_file = os.path.join(output_dir, f"{simulation_id}_cumulative.csv")
        with open(cumulative_file, 'w', newline='') as f:
            if cumulative_records:
                writer = csv.DictWriter(f, fieldnames=cumulative_records[0].keys())
                writer.writeheader()
                writer.writerows(cumulative_records)

        print(f"CSV data exported to {output_dir}")

    # Export HDF5 file if requested (using optimised approach - just copy the existing one)
    if export_hdf5 and hasattr(recorder, 'filename') and os.path.exists(recorder.filename):
        hdf5_file = os.path.join(output_dir, f"{simulation_id}.h5")

        # Update metadata in the original file before copying
        with tables.open_file(str(recorder.filename), mode="a") as source_file:
            # Create or get metadata group
            if not hasattr(source_file.root, 'metadata'):
                metadata = source_file.create_group(source_file.root, 'metadata', 'Simulation Metadata')

            # Add simulation metadata as attributes to the root
            for key, value in summary_data.items():
                if isinstance(value, (int, float, str)):
                    source_file.root._v_attrs[key] = value

            # Add additional metadata
            source_file.root._v_attrs['export_timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        import shutil
        shutil.copy2(recorder.filename, hdf5_file)
        print(f"HDF5 data exported to {hdf5_file} (optimised copy)")


    return {
        'daily_file': daily_file,
        'cumulative_file': cumulative_file,
        'hdf5_file': hdf5_file
    }

print_tt_simulation_report(days_simulated=20)

Print a comprehensive report of Test and Trace statistics.

Parameters:

Name Type Description Default
days_simulated int

Number of days the simulation ran (Default value = 20)

20
Source code in june/records/test_trace_event_recording.py
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
def print_tt_simulation_report(days_simulated=20):
    """Print a comprehensive report of Test and Trace statistics.

    Args:
        days_simulated (int, optional): Number of days the simulation ran (Default value = 20)

    """
    import datetime
    from colorama import init, Fore, Style, Back

    # Initialise colorama
    init()

    # Get the recorder
    recorder = GlobalContext.get_tt_event_recorder()

    # Get overall statistics
    stats = recorder.get_stats()
    total_counters = stats['total_counters']
    unique_counts = stats['unique_counts']
    daily_data = stats['daily_counters']
    currently = stats['currently']
    deltas = stats['deltas']

    # Get simulator for time information
    simulator = GlobalContext.get_simulator()
    time = simulator.timer.now

    # Calculate derived statistics with error checking
    avg_tests_per_day = total_counters['tested'] / max(days_simulated, 1) if days_simulated > 0 else 0
    avg_tests_per_person = total_counters['tested'] / max(unique_counts['tested'], 1) if unique_counts['tested'] > 0 else 0

    # Calculate positive rate based on actual positive tests with error checking
    positive_tests = total_counters.get('test_positive', 0)
    negative_tests = total_counters.get('test_negative', 0)
    total_tests_with_results = (positive_tests + negative_tests)
    positive_rate = positive_tests / max(total_tests_with_results, 1) if total_tests_with_results > 0 else 0

    # Calculate isolation rate with error checking
    isolation_rate = 0
    if unique_counts['tested'] > 0:
        isolation_rate = (unique_counts['isolated'] / unique_counts['tested']) * 100

    # Calculate trace efficiency with error checking
    trace_efficiency = 0
    if unique_counts['tested'] > 0:
        trace_efficiency = unique_counts['traced'] / unique_counts['tested']

    # Calculate isolations per day with error checking
    isolations_per_day = 0
    if days_simulated > 0:
        isolations_per_day = total_counters['isolated'] / days_simulated

    # Calculate false positives and negatives by reading from HDF5
    false_positives = 0
    false_negatives = 0


    # Access events from HDF5 file instead of in-memory list
    if hasattr(recorder, 'filename') and os.path.exists(recorder.filename):
        with tables.open_file(str(recorder.filename), mode="r") as file:
            if hasattr(file.root, 'test_and_trace_events'):
                # Count false positives
                for row in file.root.test_and_trace_events.where('event_type == b"test_positive"'):
                    if row['infected'] == 0:  # Not infected but positive test
                        false_positives += 1

                # Count false negatives
                for row in file.root.test_and_trace_events.where('event_type == b"test_negative"'):
                    if row['infected'] == 1:  # Infected but negative test
                        false_negatives += 1

    # Calculate rates with error checking
    fp_rate = false_positives / max(total_counters['test_positive'], 1) if total_counters['test_positive'] > 0 else 0
    fn_rate = false_negatives / max(total_counters['test_negative'], 1) if total_counters['test_negative'] > 0 else 0

    # Print header
    print("\n" + "=" * 80)
    print(f"{Back.BLUE}{Fore.WHITE} TEST AND TRACE SIMULATION REPORT {Style.RESET_ALL}")
    print(f"{Fore.BLUE}Simulation duration: {days_simulated} days{Style.RESET_ALL}")
    print(f"{Fore.BLUE}Simulation time: {time} {Style.RESET_ALL}")
    print(f"{Fore.BLUE}Report generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Style.RESET_ALL}")
    print("=" * 80)

    print(f"\n{Fore.GREEN}▓▓▓ ALL-TIME SUMMARY ▓▓▓{Style.RESET_ALL}")
     # Testing statistics
    print(f"{Fore.CYAN}┌───────────────────────┬───────────────────────┐{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Testing{Fore.CYAN}{Fore.WHITE}Tracing & Isolation{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}├───────────────────────┼───────────────────────┤{Style.RESET_ALL}")

    # Helper function to format values with deltas
    def format_with_delta(value, delta, width=8):
        """Format a value with its delta in parentheses

        Args:
            value: 
            delta: 
            width: (Default value = 8)

        """
        if delta > 0:
            delta_str = f" (+{delta})"
            # If the formatted string gets too long, use shorter formats
            if len(f"{value:,}{delta_str}") > width + 10:
                if value >= 1000000:  # Millions
                    return f"{value/1000000:.1f}M{delta_str}"
                elif value >= 1000:  # Thousands
                    return f"{value/1000:.1f}K{delta_str}"
                else:
                    return f"{value}{delta_str}"
            return f"{value:,}{delta_str}"
        return f"{value:,}"

    print(f"{Fore.CYAN}{Fore.WHITE}Total tests:{Fore.YELLOW} {format_with_delta(total_counters['tested'], deltas['tested'])}{Fore.CYAN}{Fore.WHITE}People traced:{Fore.YELLOW} {format_with_delta(unique_counts['traced'], deltas['traced'])}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Unique people:{Fore.YELLOW} {unique_counts['tested']:,}{Fore.CYAN}{Fore.WHITE}                  {Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Positive tests:{Fore.YELLOW} {format_with_delta(total_counters['test_positive'], deltas['test_positive'])}{Fore.CYAN}{Fore.WHITE}Contacts per case:{Fore.YELLOW} {trace_efficiency:.2f}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Negative tests:{Fore.YELLOW} {format_with_delta(total_counters['test_negative'], deltas['test_negative'])}{Fore.CYAN}{Fore.WHITE}Isolation rate:{Fore.YELLOW} {isolation_rate:.1f}%{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Positive rate:{Fore.YELLOW} {positive_rate*100:.1f}%{Fore.CYAN}{Fore.WHITE}                  {Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Avg tests/day:{Fore.YELLOW} {avg_tests_per_day:.1f}{Fore.CYAN}{Fore.WHITE}                  {Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Avg tests/person:{Fore.YELLOW} {avg_tests_per_person:.2f}{Fore.CYAN}{Fore.WHITE}                  {Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}└───────────────────────┴───────────────────────┘{Style.RESET_ALL}")

    # Quarantine and Isolation details
    print(f"\n{Fore.GREEN}▓▓▓ QUARANTINE & ISOLATION DETAILS ▓▓▓{Style.RESET_ALL}")
    print(f"{Fore.CYAN}┌────────────────────────────┬───────────┬───────────┐{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Metric{Fore.CYAN}{Fore.WHITE}Total{Fore.CYAN}{Fore.WHITE}Unique{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}├────────────────────────────┼───────────┼───────────┤{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Quarantined{Fore.CYAN}{Fore.YELLOW}{format_with_delta(total_counters['quarantined'], deltas['total_quarantined'])}{Fore.CYAN}{Fore.YELLOW}{format_with_delta(unique_counts['quarantined'], deltas['unique_quarantined'])}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Isolated{Fore.CYAN}{Fore.YELLOW}{format_with_delta(total_counters['isolated'], deltas['total_isolated'])}{Fore.CYAN}{Fore.YELLOW}{format_with_delta(unique_counts['isolated'], deltas['unique_isolated'])}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}└────────────────────────────┴───────────┴───────────┘{Style.RESET_ALL}")

    # Current status
    print(f"\n{Fore.GREEN}▓▓▓ CURRENT STATUS ▓▓▓{Style.RESET_ALL}")
    print(f"{Fore.CYAN}┌─────────────────────────────┬───────────┐{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Metric{Fore.CYAN}{Fore.WHITE}Count{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}├─────────────────────────────┼───────────┤{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Currently quarantining{Fore.CYAN}{Fore.YELLOW}{currently['quarantined']:,}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Currently isolating{Fore.CYAN}{Fore.YELLOW}{currently['isolated']:,}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}{Fore.WHITE}Total restricted{Fore.CYAN}{Fore.YELLOW}{currently['quarantined'] + currently['isolated']:,}{Fore.CYAN}{Style.RESET_ALL}")
    print(f"{Fore.CYAN}└─────────────────────────────┴───────────┘{Style.RESET_ALL}")

    # Only print test results analysis if we have test results
    if total_tests_with_results > 0:
        # Print extended test result analysis
        print(f"\n{Fore.GREEN}▓▓▓ TEST RESULTS ANALYSIS ▓▓▓{Style.RESET_ALL}")

        false_positives = 0
        false_negatives = 0

        # Access events from HDF5 file instead of in-memory list
        if hasattr(recorder, 'filename') and os.path.exists(recorder.filename):
            with tables.open_file(str(recorder.filename), mode="r") as file:
                if hasattr(file.root, 'test_and_trace_events'):
                    # Count false positives
                    for row in file.root.test_and_trace_events.where('event_type == b"test_positive"'):
                        if row['infected'] == 0:  # Not infected but positive test
                            false_positives += 1

                    # Count false negatives
                    for row in file.root.test_and_trace_events.where('event_type == b"test_negative"'):
                        if row['infected'] == 1:  # Infected but negative test
                            false_negatives += 1


        # Calculate rates with error checking
        fp_rate = false_positives / max(total_counters['test_positive'], 1) if total_counters['test_positive'] > 0 else 0
        fn_rate = false_negatives / max(total_counters['test_negative'], 1) if total_counters['test_negative'] > 0 else 0

        print(f"{Fore.CYAN}┌─────────────────────────────┬───────────┐{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}Metric{Fore.CYAN}{Fore.WHITE}Value{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}├─────────────────────────────┼───────────┤{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}Total positive tests{Fore.CYAN}{Fore.YELLOW}{total_counters['test_positive']:,}{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}Total negative tests{Fore.CYAN}{Fore.YELLOW}{total_counters['test_negative']:,}{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}Positive test rate{Fore.CYAN}{Fore.YELLOW}{positive_rate*100:.1f}%{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}False positive count{Fore.CYAN}{Fore.YELLOW}{false_positives:,}{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}False positive rate{Fore.CYAN}{Fore.YELLOW}{fp_rate*100:.1f}%{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}False negative count{Fore.CYAN}{Fore.YELLOW}{false_negatives:,}{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{Fore.WHITE}False negative rate{Fore.CYAN}{Fore.YELLOW}{fn_rate*100:.1f}%{Fore.CYAN}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}└─────────────────────────────┴───────────┘{Style.RESET_ALL}")

    print("\n" + "=" * 80)