Skip to content

Company distributor

logger = logging.getLogger('company_distributor') module-attribute

This file contains routines to attribute people with different characteristics according to census data.

CompanyDistributor

Distributes workers that are not yet working in key company sectors (e.g. such as schools and hospitals) to companies. This assumes that the WorkerDistributor has already been run to allocate workers in a super_area

Source code in june/distributors/company_distributor.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class CompanyDistributor:
    """Distributes workers that are not yet working in key company sectors
    (e.g. such as schools and hospitals) to companies. This assumes that
    the WorkerDistributor has already been run to allocate workers in
    a super_area

    """

    def __init__(self, company_mapping_file=None):
        """Get all companies within SuperArea"""
        if company_mapping_file is None:
            company_mapping_file = paths.configs_path / "defaults/groups/company_mapping.yaml"

        # Load the mapping from industry codes to sectors
        with open(company_mapping_file, 'r') as f:
            mapping_data = yaml.safe_load(f)

        # Create a mapping from industry code to list of compatible sectors
        self.industry_to_sectors = {}
        scotland_mapping = mapping_data.get('Scotland', {})
        for industry_code, sectors in scotland_mapping.items():
            if isinstance(sectors, list):
                self.industry_to_sectors[str(industry_code)] = sectors
            else:
                # Handle single sector case
                self.industry_to_sectors[str(industry_code)] = [sectors]

        # Pre-compute compatibility matrix for faster lookups
        self._build_compatibility_matrix()

    def _build_compatibility_matrix(self):
        """Pre-compute sector-industry compatibility for O(1) lookups."""
        self.sector_to_industries = defaultdict(list)
        self.compatibility_matrix = {}

        # Get all unique sectors and industries
        all_sectors = set()
        all_industries = set(self.industry_to_sectors.keys())

        for industry, sectors in self.industry_to_sectors.items():
            for sector in sectors:
                all_sectors.add(sector)
                self.sector_to_industries[sector].append(industry)

        # Build compatibility matrix: {(sector, industry): bool}
        for sector in all_sectors:
            for industry in all_industries:
                compatible_sectors = self.industry_to_sectors.get(industry, [])
                self.compatibility_matrix[(sector, industry)] = sector in compatible_sectors

    def _can_worker_work_in_company(self, worker_sector, company_industry):
        """Fast O(1) compatibility check using pre-computed matrix.

        Args:
            worker_sector: 
            company_industry: 

        """
        return self.compatibility_matrix.get((worker_sector, str(company_industry)), False)

    def distribute_adults_to_companies_in_super_areas(self, super_areas, debug=False):
        """

        Args:
            super_areas: 
            debug: (Default value = False)

        """
        logger.info("Distributing workers to companies")
        total_workers_assigned = 0

        for i, super_area in enumerate(super_areas):
            if i % 500 == 0:  # Reduced logging frequency
                logger.info(
                    f"Distributed workers to companies in {i} of {len(super_areas)} super areas."
                )
            workers_assigned = self._distribute_adults_to_companies_in_super_area_optimized(super_area)
            total_workers_assigned += workers_assigned

        logger.info(f"Workers distributed to companies. Total assigned: {total_workers_assigned}")

        # Optional debug output
        if debug:
            self._debug_distribution_sample(super_areas)

    def _distribute_adults_to_companies_in_super_area_optimized(self, super_area):
        """Optimized version using pre-computed compatibility and batch operations.

        Args:
            super_area: 

        """
        if not super_area.companies:
            logger.warning(f"Super area {super_area.name} has no companies - skipping")
            return 0

        # Pre-group companies by industry for faster access
        companies_by_industry = defaultdict(list)
        available_capacity = defaultdict(list)  # Track available spots

        for company in super_area.companies:
            industry = str(company.sector)
            companies_by_industry[industry].append(company)
            available_capacity[industry].append(company.n_workers_max - company.n_workers)

        # Group workers by sector for batch processing
        workers_by_sector = defaultdict(list)
        unassigned_workers = []

        for worker in super_area.workers:
            if worker.primary_activity is not None:
                continue

            worker_sector = worker.sector
            if worker_sector in self.sector_to_industries:
                workers_by_sector[worker_sector].append(worker)
            else:
                unassigned_workers.append(worker)

        workers_assigned = 0

        # Process workers by sector in batches
        for sector, workers in workers_by_sector.items():
            compatible_industries = self.sector_to_industries[sector]

            # Find all available companies for this sector
            available_companies = []
            for industry in compatible_industries:
                if industry in companies_by_industry:
                    for company in companies_by_industry[industry]:
                        if company.n_workers < company.n_workers_max:
                            available_companies.append(company)

            if not available_companies:
                unassigned_workers.extend(workers)
                continue

            # Assign workers to companies efficiently
            for worker in workers:
                if not available_companies:
                    unassigned_workers.append(worker)
                    continue

                # Choose company (round-robin for better distribution)
                company = available_companies[workers_assigned % len(available_companies)]

                # Add worker to company
                company.add(worker)
                subgroup = company.get_index_subgroup(worker)
                company.add_to_registered_members(worker.id, subgroup_type=subgroup)
                workers_assigned += 1

                # Remove company from available list if full
                if company.n_workers >= company.n_workers_max:
                    available_companies.remove(company)

        # Handle unassigned workers
        if unassigned_workers and super_area.companies:
            # Use numpy for efficient random assignment
            companies_array = np.array(super_area.companies)
            if len(companies_array) > 0:
                company_assignments = np.random.choice(
                    companies_array, len(unassigned_workers)
                )

                for worker, company in zip(unassigned_workers, company_assignments):
                    company.add(worker)
                    subgroup = company.get_index_subgroup(worker)
                    company.add_to_registered_members(worker.id, subgroup_type=subgroup)
                    workers_assigned += 1

        return workers_assigned

    def _debug_distribution_sample(self, super_areas, sample_size=10):
        """Optional debug method to show distribution sample - only call when debug=True.

        Args:
            super_areas: 
            sample_size: (Default value = 10)

        """
        try:
            # Sample workers across companies for verification
            all_companies = []
            worker_sample = []

            sample_super_areas = random.sample(list(super_areas), min(5, len(super_areas)))

            for super_area in sample_super_areas:
                if not hasattr(super_area, 'companies') or not super_area.companies:
                    continue

                sample_companies = random.sample(list(super_area.companies), min(3, len(super_area.companies)))

                for company in sample_companies:
                    all_companies.append({
                        "Super Area": super_area.name,
                        "Company ID": company.id,
                        "Sector": company.sector,
                        "Workers": len(company.people),
                        "Max Capacity": company.n_workers_max
                    })

                    # Sample workers from this company
                    if company.people:
                        sample_workers = random.sample(company.people, min(2, len(company.people)))
                        for worker in sample_workers:
                            worker_sample.append({
                                "Worker ID": worker.id,
                                "Age": worker.age,
                                "Sector": getattr(worker, 'sector', 'Unknown'),
                                "Company ID": company.id,
                                "Company Sector": company.sector
                            })

            if all_companies:
                df_companies = pd.DataFrame(all_companies)
                print(f"\n===== Sample Companies ({len(all_companies)}) =====")
                print(df_companies.head(sample_size))

            if worker_sample:
                df_workers = pd.DataFrame(worker_sample)
                print(f"\n===== Sample Worker Assignments ({len(worker_sample)}) =====")
                print(df_workers.head(sample_size))

        except Exception as e:
            logger.warning(f"Debug sampling failed: {e}")

    def _distribute_adults_to_companies_in_super_area(self, super_area):
        """Looks for all workers and companies in the super area and matches
        them based on industry code to sector mapping

        Args:
            super_area: 

        """
        # Group companies by industry code instead of sector
        company_dict = defaultdict(list)
        full_idx = defaultdict(int)
        unallocated_workers = []

        for company in super_area.companies:
            industry_code = str(company.sector)  # company.sector contains the industry code from SCT data
            company_dict[industry_code].append(company)
            full_idx[industry_code] = 0

        for worker in super_area.workers:
            if worker.primary_activity is not None:
                continue

            # Find companies where this worker can work based on sector mapping
            assigned = False
            worker_sector = worker.sector

            for industry_code, companies in company_dict.items():
                if self._can_worker_work_in_company(worker_sector, industry_code):
                    if full_idx[industry_code] >= len(companies):
                        idx = randint(0, len(companies) - 1)
                        company = companies[idx]
                    else:
                        company = companies[0]
                        if company.n_workers >= company.n_workers_max:
                            full_idx[industry_code] += 1

                    company.add(worker)
                    # Use the numeric subgroup index directly
                    subgroup = company.get_index_subgroup(worker)
                    company.add_to_registered_members(worker.id, subgroup_type=subgroup)
                    assigned = True
                    break

            if not assigned:
                unallocated_workers.append(worker)

        if unallocated_workers:
            #logger.info(f"Super area {super_area.name}: {len(unallocated_workers)} unallocated workers, {len(super_area.companies)} companies available")

            if len(super_area.companies) == 0:
                logger.error(f"PROBLEM FOUND: Super area {super_area.name} has {len(unallocated_workers)} unallocated workers but NO COMPANIES available!")
                logger.error(f"Super area details - Name: {super_area.name}, ID: {getattr(super_area, 'id', 'unknown')}")
                logger.error(f"Workers in this super area that need companies:")
                for i, worker in enumerate(unallocated_workers[:10]):  # Show first 10
                    logger.error(f"  Worker {i+1}: ID={worker.id}, Age={worker.age}, Sector={worker.sector}, Area={worker.area.name}")
                if len(unallocated_workers) > 10:
                    logger.error(f"  ... and {len(unallocated_workers) - 10} more workers")

                # Let's also check if there are any companies in neighboring super areas
                logger.error(f"This will cause the 'a' cannot be empty error in numpy.random.choice()")
                raise ValueError(f"Super area {super_area.name} has workers but no companies - this causes the distribution error")
            else:
                companies_for_unallocated = np.random.choice(
                    super_area.companies, len(unallocated_workers)
                )
                for worker, company in zip(unallocated_workers, companies_for_unallocated):
                    company.add(worker)
                    # Use the numeric subgroup index directly
                subgroup = company.get_index_subgroup(worker)
                company.add_to_registered_members(worker.id, subgroup_type=subgroup)

