Skip to content

Exact num infection seed

ExactNumClusteredInfectionSeed

Bases: ExactNumInfectionSeed

Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class ExactNumClusteredInfectionSeed(ExactNumInfectionSeed):
    """ """
    def __init__(
        self,
        world: "World",
        infection_selector: InfectionSelector,
        daily_cases_per_capita_per_age_per_region: pd.DataFrame,
        seed_past_infections: bool = True,
        seed_strength=1.0,
        # account_secondary_infections=False,
    ):
        super().__init__(
            world=world,
            infection_selector=infection_selector,
            daily_cases_per_capita_per_age_per_region=daily_cases_per_capita_per_age_per_region,
            seed_past_infections=seed_past_infections,
            seed_strength=seed_strength,
            # account_secondary_infections=account_secondary_infections,
        )

    def get_household_score(self, household, age_distribution):
        """

        Args:
            household: 
            age_distribution: 

        """
        if len(household.residents) == 0:
            return 0
        age_ranges = []
        for age in age_distribution.index:
            agemin, agemax = age.split("-")
            age_ranges.append([int(agemin), int(agemax)])
        ret = 0
        for resident in household.residents:
            for ii, age_bin in enumerate(age_ranges):
                if resident.age >= age_bin[0] and resident.age < age_bin[1]:
                    ret += age_distribution.iloc[ii]
                    break
        return ret / np.sqrt(len(household.residents))

    def infect_super_area(
        self, super_area, cases_per_capita_per_age, time, record=None
    ):
        """

        Args:
            super_area: 
            cases_per_capita_per_age: 
            time: 
            record: (Default value = None)

        """
        households = []
        if isinstance(super_area, World):
            for r in super_area.regions:
                for sa in r.super_areas:
                    for area in sa.areas:
                        households += area.households
        elif isinstance(super_area, Region):
            for sa in super_area.super_areas:
                for area in sa.areas:
                    households += area.households
        elif isinstance(super_area, SuperArea):
            for area in super_area.areas:
                households += area.households
        elif isinstance(super_area, Area):
            households += super_area.households
        else:
            raise TypeError(
                "invalid seeding location type: " + type(super_area).__name__
            )

        # Handle edge cases to prevent infinite loops
        if len(households) == 0 or cases_per_capita_per_age.sum() <= 0:
            return

        age_distribution = cases_per_capita_per_age / cases_per_capita_per_age.sum()
        scores = [self.get_household_score(h, age_distribution) for h in households]

        # If all scores are zero, return early
        if sum(scores) == 0:
            return

        cum_scores = np.cumsum(scores)

        infection_id = self.infection_selector.infection_class.infection_id()
        total_to_infect = int(cases_per_capita_per_age.sum())

        seeded_households = set()
        attempts = 0
        max_attempts = len(households) * 10  # Prevent infinite loops

        while total_to_infect > 0 and attempts < max_attempts:
            attempts += 1
            num = random.random() * cum_scores[-1]
            idx = np.searchsorted(cum_scores, num)
            household = households[idx]

            if household.id in seeded_households:
                continue

            # Mark household as attempted regardless of outcome
            seeded_households.add(household.id)

            # Try to infect household members
            for person in household.residents:
                if person.immunity.get_susceptibility(infection_id) > 0:
                    self.infect_person(person=person, time=time, record=record)
                    if time < 0:
                        self.bring_infection_up_to_date(
                            person=person,
                            time_from_infection=-time,
                            record=record,
                        )
                    total_to_infect -= 1
                    if total_to_infect <= 0:
                        return

            # If we've tried all households and can't infect anyone, exit
            if len(seeded_households) >= len(households):
                return

get_household_score(household, age_distribution)

Parameters:

