From 85656fa5da373d734fbf2f240af0f3ad3b81c118 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 15:28:26 +0800 Subject: [PATCH] Add SameAuthParams lookup auto failover --- include/pulsar/ClientConfiguration.h | 1 + .../SameAuthParamsLookupAutoClusterFailover.h | 85 ++++ ...SameAuthParamsLookupAutoClusterFailover.cc | 385 ++++++++++++++++++ tests/ServiceInfoProviderTest.cc | 54 +++ 4 files changed, 525 insertions(+) create mode 100644 include/pulsar/SameAuthParamsLookupAutoClusterFailover.h create mode 100644 lib/SameAuthParamsLookupAutoClusterFailover.cc diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h index b37b7c6a..a4d99374 100644 --- a/include/pulsar/ClientConfiguration.h +++ b/include/pulsar/ClientConfiguration.h @@ -367,6 +367,7 @@ class PULSAR_PUBLIC ClientConfiguration { friend class ClientImpl; friend class PulsarWrapper; + friend class SameAuthParamsLookupAutoClusterFailover; private: const AuthenticationPtr& getAuthPtr() const; diff --git a/include/pulsar/SameAuthParamsLookupAutoClusterFailover.h b/include/pulsar/SameAuthParamsLookupAutoClusterFailover.h new file mode 100644 index 00000000..3061a3e8 --- /dev/null +++ b/include/pulsar/SameAuthParamsLookupAutoClusterFailover.h @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SAME_AUTH_PARAMS_LOOKUP_AUTO_CLUSTER_FAILOVER_H_ +#define PULSAR_SAME_AUTH_PARAMS_LOOKUP_AUTO_CLUSTER_FAILOVER_H_ + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace pulsar { + +class SameAuthParamsLookupAutoClusterFailoverImpl; + +class PULSAR_PUBLIC SameAuthParamsLookupAutoClusterFailover final : public ServiceInfoProvider { + public: + struct Config { + std::vector serviceUrls; + ClientConfiguration clientConfiguration; + uint32_t failoverThreshold{5}; + uint32_t recoverThreshold{5}; + std::chrono::milliseconds checkHealthyInterval{1000}; + bool markTopicNotFoundAsAvailable{true}; + std::string testTopic{"public/default/tp_test"}; + + Config(std::vector serviceUrls, ClientConfiguration clientConfiguration = {}); + }; + + class Builder { + public: + explicit Builder(std::vector serviceUrls, ClientConfiguration clientConfiguration = {}); + + Builder& withFailoverThreshold(uint32_t threshold); + Builder& withRecoverThreshold(uint32_t threshold); + Builder& withCheckHealthyInterval(std::chrono::milliseconds interval); + Builder& withMarkTopicNotFoundAsAvailable(bool enabled); + Builder& withTestTopic(std::string testTopic); + + SameAuthParamsLookupAutoClusterFailover build(); + + private: + Config config_; + }; + + explicit SameAuthParamsLookupAutoClusterFailover(Config&& config); + + ~SameAuthParamsLookupAutoClusterFailover() final; + + ServiceInfo initialServiceInfo() final; + + void initialize(std::function onServiceInfoUpdate) final; + + private: + friend class SameAuthParamsLookupAutoClusterFailoverImpl; + + static ServiceInfo toServiceInfo(const ClientConfiguration& clientConfiguration, + const std::string& serviceUrl); + + std::shared_ptr impl_; +}; + +} // namespace pulsar + +#endif diff --git a/lib/SameAuthParamsLookupAutoClusterFailover.cc b/lib/SameAuthParamsLookupAutoClusterFailover.cc new file mode 100644 index 00000000..390a1257 --- /dev/null +++ b/lib/SameAuthParamsLookupAutoClusterFailover.cc @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "AsioDefines.h" +#include "AsioTimer.h" +#include "AtomicSharedPtr.h" +#include "BinaryProtoLookupService.h" +#include "ClientConfigurationImpl.h" +#include "ConnectionPool.h" +#include "ExecutorService.h" +#include "LogUtils.h" +#include "ServiceURI.h" +#include "TopicName.h" + +#ifdef USE_ASIO +#include +#include +#include +#include +#else +#include +#include +#include +#include +#endif + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +namespace { + +bool isBlank(const std::string& value) { + return std::all_of(value.begin(), value.end(), [](unsigned char ch) { return std::isspace(ch) != 0; }); +} + +bool startsWithHttp(const std::string& value) { + return value.size() >= 4 && (value.compare(0, 4, "http") == 0 || value.compare(0, 4, "HTTP") == 0); +} + +} // namespace + +class SameAuthParamsLookupAutoClusterFailoverImpl + : public std::enable_shared_from_this { + public: + enum class PulsarServiceState + { + Healthy, + PreFail, + Failed, + PreRecover + }; + + explicit SameAuthParamsLookupAutoClusterFailoverImpl( + SameAuthParamsLookupAutoClusterFailover::Config&& config) + : config_(std::move(config)), + states_(config_.serviceUrls.size(), PulsarServiceState::Healthy), + counters_(config_.serviceUrls.size(), 0) { + for (const auto& serviceUrl : config_.serviceUrls) { + serviceInfos_.emplace_back(SameAuthParamsLookupAutoClusterFailover::toServiceInfo( + config_.clientConfiguration, serviceUrl)); + } + } + + ~SameAuthParamsLookupAutoClusterFailoverImpl() { + if (!thread_.joinable()) { + return; + } + + cancelTimer(*timer_); + workGuard_.reset(); + ioContext_.stop(); + thread_.join(); + } + + ServiceInfo initialServiceInfo() const { return serviceInfos_.front(); } + + void initialize(std::function&& onServiceInfoUpdate) { + if (thread_.joinable()) { + throw std::logic_error("ServiceInfoProvider has already been initialized"); + } + onServiceInfoUpdate_ = std::move(onServiceInfoUpdate); + workGuard_.emplace(ASIO::make_work_guard(ioContext_)); + timer_.emplace(ioContext_); + + auto weakSelf = weak_from_this(); + ASIO::post(ioContext_, [weakSelf] { + if (auto self = weakSelf.lock()) { + self->scheduleCheck(); + } + }); + + thread_ = std::thread([this] { ioContext_.run(); }); + } + + private: + SameAuthParamsLookupAutoClusterFailover::Config config_; + std::vector serviceInfos_; + std::vector states_; + std::vector counters_; + size_t currentServiceIndex_{0}; + + ASIO::io_context ioContext_; + std::optional> workGuard_; + std::optional timer_; + std::thread thread_; + std::function onServiceInfoUpdate_; + + void scheduleCheck() { + timer_->expires_after(config_.checkHealthyInterval); + auto weakSelf = weak_from_this(); + timer_->async_wait([weakSelf](ASIO_ERROR error) { + if (error) { + return; + } + if (auto self = weakSelf.lock()) { + self->executeCheck(); + self->scheduleCheck(); + } + }); + } + + int firstHealthyPulsarService() const { + for (size_t i = 0; i <= currentServiceIndex_; i++) { + if (states_[i] == PulsarServiceState::Healthy || states_[i] == PulsarServiceState::PreFail) { + return static_cast(i); + } + } + return -1; + } + + int findFailoverTo() { + for (size_t i = currentServiceIndex_ + 1; i < serviceInfos_.size(); i++) { + if (probeAvailable(i)) { + return static_cast(i); + } + + states_[i] = PulsarServiceState::Failed; + counters_[i] = 0; + } + return -1; + } + + void checkPulsarServices() { + for (size_t i = 0; i <= currentServiceIndex_; i++) { + if (probeAvailable(i)) { + handleProbeSuccess(i); + } else { + handleProbeFailure(i); + } + } + } + + void handleProbeSuccess(size_t index) { + switch (states_[index]) { + case PulsarServiceState::Healthy: + break; + case PulsarServiceState::PreFail: + states_[index] = PulsarServiceState::Healthy; + counters_[index] = 0; + break; + case PulsarServiceState::Failed: + states_[index] = PulsarServiceState::PreRecover; + counters_[index] = 1; + break; + case PulsarServiceState::PreRecover: + if (++counters_[index] >= config_.recoverThreshold) { + states_[index] = PulsarServiceState::Healthy; + counters_[index] = 0; + } + break; + } + } + + void handleProbeFailure(size_t index) { + switch (states_[index]) { + case PulsarServiceState::Healthy: + states_[index] = PulsarServiceState::PreFail; + counters_[index] = 1; + break; + case PulsarServiceState::PreFail: + if (++counters_[index] >= config_.failoverThreshold) { + states_[index] = PulsarServiceState::Failed; + counters_[index] = 0; + } + break; + case PulsarServiceState::Failed: + break; + case PulsarServiceState::PreRecover: + states_[index] = PulsarServiceState::Failed; + counters_[index] = 0; + break; + } + } + + bool probeAvailable(size_t index) { + ClientConfiguration probeConfiguration = config_.clientConfiguration; + probeConfiguration.setOperationTimeoutSeconds(3); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(serviceInfos_[index])); + auto executorProvider = std::make_shared(probeConfiguration.getIOThreads()); + ConnectionPool pool(serviceInfo, probeConfiguration, executorProvider, "Pulsar-CPP-lookup-failover"); + BinaryProtoLookupService lookupService(serviceInfos_[index], pool, probeConfiguration); + + auto topicName = TopicName::get(config_.testTopic); + if (!topicName) { + LOG_WARN("Invalid lookup probe topic: " << config_.testTopic); + executorProvider->close(); + pool.close(); + return false; + } + + LookupService::LookupResult lookupResult; + const auto result = lookupService.getBroker(*topicName).get(lookupResult); + pool.close(); + executorProvider->close(); + + if (result == ResultOk) { + LOG_DEBUG("Successfully probed service availability: " << serviceInfos_[index].serviceUrl()); + return true; + } + if (result == ResultTopicNotFound && config_.markTopicNotFoundAsAvailable) { + LOG_DEBUG("Successfully probed service availability with topic not found: " + << serviceInfos_[index].serviceUrl()); + return true; + } + LOG_WARN("Failed to probe service availability: " << serviceInfos_[index].serviceUrl() << " - " + << result); + return false; + } + + void executeCheck() { + checkPulsarServices(); + const int firstHealthy = firstHealthyPulsarService(); + if (firstHealthy == static_cast(currentServiceIndex_)) { + return; + } + if (firstHealthy < 0) { + const int failoverTo = findFailoverTo(); + if (failoverTo >= 0) { + updateServiceInfo(static_cast(failoverTo)); + } + return; + } + updateServiceInfo(static_cast(firstHealthy)); + } + + void updateServiceInfo(size_t targetIndex) { + LOG_INFO("Switch service URL from " << serviceInfos_[currentServiceIndex_].serviceUrl() << " to " + << serviceInfos_[targetIndex].serviceUrl()); + if (targetIndex < currentServiceIndex_) { + for (size_t i = targetIndex + 1; i < states_.size(); i++) { + states_[i] = PulsarServiceState::Healthy; + counters_[i] = 0; + } + } + currentServiceIndex_ = targetIndex; + onServiceInfoUpdate_(serviceInfos_[currentServiceIndex_]); + } +}; + +SameAuthParamsLookupAutoClusterFailover::Config::Config(std::vector serviceUrls, + ClientConfiguration clientConfiguration) + : serviceUrls(std::move(serviceUrls)), clientConfiguration(std::move(clientConfiguration)) { + if (this->serviceUrls.empty()) { + throw std::invalid_argument("serviceUrls cannot be empty"); + } + + std::unordered_set uniqueUrls; + for (const auto& serviceUrl : this->serviceUrls) { + if (isBlank(serviceUrl)) { + throw std::invalid_argument("serviceUrls contains a blank value"); + } + if (startsWithHttp(serviceUrl)) { + throw std::invalid_argument( + "SameAuthParamsLookupAutoClusterFailover does not support HTTP service URLs"); + } + if (!uniqueUrls.insert(serviceUrl).second) { + throw std::invalid_argument("serviceUrls contains duplicated value " + serviceUrl); + } + ServiceURI serviceUri(serviceUrl); + } +} + +SameAuthParamsLookupAutoClusterFailover::Builder::Builder(std::vector serviceUrls, + ClientConfiguration clientConfiguration) + : config_(std::move(serviceUrls), std::move(clientConfiguration)) {} + +SameAuthParamsLookupAutoClusterFailover::Builder& +SameAuthParamsLookupAutoClusterFailover::Builder::withFailoverThreshold(uint32_t threshold) { + if (threshold < 1) { + throw std::invalid_argument("failoverThreshold must be larger than 0"); + } + config_.failoverThreshold = threshold; + return *this; +} + +SameAuthParamsLookupAutoClusterFailover::Builder& +SameAuthParamsLookupAutoClusterFailover::Builder::withRecoverThreshold(uint32_t threshold) { + if (threshold < 1) { + throw std::invalid_argument("recoverThreshold must be larger than 0"); + } + config_.recoverThreshold = threshold; + return *this; +} + +SameAuthParamsLookupAutoClusterFailover::Builder& +SameAuthParamsLookupAutoClusterFailover::Builder::withCheckHealthyInterval( + std::chrono::milliseconds interval) { + if (interval < std::chrono::milliseconds{1}) { + throw std::invalid_argument("checkHealthyInterval must be larger than 0"); + } + config_.checkHealthyInterval = interval; + return *this; +} + +SameAuthParamsLookupAutoClusterFailover::Builder& +SameAuthParamsLookupAutoClusterFailover::Builder::withMarkTopicNotFoundAsAvailable(bool enabled) { + config_.markTopicNotFoundAsAvailable = enabled; + return *this; +} + +SameAuthParamsLookupAutoClusterFailover::Builder& +SameAuthParamsLookupAutoClusterFailover::Builder::withTestTopic(std::string testTopic) { + if (isBlank(testTopic) || !TopicName::get(testTopic)) { + throw std::invalid_argument("testTopic cannot be blank or invalid"); + } + config_.testTopic = std::move(testTopic); + return *this; +} + +SameAuthParamsLookupAutoClusterFailover SameAuthParamsLookupAutoClusterFailover::Builder::build() { + return SameAuthParamsLookupAutoClusterFailover(std::move(config_)); +} + +SameAuthParamsLookupAutoClusterFailover::SameAuthParamsLookupAutoClusterFailover(Config&& config) + : impl_(std::make_shared(std::move(config))) {} + +SameAuthParamsLookupAutoClusterFailover::~SameAuthParamsLookupAutoClusterFailover() {} + +ServiceInfo SameAuthParamsLookupAutoClusterFailover::initialServiceInfo() { + return impl_->initialServiceInfo(); +} + +void SameAuthParamsLookupAutoClusterFailover::initialize( + std::function onServiceInfoUpdate) { + impl_->initialize(std::move(onServiceInfoUpdate)); +} + +ServiceInfo SameAuthParamsLookupAutoClusterFailover::toServiceInfo( + const ClientConfiguration& clientConfiguration, const std::string& serviceUrl) { + return clientConfiguration.impl_->toServiceInfo(serviceUrl); +} + +} // namespace pulsar diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 175c5319..a9b78dad 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -311,6 +312,59 @@ TEST(AutoClusterFailoverTest, testFailoverToAnotherSecondaryWhenCurrentSecondary ASSERT_EQ(updates[2], secondSecondaryUrl); } +TEST(SameAuthParamsLookupAutoClusterFailoverTest, testDefaultConfig) { + SameAuthParamsLookupAutoClusterFailover::Config config{ + {"pulsar://primary:6650", "pulsar://secondary:6650"}}; + + ASSERT_EQ(5u, config.failoverThreshold); + ASSERT_EQ(5u, config.recoverThreshold); + ASSERT_EQ(1000ms, config.checkHealthyInterval); + ASSERT_TRUE(config.markTopicNotFoundAsAvailable); + ASSERT_EQ("public/default/tp_test", config.testTopic); +} + +TEST(SameAuthParamsLookupAutoClusterFailoverTest, testRejectInvalidServiceUrls) { + ASSERT_THROW(SameAuthParamsLookupAutoClusterFailover::Config({}), std::invalid_argument); + ASSERT_THROW(SameAuthParamsLookupAutoClusterFailover::Config({" "}), std::invalid_argument); + ASSERT_THROW(SameAuthParamsLookupAutoClusterFailover::Config({"http://localhost:8080"}), + std::invalid_argument); + ASSERT_THROW(SameAuthParamsLookupAutoClusterFailover::Config( + {"pulsar://localhost:6650", "pulsar://localhost:6650"}), + std::invalid_argument); +} + +TEST(SameAuthParamsLookupAutoClusterFailoverTest, testInitialServiceInfoUsesSameAuthParams) { + auto auth = AuthToken::createWithToken("token"); + ClientConfiguration clientConfiguration; + clientConfiguration.setAuth(auth).setTlsTrustCertsFilePath("/path/to/ca.pem"); + + auto provider = SameAuthParamsLookupAutoClusterFailover::Builder( + {"pulsar+ssl://primary:6651", "pulsar+ssl://secondary:6651"}, clientConfiguration) + .withFailoverThreshold(2) + .withRecoverThreshold(3) + .withCheckHealthyInterval(20ms) + .withMarkTopicNotFoundAsAvailable(false) + .withTestTopic("public/default/probe") + .build(); + + const auto serviceInfo = provider.initialServiceInfo(); + ASSERT_EQ("pulsar+ssl://primary:6651", serviceInfo.serviceUrl()); + ASSERT_TRUE(serviceInfo.useTls()); + ASSERT_EQ(auth, serviceInfo.authentication()); + ASSERT_TRUE(serviceInfo.tlsTrustCertsFilePath().has_value()); + ASSERT_EQ("/path/to/ca.pem", *serviceInfo.tlsTrustCertsFilePath()); +} + +TEST(SameAuthParamsLookupAutoClusterFailoverTest, testRejectInvalidBuilderOptions) { + SameAuthParamsLookupAutoClusterFailover::Builder builder({"pulsar://localhost:6650"}); + + ASSERT_THROW(builder.withFailoverThreshold(0), std::invalid_argument); + ASSERT_THROW(builder.withRecoverThreshold(0), std::invalid_argument); + ASSERT_THROW(builder.withCheckHealthyInterval(0ms), std::invalid_argument); + ASSERT_THROW(builder.withTestTopic(" "), std::invalid_argument); + ASSERT_THROW(builder.withTestTopic("://invalid"), std::invalid_argument); +} + TEST(ServiceInfoProviderTest, testSwitchCluster) { extern std::string getToken(); // from tests/AuthTokenTest.cc // Access "private/auth" namespace in cluster 1