Skip to content

Commit 5edbc9c

Browse files
committed
Fix incorrect NPE message in StandaloneHaServices constructor
Replace checkNotNull(clusterRestEndpointAddress, clusterRestEndpointAddress) with checkNotNull(clusterRestEndpointAddress, "clusterRestEndpointAddress") to ensure the exception message clearly identifies the null parameter.
1 parent e5f53f2 commit 5edbc9c

File tree

118 files changed

+22998
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+22998
-1
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.core.state;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.state.v2.StateFuture;
23+
import org.apache.flink.api.java.tuple.Tuple2;
24+
import org.apache.flink.util.function.BiFunctionWithException;
25+
import org.apache.flink.util.function.FunctionWithException;
26+
import org.apache.flink.util.function.ThrowingConsumer;
27+
28+
/** A {@link StateFuture} that has already been completed when it is created. */
29+
@Internal
30+
public class CompletedStateFuture<T> implements InternalStateFuture<T> {
31+
32+
T result;
33+
34+
// no public access
35+
CompletedStateFuture(T result) {
36+
this.result = result;
37+
}
38+
39+
@Override
40+
public boolean isDone() {
41+
return true;
42+
}
43+
44+
@Override
45+
public T get() {
46+
return result;
47+
}
48+
49+
@Override
50+
public <U> StateFuture<U> thenApply(
51+
FunctionWithException<? super T, ? extends U, ? extends Exception> fn) {
52+
return StateFutureUtils.completedFuture(FunctionWithException.unchecked(fn).apply(result));
53+
}
54+
55+
@Override
56+
public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> action) {
57+
ThrowingConsumer.unchecked(action).accept(result);
58+
return StateFutureUtils.completedVoidFuture();
59+
}
60+
61+
@Override
62+
public <U> StateFuture<U> thenCompose(
63+
FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
64+
action) {
65+
return FunctionWithException.unchecked(action).apply(result);
66+
}
67+
68+
@Override
69+
public <U, V> StateFuture<V> thenCombine(
70+
StateFuture<? extends U> other,
71+
BiFunctionWithException<? super T, ? super U, ? extends V, ? extends Exception> fn) {
72+
return other.thenCompose(
73+
(u) -> {
74+
V v = fn.apply(result, u);
75+
return StateFutureUtils.completedFuture(v);
76+
});
77+
}
78+
79+
@Override
80+
public <U, V> StateFuture<Tuple2<Boolean, Object>> thenConditionallyApply(
81+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
82+
FunctionWithException<? super T, ? extends U, ? extends Exception> actionIfTrue,
83+
FunctionWithException<? super T, ? extends V, ? extends Exception> actionIfFalse) {
84+
boolean test = FunctionWithException.unchecked(condition).apply(result);
85+
Object r =
86+
test
87+
? FunctionWithException.unchecked(actionIfTrue).apply(result)
88+
: FunctionWithException.unchecked(actionIfFalse).apply(result);
89+
return StateFutureUtils.completedFuture(Tuple2.of(test, r));
90+
}
91+
92+
@Override
93+
public <U> StateFuture<Tuple2<Boolean, U>> thenConditionallyApply(
94+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
95+
FunctionWithException<? super T, ? extends U, ? extends Exception> actionIfTrue) {
96+
boolean test = FunctionWithException.unchecked(condition).apply(result);
97+
U r = test ? FunctionWithException.unchecked(actionIfTrue).apply(result) : null;
98+
return StateFutureUtils.completedFuture(Tuple2.of(test, r));
99+
}
100+
101+
@Override
102+
public StateFuture<Boolean> thenConditionallyAccept(
103+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
104+
ThrowingConsumer<? super T, ? extends Exception> actionIfTrue,
105+
ThrowingConsumer<? super T, ? extends Exception> actionIfFalse) {
106+
boolean test = FunctionWithException.unchecked(condition).apply(result);
107+
if (test) {
108+
ThrowingConsumer.unchecked(actionIfTrue).accept(result);
109+
} else {
110+
ThrowingConsumer.unchecked(actionIfFalse).accept(result);
111+
}
112+
return StateFutureUtils.completedFuture(test);
113+
}
114+
115+
@Override
116+
public StateFuture<Boolean> thenConditionallyAccept(
117+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
118+
ThrowingConsumer<? super T, ? extends Exception> actionIfTrue) {
119+
boolean test = FunctionWithException.unchecked(condition).apply(result);
120+
if (test) {
121+
ThrowingConsumer.unchecked(actionIfTrue).accept(result);
122+
}
123+
return StateFutureUtils.completedFuture(test);
124+
}
125+
126+
@Override
127+
public <U, V> StateFuture<Tuple2<Boolean, Object>> thenConditionallyCompose(
128+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
129+
FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
130+
actionIfTrue,
131+
FunctionWithException<? super T, ? extends StateFuture<V>, ? extends Exception>
132+
actionIfFalse) {
133+
boolean test = FunctionWithException.unchecked(condition).apply(result);
134+
StateFuture<?> actionResult;
135+
if (test) {
136+
actionResult = FunctionWithException.unchecked(actionIfTrue).apply(result);
137+
} else {
138+
actionResult = FunctionWithException.unchecked(actionIfFalse).apply(result);
139+
}
140+
return actionResult.thenApply((e) -> Tuple2.of(test, e));
141+
}
142+
143+
@Override
144+
public <U> StateFuture<Tuple2<Boolean, U>> thenConditionallyCompose(
145+
FunctionWithException<? super T, Boolean, ? extends Exception> condition,
146+
FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
147+
actionIfTrue) {
148+
boolean test = FunctionWithException.unchecked(condition).apply(result);
149+
if (test) {
150+
StateFuture<U> actionResult =
151+
FunctionWithException.unchecked(actionIfTrue).apply(result);
152+
return actionResult.thenApply((e) -> Tuple2.of(true, e));
153+
} else {
154+
return StateFutureUtils.completedFuture(Tuple2.of(false, null));
155+
}
156+
}
157+
158+
@Override
159+
public void complete(T result) {
160+
throw new UnsupportedOperationException("This state future has already been completed.");
161+
}
162+
163+
@Override
164+
public void completeExceptionally(String message, Throwable ex) {
165+
throw new UnsupportedOperationException("This state future has already been completed.");
166+
}
167+
168+
@Override
169+
public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) {
170+
ThrowingConsumer.unchecked(action).accept(result);
171+
}
172+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.core.state;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.state.v2.StateFuture;
23+
import org.apache.flink.util.function.ThrowingConsumer;
24+
25+
/**
26+
* The Internal definition of {@link StateFuture}, add some method that will be used by framework.
27+
*/
28+
@Internal
29+
public interface InternalStateFuture<T> extends StateFuture<T> {
30+
/**
31+
* Returns {@code true} if completed in any fashion: normally, exceptionally, or via
32+
* cancellation.
33+
*
34+
* @return {@code true} if completed
35+
*/
36+
boolean isDone();
37+
38+
/** Waits if necessary for the computation to complete, and then retrieves its result. */
39+
T get();
40+
41+
/** Complete this future. */
42+
void complete(T result);
43+
44+
/**
45+
* Fail this future and pass the given exception to the runtime.
46+
*
47+
* @param message the description of this exception
48+
* @param ex the exception
49+
*/
50+
void completeExceptionally(String message, Throwable ex);
51+
52+
/**
53+
* Accept the action in the same thread with the one of complete (or current thread if it has
54+
* been completed).
55+
*
56+
* @param action the action to perform.
57+
*/
58+
void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action);
59+
}

0 commit comments

Comments
 (0)