Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CHANGELOG
------

**ENHANCEMENTS**
- Add support for p6e-gb200 instances via capacity blocks.
- Echo chef-client log when a node fails to bootstrap. This helps with investigating bootstrap failures in cases CloudWatch logs are not available.

**CHANGES**
Expand Down
4 changes: 4 additions & 0 deletions cli/src/pcluster/aws/aws_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ def total_instance_count(self):
"""Return the total instance count, if present, 0 otherwise."""
return self.capacity_reservation_data.get("TotalInstanceCount", 0)

def available_instance_count(self):
"""Return the available instance count, if present, 0 otherwise."""
return self.capacity_reservation_data.get("AvailableInstanceCount", 0)

def get_tag(self, tag_key: str):
"""Get stack tag by tag key."""
return next(
Expand Down
74 changes: 74 additions & 0 deletions cli/src/pcluster/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,77 @@ def is_subnet_public(self, subnet_id):
return True

return False

@AWSExceptionHandler.handle_client_exception
def describe_capacity_block_status(
self, capacity_block_ids: List[str] = None, filters=None, max_results: int = None
):
"""
Describe the availability and health status of capacity blocks, particularly for ultraserver instances.

This method is primarily used to check the health status of ultraserver capacity blocks
(e.g., p6e-gb200) to ensure they are ready for cluster operations. It provides information
about interconnect status and available capacity.

:param capacity_block_ids: List of Capacity Block IDs to query (e.g., ['cr-123456']).
:param filters: Optional boto3-style filters to narrow results (e.g., interconnect-status).
:param max_results: Optional page size hint for pagination.
:return: Dict with key 'CapacityBlockStatuses' containing a flattened list of capacity block
status entries. Each entry includes fields like:
- CapacityBlockId: The capacity block identifier
- InterconnectStatus: Health status ('ok', 'impaired', 'insufficient-data')
- TotalCapacity: Total number of instances in the capacity block
- TotalUnavailableCapacity: Number of unavailable instances
"""
kwargs = {}
if capacity_block_ids:
kwargs["CapacityBlockIds"] = capacity_block_ids
if filters:
kwargs["Filters"] = filters
if max_results:
kwargs["MaxResults"] = max_results

paginator = self._client.get_paginator("describe_capacity_block_status")
page_iterator = paginator.paginate(**kwargs)

statuses = []
for page in page_iterator:
statuses.extend(page.get("CapacityBlockStatuses", []))

return statuses

@AWSExceptionHandler.handle_client_exception
def get_instance_type_and_reservation_type_from_capacity_reservation(
self, capacity_reservation_id: str
) -> tuple[str, str]:
"""
Retrieve instance type and reservation type from a capacity reservation ID.

This method queries AWS EC2 to get detailed information about a capacity reservation,
specifically extracting the instance type and reservation type. This information is
crucial for determining if special handling is needed (e.g., for ultraserver instances
with capacity blocks).

Args:
capacity_reservation_id: The AWS capacity reservation ID to query (e.g., 'cr-123456')

Returns:
tuple: A tuple containing (instance_type, reservation_type) where:
- instance_type: EC2 instance type (e.g., 'p6e-gb200.36xlarge')
- reservation_type: Type of reservation (e.g., 'capacity-block', 'ondemand')
Both values will be None if the reservation cannot be found or accessed.

Example:
('p6e-gb200.36xlarge', 'capacity-block')
"""
instance_type = None
reservation_type = None

if capacity_reservation_id:
capacity_reservations = self.describe_capacity_reservations([capacity_reservation_id])
if capacity_reservations:
reservation = capacity_reservations[0]
instance_type = reservation.instance_type()
reservation_type = reservation.reservation_type()

return instance_type, reservation_type
86 changes: 72 additions & 14 deletions cli/src/pcluster/config/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@
NODE_BOOTSTRAP_TIMEOUT,
ONTAP,
OPENZFS,
ULTRASERVER_INSTANCE_PREFIX_LIST,
Feature,
)
from pcluster.utils import get_partition, get_resource_name_from_resource_arn, to_snake_case
from pcluster.utils import (
get_partition,
get_resource_name_from_resource_arn,
to_snake_case,
)
from pcluster.validators.awsbatch_validators import (
AwsBatchComputeInstanceTypeValidator,
AwsBatchComputeResourceSizeValidator,
Expand Down Expand Up @@ -141,6 +146,7 @@
)
from pcluster.validators.ec2_validators import (
AmiOsCompatibleValidator,
CapacityBlockHealthStatusValidator,
CapacityReservationResourceGroupValidator,
CapacityReservationSizeValidator,
CapacityReservationValidator,
Expand Down Expand Up @@ -2409,7 +2415,16 @@ def instance_types(self) -> List[str]:
def instance_type(self):
"""Instance type of this compute resource."""
if not self._instance_type:
self._instance_type = Resource.init_param(self._instance_type_from_capacity_reservation())
capacity_reservation_id = (
self.capacity_reservation_target.capacity_reservation_id if self.capacity_reservation_target else None
)
(
instance_type_from_capacity_reservation,
_,
) = AWSApi.instance().ec2.get_instance_type_and_reservation_type_from_capacity_reservation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the benefits of creating an entirely new function rather than just using/building upon the existing _instance_type_from_capacity_reservation() function. It seems like we are repeating pre-existing logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. To support ultraserver instance I need to also get the reservation_type to check if it's "capacity-block".
  2. I need this function in other places to handle the truth that the capacity_reservation_target section can under both queue level and compute resource level.
  3. Also I need this function when building stack, if keep it here, there's a circular dependencies issue.

capacity_reservation_id
)
self._instance_type = Resource.init_param(instance_type_from_capacity_reservation)
return self._instance_type

