Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5ce3fda
feat: orchestrator switches to FDv1 fallback on directive
beekld May 23, 2026
77c972f
chore: add orchestration logging for FDv2 data system
beekld Jun 2, 2026
5a2a9bc
fix: stop emitting kOff status from FDv2DataSystem destructor
beekld Jun 4, 2026
86b2e3d
feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2…
beekld May 28, 2026
f99f1ad
chore: distinguish engaged vs. unconfigured FDv1 fallback in logs
beekld Jun 8, 2026
82d2730
fix: ignore FDv1 fallback directive when FDv1 source is active
beekld Jun 8, 2026
c164913
test: cover initializer ChangeSet+directive basis preservation
beekld Jun 8, 2026
44f47f4
feat: parse X-LD-FD-Fallback-TTL header into FDv2SourceResult
beekld Jun 8, 2026
6ca1a49
feat: parse protocolFallbackTTL and retryAfter from goodbye
beekld Jun 8, 2026
8f19a0e
feat: schedule FDv2 retry after FDv1 fallback TTL
beekld Jun 8, 2026
5b9e51e
merge: bring in FDv1 fallback TTL changes from #539
beekld Jun 8, 2026
303b3a8
chore: remove unused retry_after field from goodbye
beekld Jun 9, 2026
7f1e999
merge: pick up retry_after removal from #539
beekld Jun 9, 2026
b1157e1
feat: translate FDv1 status changes to FDv2 results in FDv1AdapterSyn…
beekld Jun 9, 2026
9a93aab
docs: explain got_basis reuse in FDv1 fallback branch
beekld Jun 11, 2026
67126a9
fix: reset FDv1 fallback retry source between schedules
beekld Jun 11, 2026
81b1c7f
merge: pick up OpenSSL bump from main
beekld Jun 11, 2026
373150d
merge: pick up OpenSSL bump from #539
beekld Jun 11, 2026
292c779
merge: pick up #539 squash from main
beekld Jun 11, 2026
27f5c1e
fix: serialize StartAsync/ShutdownAsync on wrapped FDv1 source
beekld Jun 11, 2026
6c428c5
fix: give FDv1 adapter a private status manager
beekld Jun 11, 2026
55bd262
merge: pick up #549 from main
beekld Jun 11, 2026
2f68933
fix: hold FDv1 source via shared_ptr and forward init-time errors
beekld Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/server-sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ target_sources(${LIBNAME}
data_systems/fdv2/source_manager.cpp
data_systems/fdv2/fdv2_data_system.hpp
data_systems/fdv2/fdv2_data_system.cpp
data_systems/fdv2/fdv1_adapter_synchronizer.hpp
data_systems/fdv2/fdv1_adapter_synchronizer.cpp
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
data_systems/background_sync/sources/streaming/event_handler.hpp
Expand Down
208 changes: 208 additions & 0 deletions libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#include "fdv1_adapter_synchronizer.hpp"

#include <utility>

namespace launchdarkly::server_side::data_systems {

using data_interfaces::FDv2SourceResult;
using DataSourceState = DataSourceStatus::DataSourceState;

// ----- State -----

FDv1AdapterSynchronizer::State::State(
async::Future<std::monostate> closed_future)
: closed_future_(std::move(closed_future)) {}

async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::State::GetNext() {
std::lock_guard lock(mutex_);
if (!result_queue_.empty()) {
auto result = std::move(result_queue_.front());
result_queue_.pop_front();
return async::MakeFuture(std::move(result));
}
return pending_promise_.emplace().GetFuture();
}

void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() {
std::optional<async::Promise<FDv2SourceResult>> promise;
{
std::lock_guard lock(mutex_);
if (pending_promise_) {
promise = std::move(pending_promise_);
pending_promise_.reset();
}
}
if (promise) {
promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
}

void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) {
std::optional<async::Promise<FDv2SourceResult>> promise;
{
std::lock_guard lock(mutex_);
if (closed_future_.IsFinished()) {
return;
}
if (pending_promise_) {
promise = std::move(pending_promise_);
pending_promise_.reset();
} else {
result_queue_.push_back(std::move(result));
return;
}
}
// Resolve outside the lock — Promise::Resolve may invoke inline
// continuations that could call back into Notify or GetNext.
promise->Resolve(std::move(result));
}

// ----- ConvertingDestination -----

FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination(
std::weak_ptr<State> state)
: state_(std::move(state)) {}

void FDv1AdapterSynchronizer::ConvertingDestination::Init(
data_model::SDKDataSet data_set) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.reserve(data_set.flags.size() + data_set.segments.size());
for (auto& [key, flag] : data_set.flags) {
changes.push_back({key, std::move(flag)});
}
for (auto& [key, segment] : data_set.segments) {
changes.push_back({key, std::move(segment)});
}
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kFull, std::move(changes),
data_model::Selector{}}}});
}

