From 9f10a219db79c8f5bda2a8f6d1fbab40ba58e192 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 04:48:07 +0300 Subject: [PATCH] chore: collapse duplicated helpers in recovery-aware pipeline + retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three call sites independently re-implemented the same interrupt-aware failure wrapper — wrap a throwable in ResponseOutcome.Failure, restoring the thread interrupt flag first when it is an InterruptedException. Hoist it to a single module-internal failureOf() next to ResponseOutcome (the type it constructs), and route ExecutionPipeline, ResponsePipeline, and RetryStep.executeOnce through it. RetryStep.executeOnce folds its two catch arms into one as a result. RetryStep.attempt() is deliberately left untouched: it rethrows the wrapped throwable to its caller, so an InterruptedException propagates as itself and carries the cancellation directly — pre-restoring the flag there is unnecessary. failureOf() exists for the opposite path, where the failure is buried in an outcome that flows on through the pipeline and may be swallowed or recovered, so the flag must be restored to preserve cancellation. Also collapse three more local duplications in the retry package: - RetrySettings' totalTimeout/initialDelay/maxDelay setters shared the same non-negative + nanosecond-representability checks; extract a requireRepresentable(name, value) helper that names the offending field. Rejection messages are preserved verbatim. - RetryAfterParser's retry-after-ms and x-ms-retry-after-ms branches were verbatim copies; iterate the two header names instead. Array order keeps retry-after-ms ahead of x-ms-retry-after-ms, preserving precedence. - resolveScheduler() collapses to an elvis expression. All changes are behavior-preserving. failureOf and requireRepresentable are internal/private, so the public API surface is unchanged. --- .../sdk/core/pipeline/ExecutionPipeline.kt | 11 ------- .../sdk/core/pipeline/ResponseOutcome.kt | 14 +++++++++ .../sdk/core/pipeline/ResponsePipeline.kt | 15 ++------- .../pipeline/step/retry/RetryAfterParser.kt | 9 +++--- .../core/pipeline/step/retry/RetrySettings.kt | 31 ++++++++++++------- .../sdk/core/pipeline/step/retry/RetryStep.kt | 11 ++----- 6 files changed, 42 insertions(+), 49 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt index 482786c7..91bea356 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt @@ -109,15 +109,4 @@ public class ExecutionPipeline failureOf(t) } } - - /** - * Wraps [t] in a [ResponseOutcome.Failure]. [InterruptedException] preserves the interrupt - * flag on the current thread per the SDK's cancellation contract. - */ - private fun failureOf(t: Throwable): ResponseOutcome.Failure { - if (t is InterruptedException) { - Thread.currentThread().interrupt() - } - return ResponseOutcome.Failure(t) - } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt index db9829f3..2a140ebd 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt @@ -88,3 +88,17 @@ public sealed class ResponseOutcome { is Failure -> onFailure(error) } } + +/** + * Wraps [t] in a [ResponseOutcome.Failure]. When [t] is an [InterruptedException] the interrupt + * flag is restored on the current thread before wrapping, honouring the SDK's cancellation + * contract so a thread blocked on the surfaced outcome still observes the cancellation. Shared by + * [ExecutionPipeline], [ResponsePipeline], and [org.dexpace.sdk.core.pipeline.step.retry.RetryStep] + * so the interrupt-aware wrapper has exactly one definition. + */ +internal fun failureOf(t: Throwable): ResponseOutcome.Failure { + if (t is InterruptedException) { + Thread.currentThread().interrupt() + } + return ResponseOutcome.Failure(t) +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt index 6ebafd0e..c3b6b00b 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt @@ -112,7 +112,7 @@ public class ResponsePipeline ResponseOutcome.Success(step.execute(inResponse, context)) } catch (t: Throwable) { closeQuietly(inResponse, t) - handleStepThrowable(t) + failureOf(t) } } is ResponseOutcome.Failure -> return inbound @@ -137,20 +137,9 @@ public class ResponsePipeline if (outcome is ResponseOutcome.Success) { closeQuietly(outcome.response, t) } - handleStepThrowable(t) + failureOf(t) } - /** - * Converts a step-raised throwable into a [ResponseOutcome.Failure]. [InterruptedException] - * preserves the interrupt flag on the current thread per the SDK's cancellation contract. - */ - private fun handleStepThrowable(t: Throwable): ResponseOutcome.Failure { - if (t is InterruptedException) { - Thread.currentThread().interrupt() - } - return ResponseOutcome.Failure(t) - } - /** * Closes [response], swallowing any close error so it never masks [primary]. A failure to * close is attached to [primary] as a suppressed throwable for diagnostics. diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt index 6f94ec95..1148c4b3 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt @@ -152,11 +152,10 @@ public object RetryAfterParser { parseNumericSeconds(retryAfter)?.let { return it } parseHttpDate(retryAfter, now)?.let { return it } } - headers.get(HEADER_RETRY_AFTER_MS)?.trim()?.let { value -> - if (value.isNotEmpty()) parseMillis(value)?.let { return it } - } - headers.get(HEADER_X_MS_RETRY_AFTER_MS)?.trim()?.let { value -> - if (value.isNotEmpty()) parseMillis(value)?.let { return it } + for (header in arrayOf(HEADER_RETRY_AFTER_MS, HEADER_X_MS_RETRY_AFTER_MS)) { + headers.get(header)?.trim()?.let { value -> + if (value.isNotEmpty()) parseMillis(value)?.let { return it } + } } val rateLimitReset = headers.get(HEADER_X_RATELIMIT_RESET)?.trim() if (!rateLimitReset.isNullOrEmpty()) { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt index 9d29be75..f936b0bb 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt @@ -128,23 +128,33 @@ public class RetrySettings this.attemptHeaderName = settings.attemptHeaderName } + /** + * Validates a [Duration] setting: it must be non-negative and small enough that the + * backoff math can convert it to nanoseconds without overflowing. Shared by the + * [totalTimeout], [initialDelay], and [maxDelay] setters; [name] names the offending + * field in the rejection message. + */ + private fun requireRepresentable( + name: String, + value: Duration, + ) { + require(!value.isNegative) { "$name must be non-negative" } + require(value <= MAX_NANO_REPRESENTABLE_DELAY) { + "$name must be representable in nanoseconds (≤ ~292 years); got $value" + } + } + /** Sets [RetrySettings.totalTimeout]. Must be non-negative. */ public fun totalTimeout(totalTimeout: Duration): RetrySettingsBuilder = apply { - require(!totalTimeout.isNegative) { "totalTimeout must be non-negative" } - require(totalTimeout <= MAX_NANO_REPRESENTABLE_DELAY) { - "totalTimeout must be representable in nanoseconds (≤ ~292 years); got $totalTimeout" - } + requireRepresentable("totalTimeout", totalTimeout) this.totalTimeout = totalTimeout } /** Sets [RetrySettings.initialDelay]. Must be non-negative. */ public fun initialDelay(initialDelay: Duration): RetrySettingsBuilder = apply { - require(!initialDelay.isNegative) { "initialDelay must be non-negative" } - require(initialDelay <= MAX_NANO_REPRESENTABLE_DELAY) { - "initialDelay must be representable in nanoseconds (≤ ~292 years); got $initialDelay" - } + requireRepresentable("initialDelay", initialDelay) this.initialDelay = initialDelay } @@ -158,10 +168,7 @@ public class RetrySettings /** Sets [RetrySettings.maxDelay]. Must be non-negative. */ public fun maxDelay(maxDelay: Duration): RetrySettingsBuilder = apply { - require(!maxDelay.isNegative) { "maxDelay must be non-negative" } - require(maxDelay <= MAX_NANO_REPRESENTABLE_DELAY) { - "maxDelay must be representable in nanoseconds (≤ ~292 years); got $maxDelay" - } + requireRepresentable("maxDelay", maxDelay) this.maxDelay = maxDelay } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt index dd262950..4ccb3ba6 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt @@ -14,6 +14,7 @@ import org.dexpace.sdk.core.http.response.exception.HttpException import org.dexpace.sdk.core.http.response.exception.NetworkException import org.dexpace.sdk.core.http.response.exception.Retryable import org.dexpace.sdk.core.pipeline.ResponseOutcome +import org.dexpace.sdk.core.pipeline.failureOf import org.dexpace.sdk.core.pipeline.step.ResponseRecoveryStep import java.io.InterruptedIOException import java.time.Clock @@ -289,11 +290,8 @@ public class RetryStep private fun executeOnce(attemptOrdinal: Int): ResponseOutcome = try { ResponseOutcome.Success(httpClient.execute(stampAttempt(request, attemptOrdinal))) - } catch (e: InterruptedException) { - Thread.currentThread().interrupt() - ResponseOutcome.Failure(e) } catch (t: Throwable) { - ResponseOutcome.Failure(t) + failureOf(t) } /** @@ -362,10 +360,7 @@ public class RetryStep * process-wide scheduler is a companion `by lazy` (SYNCHRONIZED), so it is initialised * at most once across the whole VM — no per-instance guard is involved. */ - private fun resolveScheduler(): ScheduledExecutorService { - settings.scheduler?.let { return it } - return DEFAULT_SCHEDULER - } + private fun resolveScheduler(): ScheduledExecutorService = settings.scheduler ?: DEFAULT_SCHEDULER /** * Returns true when [error] is an SDK-classified retryable condition. Classification