diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index fd997a10a..8ae1e978d 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -1,5 +1,4 @@ from __future__ import absolute_import -from kafka.errors import IllegalArgumentError # enum in stdlib as of py3.4 try: @@ -8,6 +7,8 @@ # vendored backport module from kafka.vendor.enum34 import IntEnum +from kafka.errors import IllegalArgumentError + class ResourceType(IntEnum): """Type of kafka resource to set ACL for @@ -30,6 +31,7 @@ class ACLOperation(IntEnum): The ANY value is only valid in a filter context """ + UNKNOWN = 0, ANY = 1, ALL = 2, READ = 3, @@ -41,7 +43,9 @@ class ACLOperation(IntEnum): CLUSTER_ACTION = 9, DESCRIBE_CONFIGS = 10, ALTER_CONFIGS = 11, - IDEMPOTENT_WRITE = 12 + IDEMPOTENT_WRITE = 12, + CREATE_TOKENS = 13, + DESCRIBE_TOKENS = 13 class ACLPermissionType(IntEnum): @@ -50,6 +54,7 @@ class ACLPermissionType(IntEnum): The ANY value is only valid in a filter context """ + UNKNOWN = 0, ANY = 1, DENY = 2, ALLOW = 3 @@ -63,6 +68,7 @@ class ACLResourcePatternType(IntEnum): https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs """ + UNKNOWN = 0, ANY = 1, MATCH = 2, LITERAL = 3, @@ -242,3 +248,7 @@ def validate(self): raise IllegalArgumentError( "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name) ) + + +def valid_acl_operations(int_vals): + return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)]) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8266c7bfb..f21ac97f9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,7 +11,7 @@ from kafka.vendor import six from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ - ACLResourcePatternType + ACLResourcePatternType, valid_acl_operations from kafka.client_async import KafkaClient, selectors from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0 import kafka.errors as Errors @@ -252,30 +252,32 @@ def _validate_timeout(self, timeout_ms): def _refresh_controller_id(self, timeout_ms=30000): """Determine the Kafka cluster controller.""" - version = self._client.api_version(MetadataRequest, max_version=6) - if 1 <= version <= 6: - timeout_at = time.time() + timeout_ms / 1000 - while time.time() < timeout_at: - response = self.send_request(MetadataRequest[version]()) - controller_id = response.controller_id - if controller_id == -1: - log.warning("Controller ID not available, got -1") - time.sleep(1) - continue - # verify the controller is new enough to support our requests - controller_version = self._client.check_version(node_id=controller_id) - if controller_version < (0, 10, 0): - raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." - .format(controller_version)) - self._controller_id = controller_id - return - else: - raise Errors.NodeNotReadyError('controller') - else: + version = self._client.api_version(MetadataRequest, max_version=8) + if version == 0: raise UnrecognizedBrokerVersion( "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) + # use defaults for allow_auto_topic_creation / include_authorized_operations in v6+ + request = MetadataRequest[version]() + + timeout_at = time.time() + timeout_ms / 1000 + while time.time() < timeout_at: + response = self.send_request(request) + controller_id = response.controller_id + if controller_id == -1: + log.warning("Controller ID not available, got -1") + time.sleep(1) + continue + # verify the controller is new enough to support our requests + controller_version = self._client.check_version(node_id=controller_id) + if controller_version < (0, 10, 0): + raise IncompatibleBrokerVersion( + "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." + .format(controller_version)) + self._controller_id = controller_id + return + else: + raise Errors.NodeNotReadyError('controller') def _find_coordinator_id_request(self, group_id): """Send a FindCoordinatorRequest to a broker. @@ -540,11 +542,20 @@ def delete_topics(self, topics, timeout_ms=None): ) ) + def _process_metadata_response(self, metadata_response): + obj = metadata_response.to_object() + if 'authorized_operations' in obj: + obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) + for t in obj['topics']: + if 'authorized_operations' in t: + t['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(t['authorized_operations']))) + return obj + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): """ topics == None means "get all topics" """ - version = self._client.api_version(MetadataRequest, max_version=5) + version = self._client.api_version(MetadataRequest, max_version=8) if version <= 3: if auto_topic_creation: raise IncompatibleBrokerVersion( @@ -553,13 +564,20 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): .format(self.config['api_version'])) request = MetadataRequest[version](topics=topics) - elif version <= 5: + elif version <= 7: request = MetadataRequest[version]( topics=topics, allow_auto_topic_creation=auto_topic_creation ) + else: + request = MetadataRequest[version]( + topics=topics, + allow_auto_topic_creation=auto_topic_creation, + include_cluster_authorized_operations=True, + include_topic_authorized_operations=True, + ) - return self.send_request(request) + return self._process_metadata_response(self.send_request(request)) def list_topics(self): """Retrieve a list of all topic names in the cluster. @@ -568,8 +586,7 @@ def list_topics(self): A list of topic name strings. """ metadata = self._get_cluster_metadata(topics=None) - obj = metadata.to_object() - return [t['topic'] for t in obj['topics']] + return [t['topic'] for t in metadata['topics']] def describe_topics(self, topics=None): """Fetch metadata for the specified topics or all topics if None. @@ -582,8 +599,7 @@ def describe_topics(self, topics=None): A list of dicts describing each topic (including partition info). """ metadata = self._get_cluster_metadata(topics=topics) - obj = metadata.to_object() - return obj['topics'] + return metadata['topics'] def describe_cluster(self): """ @@ -595,9 +611,8 @@ def describe_cluster(self): A dict with cluster-wide metadata, excluding topic details. """ metadata = self._get_cluster_metadata() - obj = metadata.to_object() - obj.pop('topics') # We have 'describe_topics' for this - return obj + metadata.pop('topics') # We have 'describe_topics' for this + return metadata @staticmethod def _convert_describe_acls_response_to_acls(describe_response): @@ -1094,11 +1109,11 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): partitions = set(partitions) topics = set(tp.topic for tp in partitions) - response = self._get_cluster_metadata(topics=topics).to_object() + metadata = self._get_cluster_metadata(topics=topics) leader2partitions = defaultdict(list) valid_partitions = set() - for topic in response.get("topics", ()): + for topic in metadata.get("topics", ()): for partition in topic.get("partitions", ()): t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) if t2p in partitions: @@ -1199,7 +1214,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def _describe_consumer_groups_request(self, group_id, include_authorized_operations=False): + def _describe_consumer_groups_request(self, group_id): """Send a DescribeGroupsRequest to the group's coordinator. Arguments: @@ -1210,74 +1225,69 @@ def _describe_consumer_groups_request(self, group_id, include_authorized_operati """ version = self._client.api_version(DescribeGroupsRequest, max_version=3) if version <= 2: - if include_authorized_operations: - raise IncompatibleBrokerVersion( - "include_authorized_operations requests " - "DescribeGroupsRequest >= v3, which is not " - "supported by Kafka {}".format(version) - ) # Note: KAFKA-6788 A potential optimization is to group the # request per coordinator and send one request with a list of # all consumer groups. Java still hasn't implemented this # because the error checking is hard to get right when some # groups error and others don't. request = DescribeGroupsRequest[version](groups=(group_id,)) - elif version <= 3: + else: request = DescribeGroupsRequest[version]( groups=(group_id,), - include_authorized_operations=include_authorized_operations + include_authorized_operations=True ) return request def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION <= 3: - assert len(response.groups) == 1 - for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): - if isinstance(response_field, Array): - described_groups_field_schema = response_field.array_of - described_group = getattr(response, response_name)[0] - described_group_information_list = [] - protocol_type_is_consumer = False - for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): - if group_information_name == 'protocol_type': - protocol_type = described_group_information - protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type) - if isinstance(group_information_field, Array): - member_information_list = [] - member_schema = group_information_field.array_of - for members in described_group_information: - member_information = [] - for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): - if protocol_type_is_consumer: - if member_name == 'member_metadata' and member: - member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member)) - elif member_name == 'member_assignment' and member: - member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member)) - else: - member_information.append(member) - member_info_tuple = MemberInformation._make(member_information) - member_information_list.append(member_info_tuple) - described_group_information_list.append(member_information_list) - else: - described_group_information_list.append(described_group_information) - # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. - # This will cause the namedtuple to fail. - # Therefore, appending a placeholder of None in it. - if response.API_VERSION <=2: - described_group_information_list.append(None) - group_description = GroupInformation._make(described_group_information_list) - error_code = group_description.error_code - error_type = Errors.for_code(error_code) - # Java has the note: KAFKA-6789, we can retry based on the error code - if error_type is not Errors.NoError: - raise error_type( - "DescribeGroupsResponse failed with response '{}'." - .format(response)) - else: + if response.API_VERSION > 3: raise NotImplementedError( "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." .format(response.API_VERSION)) + + assert len(response.groups) == 1 + for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): + if isinstance(response_field, Array): + described_groups_field_schema = response_field.array_of + described_group = getattr(response, response_name)[0] + described_group_information_list = [] + protocol_type_is_consumer = False + for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): + if group_information_name == 'protocol_type': + protocol_type = described_group_information + protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type) + if isinstance(group_information_field, Array): + member_information_list = [] + member_schema = group_information_field.array_of + for members in described_group_information: + member_information = [] + for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): + if protocol_type_is_consumer: + if member_name == 'member_metadata' and member: + member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member)) + elif member_name == 'member_assignment' and member: + member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member)) + else: + member_information.append(member) + member_info_tuple = MemberInformation._make(member_information) + member_information_list.append(member_info_tuple) + described_group_information_list.append(member_information_list) + else: + described_group_information_list.append(described_group_information) + # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. + if response.API_VERSION >= 3: + described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1]))) + else: + # TODO: Fix GroupInformation defaults + described_group_information_list.append([]) + group_description = GroupInformation._make(described_group_information_list) + error_code = group_description.error_code + error_type = Errors.for_code(error_code) + # Java has the note: KAFKA-6789, we can retry based on the error code + if error_type is not Errors.NoError: + raise error_type( + "DescribeGroupsResponse failed with response '{}'." + .format(response)) return group_description def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): @@ -1296,9 +1306,6 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None. - include_authorized_operations (bool, optional): Whether or not to include - information about the operations a group is allowed to perform. - Only supported on API version >= v3. Default: False. Returns: A list of group descriptions. For now the group descriptions @@ -1312,7 +1319,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include groups_coordinators = self._find_coordinator_ids(group_ids) requests = [ - (self._describe_consumer_groups_request(group_id, include_authorized_operations), coordinator_id) + (self._describe_consumer_groups_request(group_id), coordinator_id) for group_id, coordinator_id in groups_coordinators.items() ] return self.send_requests(requests, response_fn=self._describe_consumer_groups_process_response) diff --git a/kafka/client_async.py b/kafka/client_async.py index 7d466574f..de20c218d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -978,15 +978,17 @@ def _maybe_refresh_metadata(self, wakeup=False): if not topics and self.cluster.is_bootstrap(node_id): topics = list(self.config['bootstrap_topics_filter']) - api_version = self.api_version(MetadataRequest, max_version=7) + api_version = self.api_version(MetadataRequest, max_version=8) if self.cluster.need_all_topic_metadata: topics = MetadataRequest[api_version].ALL_TOPICS elif not topics: topics = MetadataRequest[api_version].NO_TOPICS - if api_version >= 4: + if api_version <= 3: + request = MetadataRequest[api_version](topics) + elif api_version <= 7: request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics']) else: - request = MetadataRequest[api_version](topics) + request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'], False, False) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) diff --git a/kafka/cluster.py b/kafka/cluster.py index ded8c6f96..9e819246e 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -279,8 +279,10 @@ def update_metadata(self, metadata): if metadata.API_VERSION == 0: error_code, topic, partitions = topic_data is_internal = False - else: + elif metadata.API_VERSION <= 7: error_code, topic, is_internal, partitions = topic_data + else: + error_code, topic, is_internal, partitions, _authorized_operations = topic_data if is_internal: _new_internal_topics.add(topic) error_type = Errors.for_code(error_code) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 255166801..32b75df4b 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -8,7 +8,7 @@ from kafka.vendor.enum34 import IntEnum from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields +from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields, BitField class CreateTopicsResponse_v0(Response): @@ -337,8 +337,8 @@ class DescribeGroupsResponse_v3(Response): ('client_id', String('utf-8')), ('client_host', String('utf-8')), ('member_metadata', Bytes), - ('member_assignment', Bytes)))), - ('authorized_operations', Int32)) + ('member_assignment', Bytes))), + ('authorized_operations', BitField))) ) @@ -368,7 +368,7 @@ class DescribeGroupsRequest_v2(Request): class DescribeGroupsRequest_v3(Request): API_KEY = 15 API_VERSION = 3 - RESPONSE_TYPE = DescribeGroupsResponse_v2 + RESPONSE_TYPE = DescribeGroupsResponse_v3 SCHEMA = Schema( ('groups', Array(String('utf-8'))), ('include_authorized_operations', Boolean) diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index bb22ba997..eb632371c 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String +from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String, BitField class MetadataResponse_v0(Response): @@ -164,6 +164,36 @@ class MetadataResponse_v7(Response): ) +class MetadataResponse_v8(Response): + """v8 adds authorized_operations fields""" + API_KEY = 3 + API_VERSION = 8 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32), + ('rack', String('utf-8')))), + ('cluster_id', String('utf-8')), + ('controller_id', Int32), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('is_internal', Boolean), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('leader_epoch', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)), + ('offline_replicas', Array(Int32)))), + ('authorized_operations', BitField))), + ('authorized_operations', BitField) + ) + + class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 @@ -245,13 +275,27 @@ class MetadataRequest_v7(Request): NO_TOPICS = [] +class MetadataRequest_v8(Request): + API_KEY = 3 + API_VERSION = 8 + RESPONSE_TYPE = MetadataResponse_v8 + SCHEMA = Schema( + ('topics', Array(String('utf-8'))), + ('allow_auto_topic_creation', Boolean), + ('include_cluster_authorized_operations', Boolean), + ('include_topic_authorized_operations', Boolean) + ) + ALL_TOPICS = None + NO_TOPICS = [] + + MetadataRequest = [ MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2, MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5, - MetadataRequest_v6, MetadataRequest_v7, + MetadataRequest_v6, MetadataRequest_v7, MetadataRequest_v8, ] MetadataResponse = [ MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5, - MetadataResponse_v6, MetadataResponse_v7, + MetadataResponse_v6, MetadataResponse_v7, MetadataResponse_v8, ] diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 0e3685d73..8949ce471 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -363,3 +363,34 @@ def decode(self, data): return None return [self.array_of.decode(data) for _ in range(length)] + +class BitField(AbstractType): + @classmethod + def decode(cls, data): + return cls.from_32_bit_field(Int32.decode(data)) + + @classmethod + def encode(cls, vals): + # to_32_bit_field returns unsigned val, so we need to + # encode >I to avoid crash if/when byte 31 is set + # (note that decode as signed still works fine) + return struct.Struct('>I').pack(cls.to_32_bit_field(vals)) + + @classmethod + def to_32_bit_field(cls, vals): + value = 0 + for b in vals: + assert 0 <= b < 32 + value |= 1 << b + return value + + @classmethod + def from_32_bit_field(cls, value): + result = set() + count = 0 + while value != 0: + if (value & 1) != 0: + result.add(count) + count += 1 + value = (value & 0xFFFFFFFF) >> 1 + return result diff --git a/kafka/util.py b/kafka/util.py index 658c17d59..6bc4c7051 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -138,3 +138,4 @@ def wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) functools.update_wrapper(wrapper, func) return wrapper + diff --git a/test/test_protocol.py b/test/test_protocol.py index d0cc7ed0a..35ca938e1 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -2,12 +2,14 @@ import io import struct +import pytest + from kafka.protocol.api import RequestHeader from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.message import Message, MessageSet, PartialMessage from kafka.protocol.metadata import MetadataRequest -from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes +from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes, BitField def test_create_message(): @@ -332,3 +334,11 @@ def test_compact_data_structs(): assert CompactBytes.decode(io.BytesIO(b'\x01')) == b'' enc = CompactBytes.encode(b'foo') assert CompactBytes.decode(io.BytesIO(enc)) == b'foo' + + +@pytest.mark.parametrize(('test_set',), [ + (set([0, 1, 5, 10, 31]),), + (set(range(32)),), +]) +def test_bit_field(test_set): + assert BitField.decode(io.BytesIO(BitField.encode(test_set))) == test_set diff --git a/test/test_util.py b/test/test_util.py index 875b252aa..f9e8a2b51 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -22,3 +22,4 @@ def test_topic_name_validation(topic_name, expectation): with expectation: ensure_valid_topic_name(topic_name) +