Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
89e9753
test: add concurrency hazard test suite
hongwei1 Jun 11, 2026
9209722
docs: add concurrency test suite summary
hongwei1 Jun 11, 2026
cd8adee
docs: merge concurrency hazard docs into single CONCURRENCY_HAZARDS.md
hongwei1 Jun 11, 2026
5fdc6b1
docs: add ScalaTest simulation plan for concurrent race condition haz…
hongwei1 Jun 11, 2026
b501028
Merge branch 'develop' of https://github.com/OpenBankProject/OBP-API …
hongwei1 Jun 18, 2026
8bc031a
refactor: migrate JSON serialization imports from net.liftweb to org.…
hongwei1 Jun 18, 2026
5de7ff7
feat: implement atomic bank account balance updates using Doobie row …
hongwei1 Jun 18, 2026
26941ab
docs: Update outdated V6/V7 comments in RequestScopeConnection to ref…
hongwei1 Jun 21, 2026
ed1242b
fix(transaction): prevent Doobie escaping and MFA double-spend race c…
hongwei1 Jun 22, 2026
9c3c893
fix(security): prevent authentication counter lost-updates via row-le…
hongwei1 Jun 22, 2026
acdcda6
fix(consent): prevent scheduler stale-save from resurrecting revoked …
hongwei1 Jun 22, 2026
a64d931
fix(concurrency): prevent duplicate-creation races in six getOrCreate…
hongwei1 Jun 22, 2026
5a04b20
fix(concurrency): prevent view-permission races and orphaned AccountA…
hongwei1 Jun 22, 2026
92dc1d8
fix(concurrency): make incrementFutureCounter and decrementFutureCoun…
hongwei1 Jun 22, 2026
ff19daf
docs(concurrency): update CONCURRENCY_HAZARDS.md to reflect all 19 sc…
hongwei1 Jun 22, 2026
5509e50
fix(concurrency): guard getOrCreateSystemView and migrateViewPermissi…
hongwei1 Jun 22, 2026
7e68cc1
fix(concurrency): address five post-review correctness findings
hongwei1 Jun 22, 2026
8144e64
fix(concurrency): guard getOrCreate in mapping providers and bad-logi…
hongwei1 Jun 22, 2026
3bde4fd
fix(concurrency): correct type mismatch and missing import in mapping…
hongwei1 Jun 23, 2026
1141c0d
refactor: clarify variable names and add comments in future counters
hongwei1 Jun 23, 2026
26aec2c
refactor(views): retire ViewDefinition boolean permission columns
hongwei1 Jun 23, 2026
5a444c3
feat(migration): auto-dedup before Schemifier creates UniqueIndexes
hongwei1 Jun 23, 2026
c2fc5e6
refactor(views): remove retired canXxx boolean columns from ViewDefin…
hongwei1 Jun 23, 2026
05b11f4
docs(doobie): update stale Lift-era terminology in DoobieTransactor
hongwei1 Jun 24, 2026
1ec75ed
docs(concurrency): consolidate hazard docs into a single reference
hongwei1 Jun 24, 2026
e9bb033
Merge remote-tracking branch 'OBP/develop' into feature/concurrency-h…
hongwei1 Jun 24, 2026
ba035c4
docs(migration): explain why pre-Schemifier dedup bypasses the migrat…
hongwei1 Jun 24, 2026
7e466f3
chore(migration): remove obsolete manual SQL migration scripts
hongwei1 Jun 24, 2026
d15e3ff
docs(migration): correct the stale "before Schemifier" wording in the…
hongwei1 Jun 24, 2026
a92da66
fix(v6 docs): escape XML-unsafe angle-bracket placeholders in resourc…
hongwei1 Jun 24, 2026
b8c0790
refactor(migration): make pre-Schemifier dedup DELETE portable across…
hongwei1 Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,33 @@ class Boot extends MdcLoggable {
*/
MapperRules.createForeignKeys_? = (_) => APIUtil.getPropsAsBoolValue("mapper_rules.create_foreign_keys", false)

// Pre-Schemifier dedup: drop natural-key duplicate rows in mapperaccountholder /
// mappedentitlement BEFORE schemifyAll() issues their CREATE UNIQUE INDEX (declared in
// MapperAccountHolders / MappedEntitlement dbIndexes). On a long-lived DB that still holds
// duplicates the index DDL would otherwise abort boot.
//
// This MUST stay here and must NOT be moved into Migration.database.executeScripts:
// - both executeScripts passes below run AFTER schemifyAll() (the index is already created
// by then — the "executed before Schemifier" comment on the true-pass is historical), and
// - executeScripts is gated by migration_scripts.* props (off in tests), whereas Schemifier —
// and therefore this dedup — must run ungated in every environment, incl. the H2 test DB.
// The method self-guards (skips when the table is absent or has no duplicates), so running it
// on every boot is a cheap no-op on fresh/clean/test databases.
Migration.database.deduplicateBeforeUniqueIndexSchemify()
schemifyAll()

logger.info("Mapper database info: " + Migration.DbFunction.mapperDatabaseInfo)

// NOTE: both executeScripts passes below run AFTER schemifyAll() above. The
// `startedBeforeSchemifier = true` argument does NOT mean this pass runs before Schemifier — it
// marks the existing-DB pass, in which migrations that require post-Schemifier schema skip
// themselves (see Migration.executeScripts). The "before Schemifier" wording is historical: the
// call once sat before schemifyAll() but was moved ahead of it in 2021 (commit ea4537029).
DbFunction.tableExists(ResourceUser) match {
case true => // DB already exist
// Migration Scripts are used to update the model of OBP-API DB to a latest version.
// Please note that migration scripts are executed before Lift Mapper Schemifier
case true => // DB already exists
Migration.database.executeScripts(startedBeforeSchemifier = true)
logger.info("The Mapper database already exits. The scripts are executed BEFORE Lift Mapper Schemifier.")
case false => // DB is still not created. The scripts will be executed after Lift Mapper Schemifier
logger.info("The Mapper database already exists. Running the existing-DB migration pass (post-Schemifier; migrations needing fresh schema skip themselves).")
case false => // Fresh DB — its migrations run in the catch-all pass below (after Schemifier)
logger.info("The Mapper database is still not created. The scripts are going to be executed AFTER Lift Mapper Schemifier.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.openbankproject.commons.model.{AccountId, BankId, BankIdAccountId, Us
import net.liftweb.common._
import net.liftweb.mapper._
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo


/**
Expand All @@ -31,7 +32,7 @@ object MapperAccountHolders extends MapperAccountHolders with AccountHolders wit

// NOTE: !!! Uses a DIFFERENT TABLE NAME PREFIX TO ALL OTHERS i.e. MAPPER not MAPPED !!!!!

override def dbIndexes = Index(accountBankPermalink, accountPermalink) :: Nil
override def dbIndexes = UniqueIndex(user, accountBankPermalink, accountPermalink) :: Nil

//Note, this method, will not check the existing of bankAccount, any value of BankIdAccountId
//Can create the MapperAccountHolders.
Expand All @@ -51,16 +52,25 @@ object MapperAccountHolders extends MapperAccountHolders with AccountHolders wit
mapperAccountHolder
}
case Empty => {
val holder: MapperAccountHolders = MapperAccountHolders.create
.accountBankPermalink(bankIdAccountId.bankId.value)
.accountPermalink(bankIdAccountId.accountId.value)
.user(user.userPrimaryKey.value)
.source(source.getOrElse(null))
.saveMe
logger.debug(
s"getOrCreateAccountHolder--> create account holder: $holder"
)
Full(holder)
tryo {
MapperAccountHolders.create
.accountBankPermalink(bankIdAccountId.bankId.value)
.accountPermalink(bankIdAccountId.accountId.value)
.user(user.userPrimaryKey.value)
.source(source.getOrElse(null))
.saveMe
} match {
case Full(holder) =>
logger.debug(s"getOrCreateAccountHolder--> create account holder: $holder")
Full(holder)
case Failure(_, _, _) =>
MapperAccountHolders.find(
By(MapperAccountHolders.user, user.userPrimaryKey.value),
By(MapperAccountHolders.accountBankPermalink, bankIdAccountId.bankId.value),
By(MapperAccountHolders.accountPermalink, bankIdAccountId.accountId.value)
)
case other => other
}
}
case Failure(msg, t, c) => Failure(msg, t, c)
case ParamFailure(x,y,z,q) => ParamFailure(x,y,z,q)
Expand Down
27 changes: 20 additions & 7 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4834,20 +4834,33 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
}

def incrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenFuturesCounter) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))
serviceNameCountersMap.put(serviceName,(serviceNameCounter + 1,serviceNameOpenFuturesCounter+1))
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))

// Atomically increment both the total-call counter and the open-futures counter for this
// service. ConcurrentHashMap.compute holds the segment lock for the entire lambda, so the
// read-modify-write is a single atomic step — no lost updates under concurrent callers.
// totalCallCount : ever-increasing; used by canOpenFuture for back-off modulo arithmetic.
// openFuturesCount: tracks how many futures are currently in-flight for this service.
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) =
serviceNameCountersMap.compute(serviceName, (_, old) => {
val (totalCallCount, openFuturesCount) = if (old == null) (0, 0) else old
(totalCallCount + 1, openFuturesCount + 1)
})

if(serviceNameOpenFuturesCounterLatest>=expectedOpenFuturesPerService) {
logger.warn(s"WARNING! incrementFutureCounter says: serviceName is $serviceName, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}, which is over expectedOpenFuturesPerService($expectedOpenFuturesPerService)")
}
logger.debug(s"For your information: incrementFutureCounter says: serviceName is $serviceName, serviceNameCounterLatest is ${serviceNameCounterLatest}, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}")
}

def decrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenFuturesCounter) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
serviceNameCountersMap.put(serviceName, (serviceNameCounter, serviceNameOpenFuturesCounter - 1))
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
// Atomically decrement only the open-futures counter; totalCallCount is left unchanged
// because it reflects the cumulative number of calls ever started, not the current load.
// The null-guard initialises to (0, 1) and subtracts 1 → (0, 0), which is the safe
// no-op fallback if decrement is somehow called before any increment.
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) =
serviceNameCountersMap.compute(serviceName, (_, old) => {
val (totalCallCount, openFuturesCount) = if (old == null) (0, 1) else old
(totalCallCount, openFuturesCount - 1)
})
logger.debug(s"decrementFutureCounter says: serviceName is $serviceName, serviceNameCounterLatest is $serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}")
}

Expand Down
115 changes: 75 additions & 40 deletions obp-api/src/main/scala/code/api/util/DoobieTransactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ import scala.concurrent.{ExecutionContext, Future}
*
* Provides a type-safe, functional JDBC layer for raw SQL queries.
* This handles all JDBC types correctly, including SQL Server's NVARCHAR (type -9)
* which Lift's DB.runQuery doesn't handle.
* which Lift Mapper's DB.runQuery doesn't handle.
*
* TRANSACTION UNIFICATION:
* When called within a Lift HTTP request context, Doobie uses the SAME Connection
* that Lift is holding for the current request transaction (via Transactor.fromConnection).
* This means Doobie queries participate in Lift's transaction boundary:
* When called within an http4s request scope, Doobie uses the SAME Connection that
* RequestScopeConnection holds for the current request transaction (via
* Transactor.fromConnection). This means Doobie queries participate in the request
* transaction boundary:
* - Same connection, same transaction, same commit/rollback
* - Doobie can see uncommitted Lift writes (same session)
* - If Lift rolls back, Doobie's operations are also rolled back
* - Doobie can see uncommitted writes made earlier in the same request (same session)
* - If the request transaction rolls back, Doobie's operations are also rolled back
*
* When called outside a Lift request context (e.g., background tasks, schedulers),
* falls back to Lift's shared HikariCP connection pool via Transactor.fromDataSource.
* When called outside an http4s request scope (e.g., background tasks, schedulers),
* falls back to the shared HikariCP connection pool via Transactor.fromDataSource.
*
* Benefits over DBUtil.runQuery:
* - Type-safe query results via case classes
Expand Down Expand Up @@ -56,77 +57,85 @@ import scala.concurrent.{ExecutionContext, Future}
object DoobieUtil extends MdcLoggable {

/**
* Fallback transactor that shares Lift's HikariCP connection pool.
* Used when no Lift request context is available (background tasks, schedulers).
* Fallback transactor that shares the application HikariCP connection pool.
* Used when no http4s request scope is available (background tasks, schedulers).
* Strategy.void: Doobie will not call setAutoCommit/commit/rollback.
*/
private lazy val fallbackTransactor: Transactor[IO] = {
val liftDataSource = APIUtil.vendor.HikariDatasource.ds
logger.info("DoobieUtil: Initialized fallback transactor sharing Lift's HikariCP pool")
val sharedDataSource = APIUtil.vendor.HikariDatasource.ds
logger.info("DoobieUtil: Initialized fallback transactor sharing the application HikariCP pool")
val xa = Transactor.fromDataSource[IO].apply(
liftDataSource,
sharedDataSource,
ExecutionContext.global
)
xa.copy(strategy0 = Strategy.void)
}

/**
* Create a transactor that wraps an existing JDBC Connection.
* Strategy.void ensures Doobie does not interfere with Lift's transaction management.
* Strategy.void ensures Doobie does not interfere with the request-scoped transaction
* management owned by RequestScopeConnection.withBusinessDBTransaction.
*/
private def transactorFromConnection(conn: java.sql.Connection): Transactor[IO] = {
val xa = Transactor.fromConnection[IO].apply(conn, None)
xa.copy(strategy0 = Strategy.void)
}

/**
* Try to get the current Lift request's Connection.
* Uses DB.currentConnection which peeks at the DynoVar without
* triggering reference counting or creating a new connection.
* Returns Some(connection) if inside a Lift HTTP request context,
* None otherwise (background tasks, schedulers, tests without request context).
* Try to get the current request's Connection.
*
* Primary path is the http4s RequestScopeConnection proxy (set per request via TTL).
* As a secondary fallback it reads Lift Mapper's DB.currentConnection — this only
* resolves when the call happens to run inside an open Mapper DB.use scope (the proxy
* is also on Mapper's connection stack there); it peeks at the DynaVar without triggering
* reference counting or creating a new connection.
*
* Returns Some(connection) when a request-scoped connection is available, None otherwise
* (background tasks, schedulers, tests without a request scope).
*/
private def liftCurrentConnection: Option[java.sql.Connection] = {
// DB.currentConnection returns Box[SuperConnection]
// SuperConnection has implicit conversion to java.sql.Connection
DB.currentConnection match {
case Full(superConn) =>
val conn: java.sql.Connection = superConn.connection
if (!conn.isClosed) Some(conn) else None
case _ => None
private def currentRequestConnection: Option[java.sql.Connection] = {
// 1. Primary: the http4s RequestScopeConnection proxy from Alibaba TTL
Option(code.api.util.http4s.RequestScopeConnection.currentProxy.get()).orElse {
// 2. Fallback: Lift Mapper's DB.currentConnection (only Full inside an open DB.use scope)
DB.currentConnection match {
case Full(superConn) =>
val conn: java.sql.Connection = superConn.connection
if (!conn.isClosed) Some(conn) else None
case _ => None
}
}
}

/**
* Run a Doobie query synchronously, sharing Lift's transaction when available.
* Run a Doobie query synchronously, sharing the request-scoped transaction when available.
*
* When called within a Lift HTTP request context:
* - Uses the SAME Connection that Lift holds for the current request
* - Doobie query participates in Lift's transaction (same commit/rollback)
* - Can see uncommitted Lift writes (same database session)
* When called within an http4s request scope:
* - Uses the SAME Connection that RequestScopeConnection holds for the current request
* - Doobie query participates in the request transaction (same commit/rollback)
* - Can see uncommitted writes made earlier in the same request (same database session)
*
* When called outside a Lift request context (background tasks, schedulers):
* - Falls back to Lift's shared HikariCP pool (separate connection)
* When called outside an http4s request scope (background tasks, schedulers):
* - Falls back to the shared HikariCP pool (separate connection)
*
* @param query The Doobie ConnectionIO query to execute
* @return The query result
*/
def runQuery[A](query: ConnectionIO[A]): A = {
liftCurrentConnection match {
currentRequestConnection match {
case Some(conn) =>
// Inside Lift request: use the same connection for transaction unification
// Inside a request scope: use the same connection for transaction unification
query.transact(transactorFromConnection(conn)).unsafeRunSync()
case None =>
// Outside Lift request: fallback to shared pool
logger.debug("DoobieUtil.runQuery: No Lift request context, using fallback pool transactor")
// Outside a request scope: fallback to shared pool
logger.debug("DoobieUtil.runQuery: No request scope, using fallback pool transactor")
query.transact(fallbackTransactor).unsafeRunSync()
}
}

/**
* Run a Doobie query asynchronously, returning a Future.
* Note: async queries always use the fallback pool transactor because
* Lift's request connection may not be available on a different thread.
* the request connection may not be available on a different thread.
*
* @param query The Doobie ConnectionIO query to execute
* @param ec ExecutionContext for the Future
Expand All @@ -139,7 +148,7 @@ object DoobieUtil extends MdcLoggable {
/**
* Run a Doobie query and return an IO.
* Note: IO queries always use the fallback pool transactor because
* the IO may be evaluated outside the Lift request context.
* the IO may be evaluated outside the http4s request scope.
*
* @param query The Doobie ConnectionIO query to execute
* @return IO containing the query result
Expand All @@ -148,6 +157,32 @@ object DoobieUtil extends MdcLoggable {
query.transact(fallbackTransactor)
}

/**
* Fallback transactor that commits. Used for updates outside an http4s request scope
* (background tasks, schedulers).
*/
private lazy val fallbackUpdateTransactor: Transactor[IO] = {
val sharedDataSource = APIUtil.vendor.HikariDatasource.ds
Transactor.fromDataSource[IO].apply(
sharedDataSource,
ExecutionContext.global
) // Strategy.default includes commit/rollback
}

/**
* Run a Doobie update synchronously, sharing the request-scoped transaction when available.
* Outside an http4s request scope, uses a transactor that COMMITs the connection.
*/
def runUpdate[A](query: ConnectionIO[A]): A = {
currentRequestConnection match {
case Some(conn) =>
query.transact(transactorFromConnection(conn)).unsafeRunSync()
case None =>
logger.debug("DoobieUtil.runUpdate: No request scope, using fallback update transactor")
query.transact(fallbackUpdateTransactor).unsafeRunSync()
}
}

/**
* Check if the database is SQL Server (for syntax differences like TOP vs LIMIT)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.sql.Connection
import scala.concurrent.Future

/**
* Request-scoped transaction support for v7 http4s endpoints.
* Request-scoped transaction support for Http4s native endpoints.
*
* PROBLEM: Lift Mapper uses a plain ThreadLocal for connection tracking, while
* cats-effect IO switches compute threads across flatMap / IO.fromFuture boundaries.
Expand Down Expand Up @@ -63,11 +63,11 @@ import scala.concurrent.Future
* METRIC WRITES: recordMetric runs in IO.blocking (blocking pool, no TTL from compute
* thread). currentProxy.get() returns null there, so RequestAwareConnectionManager
* falls back to the pool — metric writes use a separate connection and commit
* independently, matching v6 behaviour.
* independently, matching traditional Lift behaviour.
*
* NON-V7 PATHS (v6 via bridge, background tasks): requestProxyLocal is not set,
* currentProxy is null — RequestAwareConnectionManager delegates to APIUtil.vendor
* as before. DB.buildLoanWrapper (v6) continues to manage its own transaction.
* BACKGROUND TASKS / NON-HTTP PATHS: requestProxyLocal is not set, currentProxy is null
* — RequestAwareConnectionManager delegates to APIUtil.vendor. Any Lift Mapper operations
* outside of a Http4s request scope will auto-commit unless wrapped in a Lift LoanWrapper.
*/
object RequestScopeConnection extends MdcLoggable {

Expand Down Expand Up @@ -186,7 +186,7 @@ object RequestScopeConnection extends MdcLoggable {
* If no DB call was made: nothing to commit or close (pool unaffected).
*
* GET/HEAD must NOT be wrapped (they run on auto-commit vendor connections). Used by
* ResourceDocMiddleware (v6/v7) and by services that build their own request scope
* ResourceDocMiddleware and by services that build their own request scope
* without the middleware (e.g. Http4sDynamicEntity).
*/
def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] =
Expand Down Expand Up @@ -250,8 +250,8 @@ object RequestScopeConnection extends MdcLoggable {
* DB.defineConnectionManager(..., new RequestAwareConnectionManager(APIUtil.vendor))
*
* Used by:
* - v7 native endpoints (gets proxy from TTL, set right before Future submission)
* - v6 via bridge / background tasks (TTL is null → delegates to vendor as before)
* - Http4s native endpoints (gets proxy from TTL, set right before Future submission)
* - Background tasks / Non-HTTP paths (TTL is null → delegates to vendor as before)
*/
class RequestAwareConnectionManager(delegate: ConnectionManager) extends ConnectionManager with MdcLoggable {

Expand Down
Loading
Loading