Skip to content

Commit cc133ab

Browse files
committed
[#2768] Avoid context switching on id generation
This commit: * Stop connection leaks in BlockingIdentifierGenerator * Assert that the connection is used in the expected context
1 parent cc709f3 commit cc133ab

File tree

4 files changed

+135
-63
lines changed

4 files changed

+135
-63
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
import io.vertx.core.Context;
1616
import io.vertx.core.Vertx;
17+
import io.vertx.core.internal.ContextInternal;
1718
import io.vertx.core.internal.pool.CombinerExecutor;
1819
import io.vertx.core.internal.pool.Executor;
1920
import io.vertx.core.internal.pool.Task;
2021

2122
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
23+
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;
2224

2325
/**
2426
* A {@link ReactiveIdentifierGenerator} which uses the database to allocate
@@ -93,39 +95,34 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
9395
}
9496

9597
final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
96-
final CompletableFuture<Long> result = new CompletableFuture<>();
97-
final Context context = Vertx.currentContext();
98-
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
99-
result.whenComplete( (id, t) -> {
100-
final Context newContext = Vertx.currentContext();
101-
//Need to be careful in resuming processing on the same context as the original
102-
//request, potentially having to switch back if we're no longer executing on the same:
103-
if ( newContext != context ) {
104-
if ( t != null ) {
105-
context.runOnContext( v -> resultForThisEventLoop.completeExceptionally( t ) );
98+
// We use supplyStage so that, no matter if there's an exception, we always return something that will complete
99+
return supplyStage( () -> {
100+
final CompletableFuture<Long> result = new CompletableFuture<>();
101+
final Context context = Vertx.currentContext();
102+
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
103+
result.whenComplete( (id, t) -> {
104+
final Context newContext = Vertx.currentContext();
105+
//Need to be careful in resuming processing on the same context as the original
106+
//request, potentially having to switch back if we're no longer executing on the same:
107+
if ( newContext != context ) {
108+
context.runOnContext( v -> complete( resultForThisEventLoop, id, t ) );
106109
}
107110
else {
108-
context.runOnContext( v -> resultForThisEventLoop.complete( id ) );
111+
complete( resultForThisEventLoop, id, t );
109112
}
110-
}
111-
else {
112-
if ( t != null ) {
113-
resultForThisEventLoop.completeExceptionally( t );
114-
}
115-
else {
116-
resultForThisEventLoop.complete( id );
117-
}
118-
}
113+
} );
114+
return resultForThisEventLoop;
119115
} );
120-
return resultForThisEventLoop;
121116
}
122117

123118
private final class GenerateIdAction implements Executor.Action<GeneratorState> {
124119

125120
private final ReactiveConnectionSupplier connectionSupplier;
126121
private final CompletableFuture<Long> result;
122+
private final ContextInternal creationContext;
127123

128124
public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) {
125+
this.creationContext = ContextInternal.current();
129126
this.connectionSupplier = Objects.requireNonNull( connectionSupplier );
130127
this.result = Objects.requireNonNull( result );
131128
}
@@ -137,9 +134,16 @@ public Task execute(GeneratorState state) {
137134
// We don't need to update or initialize the hi
138135
// value in the table, so just increment the lo
139136
// value and return the next id in the block
140-
completedFuture( local ).whenComplete( this::acceptAsReturnValue );
137+
result.complete( local );
141138
}
142139
else {
140+
creationContext.runOnContext( this::generateNewHiValue );
141+
}
142+
return null;
143+
}
144+
145+
private void generateNewHiValue(Void v) {
146+
try {
143147
nextHiValue( connectionSupplier )
144148
.whenComplete( (newlyGeneratedHi, throwable) -> {
145149
if ( throwable != null ) {
@@ -155,17 +159,19 @@ public Task execute(GeneratorState state) {
155159
}
156160
} );
157161
}
158-
return null;
159-
}
160-
161-
private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
162-
if ( throwable != null ) {
163-
result.completeExceptionally( throwable );
164-
}
165-
else {
166-
result.complete( aLong );
162+
catch ( Throwable e ) {
163+
// nextHivalue() could throw an exception before returning a completion stage
164+
result.completeExceptionally( e );
167165
}
168166
}
169167
}
170168

169+
private static <T> void complete(CompletableFuture<T> future, final T result, final Throwable throwable) {
170+
if ( throwable != null ) {
171+
future.completeExceptionally( throwable );
172+
}
173+
else {
174+
future.complete( result );
175+
}
176+
}
171177
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ public interface Log extends BasicLogger {
280280
@Message(id = 88, value = "Expected to use the object %1$s on context %2$s but was %3$s")
281281
HibernateException unexpectedContextDetected(Object obj, ContextInternal expectedContext, ContextInternal currentContext);
282282

283+
@Message(id = 89, value = "Connection is closed")
284+
IllegalStateException connectionIsClosed();
285+
283286
// Same method that exists in CoreMessageLogger
284287
@LogMessage(level = WARN)
285288
@Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
1919
import org.hibernate.reactive.adaptor.impl.JdbcNull;
2020
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
21+
import org.hibernate.reactive.common.InternalStateAssertions;
2122
import org.hibernate.reactive.logging.impl.Log;
2223
import org.hibernate.reactive.logging.impl.LoggerFactory;
2324
import org.hibernate.reactive.pool.BatchingConnection;
2425
import org.hibernate.reactive.pool.ReactiveConnection;
2526
import org.hibernate.reactive.util.impl.CompletionStages;
2627

28+
import io.vertx.core.internal.ContextInternal;
2729
import io.vertx.core.json.JsonArray;
2830
import io.vertx.core.json.JsonObject;
2931
import io.vertx.sqlclient.DatabaseException;
@@ -59,14 +61,22 @@ public class SqlClientConnection implements ReactiveConnection {
5961

6062
private final Pool pool;
6163
private final SqlConnection connection;
64+
// The context associated to the connection. We expect the connection to be executed in this context.
65+
private final ContextInternal connectionContext;
6266
private Transaction transaction;
6367

64-
SqlClientConnection(SqlConnection connection, Pool pool, SqlStatementLogger sqlStatementLogger, SqlExceptionHelper sqlExceptionHelper) {
68+
SqlClientConnection(
69+
SqlConnection connection,
70+
Pool pool,
71+
SqlStatementLogger sqlStatementLogger,
72+
SqlExceptionHelper sqlExceptionHelper,
73+
ContextInternal connectionContext) {
74+
this.connectionContext = connectionContext;
6575
this.pool = pool;
6676
this.sqlStatementLogger = sqlStatementLogger;
6777
this.connection = connection;
6878
this.sqlExceptionHelper = sqlExceptionHelper;
69-
LOG.tracef( "Connection created: %s", connection );
79+
LOG.tracef( "Connection created for %1$s associated to context %2$s: ", connection, connectionContext );
7080
}
7181

7282
@Override
@@ -338,6 +348,7 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
338348
}
339349

340350
private void feedback(String sql) {
351+
InternalStateAssertions.assertCurrentContextMatches( this, connectionContext );
341352
Objects.requireNonNull( sql, "SQL query cannot be null" );
342353
// DDL already gets formatted by the client, so don't reformat it
343354
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java

Lines changed: 82 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.lang.invoke.MethodHandles;
9+
import java.lang.invoke.VarHandle;
810
import java.sql.ResultSet;
911
import java.sql.SQLException;
1012
import java.util.List;
@@ -18,10 +20,13 @@
1820
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1921
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
2022
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
23+
import org.hibernate.reactive.logging.impl.Log;
24+
import org.hibernate.reactive.logging.impl.LoggerFactory;
2125
import org.hibernate.reactive.pool.ReactiveConnection;
2226
import org.hibernate.reactive.pool.ReactiveConnectionPool;
2327

2428
import io.vertx.core.Future;
29+
import io.vertx.core.internal.ContextInternal;
2530
import io.vertx.sqlclient.DatabaseException;
2631
import io.vertx.sqlclient.Pool;
2732
import io.vertx.sqlclient.Row;
@@ -30,7 +35,8 @@
3035
import io.vertx.sqlclient.Tuple;
3136
import io.vertx.sqlclient.spi.DatabaseMetadata;
3237

33-
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
38+
import static java.lang.invoke.MethodHandles.lookup;
39+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
3440
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
3541
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3642

@@ -123,12 +129,16 @@ public CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExc
123129
}
124130

125131
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool) {
126-
return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close );
132+
return completeFuture(
133+
pool.getConnection().map( this::newConnection ),
134+
ReactiveConnection::close
135+
);
127136
}
128137

129138
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) {
130-
return completionStage(
131-
pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ),
139+
return completeFuture(
140+
pool.getConnection()
141+
.map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ),
132142
ReactiveConnection::close
133143
);
134144
}
@@ -189,8 +199,8 @@ private void feedback(String sql) {
189199
/**
190200
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
191201
*/
192-
private <T> CompletionStage<T> completionStage(Future<T> future, Consumer<T> onCancellation) {
193-
CompletableFuture<T> completableFuture = new CompletableFuture<>();
202+
private <T> CompletionStage<T> completeFuture(Future<T> future, Consumer<T> onCancellation) {
203+
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
194204
future.onComplete( ar -> {
195205
if ( ar.succeeded() ) {
196206
if ( completableFuture.isCancelled() ) {
@@ -210,13 +220,35 @@ private SqlClientConnection newConnection(SqlConnection connection) {
210220
}
211221

212222
private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) {
213-
return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper );
223+
return new SqlClientConnection(
224+
connection,
225+
getPool(),
226+
getSqlStatementLogger(),
227+
sqlExceptionHelper,
228+
ContextInternal.current()
229+
);
214230
}
215231

216232
private static class ProxyConnection implements ReactiveConnection {
233+
234+
private static final Log LOG = LoggerFactory.make( Log.class, lookup() );
235+
236+
private static final VarHandle OPENED_HANDLE;
237+
238+
static {
239+
try {
240+
MethodHandles.Lookup lookup = lookup();
241+
OPENED_HANDLE = lookup.findVarHandle( ProxyConnection.class, "opened", boolean.class );
242+
}
243+
catch (ReflectiveOperationException e) {
244+
throw new ExceptionInInitializerError( e );
245+
}
246+
}
247+
217248
private final Supplier<CompletionStage<ReactiveConnection>> connectionSupplier;
218-
private Integer batchSize;
219-
private ReactiveConnection connection;
249+
private final CompletableFuture<ReactiveConnection> connectionFuture = new CompletableFuture<>();
250+
private volatile boolean opened = false;
251+
private volatile boolean closed = false;
220252

221253
public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionSupplier) {
222254
this.connectionSupplier = connectionSupplier;
@@ -225,29 +257,41 @@ public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionS
225257
/**
226258
* @return the existing {@link ReactiveConnection}, or open a new one
227259
*/
228-
CompletionStage<ReactiveConnection> connection() {
229-
if ( connection == null ) {
230-
return connectionSupplier.get()
231-
.thenApply( conn -> {
232-
if ( batchSize != null ) {
233-
conn.withBatchSize( batchSize );
234-
}
235-
connection = conn;
236-
return connection;
237-
} );
260+
private CompletionStage<ReactiveConnection> connection() {
261+
if ( closed ) {
262+
return failedFuture( LOG.connectionIsClosed() );
263+
}
264+
if ( opened ) {
265+
return connectionFuture;
238266
}
239-
return completedFuture( connection );
267+
if ( OPENED_HANDLE.compareAndSet( this, false, true ) ) {
268+
connectionSupplier.get().whenComplete( (connection, throwable) -> {
269+
if ( throwable != null ) {
270+
connectionFuture.completeExceptionally( throwable );
271+
}
272+
else {
273+
connectionFuture.complete( connection );
274+
}
275+
} );
276+
}
277+
return connectionFuture;
240278
}
241279

242280
@Override
243281
public boolean isTransactionInProgress() {
244-
return connection != null && connection.isTransactionInProgress();
282+
ReactiveConnection reactiveConnection = connectionFuture.getNow( null );
283+
return reactiveConnection != null && reactiveConnection.isTransactionInProgress();
245284
}
246285

247286
@Override
248287
public DatabaseMetadata getDatabaseMetadata() {
249-
Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" );
250-
return connection.getDatabaseMetadata();
288+
if ( closed ) {
289+
throw LOG.connectionIsClosed();
290+
}
291+
292+
return Objects
293+
.requireNonNull( connectionFuture.getNow( null ), "Database metadata not available until a connection has been created" )
294+
.getDatabaseMetadata();
251295
}
252296

253297
@Override
@@ -355,15 +399,22 @@ public CompletionStage<Void> rollbackTransaction() {
355399
return connection().thenCompose( ReactiveConnection::rollbackTransaction );
356400
}
357401

358-
@Override
359402
public ReactiveConnection withBatchSize(int batchSize) {
360-
if ( connection == null ) {
361-
this.batchSize = batchSize;
403+
if ( closed ) {
404+
throw LOG.connectionIsClosed();
405+
}
406+
407+
if ( connectionFuture.isDone() ) {
408+
// connection exists, we can let callers use the delegate and forget about the proxy.
409+
return connectionFuture.getNow( null ).withBatchSize( batchSize );
362410
}
363411
else {
364-
connection = connection.withBatchSize( batchSize );
412+
return new ProxyConnection( () -> opened
413+
// Connection has been requested but not created yet
414+
? connectionFuture.thenApply( c -> c.withBatchSize( batchSize ) )
415+
// Connection has not been requested
416+
: connectionSupplier.get().thenApply( c -> c.withBatchSize( batchSize ) ) );
365417
}
366-
return this;
367418
}
368419

369420
@Override
@@ -373,8 +424,9 @@ public CompletionStage<Void> executeBatch() {
373424

374425
@Override
375426
public CompletionStage<Void> close() {
376-
return connection != null
377-
? connection.close().thenAccept( v -> connection = null )
427+
closed = true;
428+
return opened
429+
? connectionFuture.thenCompose( ReactiveConnection::close )
378430
: voidFuture();
379431
}
380432
}

0 commit comments

Comments
 (0)