void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
std::string const& key,
data_model::FlagDescriptor flag) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.push_back({key, std::move(flag)});
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kPartial, std::move(changes),
data_model::Selector{}}}});
}

void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
std::string const& key,
data_model::SegmentDescriptor segment) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.push_back({key, std::move(segment)});
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kPartial, std::move(changes),
data_model::Selector{}}}});
}

std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity()
const {
static std::string const identity = "FDv1 adapter destination";
return identity;
}

// ----- FDv1AdapterSynchronizer -----

FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(SourceBuilder source_builder)
: state_(std::make_shared<State>(close_promise_.GetFuture())),
destination_(std::make_unique<ConvertingDestination>(state_)),
status_manager_(
std::make_unique<data_components::DataSourceStatusManager>()),
status_subscription_(status_manager_->OnDataSourceStatusChange(
[state = state_](DataSourceStatus status) {
auto error = status.LastError();
if (!error) {
return;
}
switch (status.State()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be concerned with error only status changes? Sometimes we update the error but leave the state the same. So it wouldn't affect the flow of fallback/recovery, but the granularity of the reported status.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

case DataSourceState::kInterrupted:
case DataSourceState::kInitializing:
// Recoverable error.
state->Notify(FDv2SourceResult{
FDv2SourceResult::Interrupted{*error}});
break;
case DataSourceState::kOff:
// Terminal error.
state->Notify(FDv2SourceResult{
FDv2SourceResult::TerminalError{*error}});
break;
case DataSourceState::kValid:
// Recovery; no FDv2 result.
break;
Comment thread
cursor[bot] marked this conversation as resolved.
}
Comment thread
beekld marked this conversation as resolved.
})),
Comment thread
cursor[bot] marked this conversation as resolved.
fdv1_source_(source_builder(*status_manager_)) {}

FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() {
Close();
}

