Skip to content

Commit df6871e

Browse files
Add AutoClusterFailover and ServiceInfoProvider (#295)
1 parent 3138051 commit df6871e

File tree

6 files changed

+661
-21
lines changed

6 files changed

+661
-21
lines changed

pulsar/__init__.py

Lines changed: 200 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,159 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
569569
_check_type(str, method, 'method')
570570
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)
571571

572+
573+
class ServiceInfoProvider:
574+
"""
575+
Base class for Python-defined service discovery and failover providers.
576+
577+
Subclasses must return the initial :class:`ServiceInfo` and may keep the
578+
provided update callback to push later service changes into the client.
579+
"""
580+
581+
def initial_service_info(self) -> "ServiceInfo":
582+
raise NotImplementedError
583+
584+
def initialize(self, on_service_info_update: Callable[["ServiceInfo"], None]) -> None:
585+
raise NotImplementedError
586+
587+
def close(self) -> None:
588+
"""
589+
Stop background work and release resources.
590+
591+
This is invoked when the underlying C++ client destroys the provider,
592+
typically during :meth:`Client.close`.
593+
"""
594+
return None
595+
596+
597+
class ServiceInfo:
598+
"""
599+
Connection information for one Pulsar cluster endpoint.
600+
601+
This is primarily used with :class:`AutoClusterFailover`.
602+
"""
603+
604+
def __init__(self,
605+
service_url: str,
606+
authentication: Optional[Authentication] = None,
607+
tls_trust_certs_file_path: Optional[str] = None):
608+
"""
609+
Create a service info entry.
610+
611+
Parameters
612+
----------
613+
service_url: str
614+
The Pulsar service URL for this cluster.
615+
authentication: Authentication, optional
616+
Authentication to use when connecting to this cluster.
617+
tls_trust_certs_file_path: str, optional
618+
Trust store path for TLS connections to this cluster.
619+
"""
620+
_check_type(str, service_url, 'service_url')
621+
_check_type_or_none(Authentication, authentication, 'authentication')
622+
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
623+
624+
self._authentication = authentication
625+
self._service_info = _pulsar.ServiceInfo(
626+
service_url,
627+
authentication.auth if authentication else None,
628+
tls_trust_certs_file_path,
629+
)
630+
631+
@property
632+
def service_url(self) -> str:
633+
return self._service_info.service_url
634+
635+
@property
636+
def use_tls(self) -> bool:
637+
return self._service_info.use_tls
638+
639+
@property
640+
def tls_trust_certs_file_path(self) -> Optional[str]:
641+
return self._service_info.tls_trust_certs_file_path
642+
643+
def __repr__(self) -> str:
644+
return (
645+
"ServiceInfo("
646+
f"service_url={self.service_url!r}, "
647+
f"use_tls={self.use_tls!r}, "
648+
f"tls_trust_certs_file_path={self.tls_trust_certs_file_path!r})"
649+
)
650+
651+
@classmethod
652+
def wrap(cls, service_info: _pulsar.ServiceInfo):
653+
self = cls.__new__(cls)
654+
self._authentication = None
655+
self._service_info = service_info
656+
return self
657+
658+
659+
class AutoClusterFailover:
660+
"""
661+
Cluster-level automatic failover configuration for :class:`Client`.
662+
"""
663+
664+
def __init__(self,
665+
primary: ServiceInfo,
666+
secondary: List[ServiceInfo],
667+
check_interval_ms: int = 5000,
668+
failover_threshold: int = 1,
669+
switch_back_threshold: int = 1):
670+
"""
671+
Create an automatic failover configuration.
672+
673+
Parameters
674+
----------
675+
primary: ServiceInfo
676+
The preferred cluster to use.
677+
secondary: list[ServiceInfo]
678+
Ordered fallback clusters to probe when the primary becomes unavailable.
679+
check_interval_ms: int, default=5000
680+
Probe interval in milliseconds.
681+
failover_threshold: int, default=1
682+
Number of consecutive probe failures required before failover.
683+
switch_back_threshold: int, default=1
684+
Number of consecutive successful primary probes required before switching back.
685+
"""
686+
_check_type(ServiceInfo, primary, 'primary')
687+
_check_type(list, secondary, 'secondary')
688+
_check_type(int, check_interval_ms, 'check_interval_ms')
689+
_check_type(int, failover_threshold, 'failover_threshold')
690+
_check_type(int, switch_back_threshold, 'switch_back_threshold')
691+
692+
if not secondary:
693+
raise ValueError("Argument secondary is expected to contain at least one ServiceInfo")
694+
695+
for index, service_info in enumerate(secondary):
696+
if not isinstance(service_info, ServiceInfo):
697+
raise ValueError(
698+
"Argument secondary[%d] is expected to be of type 'ServiceInfo' and not '%s'"
699+
% (index, type(service_info).__name__)
700+
)
701+
702+
if check_interval_ms <= 0:
703+
raise ValueError("Argument check_interval_ms is expected to be greater than 0")
704+
if failover_threshold <= 0:
705+
raise ValueError("Argument failover_threshold is expected to be greater than 0")
706+
if switch_back_threshold <= 0:
707+
raise ValueError("Argument switch_back_threshold is expected to be greater than 0")
708+
709+
self.primary = primary
710+
self.secondary = list(secondary)
711+
self.check_interval_ms = check_interval_ms
712+
self.failover_threshold = failover_threshold
713+
self.switch_back_threshold = switch_back_threshold
714+
715+
def __repr__(self) -> str:
716+
return (
717+
"AutoClusterFailover("
718+
f"primary={self.primary!r}, "
719+
f"secondary={self.secondary!r}, "
720+
f"check_interval_ms={self.check_interval_ms!r}, "
721+
f"failover_threshold={self.failover_threshold!r}, "
722+
f"switch_back_threshold={self.switch_back_threshold!r})"
723+
)
724+
572725
class ConsumerDeadLetterPolicy:
573726
"""
574727
Configuration for the "dead letter queue" feature in consumer.
@@ -681,8 +834,9 @@ def __init__(self, service_url,
681834
Parameters
682835
----------
683836
684-
service_url: str
685-
The Pulsar service url eg: pulsar://my-broker.com:6650/
837+
service_url: str or AutoClusterFailover or ServiceInfoProvider
838+
The Pulsar service URL, for example ``pulsar://my-broker.com:6650/``, or an
839+
:class:`AutoClusterFailover` or :class:`ServiceInfoProvider` configuration.
686840
authentication: Authentication, optional
687841
Set the authentication provider to be used with the broker. Supported methods:
688842
@@ -743,7 +897,26 @@ def __init__(self, service_url,
743897
tls_certificate_file_path: str, optional
744898
The path to the TLS certificate file.
745899
"""
746-
_check_type(str, service_url, 'service_url')
900+
if not isinstance(service_url, (str, AutoClusterFailover, ServiceInfoProvider)):
901+
raise ValueError(
902+
"Argument service_url is expected to be of type 'str', 'AutoClusterFailover' or "
903+
"'ServiceInfoProvider'"
904+
)
905+
906+
if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and authentication is not None:
907+
raise ValueError(
908+
"Argument authentication is not supported when service_url is an AutoClusterFailover or "
909+
"ServiceInfoProvider; set authentication on each ServiceInfo instead"
910+
)
911+
912+
if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and \
913+
tls_trust_certs_file_path is not None:
914+
raise ValueError(
915+
"Argument tls_trust_certs_file_path is not supported when service_url is an "
916+
"AutoClusterFailover or ServiceInfoProvider; set tls_trust_certs_file_path on each "
917+
"ServiceInfo instead"
918+
)
919+
747920
_check_type_or_none(Authentication, authentication, 'authentication')
748921
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
749922
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
@@ -792,7 +965,24 @@ def __init__(self, service_url,
792965
conf.tls_private_key_file_path(tls_private_key_file_path)
793966
if tls_certificate_file_path is not None:
794967
conf.tls_certificate_file_path(tls_certificate_file_path)
795-
self._client = _pulsar.Client(service_url, conf)
968+
if isinstance(service_url, AutoClusterFailover):
969+
self._client = _pulsar.Client.create_auto_cluster_failover(
970+
service_url.primary._service_info,
971+
[service_info._service_info for service_info in service_url.secondary],
972+
service_url.check_interval_ms,
973+
service_url.failover_threshold,
974+
service_url.switch_back_threshold,
975+
conf,
976+
)
977+
elif isinstance(service_url, ServiceInfoProvider):
978+
try:
979+
self._client = _pulsar.Client.create_service_info_provider(service_url, conf)
980+
except RuntimeError as e:
981+
if str(e) == "Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance":
982+
raise ValueError(str(e))
983+
raise
984+
else:
985+
self._client = _pulsar.Client(service_url, conf)
796986
self._consumers = []
797987

798988
@staticmethod
@@ -1417,6 +1607,12 @@ def get_topic_partitions(self, topic):
14171607
_check_type(str, topic, 'topic')
14181608
return self._client.get_topic_partitions(topic)
14191609

1610+
def get_service_info(self) -> ServiceInfo:
1611+
"""
1612+
Get the current service info used by this client.
1613+
"""
1614+
return ServiceInfo.wrap(self._client.get_service_info())
1615+
14201616
def shutdown(self):
14211617
"""
14221618
Perform immediate shutdown of Pulsar client.

src/client.cc

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,70 @@
1818
*/
1919
#include "utils.h"
2020

21+
#include <pulsar/AutoClusterFailover.h>
22+
#include <pulsar/ServiceInfoProvider.h>
23+
#include <chrono>
24+
#include <memory>
2125
#include <pybind11/functional.h>
2226
#include <pybind11/pybind11.h>
2327
#include <pybind11/stl.h>
2428

2529
namespace py = pybind11;
2630

31+
static ServiceInfo unwrapPythonServiceInfo(const py::handle& object) {
32+
auto serviceInfoObject = py::reinterpret_borrow<py::object>(object);
33+
34+
try {
35+
return serviceInfoObject.cast<ServiceInfo>();
36+
} catch (const py::cast_error&) {
37+
}
38+
39+
if (py::hasattr(serviceInfoObject, "_service_info")) {
40+
try {
41+
return serviceInfoObject.attr("_service_info").cast<ServiceInfo>();
42+
} catch (const py::cast_error&) {
43+
}
44+
}
45+
46+
throw py::value_error("Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance");
47+
}
48+
49+
class PythonServiceInfoProvider : public ServiceInfoProvider {
50+
public:
51+
explicit PythonServiceInfoProvider(py::object provider) : provider_(std::move(provider)) {}
52+
53+
~PythonServiceInfoProvider() override {
54+
if (!Py_IsInitialized()) {
55+
return;
56+
}
57+
58+
py::gil_scoped_acquire acquire;
59+
try {
60+
if (py::hasattr(provider_, "close")) {
61+
provider_.attr("close")();
62+
}
63+
} catch (const py::error_already_set&) {
64+
PyErr_Print();
65+
}
66+
}
67+
68+
ServiceInfo initialServiceInfo() override {
69+
py::gil_scoped_acquire acquire;
70+
return unwrapPythonServiceInfo(provider_.attr("initial_service_info")());
71+
}
72+
73+
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
74+
py::gil_scoped_acquire acquire;
75+
provider_.attr("initialize")(py::cpp_function(
76+
[onServiceInfoUpdate = std::move(onServiceInfoUpdate)](py::object serviceInfo) mutable {
77+
onServiceInfoUpdate(unwrapPythonServiceInfo(serviceInfo));
78+
}));
79+
}
80+
81+
private:
82+
py::object provider_;
83+
};
84+
2785
Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
2886
return waitForAsyncValue<Producer>(
2987
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
@@ -65,7 +123,8 @@ std::vector<std::string> Client_getTopicPartitions(Client& client, const std::st
65123
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
66124
}
67125

68-
void Client_getTopicPartitionsAsync(Client &client, const std::string& topic, GetPartitionsCallback callback) {
126+
void Client_getTopicPartitionsAsync(Client& client, const std::string& topic,
127+
GetPartitionsCallback callback) {
69128
py::gil_scoped_release release;
70129
client.getPartitionsForTopicAsync(topic, callback);
71130
}
@@ -76,6 +135,25 @@ SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_
76135
});
77136
}
78137

138+
std::shared_ptr<Client> Client_createAutoClusterFailover(ServiceInfo primary,
139+
std::vector<ServiceInfo> secondary,
140+
int64_t checkIntervalMs, uint32_t failoverThreshold,
141+
uint32_t switchBackThreshold,
142+
const ClientConfiguration& conf) {
143+
AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary), std::move(secondary));
144+
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);
145+
autoClusterFailoverConfig.failoverThreshold = failoverThreshold;
146+
autoClusterFailoverConfig.switchBackThreshold = switchBackThreshold;
147+
return std::make_shared<Client>(
148+
Client::create(std::make_unique<AutoClusterFailover>(std::move(autoClusterFailoverConfig)), conf));
149+
}
150+
151+
std::shared_ptr<Client> Client_createServiceInfoProvider(py::object provider,
152+
const ClientConfiguration& conf) {
153+
return std::make_shared<Client>(
154+
Client::create(std::make_unique<PythonServiceInfoProvider>(std::move(provider)), conf));
155+
}
156+
79157
void Client_close(Client& client) {
80158
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
81159
}
@@ -108,19 +186,25 @@ void Client_subscribeAsync_pattern(Client& client, const std::string& topic_patt
108186
void export_client(py::module_& m) {
109187
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
110188
.def(py::init<const std::string&, const ClientConfiguration&>())
189+
.def_static("create_auto_cluster_failover", &Client_createAutoClusterFailover, py::arg("primary"),
190+
py::arg("secondary"), py::arg("check_interval_ms"), py::arg("failover_threshold"),
191+
py::arg("switch_back_threshold"), py::arg("client_configuration"))
192+
.def_static("create_service_info_provider", &Client_createServiceInfoProvider, py::arg("provider"),
193+
py::arg("client_configuration"))
111194
.def("create_producer", &Client_createProducer)
112195
.def("create_producer_async", &Client_createProducerAsync)
113196
.def("subscribe", &Client_subscribe)
114197
.def("subscribe_topics", &Client_subscribe_topics)
115198
.def("subscribe_pattern", &Client_subscribe_pattern)
116199
.def("create_reader", &Client_createReader)
117-
.def("create_table_view", [](Client& client, const std::string& topic,
118-
const TableViewConfiguration& config) {
119-
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
120-
client.createTableViewAsync(topic, config, callback);
121-
});
122-
})
200+
.def("create_table_view",
201+
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
202+
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
203+
client.createTableViewAsync(topic, config, callback);
204+
});
205+
})
123206
.def("get_topic_partitions", &Client_getTopicPartitions)
207+
.def("get_service_info", &Client::getServiceInfo)
124208
.def("get_schema_info", &Client_getSchemaInfo)
125209
.def("close", &Client_close)
126210
.def("close_async", &Client_closeAsync)

0 commit comments

Comments
 (0)