Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,27 @@ public class App {
### Supported targets
This library supports all targets from the below list:
```
kubernetes:///service-name
kubernetes:///service-name:8080
kubernetes:///service-name:portname
kubernetes:///service-name.namespace:8080
kubernetes:///service-name.namespace.svc.cluster_name
kubernetes:///service-name.namespace.svc.cluster_name:8080

kubernetes://namespace/service-name:8080
kubernetes://service-name
kubernetes://service-name:8080/
kubernetes://service-name.namespace:8080/
kubernetes://service-name.namespace.svc.cluster_name
kubernetes://service-name.namespace.svc.cluster_name:8080
```

#### Namespace handling
If the namespace is not explicitly provided in the target URI, the resolver will first attempt to read the current pod's namespace from the mounted file at `/var/run/secrets/kubernetes.io/serviceaccount/namespace`. If this file is not found or cannot be read, it will default to using the `default` namespace.

#### Port handling
If the port is not specified in the URI, the resolver will use any of the ports defined in the Kubernetes `EndpointSlice`. Alternatively, a port can be specified by its name in the URI (e.g., `kubernetes:///myservice:grpc`), in which case the resolver will look for an `EndpointSlice` port with that name. If a numerical port is provided, that port will be used.

### Alternative scheme
You can use alternative schema (other than `kubernetes`) by using overloaded constructor:
```new KubernetesNameResolverProvider("my-custom-scheme")```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,47 @@
import io.grpc.NameResolver;
import io.grpc.Status;

/**
* A gRPC {@link NameResolver} implementation that resolves Kubernetes services
* using EndpointSlices.
* <p>
* This resolver watches for changes in Kubernetes EndpointSlices and updates
* the gRPC client with the resolved addresses.
* </p>
* <p>
* The target URI for this resolver is parsed by {@link ResolverTarget}, which
* supports the following formats:
* <ul>
* <li>{@code kubernetes:///service-name}</li>
* <li>{@code kubernetes:///service-name:8080} (with port number)</li>
* <li>{@code kubernetes:///service-name:portname} (with port name)</li>
* <li>{@code kubernetes:///service-name.namespace:8080} (with namespace and port number)</li>
* <li>{@code kubernetes:///service-name.namespace.svc.cluster_name} (with namespace)</li>
* <li>{@code kubernetes:///service-name.namespace.svc.cluster_name:8080} (with namespace and port number)</li>
*
* <li>{@code kubernetes://namespace/service-name:8080}</li>
* <li>{@code kubernetes://service-name}</li>
* <li>{@code kubernetes://service-name:8080/}</li>
* <li>{@code kubernetes://service-name.namespace:8080/}</li>
* <li>{@code kubernetes://service-name.namespace.svc.cluster_name}</li>
* <li>{@code kubernetes://service-name.namespace.svc.cluster_name:8080}</li>
* </ul>
* </p>
* <p>
* If the namespace is not provided in the URI, the resolver will attempt to read
* the current pod's namespace from the mounted file at
* {@code /var/run/secrets/kubernetes.io/serviceaccount/namespace}. If this file
* is not found or cannot be read, the resolver will default to using the
* {@code default} namespace.
* </p>
* <p>
* If the port is not provided in the URI, the resolver will use any of the ports
* found in the EndpointSlice. If a port name is provided (e.g.,
* {@code kubernetes:///myservice:grpc}), the resolver will look for a port with
* that name in the EndpointSlice. If a numerical port is provided, that port
* will be used directly.
* </p>
*/
public final class KubernetesNameResolver extends NameResolver {

private static final Logger LOGGER = Logger.getLogger(KubernetesNameResolver.class.getName());
Expand All @@ -41,13 +82,26 @@ public final class KubernetesNameResolver extends NameResolver {
private boolean defaultExecutorUsed = false;
private Listener listener;

/**
* Creates a new {@link KubernetesNameResolver} with a default single-threaded
* executor.
*
* @param params the target parameters for the resolver
* @throws IOException if an error occurs while initializing the watcher
*/
public KubernetesNameResolver(ResolverTarget params) throws IOException {
this(Executors.newSingleThreadExecutor(), params);
this.defaultExecutorUsed = true;
}

public KubernetesNameResolver(Executor executor, ResolverTarget params)
throws IOException {
/**
* Creates a new {@link KubernetesNameResolver} with a custom executor.
*
* @param executor the executor to use for background tasks
* @param params the target parameters for the resolver
* @throws IOException if an error occurs while initializing the watcher
*/
public KubernetesNameResolver(Executor executor, ResolverTarget params) throws IOException {
this.executor = executor;
this.params = params;
if (params.namespace() != null) {
Expand All @@ -57,37 +111,54 @@ public KubernetesNameResolver(Executor executor, ResolverTarget params)
}
}

/**
* Starts the name resolution process.
*
* @param listener the listener to notify when addresses are resolved or errors
* occur
*/
@Override
public void start(Listener listener) {
this.listener = listener;
resolve();
}

/**
* Refreshes the name resolution process. This method is called when the gRPC
* client requests a refresh.
*/
@Override
public void refresh() {
if (semaphore.tryAcquire()) {
resolve();
}
}

/**
* Resolves the Kubernetes service by watching EndpointSlices.
*/
private void resolve() {
executor.execute(this::watch);
}

/**
* Watches for changes in EndpointSlices and updates the listener with resolved
* addresses.
*/
private void watch() {
watcher.watch(params.service(), new EndpointSliceWatcher.Subscriber() {
@Override
public void onEvent(Event event) {
// watch event occurred
if (!SUPPORTED_KUBERNETES_EVENTS.contains(event.type())) {
LOGGER.log(Level.FINER, "Unsupported Kubernetes event type {0}",
new Object[] { event.type().toString() });
new Object[]{event.type().toString()});
return;
}

if (event.type().equals(EventType.DELETED)) {
LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted",
new Object[] { event.endpointSlice().metadata().name() });
new Object[]{event.endpointSlice().metadata().name()});
return;
}

Expand All @@ -96,10 +167,10 @@ public void onEvent(Event event) {
return;
}

LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[] { params.service() });
LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[]{params.service()});
buildAddresses(event.endpointSlice()).ifPresentOrElse(a -> listener.onAddresses(a, Attributes.EMPTY),
() -> LOGGER.log(Level.FINE, "No usable addresses found for Kubernetes service {0}",
new Object[] { params.service() }));
new Object[]{params.service()}));
}

