From 9fd5189bb42af278d01e922686d65fd383e53485 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 15:08:34 +0800 Subject: [PATCH] Complete AutoClusterFailover timing API --- include/pulsar/AutoClusterFailover.h | 75 +++++++++++++++++++++++----- tests/ServiceInfoProviderTest.cc | 34 +++++++++++++ 2 files changed, 96 insertions(+), 13 deletions(-) diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h index a9d74428..7bcee8cb 100644 --- a/include/pulsar/AutoClusterFailover.h +++ b/include/pulsar/AutoClusterFailover.h @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -37,9 +38,11 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { struct Config { const ServiceInfo primary; const std::vector secondary; - std::chrono::milliseconds checkInterval{5000}; // 5 seconds + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds failoverDelay{30000}; // 30 seconds + std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds uint32_t failoverThreshold{1}; - uint32_t switchBackThreshold{1}; + uint32_t switchBackThreshold{2}; Config(ServiceInfo primary, std::vector secondary) : primary(std::move(primary)), secondary(std::move(secondary)) {} @@ -53,50 +56,96 @@ class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { * std::vector secondaries{...}; * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) * .withCheckInterval(std::chrono::seconds(5)) - * .withFailoverThreshold(3) - * .withSwitchBackThreshold(3) + * .withFailoverDelay(std::chrono::seconds(30)) + * .withSwitchBackDelay(std::chrono::seconds(60)) * .build(); * * Notes: * - primary: the preferred cluster to use when available. * - secondary: ordered list of fallback clusters. * - checkInterval: frequency of health probes. - * - failoverThreshold: the number of consecutive failed probes required before switching away from - * the current cluster. - * - switchBackThreshold: the number of consecutive successful probes to the primary required before - * switching back from a secondary while that secondary remains available. If the active secondary - * becomes unavailable and the primary is available, the implementation may switch back to the - * primary immediately, regardless of this threshold. + * - failoverDelay: how long the current cluster must remain unavailable before switching away. + * - switchBackDelay: how long the primary must remain healthy before switching back from a secondary. + * If the active secondary becomes unavailable and the primary is available, the implementation may + * switch back to the primary immediately, regardless of this delay. */ class Builder { public: Builder(ServiceInfo primary, std::vector secondary) : config_(std::move(primary), std::move(secondary)) {} - // Set how frequently probes run against the active cluster(s). Default: 5 seconds. + // Set how frequently probes run against the active cluster(s). Default: 30 seconds. Builder& withCheckInterval(std::chrono::milliseconds interval) { config_.checkInterval = interval; + updateThresholdsFromDelays(); return *this; } - // Set the number of consecutive failed probes required before attempting failover. Default: 1. + // Set how long the current cluster must remain unavailable before switching away. Default: 30 + // seconds. + Builder& withFailoverDelay(std::chrono::milliseconds delay) { + failoverThresholdConfigured_ = false; + config_.failoverDelay = delay; + config_.failoverThreshold = calculateThreshold(config_.failoverDelay, config_.checkInterval); + return *this; + } + + // Set how long the primary must remain healthy before switching back from a secondary. Default: 60 + // seconds. + Builder& withSwitchBackDelay(std::chrono::milliseconds delay) { + switchBackThresholdConfigured_ = false; + config_.switchBackDelay = delay; + config_.switchBackThreshold = calculateThreshold(config_.switchBackDelay, config_.checkInterval); + return *this; + } + + // Set the number of consecutive failed probes required before attempting failover. Builder& withFailoverThreshold(uint32_t threshold) { + failoverThresholdConfigured_ = true; config_.failoverThreshold = threshold; + config_.failoverDelay = config_.checkInterval * threshold; return *this; } // Set the number of consecutive successful primary probes required before switching back from a // healthy secondary. If the active secondary becomes unavailable and the primary is available, - // the implementation may switch back immediately regardless of this threshold. Default: 1. + // the implementation may switch back immediately regardless of this threshold. Builder& withSwitchBackThreshold(uint32_t threshold) { + switchBackThresholdConfigured_ = true; config_.switchBackThreshold = threshold; + config_.switchBackDelay = config_.checkInterval * threshold; return *this; } AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } private: + static uint32_t calculateThreshold(std::chrono::milliseconds delay, + std::chrono::milliseconds interval) { + if (delay <= std::chrono::milliseconds::zero() || interval <= std::chrono::milliseconds::zero()) { + return 1; + } + return static_cast( + std::max(1, (delay.count() + interval.count() - 1) / interval.count())); + } + + void updateThresholdsFromDelays() { + if (failoverThresholdConfigured_) { + config_.failoverDelay = config_.checkInterval * config_.failoverThreshold; + } else { + config_.failoverThreshold = calculateThreshold(config_.failoverDelay, config_.checkInterval); + } + if (switchBackThresholdConfigured_) { + config_.switchBackDelay = config_.checkInterval * config_.switchBackThreshold; + } else { + config_.switchBackThreshold = + calculateThreshold(config_.switchBackDelay, config_.checkInterval); + } + } + Config config_; + bool failoverThresholdConfigured_{false}; + bool switchBackThresholdConfigured_{false}; }; explicit AutoClusterFailover(Config&& config); diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 175c5319..8fb6054e 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -171,6 +171,40 @@ class ServiceInfoHolder { mutable std::mutex mutex_; }; +TEST(AutoClusterFailoverTest, testDefaultTimingConfig) { + AutoClusterFailover::Config config{ServiceInfo("pulsar://primary:6650"), + {ServiceInfo("pulsar://secondary:6650")}}; + + ASSERT_EQ(30s, config.checkInterval); + ASSERT_EQ(30s, config.failoverDelay); + ASSERT_EQ(60s, config.switchBackDelay); + ASSERT_EQ(1u, config.failoverThreshold); + ASSERT_EQ(2u, config.switchBackThreshold); +} + +TEST(AutoClusterFailoverTest, testBuilderConvertsDelaysToProbeThresholds) { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer secondary; + const auto secondaryUrl = secondary.getServiceUrl(); + + auto provider = AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverDelay(70ms) + .withSwitchBackDelay(90ms) + .build(); + + ServiceUrlObserver observer; + ASSERT_EQ(primaryUrl, provider.initialServiceInfo().serviceUrl()); + observer.onUpdate(provider.initialServiceInfo()); + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_FALSE(waitUntil(60ms, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); + ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); +} + class TestServiceInfoProvider : public ServiceInfoProvider { public: TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) : serviceInfo_(serviceInfo) {}