async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::Next(
data_model::Selector /*selector*/) {
auto closed = close_promise_.GetFuture();
if (closed.IsFinished()) {
return async::MakeFuture(
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
{
std::lock_guard lock(lifecycle_mutex_);
if (!started_) {
started_ = true;
fdv1_source_->StartAsync(destination_.get(),
/*bootstrap_data=*/nullptr);
}
}
auto result_future = state_->GetNext();
if (result_future.IsFinished()) {
return result_future;
Comment thread
beekld marked this conversation as resolved.
}
return async::WhenAny(closed, result_future)
.Then(
[state = state_, result_future](std::size_t const& idx) mutable
-> async::Future<FDv2SourceResult> {
if (idx == 0) {
state->ResolvePendingAsShutdown();
return async::MakeFuture(
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
return result_future;
},
async::kInlineExecutor);
}

void FDv1AdapterSynchronizer::Close() {
if (!close_promise_.Resolve(std::monostate{})) {
return;
}
std::lock_guard lock(lifecycle_mutex_);
bool const was_started = started_;
started_ = true;
if (was_started) {
fdv1_source_->ShutdownAsync([] {});
}
}

std::string const& FDv1AdapterSynchronizer::Identity() const {
static std::string const identity = "FDv1 fallback adapter";
return identity;
}

} // namespace launchdarkly::server_side::data_systems
130 changes: 130 additions & 0 deletions libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#pragma once

#include "../../data_components/status_notifications/data_source_status_manager.hpp"
#include "../../data_interfaces/destination/idestination.hpp"
#include "../../data_interfaces/source/idata_synchronizer.hpp"
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"

#include <launchdarkly/async/promise.hpp>
#include <launchdarkly/connection.hpp>

#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <variant>

namespace launchdarkly::server_side::data_systems {

/**
* Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface.
*
* FDv1 Init/Upsert callbacks delivered through an internal IDestination are
* translated into FDv2SourceResult::ChangeSet results, with empty selectors
* and fdv1_fallback = false (the directive does not re-fire from FDv1 data).
*
* Threading: Next() and Close() may be called from any thread; only one
* Next() may be outstanding at a time. Member declaration order ensures
* the wrapped FDv1 source destructs before destination_ and state_, so any
* in-flight FDv1 callbacks land on live objects during teardown. This
* relies on the wrapped IDataSynchronizer blocking on its in-flight work
* in its destructor.
*/
class FDv1AdapterSynchronizer final
: public data_interfaces::IFDv2Synchronizer {
public:
using SourceBuilder =
std::function<std::shared_ptr<data_interfaces::IDataSynchronizer>(
data_components::DataSourceStatusManager&)>;

/**
* @param source_builder Called once during construction with the
* adapter's status manager. Returns the wrapped
* FDv1 source, which must be constructed against
* the provided manager as its status sink.
*/
explicit FDv1AdapterSynchronizer(SourceBuilder source_builder);

~FDv1AdapterSynchronizer() override;

async::Future<data_interfaces::FDv2SourceResult> Next(
data_model::Selector selector) override;
void Close() override;
[[nodiscard]] std::string const& Identity() const override;

private:
/**
* Holds the result queue and pending Next() promise; shared with the
* FDv1 source's IDestination via the inner ConvertingDestination.
* All methods are thread-safe.
*/
class State {
public:
explicit State(async::Future<std::monostate> closed_future);

async::Future<data_interfaces::FDv2SourceResult> GetNext();

// Resolves any pending Next() promise with Shutdown and clears it.
// Called on the close path so the abandoned promise doesn't leave
// potential continuations dangling.
void ResolvePendingAsShutdown();

void Notify(data_interfaces::FDv2SourceResult result);

private:
// Finished once the owning FDv1AdapterSynchronizer's close_promise_
// is resolved. Read in Notify to drop late results.
async::Future<std::monostate> const closed_future_;

mutable std::mutex mutex_;
// Protected by mutex_.
std::optional<async::Promise<data_interfaces::FDv2SourceResult>>
pending_promise_;
std::deque<data_interfaces::FDv2SourceResult> result_queue_;
};

/**
* Translates FDv1 IDestination callbacks into FDv2 results queued on
* State. Thread-safe (delegates to State).
*/
class ConvertingDestination final : public data_interfaces::IDestination {
public:
explicit ConvertingDestination(std::weak_ptr<State> state);
void Init(data_model::SDKDataSet data_set) override;
void Upsert(std::string const& key,
data_model::FlagDescriptor flag) override;
void Upsert(std::string const& key,
data_model::SegmentDescriptor segment) override;
[[nodiscard]] std::string const& Identity() const override;

private:
std::weak_ptr<State> state_;
};

// Thread-safe primitive. Declared before state_ so state_'s constructor
// can take a future from it.
async::Promise<std::monostate> close_promise_;

// shared_ptr so async callbacks that may fire after this is destroyed
// can hold their own reference.
std::shared_ptr<State> const state_;
std::unique_ptr<ConvertingDestination> const destination_;

std::unique_ptr<data_components::DataSourceStatusManager> const
status_manager_;
std::unique_ptr<IConnection> const status_subscription_;

std::shared_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_;

// Serializes StartAsync and ShutdownAsync on fdv1_source_ across
// concurrent Next() and Close() calls.
std::mutex lifecycle_mutex_;
// Protected by lifecycle_mutex_. Set when Next() calls StartAsync, or
// when Close() runs without a prior start (to gate any later Next()
// from calling StartAsync after Close).
bool started_ = false;
};

} // namespace launchdarkly::server_side::data_systems
Loading
Loading