-
Notifications
You must be signed in to change notification settings - Fork 4
feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2Synchronizer #540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
beekld
wants to merge
23
commits into
main
Choose a base branch
from
beeklimt/SDK-2379-4
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+632
−0
Open
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
5ce3fda
feat: orchestrator switches to FDv1 fallback on directive
beekld 77c972f
chore: add orchestration logging for FDv2 data system
beekld 5a2a9bc
fix: stop emitting kOff status from FDv2DataSystem destructor
beekld 86b2e3d
feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2…
beekld f99f1ad
chore: distinguish engaged vs. unconfigured FDv1 fallback in logs
beekld 82d2730
fix: ignore FDv1 fallback directive when FDv1 source is active
beekld c164913
test: cover initializer ChangeSet+directive basis preservation
beekld 44f47f4
feat: parse X-LD-FD-Fallback-TTL header into FDv2SourceResult
beekld 6ca1a49
feat: parse protocolFallbackTTL and retryAfter from goodbye
beekld 8f19a0e
feat: schedule FDv2 retry after FDv1 fallback TTL
beekld 5b9e51e
merge: bring in FDv1 fallback TTL changes from #539
beekld 303b3a8
chore: remove unused retry_after field from goodbye
beekld 7f1e999
merge: pick up retry_after removal from #539
beekld b1157e1
feat: translate FDv1 status changes to FDv2 results in FDv1AdapterSyn…
beekld 9a93aab
docs: explain got_basis reuse in FDv1 fallback branch
beekld 67126a9
fix: reset FDv1 fallback retry source between schedules
beekld 81b1c7f
merge: pick up OpenSSL bump from main
beekld 373150d
merge: pick up OpenSSL bump from #539
beekld 292c779
merge: pick up #539 squash from main
beekld 27f5c1e
fix: serialize StartAsync/ShutdownAsync on wrapped FDv1 source
beekld 6c428c5
fix: give FDv1 adapter a private status manager
beekld 55bd262
merge: pick up #549 from main
beekld 2f68933
fix: hold FDv1 source via shared_ptr and forward init-time errors
beekld File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
208 changes: 208 additions & 0 deletions
208
libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,208 @@ | ||
| #include "fdv1_adapter_synchronizer.hpp" | ||
|
|
||
| #include <utility> | ||
|
|
||
| namespace launchdarkly::server_side::data_systems { | ||
|
|
||
| using data_interfaces::FDv2SourceResult; | ||
| using DataSourceState = DataSourceStatus::DataSourceState; | ||
|
|
||
| // ----- State ----- | ||
|
|
||
| FDv1AdapterSynchronizer::State::State( | ||
| async::Future<std::monostate> closed_future) | ||
| : closed_future_(std::move(closed_future)) {} | ||
|
|
||
| async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::State::GetNext() { | ||
| std::lock_guard lock(mutex_); | ||
| if (!result_queue_.empty()) { | ||
| auto result = std::move(result_queue_.front()); | ||
| result_queue_.pop_front(); | ||
| return async::MakeFuture(std::move(result)); | ||
| } | ||
| return pending_promise_.emplace().GetFuture(); | ||
| } | ||
|
|
||
| void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() { | ||
| std::optional<async::Promise<FDv2SourceResult>> promise; | ||
| { | ||
| std::lock_guard lock(mutex_); | ||
| if (pending_promise_) { | ||
| promise = std::move(pending_promise_); | ||
| pending_promise_.reset(); | ||
| } | ||
| } | ||
| if (promise) { | ||
| promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}}); | ||
| } | ||
| } | ||
|
|
||
| void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) { | ||
| std::optional<async::Promise<FDv2SourceResult>> promise; | ||
| { | ||
| std::lock_guard lock(mutex_); | ||
| if (closed_future_.IsFinished()) { | ||
| return; | ||
| } | ||
| if (pending_promise_) { | ||
| promise = std::move(pending_promise_); | ||
| pending_promise_.reset(); | ||
| } else { | ||
| result_queue_.push_back(std::move(result)); | ||
| return; | ||
| } | ||
| } | ||
| // Resolve outside the lock — Promise::Resolve may invoke inline | ||
| // continuations that could call back into Notify or GetNext. | ||
| promise->Resolve(std::move(result)); | ||
| } | ||
|
|
||
| // ----- ConvertingDestination ----- | ||
|
|
||
| FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination( | ||
| std::weak_ptr<State> state) | ||
| : state_(std::move(state)) {} | ||
|
|
||
| void FDv1AdapterSynchronizer::ConvertingDestination::Init( | ||
| data_model::SDKDataSet data_set) { | ||
| auto state = state_.lock(); | ||
| if (!state) { | ||
| return; | ||
| } | ||
| data_interfaces::ChangeSetData changes; | ||
| changes.reserve(data_set.flags.size() + data_set.segments.size()); | ||
| for (auto& [key, flag] : data_set.flags) { | ||
| changes.push_back({key, std::move(flag)}); | ||
| } | ||
| for (auto& [key, segment] : data_set.segments) { | ||
| changes.push_back({key, std::move(segment)}); | ||
| } | ||
| state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ | ||
| data_model::ChangeSet<data_interfaces::ChangeSetData>{ | ||
| data_model::ChangeSetType::kFull, std::move(changes), | ||
| data_model::Selector{}}}}); | ||
| } | ||
|
|
||
| void FDv1AdapterSynchronizer::ConvertingDestination::Upsert( | ||
| std::string const& key, | ||
| data_model::FlagDescriptor flag) { | ||
| auto state = state_.lock(); | ||
| if (!state) { | ||
| return; | ||
| } | ||
| data_interfaces::ChangeSetData changes; | ||
| changes.push_back({key, std::move(flag)}); | ||
| state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ | ||
| data_model::ChangeSet<data_interfaces::ChangeSetData>{ | ||
| data_model::ChangeSetType::kPartial, std::move(changes), | ||
| data_model::Selector{}}}}); | ||
| } | ||
|
|
||
| void FDv1AdapterSynchronizer::ConvertingDestination::Upsert( | ||
| std::string const& key, | ||
| data_model::SegmentDescriptor segment) { | ||
| auto state = state_.lock(); | ||
| if (!state) { | ||
| return; | ||
| } | ||
| data_interfaces::ChangeSetData changes; | ||
| changes.push_back({key, std::move(segment)}); | ||
| state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ | ||
| data_model::ChangeSet<data_interfaces::ChangeSetData>{ | ||
| data_model::ChangeSetType::kPartial, std::move(changes), | ||
| data_model::Selector{}}}}); | ||
| } | ||
|
|
||
| std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity() | ||
| const { | ||
| static std::string const identity = "FDv1 adapter destination"; | ||
| return identity; | ||
| } | ||
|
|
||
| // ----- FDv1AdapterSynchronizer ----- | ||
|
|
||
| FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(SourceBuilder source_builder) | ||
| : state_(std::make_shared<State>(close_promise_.GetFuture())), | ||
| destination_(std::make_unique<ConvertingDestination>(state_)), | ||
| status_manager_( | ||
| std::make_unique<data_components::DataSourceStatusManager>()), | ||
| status_subscription_(status_manager_->OnDataSourceStatusChange( | ||
| [state = state_](DataSourceStatus status) { | ||
| auto error = status.LastError(); | ||
| if (!error) { | ||
| return; | ||
| } | ||
| switch (status.State()) { | ||
| case DataSourceState::kInterrupted: | ||
| case DataSourceState::kInitializing: | ||
| // Recoverable error. | ||
| state->Notify(FDv2SourceResult{ | ||
| FDv2SourceResult::Interrupted{*error}}); | ||
| break; | ||
| case DataSourceState::kOff: | ||
| // Terminal error. | ||
| state->Notify(FDv2SourceResult{ | ||
| FDv2SourceResult::TerminalError{*error}}); | ||
| break; | ||
| case DataSourceState::kValid: | ||
| // Recovery; no FDv2 result. | ||
| break; | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } | ||
|
beekld marked this conversation as resolved.
|
||
| })), | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| fdv1_source_(source_builder(*status_manager_)) {} | ||
|
|
||
| FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() { | ||
| Close(); | ||
| } | ||
|
|
||
| async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::Next( | ||
| data_model::Selector /*selector*/) { | ||
| auto closed = close_promise_.GetFuture(); | ||
| if (closed.IsFinished()) { | ||
| return async::MakeFuture( | ||
| FDv2SourceResult{FDv2SourceResult::Shutdown{}}); | ||
| } | ||
| { | ||
| std::lock_guard lock(lifecycle_mutex_); | ||
| if (!started_) { | ||
| started_ = true; | ||
| fdv1_source_->StartAsync(destination_.get(), | ||
| /*bootstrap_data=*/nullptr); | ||
| } | ||
| } | ||
| auto result_future = state_->GetNext(); | ||
| if (result_future.IsFinished()) { | ||
| return result_future; | ||
|
beekld marked this conversation as resolved.
|
||
| } | ||
| return async::WhenAny(closed, result_future) | ||
| .Then( | ||
| [state = state_, result_future](std::size_t const& idx) mutable | ||
| -> async::Future<FDv2SourceResult> { | ||
| if (idx == 0) { | ||
| state->ResolvePendingAsShutdown(); | ||
| return async::MakeFuture( | ||
| FDv2SourceResult{FDv2SourceResult::Shutdown{}}); | ||
| } | ||
| return result_future; | ||
| }, | ||
| async::kInlineExecutor); | ||
| } | ||
|
|
||
| void FDv1AdapterSynchronizer::Close() { | ||
| if (!close_promise_.Resolve(std::monostate{})) { | ||
| return; | ||
| } | ||
| std::lock_guard lock(lifecycle_mutex_); | ||
| bool const was_started = started_; | ||
| started_ = true; | ||
| if (was_started) { | ||
| fdv1_source_->ShutdownAsync([] {}); | ||
| } | ||
| } | ||
|
|
||
| std::string const& FDv1AdapterSynchronizer::Identity() const { | ||
| static std::string const identity = "FDv1 fallback adapter"; | ||
| return identity; | ||
| } | ||
|
|
||
| } // namespace launchdarkly::server_side::data_systems | ||
130 changes: 130 additions & 0 deletions
130
libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| #pragma once | ||
|
|
||
| #include "../../data_components/status_notifications/data_source_status_manager.hpp" | ||
| #include "../../data_interfaces/destination/idestination.hpp" | ||
| #include "../../data_interfaces/source/idata_synchronizer.hpp" | ||
| #include "../../data_interfaces/source/ifdv2_synchronizer.hpp" | ||
|
|
||
| #include <launchdarkly/async/promise.hpp> | ||
| #include <launchdarkly/connection.hpp> | ||
|
|
||
| #include <deque> | ||
| #include <functional> | ||
| #include <memory> | ||
| #include <mutex> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <variant> | ||
|
|
||
| namespace launchdarkly::server_side::data_systems { | ||
|
|
||
| /** | ||
| * Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface. | ||
| * | ||
| * FDv1 Init/Upsert callbacks delivered through an internal IDestination are | ||
| * translated into FDv2SourceResult::ChangeSet results, with empty selectors | ||
| * and fdv1_fallback = false (the directive does not re-fire from FDv1 data). | ||
| * | ||
| * Threading: Next() and Close() may be called from any thread; only one | ||
| * Next() may be outstanding at a time. Member declaration order ensures | ||
| * the wrapped FDv1 source destructs before destination_ and state_, so any | ||
| * in-flight FDv1 callbacks land on live objects during teardown. This | ||
| * relies on the wrapped IDataSynchronizer blocking on its in-flight work | ||
| * in its destructor. | ||
| */ | ||
| class FDv1AdapterSynchronizer final | ||
| : public data_interfaces::IFDv2Synchronizer { | ||
| public: | ||
| using SourceBuilder = | ||
| std::function<std::shared_ptr<data_interfaces::IDataSynchronizer>( | ||
| data_components::DataSourceStatusManager&)>; | ||
|
|
||
| /** | ||
| * @param source_builder Called once during construction with the | ||
| * adapter's status manager. Returns the wrapped | ||
| * FDv1 source, which must be constructed against | ||
| * the provided manager as its status sink. | ||
| */ | ||
| explicit FDv1AdapterSynchronizer(SourceBuilder source_builder); | ||
|
|
||
| ~FDv1AdapterSynchronizer() override; | ||
|
|
||
| async::Future<data_interfaces::FDv2SourceResult> Next( | ||
| data_model::Selector selector) override; | ||
| void Close() override; | ||
| [[nodiscard]] std::string const& Identity() const override; | ||
|
|
||
| private: | ||
| /** | ||
| * Holds the result queue and pending Next() promise; shared with the | ||
| * FDv1 source's IDestination via the inner ConvertingDestination. | ||
| * All methods are thread-safe. | ||
| */ | ||
| class State { | ||
| public: | ||
| explicit State(async::Future<std::monostate> closed_future); | ||
|
|
||
| async::Future<data_interfaces::FDv2SourceResult> GetNext(); | ||
|
|
||
| // Resolves any pending Next() promise with Shutdown and clears it. | ||
| // Called on the close path so the abandoned promise doesn't leave | ||
| // potential continuations dangling. | ||
| void ResolvePendingAsShutdown(); | ||
|
|
||
| void Notify(data_interfaces::FDv2SourceResult result); | ||
|
|
||
| private: | ||
| // Finished once the owning FDv1AdapterSynchronizer's close_promise_ | ||
| // is resolved. Read in Notify to drop late results. | ||
| async::Future<std::monostate> const closed_future_; | ||
|
|
||
| mutable std::mutex mutex_; | ||
| // Protected by mutex_. | ||
| std::optional<async::Promise<data_interfaces::FDv2SourceResult>> | ||
| pending_promise_; | ||
| std::deque<data_interfaces::FDv2SourceResult> result_queue_; | ||
| }; | ||
|
|
||
| /** | ||
| * Translates FDv1 IDestination callbacks into FDv2 results queued on | ||
| * State. Thread-safe (delegates to State). | ||
| */ | ||
| class ConvertingDestination final : public data_interfaces::IDestination { | ||
| public: | ||
| explicit ConvertingDestination(std::weak_ptr<State> state); | ||
| void Init(data_model::SDKDataSet data_set) override; | ||
| void Upsert(std::string const& key, | ||
| data_model::FlagDescriptor flag) override; | ||
| void Upsert(std::string const& key, | ||
| data_model::SegmentDescriptor segment) override; | ||
| [[nodiscard]] std::string const& Identity() const override; | ||
|
|
||
| private: | ||
| std::weak_ptr<State> state_; | ||
| }; | ||
|
|
||
| // Thread-safe primitive. Declared before state_ so state_'s constructor | ||
| // can take a future from it. | ||
| async::Promise<std::monostate> close_promise_; | ||
|
|
||
| // shared_ptr so async callbacks that may fire after this is destroyed | ||
| // can hold their own reference. | ||
| std::shared_ptr<State> const state_; | ||
| std::unique_ptr<ConvertingDestination> const destination_; | ||
|
|
||
| std::unique_ptr<data_components::DataSourceStatusManager> const | ||
| status_manager_; | ||
| std::unique_ptr<IConnection> const status_subscription_; | ||
|
|
||
| std::shared_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_; | ||
|
|
||
| // Serializes StartAsync and ShutdownAsync on fdv1_source_ across | ||
| // concurrent Next() and Close() calls. | ||
| std::mutex lifecycle_mutex_; | ||
| // Protected by lifecycle_mutex_. Set when Next() calls StartAsync, or | ||
| // when Close() runs without a prior start (to gate any later Next() | ||
| // from calling StartAsync after Close). | ||
| bool started_ = false; | ||
| }; | ||
|
|
||
| } // namespace launchdarkly::server_side::data_systems |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to be concerned with error only status changes? Sometimes we update the error but leave the state the same. So it wouldn't affect the flow of fallback/recovery, but the granularity of the reported status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.