Name Type Description Default
household
required
age_distribution
required
Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_household_score(self, household, age_distribution):
    """

    Args:
        household: 
        age_distribution: 

    """
    if len(household.residents) == 0:
        return 0
    age_ranges = []
    for age in age_distribution.index:
        agemin, agemax = age.split("-")
        age_ranges.append([int(agemin), int(agemax)])
    ret = 0
    for resident in household.residents:
        for ii, age_bin in enumerate(age_ranges):
            if resident.age >= age_bin[0] and resident.age < age_bin[1]:
                ret += age_distribution.iloc[ii]
                break
    return ret / np.sqrt(len(household.residents))

infect_super_area(super_area, cases_per_capita_per_age, time, record=None)

Parameters:

Name Type Description Default
super_area
required
cases_per_capita_per_age
required
time
required
record

(Default value = None)

None
Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def infect_super_area(
    self, super_area, cases_per_capita_per_age, time, record=None
):
    """

    Args:
        super_area: 
        cases_per_capita_per_age: 
        time: 
        record: (Default value = None)

    """
    households = []
    if isinstance(super_area, World):
        for r in super_area.regions:
            for sa in r.super_areas:
                for area in sa.areas:
                    households += area.households
    elif isinstance(super_area, Region):
        for sa in super_area.super_areas:
            for area in sa.areas:
                households += area.households
    elif isinstance(super_area, SuperArea):
        for area in super_area.areas:
            households += area.households
    elif isinstance(super_area, Area):
        households += super_area.households
    else:
        raise TypeError(
            "invalid seeding location type: " + type(super_area).__name__
        )

    # Handle edge cases to prevent infinite loops
    if len(households) == 0 or cases_per_capita_per_age.sum() <= 0:
        return

    age_distribution = cases_per_capita_per_age / cases_per_capita_per_age.sum()
    scores = [self.get_household_score(h, age_distribution) for h in households]

    # If all scores are zero, return early
    if sum(scores) == 0:
        return

    cum_scores = np.cumsum(scores)

    infection_id = self.infection_selector.infection_class.infection_id()
    total_to_infect = int(cases_per_capita_per_age.sum())

    seeded_households = set()
    attempts = 0
    max_attempts = len(households) * 10  # Prevent infinite loops

    while total_to_infect > 0 and attempts < max_attempts:
        attempts += 1
        num = random.random() * cum_scores[-1]
        idx = np.searchsorted(cum_scores, num)
        household = households[idx]

        if household.id in seeded_households:
            continue

        # Mark household as attempted regardless of outcome
        seeded_households.add(household.id)

        # Try to infect household members
        for person in household.residents:
            if person.immunity.get_susceptibility(infection_id) > 0:
                self.infect_person(person=person, time=time, record=record)
                if time < 0:
                    self.bring_infection_up_to_date(
                        person=person,
                        time_from_infection=-time,
                        record=record,
                    )
                total_to_infect -= 1
                if total_to_infect <= 0:
                    return

        # If we've tried all households and can't infect anyone, exit
        if len(seeded_households) >= len(households):
            return

ExactNumInfectionSeed

