diff --git a/macromodel/agents/firms/func/production.py b/macromodel/agents/firms/func/production.py index 10b7390a..7d5a1f26 100644 --- a/macromodel/agents/firms/func/production.py +++ b/macromodel/agents/firms/func/production.py @@ -945,13 +945,18 @@ def compute_intermediate_inputs_used( ) -> np.ndarray: """Calculate intermediate inputs used under bundled Leontief technology. - Input usage is proportional to production with fixed input-output coefficients, - adjusted by the substitution bundle matrix. + For singleton goods, usage follows the standard Leontief formula: + used = production / coefficient. + + Within multi-member substitution bundles, total usage is set so that + the effective input consumed (sum of weight * coefficient * used) + equals production, and is distributed proportionally to stock: + used[j] = production * stock[j] / bundle_capacity. Args: realised_production (np.ndarray): Actual production achieved intermediate_inputs_productivity_matrix (np.ndarray): Input-output - coefficients + coefficients (productivity: output per unit of input) intermediate_inputs_stock (np.ndarray): Available input stocks goods_criticality_matrix (np.ndarray): Input criticality levels substitution_bundle_matrix (np.ndarray): Matrix defining substitution @@ -960,7 +965,7 @@ def compute_intermediate_inputs_used( Returns: np.ndarray: Intermediate inputs used in production """ - # Calculate base input usage + # Base Leontief usage for singleton goods: used = production / coefficient used_inputs = np.divide( realised_production[:, None], intermediate_inputs_productivity_matrix, @@ -968,9 +973,31 @@ def compute_intermediate_inputs_used( where=intermediate_inputs_productivity_matrix != 0.0, ) - # Apply substitution bundles to determine actual input usage - # This is a simplified approach - in a full implementation, - # you might want to optimize input usage across bundles + # For multi-member bundles, distribute usage proportionally to stock + # such that the effective bundle contribution equals production + bundle_members_per_col = (substitution_bundle_matrix > 0).sum(axis=0) + multi_member_bundles = np.where(bundle_members_per_col > 1)[0] + + for b in multi_member_bundles: + member_indices = np.where(substitution_bundle_matrix[:, b] > 0)[0] + member_stock = intermediate_inputs_stock[:, member_indices] + member_coeff = intermediate_inputs_productivity_matrix[:, member_indices] + member_weights = substitution_bundle_matrix[member_indices, b] + + # Only include goods with finite positive coefficients in bundle capacity. + # inf coefficient means the firm doesn't use that input at all. + finite_mask = np.isfinite(member_coeff) & (member_coeff > 0) + effective = np.where(finite_mask, member_weights * member_coeff * member_stock, 0.0) + bundle_cap = effective.sum(axis=1, keepdims=True) + + # used[j] = production * stock[j] / bundle_capacity + # Only for goods the firm actually uses (finite coefficient) + used_inputs[:, member_indices] = np.where( + finite_mask & (bundle_cap > 0), + realised_production[:, None] * member_stock / np.maximum(bundle_cap, 1e-30), + 0.0, + ) + return used_inputs def compute_capital_inputs_used( @@ -984,7 +1011,9 @@ def compute_capital_inputs_used( """Calculate capital depreciation under bundled Leontief technology. Capital depreciation is proportional to production with fixed - depreciation rates, adjusted by the substitution bundle matrix. + depreciation rates. Bundle adjustment is not applied here because + depreciation (production * rate) does not have the same + overconsumption issue as intermediate inputs (production / coefficient). Args: realised_production (np.ndarray): Actual production achieved @@ -998,12 +1027,7 @@ def compute_capital_inputs_used( Returns: np.ndarray: Capital inputs depreciated in production """ - # Calculate base capital depreciation used_capital_inputs = realised_production[:, None] * capital_inputs_depreciation_matrix used_capital_inputs[used_capital_inputs == np.inf] = 0.0 used_capital_inputs[used_capital_inputs == -np.inf] = 0.0 - - # Apply substitution bundles to determine actual capital usage - # This is a simplified approach - in a full implementation, - # you might want to optimize capital usage across bundles return used_capital_inputs diff --git a/tests/test_macromodel/unit/test_agents/test_firms/func/test_production.py b/tests/test_macromodel/unit/test_agents/test_firms/func/test_production.py index 3f091647..fe2eaeac 100644 --- a/tests/test_macromodel/unit/test_agents/test_firms/func/test_production.py +++ b/tests/test_macromodel/unit/test_agents/test_firms/func/test_production.py @@ -144,3 +144,154 @@ def test_target_intermediate_inputs_bundle_empty(self): expected = np.full((n_industries, n_industries), 1) assert np.array_equal(result, expected) + + +class TestBundledLeontiefUsage: + """Test that BundledLeontief distributes input usage within bundles + proportionally to stock, not at fixed Leontief rates.""" + + def test_usage_distributed_by_stock_within_bundle(self): + """Within a bundle, usage is proportional to stock. The effective + input consumed (sum of weight * coeff * used) must equal production.""" + n_firms = 2 + n_goods = 3 # goods 0 and 1 in a bundle, good 2 is singleton + + # Equal coefficients: 1 unit of output per unit of input + productivity = np.full((n_firms, n_goods), 1.0) + + stock = np.array( + [ + [3.0, 1.0, 2.0], # firm 0: 3x more of good 0 than good 1 + [2.0, 2.0, 2.0], # firm 1: equal stock + ] + ) + + production = np.array([2.0, 2.0]) + + # Bundle matrix: goods 0,1 in bundle 0; good 2 in bundle 1 + bundle_matrix = np.array( + [ + [0.5, 0.0], + [0.5, 0.0], + [0.0, 1.0], + ] + ) + + criticality = np.ones((n_firms, n_goods)) + + bundled = BundledLeontief() + used = bundled.compute_intermediate_inputs_used( + realised_production=production, + intermediate_inputs_productivity_matrix=productivity, + intermediate_inputs_stock=stock, + goods_criticality_matrix=criticality, + substitution_bundle_matrix=bundle_matrix, + ) + + # Bundle cap firm 0: 0.5*(1*3 + 1*1) = 2.0 + # used[j] = production * stock[j] / cap = 2*stock[j]/2 = stock[j] + # Bundle cap firm 1: 0.5*(1*2 + 1*2) = 2.0, same result + # Good 2 (singleton): production / coeff = 2.0 + expected = np.array( + [ + [3.0, 1.0, 2.0], + [2.0, 2.0, 2.0], + ] + ) + assert np.allclose(used, expected) + + # Verify effective contribution = production for the bundle + bundle_contrib = (used[:, :2] * productivity[:, :2] * 0.5).sum(axis=1) + assert np.allclose(bundle_contrib, production) + + def test_usage_never_exceeds_stock(self): + """With varying coefficients, usage must never exceed available stock. + This was the overconsumption bug: summing production/coeff across all + bundle members produced usage exceeding total stock.""" + n_firms = 1 + n_goods = 2 # both in one bundle + + # Good 0 very productive, good 1 not + productivity = np.array([[2.0, 0.5]]) + stock = np.array([[3.0, 1.0]]) + + # Bundle capacity: 0.5*(2*3 + 0.5*1) = 3.25 + production = np.array([3.25]) # bundle is binding + + bundle_matrix = np.array([[0.5], [0.5]]) + criticality = np.ones((n_firms, n_goods)) + + bundled = BundledLeontief() + used = bundled.compute_intermediate_inputs_used( + realised_production=production, + intermediate_inputs_productivity_matrix=productivity, + intermediate_inputs_stock=stock, + goods_criticality_matrix=criticality, + substitution_bundle_matrix=bundle_matrix, + ) + + # Must not exceed stock + assert np.all(used <= stock + 1e-10) + + # Effective contribution must equal production + bundle_contrib = (used * productivity * 0.5).sum(axis=1) + assert np.allclose(bundle_contrib, production) + + def test_usage_effective_contribution_equals_production(self): + """The effective input contribution (sum of weight*coeff*used) + must equal production for each bundle, with varying coefficients.""" + n_firms = 3 + n_goods = 4 # goods 0,1,2 in a bundle, good 3 singleton + + productivity = np.array( + [ + [2.0, 4.0, 5.0, 3.0], + [2.0, 4.0, 5.0, 3.0], + [2.0, 4.0, 5.0, 3.0], + ] + ) + + stock = np.array( + [ + [10.0, 5.0, 5.0, 8.0], + [1.0, 1.0, 18.0, 8.0], + [0.0, 0.0, 0.0, 8.0], # no stock at all + ] + ) + + production = np.array([10.0, 10.0, 10.0]) + + bundle_matrix = np.array( + [ + [1 / 3, 0.0], + [1 / 3, 0.0], + [1 / 3, 0.0], + [0.0, 1.0], + ] + ) + + criticality = np.ones((n_firms, n_goods)) + + bundled = BundledLeontief() + used = bundled.compute_intermediate_inputs_used( + realised_production=production, + intermediate_inputs_productivity_matrix=productivity, + intermediate_inputs_stock=stock, + goods_criticality_matrix=criticality, + substitution_bundle_matrix=bundle_matrix, + ) + + # Effective bundle contribution must equal production for firms with stock + weights = np.array([1 / 3, 1 / 3, 1 / 3]) + for firm in [0, 1]: + bundle_contrib = (used[firm, :3] * productivity[firm, :3] * weights).sum() + assert np.allclose(bundle_contrib, production[firm]) + + # Singleton good 3 unchanged: 10/3 + assert np.allclose(used[:, 3], 10.0 / 3.0) + + # Firm 2 has zero stock -> zero usage + assert np.allclose(used[2, :3], 0.0) + + # Usage must never exceed stock + assert np.all(used <= stock + 1e-10)