__init__(company_mapping_file=None)

Get all companies within SuperArea

Source code in june/distributors/company_distributor.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, company_mapping_file=None):
    """Get all companies within SuperArea"""
    if company_mapping_file is None:
        company_mapping_file = paths.configs_path / "defaults/groups/company_mapping.yaml"

    # Load the mapping from industry codes to sectors
    with open(company_mapping_file, 'r') as f:
        mapping_data = yaml.safe_load(f)

    # Create a mapping from industry code to list of compatible sectors
    self.industry_to_sectors = {}
    scotland_mapping = mapping_data.get('Scotland', {})
    for industry_code, sectors in scotland_mapping.items():
        if isinstance(sectors, list):
            self.industry_to_sectors[str(industry_code)] = sectors
        else:
            # Handle single sector case
            self.industry_to_sectors[str(industry_code)] = [sectors]

    # Pre-compute compatibility matrix for faster lookups
    self._build_compatibility_matrix()

distribute_adults_to_companies_in_super_areas(super_areas, debug=False)

Parameters:

Name Type Description Default
super_areas
required
debug

(Default value = False)

False
Source code in june/distributors/company_distributor.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def distribute_adults_to_companies_in_super_areas(self, super_areas, debug=False):
    """

    Args:
        super_areas: 
        debug: (Default value = False)

    """
    logger.info("Distributing workers to companies")
    total_workers_assigned = 0

    for i, super_area in enumerate(super_areas):
        if i % 500 == 0:  # Reduced logging frequency
            logger.info(
                f"Distributed workers to companies in {i} of {len(super_areas)} super areas."
            )
        workers_assigned = self._distribute_adults_to_companies_in_super_area_optimized(super_area)
        total_workers_assigned += workers_assigned

    logger.info(f"Workers distributed to companies. Total assigned: {total_workers_assigned}")

    # Optional debug output
    if debug:
        self._debug_distribution_sample(super_areas)