Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ object MappedAccountAccessRequestProvider extends AccountAccessRequestProvider {
checkerComment: String
): Box[AccountAccessRequestTrait] = {
AccountAccessRequest.find(By(AccountAccessRequest.AccountAccessRequestId, accountAccessRequestId)).flatMap { request =>
tryo {
request
.Status(status)
.CheckerUserId(checkerUserId)
.CheckerComment(checkerComment)
.saveMe()
}
// Atomic guarded transition: an access request is actioned once, from INITIATED. The loser of a
// concurrent approve/decline gets 0 rows -> Failure, instead of silently overwriting the decision.
val rows = code.bankconnectors.DoobieBusinessStatusQueries.conditionalAccountAccessRequestStatus(
request.id.get, AccountAccessRequestStatus.INITIATED.toString, status, checkerUserId, checkerComment)
if (rows == 1) AccountAccessRequest.find(By(AccountAccessRequest.AccountAccessRequestId, accountAccessRequestId))
else net.liftweb.common.Failure("Account access request is no longer INITIATED; it was already actioned.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ object MappedAccountApplicationProvider extends AccountApplicationProvider {
match {
case Full(accountApplication) if(accountApplication.status == "ACCEPTED") =>
Failure(s"${ErrorMessages.AccountApplicationAlreadyAccepted} Current Account-Application-Id($accountApplicationId)")
case Full(accountApplication) => tryo{accountApplication.mStatus(status).saveMe()}
case Full(accountApplication) =>
// Optimistic CAS: transition only if the status hasn't changed since we loaded it. Two
// concurrent updates that both read the same status can no longer both write — the loser
// (0 rows) gets a Failure instead of silently overwriting the winner's decision.
val rows = code.bankconnectors.DoobieBusinessStatusQueries.conditionalAccountApplicationStatus(
accountApplication.id.get, accountApplication.status, status)
if (rows == 1) MappedAccountApplication.find(By(MappedAccountApplication.mAccountApplicationId, accountApplicationId))
else Failure(s"${ErrorMessages.AccountApplicationAlreadyAccepted} Status changed concurrently. Current Account-Application-Id($accountApplicationId)")
case Empty => Failure(s"${ErrorMessages.AccountApplicationNotFound} Current Account-Application-Id($accountApplicationId)")
case _ => Failure(ErrorMessages.UnknownError)
}
Expand Down
23 changes: 16 additions & 7 deletions obp-api/src/main/scala/code/actorsystem/ObpActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import com.typesafe.config.ConfigFactory
object ObpActorSystem extends MdcLoggable {

val props_hostname = Helper.getHostname
var obpActorSystem: ActorSystem = _
var northSideAkkaConnectorActorSystem: ActorSystem = _
// @volatile so the single assignment of each actor system is visible to all reader threads
// (the JVM memory model does not guarantee visibility of a non-volatile write across threads).
@volatile var obpActorSystem: ActorSystem = _
@volatile var northSideAkkaConnectorActorSystem: ActorSystem = _

def startLocalActorSystem() = localActorSystem

Expand All @@ -22,12 +24,19 @@ object ObpActorSystem extends MdcLoggable {
obpActorSystem = ActorSystem.create(s"ObpActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(localConf)))
obpActorSystem
}


// synchronized double-checked init so concurrent callers start the connector system exactly once.
def startNorthSideAkkaConnectorActorSystem(): ActorSystem = {
logger.info("Starting North Side Akka Connector actor system")
val localConf = AkkaConnectorActorConfig.localConf
logger.info(localConf)
northSideAkkaConnectorActorSystem = ActorSystem.create(s"SouthSideAkkaConnector_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(localConf)))
if (northSideAkkaConnectorActorSystem == null) {
synchronized {
if (northSideAkkaConnectorActorSystem == null) {
logger.info("Starting North Side Akka Connector actor system")
val localConf = AkkaConnectorActorConfig.localConf
logger.info(localConf)
northSideAkkaConnectorActorSystem = ActorSystem.create(s"SouthSideAkkaConnector_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(localConf)))
}
}
}
northSideAkkaConnectorActorSystem
}
}
16 changes: 11 additions & 5 deletions obp-api/src/main/scala/code/actorsystem/ObpLookupSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ object ObpLookupSystem extends ObpLookupSystem {
}

trait ObpLookupSystem extends MdcLoggable {
var obpLookupSystem: ActorSystem = null
// @volatile + synchronized double-checked init: without it two threads can both see null,
// both build an ActorSystem (resource leak), and a reader can observe a stale null.
@volatile var obpLookupSystem: ActorSystem = null
val props_hostname = Helper.getHostname

def init (): ActorSystem = {
if (obpLookupSystem == null ) {
val system = ActorSystem("ObpLookupSystem", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.lookupConf)))
logger.info(ObpActorConfig.lookupConf)
obpLookupSystem = system
if (obpLookupSystem == null) {
synchronized {
if (obpLookupSystem == null) {
val system = ActorSystem("ObpLookupSystem", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.lookupConf)))
logger.info(ObpActorConfig.lookupConf)
obpLookupSystem = system
}
}
}
obpLookupSystem
}
Expand Down
49 changes: 49 additions & 0 deletions obp-api/src/main/scala/code/api/cache/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,55 @@ object Redis extends MdcLoggable {
}
}

/**
* Atomic `SET key value EX ttlSeconds NX` (Jedis 2.9.0 five-arg overload). Sets the key with a TTL
* only if it does not already exist, in a single command. Returns true iff this call set the key.
*
* Use for first-write-wins caching (idempotency response cache) and lock acquisition: there is no
* window between "set value" and "set TTL" (unlike setnx + expire), so a crash can never leave a
* key without an expiry, and a second writer can never clobber the first.
*/
def setNxEx(key: String, value: String, ttlSeconds: Int): Boolean = {
var jedisConnection: Option[Jedis] = None
try {
jedisConnection = Some(jedisPool.getResource())
jedisConnection.get.set(key, value, "NX", "EX", ttlSeconds) == "OK"
} catch {
case e: Throwable => throw new RuntimeException(e)
} finally {
if (jedisConnection.isDefined && jedisConnection.get != null)
jedisConnection.foreach(_.close())
}
}

/**
* Atomic increment-with-create-TTL via a single Lua script: INCR the key, and on first creation
* (value == 1) set its expiry. Returns the post-increment counter value. Because INCR and EXPIRE
* run atomically server-side, concurrent callers cannot lose increments or race the TTL set, and an
* increment-then-compare rate-limit check cannot be bypassed by interleaving.
*/
def incrementWithTtl(key: String, ttlSeconds: Int): Long = {
val script =
"""local c = redis.call('INCR', KEYS[1])
|if c == 1 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end
|return c""".stripMargin
var jedisConnection: Option[Jedis] = None
try {
jedisConnection = Some(jedisPool.getResource())
val result = jedisConnection.get.eval(
script,
java.util.Collections.singletonList(key),
java.util.Collections.singletonList(ttlSeconds.toString)
)
result.asInstanceOf[java.lang.Long].longValue()
} catch {
case e: Throwable => throw new RuntimeException(e)
} finally {
if (jedisConnection.isDefined && jedisConnection.get != null)
jedisConnection.foreach(_.close())
}
}

/**
* Delete all Redis keys matching a pattern using KEYS command
* @param pattern Redis key pattern (e.g., "rl_active_CONSUMER123_*")
Expand Down
4 changes: 3 additions & 1 deletion obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4407,7 +4407,9 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
* value: dependent endpoint information list.
* this is used by MessageDoc
*/
val connectorToEndpoint = mutable.Map[String, List[EndpointInfo]]()
// Thread-safe concurrent map: populated during startup scanning and read on the resource-docs
// path. TrieMap keeps the mutable.Map API (getOrElse/put) while being safe under concurrent access.
val connectorToEndpoint = scala.collection.concurrent.TrieMap[String, List[EndpointInfo]]()

private def addEndpointInfos(connectorMethods: List[String], partialFunctionName: String, apiVersion: ScannedApiVersion) = {
val endpointInfo = EndpointInfo(partialFunctionName, apiVersion.fullyQualifiedVersion)
Expand Down
27 changes: 11 additions & 16 deletions obp-api/src/main/scala/code/api/util/RateLimitingUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,22 +244,17 @@ object RateLimitingUtil extends MdcLoggable {
* No gates, no key formatting — call sites pass the final key and supply their own enable flags.
* Returns (ttl_seconds, current_count); (-1, -1) when Redis is unreachable. */
private[util] def incrementCounter(key: String, period: LimitCallPeriod): (Long, Long) = {
val ttlOpt = Redis.use(JedisMethod.TTL, key).map(_.toInt)
ttlOpt match {
case Some(-2) => // Key does not exist, create it
val seconds = RateLimitingPeriod.toSeconds(period).toInt
Redis.use(JedisMethod.SET, key, Some(seconds), Some("1"))
(seconds, 1)
case Some(ttl) if ttl > 0 => // Key exists with TTL, increment it
val cnt = Redis.use(JedisMethod.INCR, key).map(_.toInt).getOrElse(1)
(ttl, cnt)
case Some(ttl) if ttl <= 0 => // Key expired or has no expiry (shouldn't happen)
logger.warn(s"Unexpected TTL state ($ttl) for key $key, period $period - recreating counter")
val seconds = RateLimitingPeriod.toSeconds(period).toInt
Redis.use(JedisMethod.SET, key, Some(seconds), Some("1"))
(seconds, 1)
case None => // Redis unavailable
logger.error(s"Redis unavailable when incrementing counter for key $key, period $period")
val seconds = RateLimitingPeriod.toSeconds(period).toInt
try {
// Atomic INCR + create-TTL in one Lua call. Replaces the former TTL-read-then-SET/INCR sequence,
// which could lose increments and race the expiry under concurrency. On first increment (cnt==1)
// the key gets its TTL atomically.
val cnt = Redis.incrementWithTtl(key, seconds)
val ttl = Redis.use(JedisMethod.TTL, key).map(_.toLong).getOrElse(seconds.toLong)
(ttl, cnt)
} catch {
case e: Throwable =>
logger.error(s"Redis unavailable when incrementing counter for key $key, period $period", e)
(-1, -1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,23 +294,21 @@ object IdempotencyMiddleware extends MdcLoggable {
Option(j.get(key)).flatMap(envelopeFromJson)
}

private def writeResponseKey(key: String, env: Envelope): Unit =
withJedis { j =>
j.setex(key, ResponseTtlSeconds, envelopeToJson(env))
()
}
// First-write-wins via atomic SET NX EX: once a response is cached for an idempotency key it is
// immutable for its TTL, so a second concurrent response cannot clobber the first (which a replay
// would then return). Plain setex overwrites unconditionally.
private def writeResponseKey(key: String, env: Envelope): Unit = {
Redis.setNxEx(key, envelopeToJson(env), ResponseTtlSeconds)
()
}

/**
* SETNX + EXPIRE. The brief window between the two has the lock without a
* TTL, but the key value is only ever a sentinel and the next overwrite (or
* crash recovery via the 60s TTL once expire lands) resolves it.
* Atomic SET NX EX: acquire the lock and set its TTL in one command. Unlike setnx + a separate
* expire, there is no window in which the key exists without a TTL, so a crash mid-acquire can
* never orphan the lock and permanently block retries of that idempotency key.
*/
private def tryAcquireLock(key: String): Boolean =
withJedis { j =>
val acquired = j.setnx(key, "1") == 1L
if (acquired) j.expire(key, LockTtlSeconds)
acquired
}
Redis.setNxEx(key, "1", LockTtlSeconds)

private def deleteKey(key: String): Unit =
withJedis { j =>
Expand Down
11 changes: 9 additions & 2 deletions obp-api/src/main/scala/code/api/v3_1_0/Http4s310.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4434,8 +4434,15 @@ object Http4s310 {
APIUtil.ConsumerIdPair(grantorConsumerId, granteeConsumerId))
_ <- if (shouldSkipConsentSca) {
Future {
MappedConsent.find(By(MappedConsent.mConsentId, createdConsent.consentId))
.map(_.mStatus(ConsentStatus.ACCEPTED.toString).saveMe()).head
// Atomic guarded auto-accept: only move INITIATED -> ACCEPTED. If the consent was
// concurrently revoked, the conditional UPDATE is a 0-row no-op and the revoke stands,
// instead of the skip-SCA write blindly resurrecting it to ACCEPTED.
// The consent was just created above, so find must succeed; fail visibly (rather than
// silently leaving it INITIATED) if it has vanished — that is an internal inconsistency.
val consent = MappedConsent.find(By(MappedConsent.mConsentId, createdConsent.consentId))
.openOrThrowException(s"Consent ${createdConsent.consentId} not found immediately after creation")
code.bankconnectors.DoobieConsentStatusQueries
.conditionalStatusTransition(consent.id.get, ConsentStatus.INITIATED.toString, ConsentStatus.ACCEPTED.toString)
}
} else {
val challengeText = s"Your consent challenge : $challengeAnswer, Application: $applicationText"
Expand Down
8 changes: 8 additions & 0 deletions obp-api/src/main/scala/code/api/v5_1_0/Http4s510.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3380,6 +3380,14 @@ object Http4s510 {
postedData <- NewStyle.function.tryons(s"$InvalidJsonFormat The Json body should be the $PostTransactionRequestStatusJsonV510", 400, Some(cc)) {
com.openbankproject.commons.util.JsonAliases.parse(cc.httpBody.getOrElse("")).extract[PostTransactionRequestStatusJsonV510]
}
// Lock the transaction-request row for the duration of this request transaction (the
// FOR UPDATE lock runs on the request connection via RequestScopeConnection, so it is held
// through the read + status write below). Without it this management update races the
// challenge-answer path (Http4s400, which already locks) and can overwrite a COMPLETED
// payment with a stale status.
_ <- code.util.Helper.booleanToFuture("Failed to acquire transaction request lock", cc = Some(cc)) {
code.bankconnectors.DoobieTransactionRequestQueries.lockTransactionRequest(requestId.value).isDefined
}
(existing, _) <- NewStyle.function.getTransactionRequestImpl(requestId, Some(cc))
_ <- NewStyle.function.hasAtLeastOneEntitlement(existing.from.bank_id, user.userId,
canUpdateTransactionRequestStatusAtOneBank :: canUpdateTransactionRequestStatusAtAnyBank :: Nil, Some(cc))
Expand Down
25 changes: 16 additions & 9 deletions obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3294,9 +3294,23 @@ object Http4s700 {
_ <- NewStyle.function.checkAuthorisationToCreateTransactionRequest(
view.viewId, BankIdAccountId(fromAccount.bankId, fromAccount.accountId), user, callCtx
)
// 3. Create the parent BULK TR row. toAccount = self (envelope only;
// the real destinations live in the per-payment side-table).
trId = APIUtil.generateUUID()
// 3. Claim the batch_reference for idempotency BEFORE creating the parent TR or
// fanning out any payment. The UniqueIndex(FromBankId, FromAccountId, BatchReference)
// makes this the single atomic point of idempotency: two concurrent submissions both
// pass the earlier isBatchReferenceUsed check, but only one wins the INSERT here. The
// loser's Box is a Failure — we must surface it (409) so it aborts before any payment,
// rather than dropping it and double-charging.
_ <- Future {
unboxFullOrFail(
BulkPayments.bulkPayment.vend.claimBatchReference(
fromAccount.bankId.value, fromAccount.accountId.value, body.batch_reference, trId
),
callCtx, BulkBatchReferenceAlreadyUsed, 409
)
}
// 4. Create the parent BULK TR row. toAccount = self (envelope only;
// the real destinations live in the per-payment side-table).
detailsPlain = prettyRender(Extraction.decompose(body))
parentTrBox = MappedTransactionRequestProvider.createTransactionRequestImpl210(
com.openbankproject.commons.model.TransactionRequestId(trId),
Expand All @@ -3317,13 +3331,6 @@ object Http4s700 {
_ <- Future {
unboxFullOrFail(parentTrBox, callCtx, BulkPaymentTransactionRequestError, 500)
}
// 4. Claim the batch_reference for idempotency. After this, a
// second submission with the same batch_reference fails fast.
_ <- Future {
BulkPayments.bulkPayment.vend.claimBatchReference(
fromAccount.bankId.value, fromAccount.accountId.value, body.batch_reference, trId
)
}
// 5. Fan-out — sequential per-payment execution. Returns one row
// per input item (SUCCEEDED / FAILED + reason).
itemRows <- BulkPaymentHandler.executeAllItems(body, fromAccount, trId, chargePolicy, callCtx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package code.bankconnectors

import code.api.util.DoobieUtil
import doobie._
import doobie.implicits._

/**
* Atomic, guarded status transitions for business state-machine rows whose Lift updateStatus
* methods were check-then-write (load row, compare status in memory, saveMe). Each method is a
* single conditional UPDATE with a status guard, returning the affected-row count so the caller
* can tell whether it won the transition (1) or lost it to a concurrent request (0).
*/
object DoobieBusinessStatusQueries {

/** AccountAccessRequest: transition only from the guard status. Table has explicit dbTableName. */
def conditionalAccountAccessRequestStatus(
rowId: Long,
guardStatus: String,
newStatus: String,
checkerUserId: String,
checkerComment: String
): Int = DoobieUtil.runUpdate(
sql"""UPDATE AccountAccessRequest
SET status = $newStatus,
checkeruserid = $checkerUserId,
checkercomment = $checkerComment
WHERE id = $rowId
AND status = $guardStatus""".update.run
)

/** MappedAccountApplication: transition only from the guard status (a one-shot decision). */
def conditionalAccountApplicationStatus(rowId: Long, guardStatus: String, newStatus: String): Int =
DoobieUtil.runUpdate(
sql"""UPDATE mappedaccountapplication
SET mstatus = $newStatus
WHERE id = $rowId
AND mstatus = $guardStatus""".update.run
)

/** ExpectedChallengeAnswer: compare-and-set the success flag so only one correct answer wins.
* Booleans are bound as typed JDBC parameters (not SQL literals) so the driver maps them to the
* column type correctly across H2 and Postgres. */
def conditionalChallengeSuccess(challengeId: String, finalisedScaStatus: String): Int =
DoobieUtil.runUpdate(
// NB: Lift MappedBoolean maps the `Successful` field to column `successful_c` (it appends _c).
sql"""UPDATE ExpectedChallengeAnswer
SET successful_c = ${true},
scastatus = $finalisedScaStatus
WHERE challengeid = $challengeId
AND successful_c = ${false}""".update.run
)
}
Loading
Loading