Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class AsyncHttpPipeline internal constructor(
Futures.failed(e)
}
}
val state = AsyncPipelineCallState(this, request, httpClient)
val state = AsyncPipelineCallState(this, request)
return AsyncPipelineNext(state).processAsync()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class AsyncHttpPipelineBuilder(private val httpClient: AsyncHttpClient) {
/** Builds an immutable [AsyncHttpPipeline]. */
public fun build(): AsyncHttpPipeline {
val ordered = steps.flatten()
return AsyncHttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
return AsyncHttpPipeline(httpClient, ordered.toTypedArray())
}

public companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

package org.dexpace.sdk.core.http.pipeline

import org.dexpace.sdk.core.client.AsyncHttpClient
import org.dexpace.sdk.core.client.asBlocking
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.Response
import org.dexpace.sdk.core.util.Futures
import java.io.InterruptedIOException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference

Expand Down Expand Up @@ -120,25 +120,5 @@ private fun sendInterruptibly(
* The blocking wait honours `Thread.interrupt()`: interrupting the calling thread restores the
* interrupt flag, cancels the in-flight future, and throws an [InterruptedIOException].
*/
public fun AsyncHttpPipeline.toBlocking(): HttpPipeline {
val async = this
return HttpPipeline.of { request ->
val future = async.sendAsync(request)
try {
future.get()
} catch (ie: InterruptedException) {
// `get()` parks interruptibly (unlike `join()`). Restore the interrupt flag, abort
// the in-flight send, and surface an InterruptedIOException so the caller's I/O
// error handling terminates cleanly.
Thread.currentThread().interrupt()
future.cancel(true)
val ioe = InterruptedIOException("Interrupted while waiting for response")
ioe.initCause(ie)
throw ioe
} catch (ee: ExecutionException) {
// `get()` wraps exceptional completion in ExecutionException; unwrap so callers'
// `catch (IOException)` sees the original failure rather than the JDK wrapper.
throw Futures.unwrap(ee)
}
}
}
public fun AsyncHttpPipeline.toBlocking(): HttpPipeline =
HttpPipeline.of(AsyncHttpClient { sendAsync(it) }.asBlocking())
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@

package org.dexpace.sdk.core.http.pipeline

import org.dexpace.sdk.core.client.AsyncHttpClient
import org.dexpace.sdk.core.http.request.Request

/**
* Per-call mutable cursor over an [AsyncHttpPipeline]'s steps array. Async counterpart of
* [PipelineCallState]: holds the index of the next step to invoke, the in-flight [Request],
* and a reference to the [AsyncHttpClient] used when the cursor reaches the end.
* [PipelineCallState]: holds the index of the next step to invoke and the in-flight [Request].
*
* Cloned via [copy] (exposed to user code through [AsyncPipelineNext.copy]) so async retry /
* redirect steps can re-drive the downstream chain. Cloning copies the current index — the
Expand All @@ -24,7 +22,6 @@ import org.dexpace.sdk.core.http.request.Request
internal class AsyncPipelineCallState internal constructor(
val pipeline: AsyncHttpPipeline,
initialRequest: Request,
val httpClient: AsyncHttpClient,
private var index: Int = 0,
) {
/**
Expand All @@ -42,5 +39,5 @@ internal class AsyncPipelineCallState internal constructor(
}

/** Returns an independent state cloned at the current cursor position. */
fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, httpClient, index)
fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, index)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class AsyncPipelineNext internal constructor(private val state: AsyncPipe
val nextStep = state.advance()
return try {
if (nextStep == null) {
state.httpClient.executeAsync(state.request)
state.pipeline.httpClient.executeAsync(state.request)
} else {
nextStep.processAsync(state.request, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class HttpPipeline internal constructor(
@Throws(IOException::class)
public fun send(request: Request): Response {
if (stepArray.isEmpty()) return httpClient.execute(request)
val state = PipelineCallState(this, request, httpClient)
val state = PipelineCallState(this, request)
return PipelineNext(state).process()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class HttpPipelineBuilder(private val httpClient: HttpClient) {
*/
public fun build(): HttpPipeline {
val ordered = steps.flatten()
return HttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
return HttpPipeline(httpClient, ordered.toTypedArray())
}

public companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import org.dexpace.sdk.core.http.request.Request

/**
* Per-call mutable cursor over a [HttpPipeline]'s steps array. Holds the index of the
* next step to invoke, the originating [Request], and a reference to the [HttpClient] used
* when the cursor reaches the end.
* next step to invoke and the originating [Request].
*
* Cloned via [copy] (exposed to user code through [PipelineNext.copy]) so retry / redirect
* steps can re-drive the downstream chain. Cloning copies the current index — the new
Expand All @@ -27,7 +26,6 @@ import org.dexpace.sdk.core.http.request.Request
internal class PipelineCallState internal constructor(
val pipeline: HttpPipeline,
initialRequest: Request,
val httpClient: HttpClient,
private var index: Int = 0,
) {
/**
Expand All @@ -51,5 +49,5 @@ internal class PipelineCallState internal constructor(
}

/** Returns an independent state cloned at the current cursor position. */
fun copy(): PipelineCallState = PipelineCallState(pipeline, request, httpClient, index)
fun copy(): PipelineCallState = PipelineCallState(pipeline, request, index)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class PipelineNext internal constructor(private val state: PipelineCallSt
public fun process(): Response {
val nextStep = state.advance()
return if (nextStep == null) {
state.httpClient.execute(state.request)
state.pipeline.httpClient.execute(state.request)
} else {
nextStep.process(state.request, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package org.dexpace.sdk.core.http.pipeline
* Sparse [order] values (100s apart) leave room to insert new stages later without
* renumbering existing ones.
*
* @property order Run-order key used by [HttpPipelineBuilder.build] to emit steps; lower
* values run first.
* @property order Stable numeric identity for the stage; ascends with declaration order.
* The builder emits steps in declaration order (`Stage.entries`), not by reading this
* value — it exists as a stable, sortable inspection key for callers.
* @property isPillar True if the stage admits at most one step (singleton). False for
* user-extensible stages backed by an ordered deque.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,7 @@ internal class StagedSteps<S : Any>(
fun reload(steps: List<S>) {
perStage.clear()
pillars.clear()
for (s in steps) {
val stage = stageOf(s)
if (stage.isPillar) {
installPillar(s, stage)
} else {
perStage.getOrPut(stage) { ArrayDeque() }.addLast(s)
}
}
steps.forEach(::append)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class HttpPipelineTest {
val order = mutableListOf<String>()
val pipeline =
HttpPipelineBuilder(client)
// Add in reverse-of-stage order to prove the builder sorts by Stage.order.
// Add in reverse declaration order to prove the builder emits in Stage declaration order.
.append(TaggingStep(Stage.PRE_SEND, "pre-send", order))
.append(TaggingStep(Stage.POST_AUTH, "post-auth", order))
.append(TaggingStep(Stage.PRE_AUTH, "pre-auth", order))
Expand Down
Loading