def _register_validators(self, context: ValidatorContext = None):
Expand Down Expand Up @@ -2453,18 +2468,6 @@ def disable_simultaneous_multithreading_manually(self) -> bool:
"""Return true if simultaneous multithreading must be disabled with a cookbook script."""
return self.disable_simultaneous_multithreading and self._instance_type_info.default_threads_per_core() > 1

def _instance_type_from_capacity_reservation(self):
"""Return the instance type from the configured CapacityReservationId, if any."""
instance_type = None
capacity_reservation_id = (
self.capacity_reservation_target.capacity_reservation_id if self.capacity_reservation_target else None
)
if capacity_reservation_id:
capacity_reservations = AWSApi.instance().ec2.describe_capacity_reservations([capacity_reservation_id])
if capacity_reservations:
instance_type = capacity_reservations[0].instance_type()
return instance_type


class _CommonQueue(BaseQueue):
"""Represent the Common Queue resource between Slurm and future scheduler implementation."""
Expand Down Expand Up @@ -2931,6 +2934,7 @@ def __init__(
pool.ssh.allowed_ips = self.head_node.ssh.allowed_ips

self.__image_dict = None
self.__ultraserver_capacity_block_dict = None
# Cache capacity reservations information together to reduce number of boto3 calls.
# Since this cache is only used for validation, if AWSClientError happens
# (e.g insufficient IAM permissions to describe the capacity reservations), we catch the exception to avoid
Expand Down Expand Up @@ -2986,6 +2990,53 @@ def login_nodes_subnet_ids(self):
subnet_ids_set.add(subnet_id)
return list(subnet_ids_set)

@property
def ultraserver_capacity_block_dict(self):
"""
Return a dictionary mapping ultraserver instance prefixes to their capacity block reservation IDs.

This property collects all capacity block reservations used by ultraserver instances
(e.g., p6e-gb200) across all queues and compute resources in the cluster configuration.

Returns:
dict: A dictionary where keys are ultraserver instance prefixes (e.g., 'p6e-gb200')
and values are lists of capacity reservation IDs for that instance type.

Example:
{
'p6e-gb200': ['cr-123456', 'cr-789012']
}
"""
if self.__ultraserver_capacity_block_dict:
return self.__ultraserver_capacity_block_dict

self.__ultraserver_capacity_block_dict = {}

# Initialize empty lists for each supported ultraserver instance prefix
for ultraserver_instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST:
self.__ultraserver_capacity_block_dict[ultraserver_instance_prefix] = []

# Iterate through all queues and compute resources to find ultraserver capacity blocks
for queue in self.scheduling.queues:
for compute_resource in queue.compute_resources:
cr_target = compute_resource.capacity_reservation_target or queue.capacity_reservation_target
if cr_target and cr_target.capacity_reservation_id:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the config specifies a resource group ARN rather than a reservation id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only accept capacity block for ultraserver instance. ResourceGroupArn is not considered here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a limitation captured in our code, i.e. we fail if we specify a capacity block and CapacityReservationResourceGroupArn does the validation fail? At least in documentation it does not seems so: https://docs.aws.amazon.com/parallelcluster/latest/ug/Scheduling-v3.html#yaml-Scheduling-SlurmQueues-CapacityReservationTarget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean specify CapacityBlock and and CapacityReservationResourceGroupArn at the same time?
I think they can not be specified at the same time because CapacityReservationResourceGroupArn needs Instances section. https://docs.aws.amazon.com/parallelcluster/latest/ug/launch-instances-odcr-v3.html

      ComputeResources:
        - ...
          Instances:
            - InstanceType: instance

But if it's CapacityBlock, you have to use InstanceType section or leave it empty(optional). https://docs.aws.amazon.com/parallelcluster/latest/ug/launch-instances-capacity-blocks.html

   ComputeResources:
   - ...
     InstanceType: String (EC2 Instance type of the CB)

But even if it doesn't fail, why it matters in our case? This function is to collect ultraserver_capacity_block_dict, ultraserver instance can not in a CapacityReservationResourceGroupArn, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case we are talking about is: a compute resource with CAPACITY BLOCK, instance type specified and a resourcegroupArn rather than a reservationId.

I think we should support this scenario. If we do not support it, let's verify that that there exist a validator to prevent it. If it does not exist, the next steps are:
1.If the team agrees on supporting the use case, let's address it in a follow up PR
2. if the team agrees on not supporting it, let;s add a validator to prevent it in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a sync with Giacomo. We are not going to support it because: >Capacity Reservation Groups are designed to work with standard Capacity Reservations, not with Capacity Blocks. They allow you to group and manage multiple Capacity Reservations together, but this functionality does not extend to Capacity Blocks.

# Get instance type and reservation type from the capacity reservation
(
instance_type,
reservation_type,
) = AWSApi.instance().ec2.get_instance_type_and_reservation_type_from_capacity_reservation(
cr_target.capacity_reservation_id
)
# Extract instance prefix (e.g., 'p6e-gb200' from 'p6e-gb200.36xlarge')
instance_prefix = instance_type.split(".")[0]
# Only collect capacity blocks for ultraserver instances
if reservation_type == "capacity-block" and instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST:
self.__ultraserver_capacity_block_dict.get(instance_prefix).append(
cr_target.capacity_reservation_id
)
return self.__ultraserver_capacity_block_dict

def _register_login_node_validators(self):
"""Register all login node validators to ensure that the resource parameters are valid."""
# Check if all subnets(head node, Login nodes, compute nodes) are in the same VPC and support DNS.
Expand Down Expand Up @@ -3223,6 +3274,13 @@ def _register_validators(self, context: ValidatorContext = None): # noqa: C901
num_of_instances=num_of_instances,
)

