From 7f7d9b8b16630ffabeca2e9e1467bd6f97698047 Mon Sep 17 00:00:00 2001 From: Ken Hu <106191785+kenhuuu@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:16:13 -0800 Subject: [PATCH] TINKERPOP-3217 Add server option to close Session automatically. The added destroySessionPostGraphOp setting enables re-using the same underlying connection for a different subsequent Session. This should increase performance for cases where many short-lived Transactions are sent to the server. --- CHANGELOG.asciidoc | 2 + .../reference/gremlin-applications.asciidoc | 1 + .../tinkerpop/gremlin/server/Settings.java | 8 + .../server/handler/AbstractSession.java | 4 + .../server/op/AbstractOpProcessor.java | 8 +- .../server/op/session/SessionOpProcessor.java | 27 +- .../AbstractSessionTxIntegrateTest.java | 407 ++++++++++++++++++ .../GremlinSessionReuseTxIntegrateTest.java | 46 ++ .../server/GremlinSessionTxIntegrateTest.java | 378 +--------------- 9 files changed, 498 insertions(+), 383 deletions(-) create mode 100644 gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractSessionTxIntegrateTest.java create mode 100644 gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b292f8939aa..271bc6d921d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,7 +22,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima [[release-3-7-6]] === TinkerPop 3.7.6 (NOT OFFICIALLY RELEASED YET) + * Integrated Python driver examples into automated build process to ensure examples remain functional. +* Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback. [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: November 12, 2025) diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 0eee376fab7..ed6edbf14d8 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -1015,6 +1015,7 @@ The following table describes the various YAML configuration options that Gremli |authorization.authorizer |The fully qualified classname of an `Authorizer` implementation to use. |_none_ |authorization.config |A `Map` of configuration settings to be passed to the `Authorizer` when it is constructed. The settings available are dependent on the implementation. |_none_ |channelizer |The fully qualified classname of the `Channelizer` implementation to use. A `Channelizer` is a "channel initializer" which Gremlin Server uses to define the type of processing pipeline to use. By allowing different `Channelizer` implementations, Gremlin Server can support different communication protocols (e.g. WebSocket). |`WebSocketChannelizer` +|closeSessionPostGraphOp |Controls whether a `Session` will be closed by the server after a successful TX_COMMIT or TX_ROLLBACK bytecode request. |_false_ |enableAuditLog |The `AuthenticationHandler`, `AuthorizationHandler` and processors can issue audit logging messages with the authenticated user, remote socket address and requests with a gremlin query. For privacy reasons, the default value of this setting is false. The audit logging messages are logged at the INFO level via the `audit.org.apache.tinkerpop.gremlin.server` logger, which can be configured using the `logback.xml` file. |_false_ |graphManager |The fully qualified classname of the `GraphManager` implementation to use. A `GraphManager` is a class that adheres to the TinkerPop `GraphManager` interface, allowing custom implementations for storing and managing graph references, as well as defining custom methods to open and close graphs instantiations. To prevent Gremlin Server from starting when all graphs fails, the `CheckedGraphManager` can be used.|`DefaultGraphManager` |graphs |A `Map` of `Graph` configuration files where the key of the `Map` becomes the name to which the `Graph` will be bound and the value is the file name of a `Graph` configuration file. |_none_ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 50e7444b2f6..5323f5d787f 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -189,6 +189,14 @@ public Settings() { */ public boolean strictTransactionManagement = false; + /** + * If set to {@code true} the Gremlin Server will close the session when a GraphOp (commit or rollback) is + * successfully completed on that session. + * + * NOTE: Defaults to false in 3.7.x/3.8.x to prevent breaking change. + */ + public boolean closeSessionPostGraphOp = false; + /** * The full class name of the {@link Channelizer} to use in Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java index 6680127d513..c7519ced5dd 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java @@ -699,6 +699,10 @@ protected void handleGraphOperation(final SessionTask sessionTask, final Bytecod .code(ResponseStatusCode.NO_CONTENT) .statusAttributes(attributes) .create()); + + if (sessionTask.getSettings().closeSessionPostGraphOp) { + close(); + } } else { throw new IllegalStateException(String.format( "Bytecode in request is not a recognized graph operation: %s", bytecode.toString())); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java index c0717802841..1c1c6347134 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java @@ -111,8 +111,7 @@ protected void handleIterator(final Context context, final Iterator itty) throws int warnCounter = 0; // sessionless requests are always transaction managed, but in-session requests are configurable. - final boolean managedTransactionsForRequest = manageTransactions ? - true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false); + final boolean managedTransactionsForRequest = shouldManageTransactionsForRequest(context); // we have an empty iterator - happens on stuff like: g.V().iterate() if (!itty.hasNext()) { @@ -371,4 +370,9 @@ protected static void attemptRollback(final RequestMessage msg, final GraphManag graphManager.rollbackAll(); } } + + protected boolean shouldManageTransactionsForRequest(final Context ctx) { + return manageTransactions ? + true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false); + } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java index e10a0f1dd04..fc96c1fdf99 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java @@ -141,6 +141,9 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { }}; } + // Determines whether to close the session after a successful COMMIT/ROLLBACK. Set during init(). + private boolean closeSessionPostGraphOp; + public SessionOpProcessor() { super(false); } @@ -154,6 +157,7 @@ public String getName() { public void init(final Settings settings) { this.maxParameters = (int) settings.optionalProcessor(SessionOpProcessor.class).orElse(DEFAULT_SETTINGS).config. getOrDefault(CONFIG_MAX_PARAMETERS, DEFAULT_MAX_PARAMETERS); + this.closeSessionPostGraphOp = settings.closeSessionPostGraphOp; } /** @@ -546,6 +550,13 @@ protected void handleGraphOperation(final Bytecode bytecode, final Graph graph, .statusAttributes(attributes) .create()); + if (closeSessionPostGraphOp) { + // Setting force to true prevents deadlock when this thread attempts to destroy the session. + // This should be safe since either a commit or rollback just finished so the transaction + // shouldn't be open. + session.manualKill(true); + } + } catch (Throwable t) { onError(graph, context); // if any exception in the chain is TemporaryException or Failure then we should respond with the @@ -571,6 +582,13 @@ protected void handleGraphOperation(final Bytecode bytecode, final Graph graph, .statusMessage(t.getMessage()) .statusAttributeException(t).create()); } + + if (closeSessionPostGraphOp && shouldManageTransactionsForRequest(context)) { + // Destroy the session after a successful rollback due to error. Placed here rather than + // in a finally block since we don't want to end the session if no commit/rollback succeeded. + session.manualKill(true); + } + if (t instanceof Error) { //Re-throw any errors to be handled by and set as the result the FutureTask throw t; @@ -589,20 +607,17 @@ protected void handleGraphOperation(final Bytecode bytecode, final Graph graph, } protected void beforeProcessing(final Graph graph, final Context ctx) { - final boolean managedTransactionsForRequest = manageTransactions ? - true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false); + final boolean managedTransactionsForRequest = shouldManageTransactionsForRequest(ctx); if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback(); } protected void onError(final Graph graph, final Context ctx) { - final boolean managedTransactionsForRequest = manageTransactions ? - true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false); + final boolean managedTransactionsForRequest = shouldManageTransactionsForRequest(ctx); if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback(); } protected void onTraversalSuccess(final Graph graph, final Context ctx) { - final boolean managedTransactionsForRequest = manageTransactions ? - true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false); + final boolean managedTransactionsForRequest = shouldManageTransactionsForRequest(ctx); if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit(); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractSessionTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractSessionTxIntegrateTest.java new file mode 100644 index 00000000000..4f3ef2fc433 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractSessionTxIntegrateTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor; +import org.apache.tinkerpop.gremlin.util.ExceptionHelper; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Abstract test class that holds tests for the gremlin-driver and bytecode sessions. + * + * @author Stephen Mallette (http://stephen.genoprime.com) + */ +public abstract class AbstractSessionTxIntegrateTest extends AbstractGremlinServerIntegrationTest { + + public static Long DEFAULT_TEST_SESSION_TIMEOUT = 3000L; + + abstract protected Cluster createCluster(); + + /** + * Configure specific Gremlin Server settings for specific tests. + */ + @Override + public Settings overrideSettings(final Settings settings) { + final String nameOfTest = name.getMethodName(); + + settings.graphs.put("graph", "conf/tinkertransactiongraph-empty.properties"); + + switch (nameOfTest) { + case "shouldTimeoutTxBytecode": + settings.processors.clear(); + + // OpProcessor setting + final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings(); + processorSettings.className = SessionOpProcessor.class.getCanonicalName(); + processorSettings.config = new HashMap<>(); + processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, DEFAULT_TEST_SESSION_TIMEOUT); + settings.processors.add(processorSettings); + + // Unified setting + settings.sessionLifetimeTimeout = DEFAULT_TEST_SESSION_TIMEOUT; + break; + } + + return settings; + } + + @Test + @Ignore("TINKERPOP-2832") + public void shouldTimeoutTxBytecode() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").addE("link").iterate(); + gtx.tx().commit(); + + assertEquals(1L, g.V().count().next().longValue()); + assertEquals(1L, g.E().count().next().longValue()); + + try { + gtx = g.tx().begin(); + + assertEquals(1L, gtx.V().count().next().longValue()); + assertEquals(1L, gtx.E().count().next().longValue()); + + // wait long enough for the session to die + Thread.sleep(4000); + + // the following should fail with a dead session + gtx.V().count().iterate(); + fail("Session is dead - a new one should not reopen to serve this request"); + } catch (Exception ex) { + ex.printStackTrace(); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSession() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + + gtx.tx().commit(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionWithExplicitTransactionObject() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + final Transaction tx = g.tx(); + assertThat(tx.isOpen(), is(true)); + + final GraphTraversalSource gtx = tx.begin(); + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + tx.commit(); + assertThat(tx.isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + cluster.close(); + } + + @Test + public void shouldRollbackTxBytecodeInSession() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.tx().rollback(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(0, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionOnCloseOfGtx() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.close(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionOnCloseTx() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.close(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldOpenAndCloseObsceneAmountOfSessions() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + // need to open significantly more sessions that we have threads in gremlinPool. if we go too obscene on + // OpProcessor this test will take too long + final int numberOfSessions = isUsingUnifiedChannelizer() ? 1000 : 100; + for (int ix = 0; ix < numberOfSessions; ix ++) { + final Transaction tx = g.tx(); + final GraphTraversalSource gtx = tx.begin(); + try { + final Vertex v1 = gtx.addV("person").property("pid", ix + "a").next(); + final Vertex v2 = gtx.addV("person").property("pid", ix + "b").next(); + gtx.addE("knows").from(v1).to(v2).iterate(); + tx.commit(); + } catch (Exception ex) { + tx.rollback(); + fail("Should not expect any failures"); + } finally { + assertThat(tx.isOpen(), is(false)); + } + } + + // sessionless connections should still be good - close() should not affect that + assertEquals(numberOfSessions * 2, (long) g.V().count().next()); + assertEquals(numberOfSessions, (long) g.E().count().next()); + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionReusingGtxAcrossThreads() throws Exception { + + final ExecutorService service = Executors.newFixedThreadPool(2); + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + final int verticesToAdd = 64; + for (int ix = 0; ix < verticesToAdd; ix++) { + service.submit(() -> gtx.addV("person").iterate()); + } + + service.shutdown(); + service.awaitTermination(90000, TimeUnit.MILLISECONDS); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + + assertEquals(verticesToAdd, (long) gtx.V().count().next()); + gtx.tx().commit(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(verticesToAdd, (long) g.V().count().next()); + + cluster.close(); + } + + @Test + public void shouldSpawnMultipleTraversalSourceInSameTransaction() throws Exception { + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final Transaction tx1 = g.tx(); + final GraphTraversalSource gtx1a = tx1.begin(); + final GraphTraversalSource gtx1b = tx1.begin(); + final Transaction tx2 = g.tx(); + final GraphTraversalSource gtx2 = tx2.begin(); + + gtx1a.addV("person").iterate(); + assertEquals(1, (long) gtx1a.V().count().next()); + assertEquals(1, (long) gtx1b.V().count().next()); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + assertEquals(0, (long) gtx2.V().count().next()); + + // either can commit to end the transaction + gtx1b.tx().commit(); + assertThat(gtx1a.tx().isOpen(), is(false)); + assertThat(gtx1b.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx1a.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + try { + gtx1b.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitRollbackInScriptUsingGremlinLang() throws Exception { + final Cluster cluster = TestClientFactory.open(); + final Client.SessionSettings sessionSettings = Client.SessionSettings.build(). + sessionId(name.getMethodName()). + manageTransactions(false). + maintainStateAfterException(false). + create(); + final Client.Settings clientSettings = Client.Settings.build().useSession(sessionSettings).create(); + final Client client = cluster.connect(); + final Client session = cluster.connect(clientSettings); + + // this test mixes calls across scriptengines - probably not a use case but interesting + try { + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + + // outside of session graph should be empty still but in session we should have 2 + assertEquals(0, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // commit whats there using gremlin-language and test again + session.submit("g.tx().commit()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // add one more in session and test + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(3, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // rollback the additional one and test + session.submit("g.tx().rollback()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + } finally { + cluster.close(); + } + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java new file mode 100644 index 00000000000..e49349dee6a --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Cluster; + +/** + * Integration tests for gremlin-driver and bytecode sessions where the underlying connection can be re-used for + * multiple sessions. The server is configured with "closeSessionPostGraphOp" set to True. + */ +public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegrateTest { + + @Override + protected Cluster createCluster() { + return TestClientFactory.build().create(); + } + + /** + * Configure specific Gremlin Server settings for specific tests. + */ + @Override + public Settings overrideSettings(final Settings settings) { + super.overrideSettings(settings); + + // This setting allows connection re-use on the server side. + settings.closeSessionPostGraphOp = true; + + return settings; + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java index cf22cf91568..bc3873b01df 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java @@ -18,389 +18,17 @@ */ package org.apache.tinkerpop.gremlin.server; -import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor; -import org.apache.tinkerpop.gremlin.util.ExceptionHelper; -import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; -import org.apache.tinkerpop.gremlin.driver.RequestOptions; -import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.structure.Transaction; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.File; -import java.util.HashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; /** * Integration tests for gremlin-driver and bytecode sessions. * * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class GremlinSessionTxIntegrateTest extends AbstractGremlinServerIntegrationTest { +public class GremlinSessionTxIntegrateTest extends AbstractSessionTxIntegrateTest { - /** - * Configure specific Gremlin Server settings for specific tests. - */ @Override - public Settings overrideSettings(final Settings settings) { - final String nameOfTest = name.getMethodName(); - - settings.graphs.put("graph", "conf/tinkertransactiongraph-empty.properties"); - - switch (nameOfTest) { - case "shouldExecuteBytecodeInSession": - break; - case "shouldTimeoutTxBytecode": - settings.processors.clear(); - - // OpProcessor setting - final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings(); - processorSettings.className = SessionOpProcessor.class.getCanonicalName(); - processorSettings.config = new HashMap<>(); - processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L); - settings.processors.add(processorSettings); - - // Unified setting - settings.sessionLifetimeTimeout = 3000L; - break; - } - - return settings; - } - - @Test - @Ignore("TINKERPOP-2832") - public void shouldTimeoutTxBytecode() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - GraphTraversalSource gtx = g.tx().begin(); - gtx.addV("person").addE("link").iterate(); - gtx.tx().commit(); - - assertEquals(1L, g.V().count().next().longValue()); - assertEquals(1L, g.E().count().next().longValue()); - - try { - gtx = g.tx().begin(); - - assertEquals(1L, gtx.V().count().next().longValue()); - assertEquals(1L, gtx.E().count().next().longValue()); - - // wait long enough for the session to die - Thread.sleep(4000); - - // the following should fail with a dead session - gtx.V().count().iterate(); - fail("Session is dead - a new one should not reopen to server this request"); - } catch (Exception ex) { - ex.printStackTrace(); - } - - cluster.close(); - } - - @Test - public void shouldCommitTxBytecodeInSession() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final GraphTraversalSource gtx = g.tx().begin(); - assertThat(gtx.tx().isOpen(), is(true)); - - gtx.addV("person").iterate(); - assertEquals(1, (long) gtx.V().count().next()); - - // outside the session we should be at zero - assertEquals(0, (long) g.V().count().next()); - - gtx.tx().commit(); - assertThat(gtx.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(1, (long) g.V().count().next()); - - // but the spawned gtx should be dead - try { - gtx.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - cluster.close(); - } - - @Test - public void shouldCommitTxBytecodeInSessionWithExplicitTransactionObject() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - final Transaction tx = g.tx(); - assertThat(tx.isOpen(), is(true)); - - final GraphTraversalSource gtx = tx.begin(); - gtx.addV("person").iterate(); - assertEquals(1, (long) gtx.V().count().next()); - tx.commit(); - assertThat(tx.isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(1, (long) g.V().count().next()); - - cluster.close(); - } - - @Test - public void shouldRollbackTxBytecodeInSession() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final GraphTraversalSource gtx = g.tx().begin(); - assertThat(gtx.tx().isOpen(), is(true)); - - gtx.addV("person").iterate(); - assertEquals(1, (long) gtx.V().count().next()); - gtx.tx().rollback(); - assertThat(gtx.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(0, (long) g.V().count().next()); - - // but the spawned gtx should be dead - try { - gtx.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - cluster.close(); - } - - @Test - public void shouldCommitTxBytecodeInSessionOnCloseOfGtx() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final GraphTraversalSource gtx = g.tx().begin(); - assertThat(gtx.tx().isOpen(), is(true)); - - gtx.addV("person").iterate(); - assertEquals(1, (long) gtx.V().count().next()); - gtx.close(); - assertThat(gtx.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(1, (long) g.V().count().next()); - - // but the spawned gtx should be dead - try { - gtx.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - cluster.close(); - } - - @Test - public void shouldCommitTxBytecodeInSessionOnCloseTx() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final GraphTraversalSource gtx = g.tx().begin(); - assertThat(gtx.tx().isOpen(), is(true)); - - gtx.addV("person").iterate(); - assertEquals(1, (long) gtx.V().count().next()); - gtx.close(); - assertThat(gtx.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(1, (long) g.V().count().next()); - - // but the spawned gtx should be dead - try { - gtx.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - cluster.close(); - } - - @Test - public void shouldOpenAndCloseObsceneAmountOfSessions() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - // need to open significantly more sessions that we have threads in gremlinPool. if we go too obscene on - // OpProcessor this test will take too long - final int numberOfSessions = isUsingUnifiedChannelizer() ? 1000 : 100; - for (int ix = 0; ix < numberOfSessions; ix ++) { - final Transaction tx = g.tx(); - final GraphTraversalSource gtx = tx.begin(); - try { - final Vertex v1 = gtx.addV("person").property("pid", ix + "a").next(); - final Vertex v2 = gtx.addV("person").property("pid", ix + "b").next(); - gtx.addE("knows").from(v1).to(v2).iterate(); - tx.commit(); - } catch (Exception ex) { - tx.rollback(); - fail("Should not expect any failures"); - } finally { - assertThat(tx.isOpen(), is(false)); - } - } - - // sessionless connections should still be good - close() should not affect that - assertEquals(numberOfSessions * 2, (long) g.V().count().next()); - assertEquals(numberOfSessions, (long) g.E().count().next()); - - cluster.close(); - } - - @Test - public void shouldCommitTxBytecodeInSessionReusingGtxAcrossThreads() throws Exception { - - final ExecutorService service = Executors.newFixedThreadPool(2); - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final GraphTraversalSource gtx = g.tx().begin(); - assertThat(gtx.tx().isOpen(), is(true)); - - final int verticesToAdd = 64; - for (int ix = 0; ix < verticesToAdd; ix++) { - service.submit(() -> gtx.addV("person").iterate()); - } - - service.shutdown(); - service.awaitTermination(90000, TimeUnit.MILLISECONDS); - - // outside the session we should be at zero - assertEquals(0, (long) g.V().count().next()); - - assertEquals(verticesToAdd, (long) gtx.V().count().next()); - gtx.tx().commit(); - assertThat(gtx.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(verticesToAdd, (long) g.V().count().next()); - - cluster.close(); - } - - @Test - public void shouldSpawnMultipleTraversalSourceInSameTransaction() throws Exception { - - final Cluster cluster = TestClientFactory.build().create(); - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - - final Transaction tx1 = g.tx(); - final GraphTraversalSource gtx1a = tx1.begin(); - final GraphTraversalSource gtx1b = tx1.begin(); - final Transaction tx2 = g.tx(); - final GraphTraversalSource gtx2 = tx2.begin(); - - gtx1a.addV("person").iterate(); - assertEquals(1, (long) gtx1a.V().count().next()); - assertEquals(1, (long) gtx1b.V().count().next()); - - // outside the session we should be at zero - assertEquals(0, (long) g.V().count().next()); - assertEquals(0, (long) gtx2.V().count().next()); - - // either can commit to end the transaction - gtx1b.tx().commit(); - assertThat(gtx1a.tx().isOpen(), is(false)); - assertThat(gtx1b.tx().isOpen(), is(false)); - - // sessionless connections should still be good - close() should not affect that - assertEquals(1, (long) g.V().count().next()); - - // but the spawned gtx should be dead - try { - gtx1a.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - try { - gtx1b.addV("software").iterate(); - fail("Should have failed since we committed the transaction"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertEquals("Client is closed", root.getMessage()); - } - - cluster.close(); - } - - @Test - public void shouldCommitRollbackInScriptUsingGremlinLang() throws Exception { - final Cluster cluster = TestClientFactory.open(); - final Client.SessionSettings sessionSettings = Client.SessionSettings.build(). - sessionId(name.getMethodName()). - manageTransactions(false). - maintainStateAfterException(false). - create(); - final Client.Settings clientSettings = Client.Settings.build().useSession(sessionSettings).create(); - final Client client = cluster.connect(); - final Client session = cluster.connect(clientSettings); - - // this test mixes calls across scriptengines - probably not a use case but interesting - try { - session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); - session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); - - // outside of session graph should be empty still but in session we should have 2 - assertEquals(0, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - - // commit whats there using gremlin-language and test again - session.submit("g.tx().commit()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); - assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - - // add one more in session and test - session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); - assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - assertEquals(3, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - - // rollback the additional one and test - session.submit("g.tx().rollback()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); - assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); - } finally { - cluster.close(); - } + protected Cluster createCluster() { + return TestClientFactory.build().create(); } }