Skip to content

Commit f425884

Browse files
authored
fix: R2dbcTransaction parameters should be defined at beginTransaction() (#2679)
* fix: Move setting of transaction parameters to before beginTransaction()
1 parent 9a521f4 commit f425884

File tree

16 files changed

+215
-59
lines changed

16 files changed

+215
-59
lines changed

exposed-jdbc/src/main/kotlin/org/jetbrains/exposed/v1/jdbc/transactions/Transactions.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ fun <T> inTopLevelTransaction(
180180

181181
while (true) {
182182
val transaction = database.transactionManager.newTransaction(
183-
transactionIsolation ?: database.transactionManager.defaultIsolationLevel, readOnly ?: database.transactionManager.defaultReadOnly, outerTransaction
183+
transactionIsolation ?: database.transactionManager.defaultIsolationLevel,
184+
readOnly ?: database.transactionManager.defaultReadOnly,
185+
outerTransaction
184186
)
185187

186188
try {
@@ -324,7 +326,9 @@ suspend fun <T> inTopLevelSuspendTransaction(
324326

325327
while (true) {
326328
val transaction = database.transactionManager.newTransaction(
327-
transactionIsolation ?: database.transactionManager.defaultIsolationLevel, readOnly ?: database.transactionManager.defaultReadOnly, outerTransaction
329+
transactionIsolation ?: database.transactionManager.defaultIsolationLevel,
330+
readOnly ?: database.transactionManager.defaultReadOnly,
331+
outerTransaction
328332
)
329333

330334
try {

exposed-r2dbc-tests/src/test/kotlin/org/jetbrains/exposed/v1/r2dbc/sql/tests/shared/CoroutineTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ class CoroutineTests : R2dbcDatabaseTestsBase() {
211211
}
212212
}
213213

214-
@Disabled("Until isolation level fixed")
215214
@RepeatedTest(10)
216215
@CoroutinesTimeout(60000)
217216
fun nestedSuspendTxTest() {
@@ -252,6 +251,7 @@ class CoroutineTests : R2dbcDatabaseTestsBase() {
252251
}
253252
}
254253

254+
@Disabled
255255
@RepeatedTest(10)
256256
@CoroutinesTimeout(60000)
257257
fun nestedSuspendAsyncTxTest() {

exposed-r2dbc-tests/src/test/kotlin/org/jetbrains/exposed/v1/r2dbc/sql/tests/shared/QueryTimeoutTest.kt

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,26 @@ package org.jetbrains.exposed.v1.r2dbc.sql.tests.shared
22

33
import io.r2dbc.spi.R2dbcNonTransientResourceException
44
import io.r2dbc.spi.R2dbcTimeoutException
5+
import kotlinx.coroutines.test.runTest
56
import nl.altindag.log.LogCaptor
67
import org.jetbrains.exposed.v1.core.exposedLogger
78
import org.jetbrains.exposed.v1.r2dbc.ExposedR2dbcException
89
import org.jetbrains.exposed.v1.r2dbc.tests.R2dbcDatabaseTestsBase
910
import org.jetbrains.exposed.v1.r2dbc.tests.TestDB
1011
import org.jetbrains.exposed.v1.r2dbc.tests.shared.assertTrue
1112
import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager
13+
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
14+
import org.junit.jupiter.api.Assertions
15+
import org.junit.jupiter.api.Assumptions
1216
import org.junit.jupiter.api.Disabled
1317
import org.junit.jupiter.api.Test
14-
import kotlin.test.fail
18+
import kotlin.test.assertTrue
1519

16-
@Disabled
1720
class QueryTimeoutTest : R2dbcDatabaseTestsBase() {
1821

1922
private fun generateTimeoutStatements(db: TestDB, timeout: Int): String {
2023
return when (db) {
21-
in TestDB.ALL_MYSQL_MARIADB -> "SELECT SLEEP($timeout) = 0;"
24+
in TestDB.ALL_MYSQL_MARIADB -> "SELECT 1 = 0 WHERE SLEEP($timeout);"
2225
in TestDB.ALL_POSTGRES -> "SELECT pg_sleep($timeout);"
2326
TestDB.SQLSERVER -> "WAITFOR DELAY '00:00:$timeout';"
2427
else -> throw NotImplementedError()
@@ -28,19 +31,33 @@ class QueryTimeoutTest : R2dbcDatabaseTestsBase() {
2831
private val timeoutTestDBList = TestDB.ALL_MARIADB + TestDB.ALL_POSTGRES + TestDB.SQLSERVER + TestDB.MYSQL_V8
2932

3033
@Test
31-
fun timeoutStatements() {
32-
withDb(timeoutTestDBList) { testDB ->
33-
this.queryTimeout = 3
34+
fun timeoutStatements() = runTest {
35+
Assumptions.assumeTrue(dialect in timeoutTestDBList)
36+
37+
if (dialect == TestDB.POSTGRESQL) {
38+
val db = dialect.connect { defaultMaxAttempts = 1 }
3439
try {
35-
TransactionManager.current().exec(
36-
generateTimeoutStatements(testDB, 5)
37-
)
38-
fail("Should have thrown a timeout or cancelled statement exception")
40+
suspendTransaction(db = db) {
41+
this.queryTimeout = 3
42+
TransactionManager.current().exec(
43+
generateTimeoutStatements(dialect, 5)
44+
)
45+
Assertions.fail("Should have thrown a timeout or cancelled statement exception")
46+
}
3947
} catch (cause: ExposedR2dbcException) {
40-
when (testDB) {
41-
// PostgreSQL throws a regular PgSQLException with a cancelled statement message
42-
TestDB.POSTGRESQL -> assertTrue(cause.cause is R2dbcNonTransientResourceException)
43-
else -> assertTrue(cause.cause is R2dbcTimeoutException)
48+
assertTrue(cause.cause is R2dbcNonTransientResourceException)
49+
}
50+
TransactionManager.closeAndUnregister(db)
51+
} else {
52+
withDb { testDB ->
53+
this.queryTimeout = 3
54+
try {
55+
TransactionManager.current().exec(
56+
generateTimeoutStatements(testDB, 5)
57+
)
58+
Assertions.fail("Should have thrown a timeout or cancelled statement exception")
59+
} catch (cause: ExposedR2dbcException) {
60+
assertTrue(cause.cause is R2dbcTimeoutException)
4461
}
4562
}
4663
}
@@ -66,21 +83,7 @@ class QueryTimeoutTest : R2dbcDatabaseTestsBase() {
6683
}
6784
}
6885

69-
@Test
70-
fun timeoutMinusWithTimeoutStatement() {
71-
withDb(timeoutTestDBList) { testDB ->
72-
this.queryTimeout = -1
73-
try {
74-
TransactionManager.current().exec(
75-
generateTimeoutStatements(testDB, 1)
76-
)
77-
fail("Should have thrown a timeout or cancelled statement exception")
78-
} catch (cause: ExposedR2dbcException) {
79-
assertTrue(cause.cause is R2dbcTimeoutException)
80-
}
81-
}
82-
}
83-
86+
@Disabled("MariaDB v2 fails on TC with empty log list")
8487
@Test
8588
fun testLongQueryThrowsWarning() {
8689
val logCaptor = LogCaptor.forName(exposedLogger.name)

exposed-r2dbc-tests/src/test/kotlin/org/jetbrains/exposed/v1/r2dbc/sql/tests/shared/ReadOnlyTests.kt

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import org.jetbrains.exposed.v1.r2dbc.tests.TestDB
1010
import org.jetbrains.exposed.v1.r2dbc.tests.shared.expectException
1111
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
1212
import org.junit.jupiter.api.Assumptions
13-
import org.junit.jupiter.api.Disabled
1413
import org.junit.jupiter.api.Test
14+
import org.junit.jupiter.api.assertInstanceOf
15+
import kotlin.test.assertContains
1516

1617
// equivalent to exposed-tests/ThreadLocalManagerTest.kt
1718
class ReadOnlyTests : R2dbcDatabaseTestsBase() {
@@ -25,25 +26,40 @@ class ReadOnlyTests : R2dbcDatabaseTestsBase() {
2526
val value = varchar("value", 20)
2627
}
2728

28-
@Disabled
2929
@Test
3030
fun testReadOnly() = runTest {
3131
Assumptions.assumeFalse(dialect in readOnlyExcludedVendors)
3232

3333
val database = dialect.connect()
34-
suspendTransaction(db = database, readOnly = true) {
35-
expectException<ExposedR2dbcException> {
36-
SchemaUtils.create(RollbackTable)
34+
if (dialect == TestDB.POSTGRESQL) {
35+
try {
36+
suspendTransaction(db = database, readOnly = true) {
37+
SchemaUtils.create(RollbackTable)
38+
}
39+
} catch (cause: Exception) {
40+
assertInstanceOf<ExposedR2dbcException>(cause)
41+
assertContains(cause.message, "read-only transaction")
3742
}
3843
}
3944

4045
suspendTransaction(db = database) {
4146
SchemaUtils.create(RollbackTable)
4247
}
4348

44-
suspendTransaction(db = database, readOnly = true) {
45-
expectException<ExposedR2dbcException> {
46-
RollbackTable.insert { it[RollbackTable.value] = "random-something" }
49+
if (dialect == TestDB.POSTGRESQL) {
50+
try {
51+
suspendTransaction(db = database, readOnly = true) {
52+
RollbackTable.insert { it[RollbackTable.value] = "random-something" }
53+
}
54+
} catch (cause: Exception) {
55+
assertInstanceOf<ExposedR2dbcException>(cause)
56+
assertContains(cause.message, "read-only transaction")
57+
}
58+
} else {
59+
suspendTransaction(db = database, readOnly = true) {
60+
expectException<ExposedR2dbcException> {
61+
RollbackTable.insert { it[RollbackTable.value] = "random-something" }
62+
}
4763
}
4864
}
4965

exposed-r2dbc-tests/src/test/kotlin/org/jetbrains/exposed/v1/r2dbc/sql/tests/shared/TransactionIsolationTest.kt

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.jetbrains.exposed.v1.r2dbc.sql.tests.shared
22

33
import io.r2dbc.spi.IsolationLevel
4+
import io.r2dbc.spi.Option
5+
import io.r2dbc.spi.TransactionDefinition
46
import kotlinx.coroutines.flow.singleOrNull
57
import kotlinx.coroutines.test.runTest
68
import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase
@@ -13,14 +15,12 @@ import org.jetbrains.exposed.v1.r2dbc.tests.shared.assertEquals
1315
import org.jetbrains.exposed.v1.r2dbc.transactions.inTopLevelSuspendTransaction
1416
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
1517
import org.junit.jupiter.api.Assumptions
16-
import org.junit.jupiter.api.Disabled
1718
import org.junit.jupiter.api.Test
1819
import kotlin.test.assertNotNull
1920

2021
class TransactionIsolationTest : R2dbcDatabaseTestsBase() {
2122
private val transactionIsolationSupportDb = TestDB.ALL_MARIADB + TestDB.MYSQL_V5 + TestDB.POSTGRESQL + TestDB.SQLSERVER
2223

23-
@Disabled
2424
@Test
2525
fun testWhatTransactionIsolationWasApplied() {
2626
withDb {
@@ -31,15 +31,13 @@ class TransactionIsolationTest : R2dbcDatabaseTestsBase() {
3131
}
3232
}
3333

34-
// MariaDB 'Transaction characteristics can't be changed while a transaction is in progress'
35-
@Disabled
3634
@Test
3735
fun testTransactionIsolationSetOnDatabaseConfig() = runTest {
3836
Assumptions.assumeTrue(transactionIsolationSupportDb.containsAll(TestDB.enabledDialects()))
3937

4038
val db = dialect.connect { defaultR2dbcIsolationLevel = IsolationLevel.READ_COMMITTED }
4139

42-
suspendTransaction {
40+
suspendTransaction(db = db) {
4341
// transaction manager should default to use DatabaseConfig level
4442
assertEquals(IsolationLevel.READ_COMMITTED, transactionManager.defaultIsolationLevel)
4543

@@ -74,6 +72,40 @@ class TransactionIsolationTest : R2dbcDatabaseTestsBase() {
7472
}
7573
}
7674

75+
private class CustomTestTransactionDefinition : TransactionDefinition {
76+
override fun <T : Any?> getAttribute(option: Option<T?>): T? {
77+
return when (option) {
78+
TransactionDefinition.ISOLATION_LEVEL -> IsolationLevel.REPEATABLE_READ as T
79+
else -> null
80+
}
81+
}
82+
}
83+
84+
@Test
85+
fun testTransactionIsolationSetByManualDefinition() = runTest {
86+
Assumptions.assumeTrue(transactionIsolationSupportDb.containsAll(TestDB.enabledDialects()))
87+
88+
val db = dialect.connect { defaultR2dbcIsolationLevel = IsolationLevel.READ_COMMITTED }
89+
90+
suspendTransaction(db = db) {
91+
// transaction manager should default to use DatabaseConfig level
92+
assertEquals(IsolationLevel.READ_COMMITTED, transactionManager.defaultIsolationLevel)
93+
94+
this.connection().setTransactionDefinition(CustomTestTransactionDefinition())
95+
96+
// database level should be set by the value in CustomTestTransactionDefinition
97+
assertTransactionIsolationLevel(dialect, IsolationLevel.REPEATABLE_READ)
98+
}
99+
100+
suspendTransaction(db = db) {
101+
// overrides any Exposed parameter setting & forces beginTransaction() to be called with no definition
102+
this.connection().setTransactionDefinition(null)
103+
104+
// database level should be set by the database's own defaults
105+
assertTransactionIsolationLevel(dialect, R2dbcDatabase.getDefaultIsolationLevel(db))
106+
}
107+
}
108+
77109
private suspend fun R2dbcTransaction.assertTransactionIsolationLevel(testDb: TestDB, expected: IsolationLevel) {
78110
val (sql, repeatable, committed) = when (testDb) {
79111
TestDB.POSTGRESQL -> Triple("SHOW TRANSACTION ISOLATION LEVEL", "repeatable read", "read committed")

exposed-r2dbc/api/exposed-r2dbc.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,7 @@ public final class org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl
745745
public fun setCatalog (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
746746
public fun setReadOnly (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
747747
public fun setSavepoint (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
748+
public fun setTransactionDefinition (Lio/r2dbc/spi/TransactionDefinition;)V
748749
public fun setTransactionIsolation (Lio/r2dbc/spi/IsolationLevel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
749750
}
750751

@@ -900,6 +901,7 @@ public abstract interface class org/jetbrains/exposed/v1/r2dbc/statements/api/R2
900901
public abstract fun setCatalog (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
901902
public abstract fun setReadOnly (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
902903
public abstract fun setSavepoint (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
904+
public abstract fun setTransactionDefinition (Lio/r2dbc/spi/TransactionDefinition;)V
903905
public abstract fun setTransactionIsolation (Lio/r2dbc/spi/IsolationLevel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
904906
}
905907

exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl.kt

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.r2dbc.spi.IsolationLevel
55
import io.r2dbc.spi.Row
66
import io.r2dbc.spi.RowMetadata
77
import io.r2dbc.spi.Statement
8+
import io.r2dbc.spi.TransactionDefinition
89
import io.r2dbc.spi.ValidationDepth
910
import kotlinx.coroutines.flow.flow
1011
import kotlinx.coroutines.flow.toList
@@ -29,8 +30,11 @@ import org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcExposedDatabaseMetadat
2930
import org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcSavepoint
3031
import org.jetbrains.exposed.v1.r2dbc.statements.api.getBoolean
3132
import org.jetbrains.exposed.v1.r2dbc.statements.api.getString
33+
import org.jetbrains.exposed.v1.r2dbc.transactions.R2dbcTransactionDefinition
3234
import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager
3335
import org.jetbrains.exposed.v1.r2dbc.vendors.metadata.MetadataProvider
36+
import org.jetbrains.exposed.v1.r2dbc.vendors.metadata.MySQLMetadata
37+
import org.jetbrains.exposed.v1.r2dbc.vendors.metadata.OracleMetadata
3438
import org.reactivestreams.Publisher
3539
import java.util.*
3640

@@ -83,6 +87,12 @@ class R2dbcConnectionImpl(
8387
withConnection { setTransactionIsolationLevel(value).awaitFirstOrNull() }
8488
}
8589

90+
private var transactionDefinition: TransactionDefinition? = null
91+
92+
override fun setTransactionDefinition(definition: TransactionDefinition?) {
93+
transactionDefinition = definition
94+
}
95+
8696
override suspend fun commit() {
8797
withConnection {
8898
// this has side effect of enabling auto-commit ON, which may cause unexpected rollback behavior
@@ -102,6 +112,7 @@ class R2dbcConnectionImpl(
102112
override suspend fun close() {
103113
withConnection { close().awaitFirstOrNull() }
104114
localConnection = null
115+
transactionDefinition = null
105116
}
106117

107118
override suspend fun prepareStatement(
@@ -217,10 +228,29 @@ class R2dbcConnectionImpl(
217228

218229
private suspend fun <T> withConnection(body: suspend Connection.() -> T): T {
219230
val acquiredConnection = localConnectionLock.withLock {
220-
localConnection ?: connection.awaitLast().also {
221-
// this starts an explicit transaction with autoCommit mode off
222-
it.beginTransaction().awaitFirstOrNull()
223-
localConnection = it
231+
localConnection ?: connection.awaitLast().also { cx ->
232+
// beginTransaction() starts an explicit transaction with autoCommit mode off
233+
transactionDefinition
234+
?.let { originalDefinition ->
235+
when (val definition = originalDefinition as? R2dbcTransactionDefinition) {
236+
is R2dbcTransactionDefinition if metadataProvider is OracleMetadata && definition.readOnly != null -> {
237+
// Oracle does not allow both isolation level + mutability to be set implicitly together;
238+
// instead it requires a specific order, with transaction isolation always set first.
239+
cx.beginTransaction(definition.toOracleDefinition()).awaitFirstOrNull()
240+
cx.executeSQL(metadataProvider.setReadOnlyMode(definition.readOnly))
241+
}
242+
is R2dbcTransactionDefinition if metadataProvider is MySQLMetadata && definition.isolationLevel != null -> {
243+
// MySQL/MariaDB driver would set level only on next-next transaction, not the 1 about to start
244+
cx.executeSQL(metadataProvider.setCurrentTransactionIsolation(definition.isolationLevel))
245+
cx.beginTransaction(definition).awaitFirstOrNull()
246+
}
247+
else -> cx.beginTransaction(originalDefinition).awaitFirstOrNull()
248+
}
249+
}
250+
?: cx.beginTransaction().awaitFirstOrNull()
251+
252+
localConnection = cx
253+
transactionDefinition = null
224254
}
225255
}
226256
return acquiredConnection.body()

exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.jetbrains.exposed.v1.r2dbc.statements.api
22

33
import io.r2dbc.spi.IsolationLevel
4+
import io.r2dbc.spi.TransactionDefinition
45
import org.jetbrains.exposed.v1.core.statements.api.ExposedSavepoint
56
import org.jetbrains.exposed.v1.r2dbc.statements.R2dbcPreparedStatementImpl
67

@@ -37,6 +38,15 @@ interface R2dbcExposedConnection<OriginalConnection : Any> {
3738
/** Sets the transaction isolation level of the connection. */
3839
suspend fun setTransactionIsolation(value: IsolationLevel)
3940

41+
/**
42+
* Sets specific transaction properties to be used together when starting a transaction explicitly.
43+
* Passing a `null` argument will force `beginTransaction()` to be invoked without any transaction definition.
44+
*
45+
* **Note:** This method should be called from within a transaction block at the start, before any operations
46+
* that trigger may trigger connection retrieval & usage.
47+
*/
48+
fun setTransactionDefinition(definition: TransactionDefinition?)
49+
4050
/** Saves all changes since the last commit or rollback operation. */
4151
suspend fun commit()
4252

0 commit comments

Comments
 (0)