Bases: InfectionSeed

Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class ExactNumInfectionSeed(InfectionSeed):
    """ """
    def __init__(
        self,
        world: "World",
        infection_selector: InfectionSelector,
        daily_cases_per_capita_per_age_per_region: pd.DataFrame,
        seed_past_infections: bool = True,
        seed_strength=1.0,
        # account_secondary_infections=False,
    ):
        super().__init__(
            world=world,
            infection_selector=infection_selector,
            daily_cases_per_capita_per_age_per_region=daily_cases_per_capita_per_age_per_region,
            seed_past_infections=seed_past_infections,
            seed_strength=seed_strength,
            account_secondary_infections=False,
        )
        # use age bin in exact number mode. No need to expanding individual ages.
        self.daily_cases_per_capita_per_age_per_region = (
            daily_cases_per_capita_per_age_per_region * seed_strength
        )

        self.iter_type_set = set()
        if "all" not in daily_cases_per_capita_per_age_per_region.columns:
            # generate list of existing regions, superareas, areas
            regions = [region.name for region in self.world.regions]
            super_areas = [super_area.name for super_area in self.world.super_areas]
            areas = [area.name for area in self.world.areas]

            # check if seeding locations are existing in curent world
            for loc_name in self.daily_cases_per_capita_per_age_per_region.columns:
                if loc_name in regions:
                    self.iter_type_set.add("regions")
                elif loc_name in super_areas:
                    self.iter_type_set.add("super_areas")
                elif loc_name in areas:
                    self.iter_type_set.add("areas")
                else:
                    pass
                    #raise TypeError(
                    #    "invalid seeding location (column) name: " + loc_name
                    #)

    def infect_super_area(
        self, super_area, cases_per_capita_per_age, time, record=None
    ):
        """

        Args:
            super_area: 
            cases_per_capita_per_age: 
            time: 
            record: (Default value = None)

        """
        print("Infecting super area - exact num")
        people = super_area.people
        infection_id = self.infection_selector.infection_class.infection_id()

        age_ranges = []
        for age in cases_per_capita_per_age.index:
            agemin, agemax = age.split("-")
            age_ranges.append([int(agemin), int(agemax)])

        N_seeded = np.zeros(len(age_ranges), dtype="int")
        random.seed()
        for person in random.sample(list(people), len(people)):
            in_seed_age_range = False
            for j in range(len(age_ranges)):
                if (
                    person.age >= age_ranges[j][0]
                    and person.age < age_ranges[j][1]
                    and N_seeded[j] < int(cases_per_capita_per_age.iloc[j])
                ):
                    in_seed_age_range = True
                    break
            if (
                in_seed_age_range
                and person.immunity.get_susceptibility(infection_id) > 0
            ):
                self.infect_person(person=person, time=time, record=record)
                self.current_seeded_cases[person.region.name] += 1
                if time < 0:
                    self.bring_infection_up_to_date(
                        person=person, time_from_infection=-time, record=record
                    )

                N_seeded[j] += 1
                if np.all(N_seeded == np.array(cases_per_capita_per_age)):
                    break

    def infect_super_areas(
        self,
        cases_per_capita_per_age_per_region: pd.DataFrame,
        time: float,
        date: datetime.datetime,
        record: Optional[Record] = None,
    ):
        """Infect world/region/super_area/area with number of cases given by data frame
        Not only super area, but still keep the old function name for now.

        Args:
            cases_per_capita_per_age_per_region (pd.DataFrame): 
            time (float): Time where infections start (could be negative if they started before the simulation)
            date (datetime.datetime): 
            record (Optional[Record], optional): (Default value = None)

        """
        if "all" in cases_per_capita_per_age_per_region.columns:
            self.infect_super_area(
                super_area=self.world,
                cases_per_capita_per_age=cases_per_capita_per_age_per_region["all"],
                time=time,
                record=record,
            )
        else:
            num_locations_to_seed = len(cases_per_capita_per_age_per_region.columns)
            for geo_type_name in self.iter_type_set:
                geo_type = getattr(self.world, geo_type_name)
                for this_loc in geo_type:
                    try:
                        cases_per_capita_per_age = cases_per_capita_per_age_per_region[
                            this_loc.name
                        ]
                    except KeyError:
                        continue

                    """ 
                    ### 
                    # TO DO: rewite self._adjust_seed_accounting_secondary_infections to work for superarea/area
                    ###
                    if self._need_to_seed_accounting_secondary_infections(date=date):
                        cases_per_capita_per_age = (
                            self._adjust_seed_accounting_secondary_infections(
                                cases_per_capita_per_age=cases_per_capita_per_age,
                                region=this_loc,
                                date=date,
                                time=time,
                            )
                        )
                    """
                    self.infect_super_area(
                        super_area=this_loc,
                        cases_per_capita_per_age=cases_per_capita_per_age,
                        time=time,
                        record=record,
                    )
                    num_locations_to_seed -= 1

