Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,23 @@ public static class NgramProperties {
@NoArgsConstructor
@AllArgsConstructor
public static class ClusterFtsProperties {
boolean enabled = false;
boolean enabled = true;
boolean defaultEnabled = false;
NgramProperties schemas = new NgramProperties(1, 4);
NgramProperties consumers = new NgramProperties(1, 4);
NgramProperties connect = new NgramProperties(1, 4);
NgramProperties acl = new NgramProperties(1, 4);

public boolean use(Boolean request) {
if (enabled) {
if (Boolean.TRUE.equals(request)) {
return true;
} else if (request == null && defaultEnabled) {
return true;
}
}
return false;
}
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

@RestController
@RequiredArgsConstructor
public class AclsController extends AbstractController implements AclsApi, McpTool {
public class AclsController extends AbstractController implements AclsApi, McpTool {

private final AclsService aclsService;

Expand Down Expand Up @@ -69,6 +69,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
String resourceName,
KafkaAclNamePatternTypeDTO namePatternTypeDto,
String search,
Boolean fts,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -89,7 +90,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter, search)
aclsService.listAcls(getCluster(clusterName), filter, search, fts)
.map(ClusterMapper::toKafkaAclDto)))
).doOnEach(sig -> audit(context, sig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage
String search,
ConsumerGroupOrderingDTO orderBy,
SortOrderDTO sortOrderDto,
Boolean fts,
ServerWebExchange exchange) {

var context = AccessContext.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
String search,
ConnectorColumnsToSortDTO orderBy,
SortOrderDTO sortOrder,
Boolean fts,
ServerWebExchange exchange
) {
var context = AccessContext.builder()
Expand All @@ -140,7 +141,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
? getConnectorsComparator(orderBy)
: getConnectorsComparator(orderBy).reversed();

Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.sort(comparator);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
@Valid String search,
SchemaColumnsToSortDTO orderBy,
SortOrderDTO sortOrder,
Boolean fts,
ServerWebExchange serverWebExchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getSchemas")
.build();

ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
boolean useFts = ftsProperties.use(fts);

return schemaRegistryService
.getAllSubjectNames(getCluster(clusterName))
Expand All @@ -234,7 +236,7 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;

SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
SchemasFilter filter = new SchemasFilter(subjects, useFts, ftsProperties.getSchemas());
List<String> filteredSubjects = new ArrayList<>(filter.find(search));

var totalPages = (filteredSubjects.size() / pageSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,22 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
@Valid String search,
@Valid TopicColumnsToSortDTO orderBy,
@Valid SortOrderDTO sortOrder,
Boolean fts,
ServerWebExchange exchange) {

AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getTopics")
.build();

return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts)
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
boolean useFts = ftsProperties.use(fts);
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, useFts);
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? comparatorForTopic : comparatorForTopic.reversed();

Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/ClusterFeature.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ public enum ClusterFeature {
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT,
CLIENT_QUOTA_MANAGEMENT,
GRAPHS_ENABLED
GRAPHS_ENABLED,
FTS_ENABLED,
FTS_DEFAULT_ENABLED
}
14 changes: 14 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/FeatureService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.kafbat.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.ClusterFeature;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;
Expand All @@ -11,6 +12,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
Expand All @@ -19,7 +21,9 @@

@Service
@Slf4j
@RequiredArgsConstructor
public class FeatureService {
private final ClustersProperties clustersProperties;

public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
KafkaCluster cluster,
Expand Down Expand Up @@ -49,6 +53,16 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient admin
features.add(aclEdit(adminClient, clusterDescription));
features.add(quotaManagement(adminClient));

if (clustersProperties.getFts() != null) {
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
if (fts.isEnabled()) {
features.add(Mono.just(ClusterFeature.FTS_ENABLED));
if (fts.isDefaultEnabled()) {
features.add(Mono.just(ClusterFeature.FTS_DEFAULT_ENABLED));
}
}
}

return Flux.fromIterable(features).flatMap(m -> m).collectList();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kafbat.ui.service;

import static org.apache.commons.lang3.Strings.CI;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.kafbat.ui.config.ClustersProperties;
Expand Down Expand Up @@ -134,7 +132,7 @@ private Flux<InternalConnectorInfo> getConnectConnectors(
}

public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search) {
@Nullable final String search, Boolean fts) {
return getConnects(cluster, false)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
Expand All @@ -153,14 +151,17 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
.build())))
.map(kafkaConnectMapper::fullConnectorInfo)
.collectList()
.map(lst -> filterConnectors(lst, search))
.map(lst -> filterConnectors(lst, search, fts))
.flatMapMany(Flux::fromIterable);
}

private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
private List<FullConnectorInfoDTO> filterConnectors(
List<FullConnectorInfoDTO> connectors,
String search,
Boolean fts) {
boolean useFts = clustersProperties.getFts().use(fts);
KafkaConnectNgramFilter filter =
new KafkaConnectNgramFilter(connectors, fts.isEnabled(), fts.getConnect());
new KafkaConnectNgramFilter(connectors, useFts, clustersProperties.getFts().getConnect());
return filter.find(search);
}

Expand Down
9 changes: 4 additions & 5 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.kafbat.ui.model.TopicUpdateDTO;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -49,7 +48,6 @@
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

Expand Down Expand Up @@ -467,13 +465,14 @@ public Mono<InternalTopic> cloneTopic(
);
}