@Override
Expand All @@ -120,18 +191,33 @@ public void onCompleted() {
});
}

/**
* Shuts down the resolver and releases resources.
*/
@Override
public void shutdown() {
if (defaultExecutorUsed && executor instanceof ExecutorService executor) {
executor.shutdownNow();
}
}

/**
* Returns the authority of the service being resolved.
*
* @return an empty string as this resolver does not use service authority
*/
@Override
public String getServiceAuthority() {
return "";
}

/**
* Builds a list of gRPC {@link EquivalentAddressGroup} from the given
* {@link EndpointSlice}.
*
* @param endpointSlice the EndpointSlice to process
* @return an optional list of resolved addresses
*/
private Optional<List<EquivalentAddressGroup>> buildAddresses(EndpointSlice endpointSlice) {
return findPort(endpointSlice.ports())
.map(port -> endpointSlice.endpoints().stream()
Expand All @@ -140,6 +226,14 @@ private Optional<List<EquivalentAddressGroup>> buildAddresses(EndpointSlice endp
.toList());
}

/**
* Finds the port to use for the service from the list of ports in the
* EndpointSlice. If the port is not provided in {@link ResolverTarget}
* then first port found in EndpointSlice is used.
*
* @param ports the list of ports in the EndpointSlice
* @return an optional port number
*/
private Optional<Integer> findPort(List<EndpointPort> ports) {
if (params.port() == null) {
return ports.stream().map(EndpointPort::port).findFirst();
Expand All @@ -155,6 +249,14 @@ private Optional<Integer> findPort(List<EndpointPort> ports) {
}
}

/**
* Builds a gRPC {@link EquivalentAddressGroup} from the given addresses and
* port.
*
* @param addresses the list of addresses
* @param port the port number
* @return an {@link EquivalentAddressGroup} containing the resolved addresses
*/
private EquivalentAddressGroup buildAddressGroup(List<String> addresses, int port) {
var socketAddresses = addresses.stream()
.map(address -> (SocketAddress) new InetSocketAddress(address, port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,88 @@
import io.grpc.NameResolver.Args;
import io.grpc.NameResolverProvider;

/**
* A gRPC {@link NameResolverProvider} that resolves service names using Kubernetes.
* <p>
* This provider supports target URIs as defined by {@link ResolverTarget}, including:
* <ul>
* <li>{@code kubernetes:///service-name}</li>
* <li>{@code kubernetes:///service-name:8080} (with port number)</li>
* <li>{@code kubernetes:///service-name:portname} (with port name)</li>
* <li>{@code kubernetes:///service-name.namespace:8080} (with namespace and port number)</li>
* <li>{@code kubernetes:///service-name.namespace.svc.cluster_name} (with namespace)</li>
* <li>{@code kubernetes:///service-name.namespace.svc.cluster_name:8080} (with namespace and port number)</li>
*
* <li>{@code kubernetes://namespace/service-name:8080}</li>
* <li>{@code kubernetes://service-name}</li>
* <li>{@code kubernetes://service-name:8080/}</li>
* <li>{@code kubernetes://service-name.namespace:8080/}</li>
* <li>{@code kubernetes://service-name.namespace.svc.cluster_name}</li>
* <li>{@code kubernetes://service-name.namespace.svc.cluster_name:8080}</li>
* </ul>
* </p>
* <p>
* If the namespace is not explicitly provided in the URI, the underlying
* {@link KubernetesNameResolver} will first attempt to read the current pod's
* namespace from the mounted file at
* {@code /var/run/secrets/kubernetes.io/serviceaccount/namespace}. If this file
* is not found or cannot be read, it will default to using the {@code default}
* namespace.
* </p>
* <p>
* If the port is not specified in the URI, the resolver will use any of the ports
* defined in the Kubernetes EndpointSlice. Alternatively, a port can be specified
* by its name in the URI (e.g., {@code kubernetes:///myservice:grpc}), in which
* case the resolver will look for an EndpointSlice port with that name. If a
* numerical port is provided, that port will be used.
* </p>
*/
public class KubernetesNameResolverProvider extends NameResolverProvider {

private String scheme = "kubernetes";

/**
* Constructs a new provider with a custom scheme.
*
* @param schema the URI scheme this provider should support (e.g., "kubernetes")
*/
public KubernetesNameResolverProvider(String schema) {
this.scheme = schema;
}

/**
* Constructs a new provider with the default scheme ("kubernetes").
*/
public KubernetesNameResolverProvider() {
}

/**
* Indicates whether this provider is available for use.
*
* @return always returns {@code true}
*/
@Override
protected boolean isAvailable() {
return true;
}

/**
* Returns the priority of this provider.
*
* @return the priority value (5)
*/
@Override
protected int priority() {
return 5;
}

/**
* Returns a new {@link NameResolver} for the given target URI and arguments, if the URI scheme matches.
*
* @param targetUri the URI to resolve
* @param args the resolver arguments
* @return a new {@link KubernetesNameResolver}, or {@code null} if the scheme does not match
*/
@Override
public NameResolver newNameResolver(URI targetUri, Args args) {
if (targetUri.getScheme().equals(this.scheme)) {
Expand All @@ -38,6 +99,14 @@ public NameResolver newNameResolver(URI targetUri, Args args) {
return null;
}

/**
* Builds a {@link KubernetesNameResolver} using the provided executor and target parameters.
*
* @param executor the executor for offloading tasks
* @param params the parsed target parameters
* @return a new {@link KubernetesNameResolver}
* @throws RuntimeException if an I/O error occurs
*/
private NameResolver buildResolver(Executor executor, ResolverTarget params) {
try {
if (executor != null) {
Expand All @@ -49,6 +118,11 @@ private NameResolver buildResolver(Executor executor, ResolverTarget params) {
}
}

/**
* Returns the default scheme supported by this provider.
*
* @return the default scheme
*/
@Override
public String getDefaultScheme() {
return this.scheme;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,38 @@
import javax.annotation.Nullable;
import javax.annotation.Nonnull;

/**
* Represents a parsed Kubernetes target including service name, namespace, and optional port.
* Service name is always required but namespace and port are optional.
* Used by {@link KubernetesNameResolver} to extract target information from a URI.
*/
public record ResolverTarget(@Nullable String namespace,
@Nonnull String service,
@Nullable String port) {

/**
* Parses a {@link URI} into a {@link ResolverTarget}, extracting the service, namespace, and port.
* Supports formats like:
* <ul>
* <li>kubernetes:///service-name</li>
* <li>kubernetes:///service-name:8080</li>
* <li>kubernetes:///service-name:portname</li>
* <li>kubernetes:///service-name.namespace:8080</li>
* <li>kubernetes:///service-name.namespace.svc.cluster_name</li>
* <li>kubernetes:///service-name.namespace.svc.cluster_name:8080</li>
*
* <li>kubernetes://namespace/service-name:8080</li>
* <li>kubernetes://service-name</li>
* <li>kubernetes://service-name:8080/</li>
* <li>kubernetes://service-name.namespace:8080/</li>
* <li>kubernetes://service-name.namespace.svc.cluster_name</li>
* <li>kubernetes://service-name.namespace.svc.cluster_name:8080</li>
* </ul>
*
* @param uri the URI to parse
* @return the parsed {@link ResolverTarget}
* @throws IllegalArgumentException if the service name cannot be determined
*/
public static ResolverTarget parse(URI uri) throws IllegalArgumentException {
ResolverTarget params;
if (uri.getAuthority() == null || uri.getAuthority().isEmpty()) {
Expand Down
Loading
Loading