Skip to content
14 changes: 12 additions & 2 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError

# enum in stdlib as of py3.4
try:
Expand All @@ -8,6 +7,8 @@
# vendored backport module
from kafka.vendor.enum34 import IntEnum

from kafka.errors import IllegalArgumentError


class ResourceType(IntEnum):
"""Type of kafka resource to set ACL for
Expand All @@ -30,6 +31,7 @@ class ACLOperation(IntEnum):
The ANY value is only valid in a filter context
"""

UNKNOWN = 0,
ANY = 1,
ALL = 2,
READ = 3,
Expand All @@ -41,7 +43,9 @@ class ACLOperation(IntEnum):
CLUSTER_ACTION = 9,
DESCRIBE_CONFIGS = 10,
ALTER_CONFIGS = 11,
IDEMPOTENT_WRITE = 12
IDEMPOTENT_WRITE = 12,
CREATE_TOKENS = 13,
DESCRIBE_TOKENS = 13


class ACLPermissionType(IntEnum):
Expand All @@ -50,6 +54,7 @@ class ACLPermissionType(IntEnum):
The ANY value is only valid in a filter context
"""

UNKNOWN = 0,
ANY = 1,
DENY = 2,
ALLOW = 3
Expand All @@ -63,6 +68,7 @@ class ACLResourcePatternType(IntEnum):
https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
"""

UNKNOWN = 0,
ANY = 1,
MATCH = 2,
LITERAL = 3,
Expand Down Expand Up @@ -242,3 +248,7 @@ def validate(self):
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
)


def valid_acl_operations(int_vals):
return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)])
189 changes: 98 additions & 91 deletions kafka/admin/client.py

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,15 +978,17 @@ def _maybe_refresh_metadata(self, wakeup=False):
if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])

api_version = self.api_version(MetadataRequest, max_version=7)
api_version = self.api_version(MetadataRequest, max_version=8)
if self.cluster.need_all_topic_metadata:
topics = MetadataRequest[api_version].ALL_TOPICS
elif not topics:
topics = MetadataRequest[api_version].NO_TOPICS
if api_version >= 4:
if api_version <= 3:
request = MetadataRequest[api_version](topics)
elif api_version <= 7:
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'])
else:
request = MetadataRequest[api_version](topics)
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'], False, False)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request, wakeup=wakeup)
future.add_callback(self.cluster.update_metadata)
Expand Down
4 changes: 3 additions & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ def update_metadata(self, metadata):
if metadata.API_VERSION == 0:
error_code, topic, partitions = topic_data
is_internal = False
else:
elif metadata.API_VERSION <= 7:
error_code, topic, is_internal, partitions = topic_data
else:
error_code, topic, is_internal, partitions, _authorized_operations = topic_data
if is_internal:
_new_internal_topics.add(topic)
error_type = Errors.for_code(error_code)
Expand Down
8 changes: 4 additions & 4 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kafka.vendor.enum34 import IntEnum

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields, BitField


class CreateTopicsResponse_v0(Response):
Expand Down Expand Up @@ -337,8 +337,8 @@ class DescribeGroupsResponse_v3(Response):
('client_id', String('utf-8')),
('client_host', String('utf-8')),
('member_metadata', Bytes),
('member_assignment', Bytes)))),
('authorized_operations', Int32))
('member_assignment', Bytes))),
('authorized_operations', BitField)))
)


Expand Down Expand Up @@ -368,7 +368,7 @@ class DescribeGroupsRequest_v2(Request):
class DescribeGroupsRequest_v3(Request):
API_KEY = 15
API_VERSION = 3
RESPONSE_TYPE = DescribeGroupsResponse_v2
RESPONSE_TYPE = DescribeGroupsResponse_v3
SCHEMA = Schema(
('groups', Array(String('utf-8'))),
('include_authorized_operations', Boolean)
Expand Down
50 changes: 47 additions & 3 deletions kafka/protocol/metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String, BitField


class MetadataResponse_v0(Response):
Expand Down Expand Up @@ -164,6 +164,36 @@ class MetadataResponse_v7(Response):
)


class MetadataResponse_v8(Response):
"""v8 adds authorized_operations fields"""
API_KEY = 3
API_VERSION = 8
SCHEMA = Schema(
('throttle_time_ms', Int32),
('brokers', Array(
('node_id', Int32),
('host', String('utf-8')),
('port', Int32),
('rack', String('utf-8')))),
('cluster_id', String('utf-8')),
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('leader_epoch', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('offline_replicas', Array(Int32)))),
('authorized_operations', BitField))),
('authorized_operations', BitField)
)


class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
Expand Down Expand Up @@ -245,13 +275,27 @@ class MetadataRequest_v7(Request):
NO_TOPICS = []


class MetadataRequest_v8(Request):
API_KEY = 3
API_VERSION = 8
RESPONSE_TYPE = MetadataResponse_v8
SCHEMA = Schema(
('topics', Array(String('utf-8'))),
('allow_auto_topic_creation', Boolean),
('include_cluster_authorized_operations', Boolean),
('include_topic_authorized_operations', Boolean)
)
ALL_TOPICS = None
NO_TOPICS = []


MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5,
MetadataRequest_v6, MetadataRequest_v7,
MetadataRequest_v6, MetadataRequest_v7, MetadataRequest_v8,
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,
MetadataResponse_v6, MetadataResponse_v7,
MetadataResponse_v6, MetadataResponse_v7, MetadataResponse_v8,
]
31 changes: 31 additions & 0 deletions kafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,34 @@ def decode(self, data):
return None
return [self.array_of.decode(data) for _ in range(length)]


class BitField(AbstractType):
@classmethod
def decode(cls, data):
return cls.from_32_bit_field(Int32.decode(data))

@classmethod
def encode(cls, vals):
# to_32_bit_field returns unsigned val, so we need to
# encode >I to avoid crash if/when byte 31 is set
# (note that decode as signed still works fine)
return struct.Struct('>I').pack(cls.to_32_bit_field(vals))

@classmethod
def to_32_bit_field(cls, vals):
value = 0
for b in vals:
assert 0 <= b < 32
value |= 1 << b
return value

@classmethod
def from_32_bit_field(cls, value):
result = set()
count = 0
while value != 0:
if (value & 1) != 0:
result.add(count)
count += 1
value = (value & 0xFFFFFFFF) >> 1
return result
1 change: 1 addition & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
functools.update_wrapper(wrapper, func)
return wrapper

12 changes: 11 additions & 1 deletion test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import io
import struct

import pytest

from kafka.protocol.api import RequestHeader
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.message import Message, MessageSet, PartialMessage
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes, BitField


def test_create_message():
Expand Down Expand Up @@ -332,3 +334,11 @@ def test_compact_data_structs():
assert CompactBytes.decode(io.BytesIO(b'\x01')) == b''
enc = CompactBytes.encode(b'foo')
assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'


@pytest.mark.parametrize(('test_set',), [
(set([0, 1, 5, 10, 31]),),
(set(range(32)),),
])
def test_bit_field(test_set):
assert BitField.decode(io.BytesIO(BitField.encode(test_set))) == test_set
1 change: 1 addition & 0 deletions test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
def test_topic_name_validation(topic_name, expectation):
with expectation:
ensure_valid_topic_name(topic_name)

Loading