infect_super_area(super_area, cases_per_capita_per_age, time, record=None)

Parameters:

Name Type Description Default
super_area
required
cases_per_capita_per_age
required
time
required
record

(Default value = None)

None
Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def infect_super_area(
    self, super_area, cases_per_capita_per_age, time, record=None
):
    """

    Args:
        super_area: 
        cases_per_capita_per_age: 
        time: 
        record: (Default value = None)

    """
    print("Infecting super area - exact num")
    people = super_area.people
    infection_id = self.infection_selector.infection_class.infection_id()

    age_ranges = []
    for age in cases_per_capita_per_age.index:
        agemin, agemax = age.split("-")
        age_ranges.append([int(agemin), int(agemax)])

    N_seeded = np.zeros(len(age_ranges), dtype="int")
    random.seed()
    for person in random.sample(list(people), len(people)):
        in_seed_age_range = False
        for j in range(len(age_ranges)):
            if (
                person.age >= age_ranges[j][0]
                and person.age < age_ranges[j][1]
                and N_seeded[j] < int(cases_per_capita_per_age.iloc[j])
            ):
                in_seed_age_range = True
                break
        if (
            in_seed_age_range
            and person.immunity.get_susceptibility(infection_id) > 0
        ):
            self.infect_person(person=person, time=time, record=record)
            self.current_seeded_cases[person.region.name] += 1
            if time < 0:
                self.bring_infection_up_to_date(
                    person=person, time_from_infection=-time, record=record
                )

            N_seeded[j] += 1
            if np.all(N_seeded == np.array(cases_per_capita_per_age)):
                break

infect_super_areas(cases_per_capita_per_age_per_region, time, date, record=None)

Infect world/region/super_area/area with number of cases given by data frame Not only super area, but still keep the old function name for now.

Parameters:

Name Type Description Default
cases_per_capita_per_age_per_region DataFrame
required
time float

Time where infections start (could be negative if they started before the simulation)

required
date datetime
required
record Optional[Record]

(Default value = None)

None
Source code in june/epidemiology/infection_seed/exact_num_infection_seed.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def infect_super_areas(
    self,
    cases_per_capita_per_age_per_region: pd.DataFrame,
    time: float,
    date: datetime.datetime,
    record: Optional[Record] = None,
):
    """Infect world/region/super_area/area with number of cases given by data frame
    Not only super area, but still keep the old function name for now.

    Args:
        cases_per_capita_per_age_per_region (pd.DataFrame): 
        time (float): Time where infections start (could be negative if they started before the simulation)
        date (datetime.datetime): 
        record (Optional[Record], optional): (Default value = None)

    """
    if "all" in cases_per_capita_per_age_per_region.columns:
        self.infect_super_area(
            super_area=self.world,
            cases_per_capita_per_age=cases_per_capita_per_age_per_region["all"],
            time=time,
            record=record,
        )
    else:
        num_locations_to_seed = len(cases_per_capita_per_age_per_region.columns)
        for geo_type_name in self.iter_type_set:
            geo_type = getattr(self.world, geo_type_name)
            for this_loc in geo_type:
                try:
                    cases_per_capita_per_age = cases_per_capita_per_age_per_region[
                        this_loc.name
                    ]
                except KeyError:
                    continue

                """ 
                ### 
                # TO DO: rewite self._adjust_seed_accounting_secondary_infections to work for superarea/area
                ###
                if self._need_to_seed_accounting_secondary_infections(date=date):
                    cases_per_capita_per_age = (
                        self._adjust_seed_accounting_secondary_infections(
                            cases_per_capita_per_age=cases_per_capita_per_age,
                            region=this_loc,
                            date=date,
                            time=time,
                        )
                    )
                """
                self.infect_super_area(
                    super_area=this_loc,
                    cases_per_capita_per_age=cases_per_capita_per_age,
                    time=time,
                    record=record,
                )
                num_locations_to_seed -= 1