public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal) {
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal,
Boolean fts) {
Statistics stats = statisticsCache.get(cluster);
ScrapedClusterState clusterState = stats.getClusterState();

boolean useFts = clustersProperties.getFts().use(fts);
try {
return Mono.just(
clusterState.getTopicIndex().find(search, showInternal, null)
clusterState.getTopicIndex().find(search, showInternal, useFts, null)
).flatMap(lst -> filterExisting(cluster, lst)).map(lst ->
lst.stream().map(t -> t.withMetrics(stats.getMetrics())).toList()
);
Expand Down
11 changes: 6 additions & 5 deletions api/src/main/java/io/kafbat/ui/service/acl/AclsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,21 @@ public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
.doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString));
}

public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter, String principalSearch) {
public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter, String principalSearch,
Boolean fts) {
return adminClientService.get(cluster)
.flatMap(c -> c.listAcls(filter))
.flatMapIterable(acls -> acls)
.filter(acl -> principalSearch == null || acl.entry().principal().contains(principalSearch))
.collectList()
.map(lst -> filter(lst, principalSearch))
.map(lst -> filter(lst, principalSearch, fts))
.flatMapMany(Flux::fromIterable)
.sort(Comparator.comparing(AclBinding::toString)); //sorting to keep stable order on different calls
}

private List<AclBinding> filter(List<AclBinding> acls, String principalSearch) {
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
AclBindingNgramFilter filter = new AclBindingNgramFilter(acls, fts.isEnabled(), fts.getAcl());
private List<AclBinding> filter(List<AclBinding> acls, String principalSearch, Boolean fts) {
boolean useFts = clustersProperties.getFts().use(fts);
AclBindingNgramFilter filter = new AclBindingNgramFilter(acls, useFts, clustersProperties.getFts().getAcl());
return filter.find(principalSearch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@

import io.kafbat.ui.model.InternalTopic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;

public class FilterTopicIndex implements TopicsIndex {
private List<InternalTopic> topics;
private Collection<InternalTopic> topics;

public FilterTopicIndex(List<InternalTopic> topics) {
public FilterTopicIndex(Collection<InternalTopic> topics) {
this.topics = topics;
}

@Override
public List<InternalTopic> find(String search, Boolean showInternal, String sort, Integer count) {
public List<InternalTopic> find(String search, Boolean showInternal, String sort,
boolean fts, Integer count) {
if (search == null || search.isBlank()) {
return new ArrayList<>(this.topics);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.service.index;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.InternalTopic;
import io.kafbat.ui.model.InternalTopicConfig;
import java.io.IOException;
Expand Down Expand Up @@ -98,7 +97,15 @@ public void close() throws Exception {
}
}

public List<InternalTopic> find(String search, Boolean showInternal, String sort, Integer count) {
public List<InternalTopic> find(String search, Boolean showInternal, String sort,
boolean fts, Integer count) {
if (!fts) {
try (FilterTopicIndex filter = new FilterTopicIndex(this.topicMap.values())) {
return filter.find(search, showInternal, sort, fts, count);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return find(search, showInternal, sort, count, 0.0f);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ enum FieldType {
FIELD_CONFIG_PREFIX, FieldType.STRING
);

default List<InternalTopic> find(String search, Boolean showInternal, Integer count) {
return this.find(search, showInternal, FIELD_NAME, count);
default List<InternalTopic> find(String search, Boolean showInternal, boolean fts, Integer count) {
return this.find(search, showInternal, FIELD_NAME, fts, count);
}

List<InternalTopic> find(String search, Boolean showInternal, String sort, Integer count);
List<InternalTopic> find(String search, Boolean showInternal, String sort, boolean fts, Integer count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void shouldListFirst25andThen10Schemas() {
.toList()
);
var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
null, null, null, null, null, null).block();
null, null, null, null, null, null, null).block();
assertThat(schemasFirst25).isNotNull();
assertThat(schemasFirst25.getBody()).isNotNull();
assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
Expand All @@ -88,7 +88,7 @@ void shouldListFirst25andThen10Schemas() {
.isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));

var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
null, 10, null, null, null, null).block();
null, 10, null, null, null, null, null).block();

assertThat(schemasFirst10).isNotNull();
assertThat(schemasFirst10.getBody()).isNotNull();
Expand All @@ -107,7 +107,7 @@ void shouldListSchemasContaining_1() {
.toList()
);
var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
null, null, "1", null, null, null).block();
null, null, "1", null, null, null, null).block();
assertThat(schemasSearch7).isNotNull();
assertThat(schemasSearch7.getBody()).isNotNull();
assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
Expand All @@ -123,7 +123,7 @@ void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
.toList()
);
var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
0, -1, null, null, null, null).block();
0, -1, null, null, null, null, null).block();

assertThat(schemas).isNotNull();
assertThat(schemas.getBody()).isNotNull();
Expand All @@ -142,7 +142,7 @@ void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
);

var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
4, 33, null, null, null, null).block();
4, 33, null, null, null, null, null).block();

assertThat(schemas).isNotNull();
assertThat(schemas.getBody()).isNotNull();
Expand Down Expand Up @@ -177,7 +177,7 @@ void shouldOrderByAndPaginate() {

var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
null, null, null,
SchemaColumnsToSortDTO.ID, SortOrderDTO.DESC, null
SchemaColumnsToSortDTO.ID, SortOrderDTO.DESC, null, null
).block();

List<String> last25OrderedById = schemas.stream()
Expand Down
Loading
Loading