for ultraserver_instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST:
if self.ultraserver_capacity_block_dict.get(ultraserver_instance_prefix):
self._register_validator(
CapacityBlockHealthStatusValidator,
capacity_reservation_ids=self.ultraserver_capacity_block_dict.get(ultraserver_instance_prefix),
)

@property
def image_dict(self):
"""Return image dict of queues, key is queue name, value is image id."""
Expand Down
9 changes: 9 additions & 0 deletions cli/src/pcluster/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,12 @@ class Operation(Enum):
# Tag key & expected revision (increment when policy widens)
PCLUSTER_BUILD_IMAGE_CLEANUP_ROLE_REVISION = 1
PCLUSTER_BUILD_IMAGE_CLEANUP_ROLE_BOOTSTRAP_TAG_KEY = "parallelcluster:build-image-cleanup-role-bootstrapped"

P6E_GB200 = "p6e-gb200"
ULTRASERVER_INSTANCE_PREFIX_LIST = [P6E_GB200]
# Dictionary mapping ultraserver instance prefixes to their allowed capacity block sizes
ULTRASERVER_CAPACITY_BLOCK_ALLOWED_SIZE_DICT = {
P6E_GB200: [9, 18], # Allowed sizes for p6e-gb200 ultraserver instances
}
# Capacity Block states that are considered inactive (cannot check health status)
CAPACITY_BLOCK_INACTIVE_STATES = ["scheduled", "payment-pending", "assessing", "delayed"]
66 changes: 66 additions & 0 deletions cli/src/pcluster/templates/cdk_builder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from aws_cdk.aws_iam import ManagedPolicy, PermissionsBoundary
from aws_cdk.core import Arn, ArnFormat, CfnDeletionPolicy, CfnTag, Construct, Fn, Stack

from pcluster.api.errors import BadRequestException
from pcluster.aws.aws_api import AWSApi
from pcluster.config.cluster_config import (
BaseClusterConfig,
BaseComputeResource,
Expand All @@ -42,6 +44,8 @@
PCLUSTER_CLUSTER_NAME_TAG,
PCLUSTER_DYNAMODB_PREFIX,
PCLUSTER_NODE_TYPE_TAG,
ULTRASERVER_CAPACITY_BLOCK_ALLOWED_SIZE_DICT,
ULTRASERVER_INSTANCE_PREFIX_LIST,
)
from pcluster.launch_template_utils import _LaunchTemplateBuilder
from pcluster.models.s3_bucket import S3Bucket, parse_bucket_url
Expand Down Expand Up @@ -369,6 +373,68 @@ def generate_launch_template_version_cfn_parameter_hash(queue, compute_resource)
return sha1((queue + compute_resource).encode()).hexdigest()[0:16].capitalize() # nosec nosemgrep


def process_ultraserver_capacity_block_sizes(cluster_ultraserver_capacity_block_dict):
"""
Process ultraserver capacity block sizes and validate them.

Returns:
Dictionary mapping ultraserver instance prefixes to comma-separated size strings
Raises:
BadRequestException: If any capacity block sizes are invalid
"""
cluster_ultraserver_capacity_block_sizes_dict = {}
invalid_capacity_blocks = []

for ultraserver_instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST:
cluster_ultraserver_capacity_block_sizes_dict[ultraserver_instance_prefix] = []
allowed_sizes_list = ULTRASERVER_CAPACITY_BLOCK_ALLOWED_SIZE_DICT.get(ultraserver_instance_prefix)

capacity_reservation_ids = cluster_ultraserver_capacity_block_dict.get(ultraserver_instance_prefix)
if capacity_reservation_ids:
statuses = AWSApi.instance().ec2.describe_capacity_block_status(capacity_reservation_ids)

for status in statuses:
size = status.get("TotalCapacity")
if size is not None:
if size not in allowed_sizes_list:
invalid_capacity_blocks.append(
f"{status.get('CapacityBlockId')} (size: {size}, allowed: {allowed_sizes_list})"
)
else:
cluster_ultraserver_capacity_block_sizes_dict.get(ultraserver_instance_prefix).append(size)

unique_sizes = sorted(set(cluster_ultraserver_capacity_block_sizes_dict.get(ultraserver_instance_prefix)))

cluster_ultraserver_capacity_block_sizes_dict[ultraserver_instance_prefix] = ", ".join(
str(unique_size) for unique_size in unique_sizes
)

# Raise exception with all invalid capacity blocks if any found
if invalid_capacity_blocks:
raise BadRequestException(
f"The following capacity blocks have invalid block sizes: {'; '.join(invalid_capacity_blocks)}."
)

return cluster_ultraserver_capacity_block_sizes_dict


def has_ultraserver_instance(cr_target):
"""Check if the compute resource uses ultraserver instances with capacity blocks."""
_has_ultraserver_instance = False
if cr_target and cr_target.capacity_reservation_id:
(
instance_type,
reservation_type,
) = AWSApi.instance().ec2.get_instance_type_and_reservation_type_from_capacity_reservation(
cr_target.capacity_reservation_id
)
instance_prefix = instance_type.split(".")[0]
if reservation_type == "capacity-block" and instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST:
_has_ultraserver_instance = True

return _has_ultraserver_instance


class NodeIamResourcesBase(Construct):
"""Abstract construct defining IAM resources for a cluster node."""

Expand Down
19 changes: 19 additions & 0 deletions cli/src/pcluster/templates/cluster_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@
NFS_PORT,
NODE_BOOTSTRAP_TIMEOUT,
OS_MAPPING,
P6E_GB200,
PCLUSTER_DYNAMODB_PREFIX,
PCLUSTER_S3_ARTIFACTS_DICT,
SLURM,
SLURM_PORTS_RANGE,
)
from pcluster.models.s3_bucket import S3Bucket
Expand All @@ -98,6 +100,7 @@
get_slurm_specific_dna_json_for_head_node,
get_source_ingress_rule,
get_user_data_content,
process_ultraserver_capacity_block_sizes,
to_comma_separated_string,
)
from pcluster.templates.compute_fleet_stack import ComputeFleetConstruct
Expand Down Expand Up @@ -1265,6 +1268,16 @@ def _add_head_node(self):
head_node_launch_template.add_metadata("Comment", "AWS ParallelCluster Head Node")
# CloudFormation::Init metadata

# Process ultraserver capacity block information for DNA JSON
# This section collects capacity block sizes for ultraserver instances (e.g., p6e-gb200)
# and validates that they conform to allowed size configurations for Slurm topology
cluster_ultraserver_capacity_block_sizes_dict = {}
if self.config.scheduling.scheduler == SLURM:
cluster_ultraserver_capacity_block_dict = self.config.ultraserver_capacity_block_dict
cluster_ultraserver_capacity_block_sizes_dict = process_ultraserver_capacity_block_sizes(
cluster_ultraserver_capacity_block_dict
)

dna_json = json.dumps(
{
"cluster": {
Expand Down Expand Up @@ -1358,6 +1371,12 @@ def _add_head_node(self):
else "false"
),
"launch_template_id": launch_template_id,
**(
{"p6egb200_block_sizes": cluster_ultraserver_capacity_block_sizes_dict[P6E_GB200]}
if P6E_GB200 in cluster_ultraserver_capacity_block_sizes_dict
and cluster_ultraserver_capacity_block_sizes_dict[P6E_GB200]
else {}
),
**(
get_slurm_specific_dna_json_for_head_node(self.config, self.scheduler_resources)
if self._condition_is_slurm()
Expand Down
Loading
Loading