Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,42 @@ target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShar
target_link_libraries(SampleKeyValueSchemaConsumer ${CLIENT_LIBS} pulsarShared)
target_link_libraries(SampleKeyValueSchemaProducer ${CLIENT_LIBS} pulsarShared)
target_link_libraries(SampleCustomLoggerCApi ${CLIENT_LIBS} pulsarShared)

# --- Scalable topics (pulsar::st) examples ---------------------------------
# These use the new typed scalable-topics API under include/pulsar/st. Its
# implementation (lib/st) does not exist yet, so the examples are COMPILED here
# for header/API verification but are NOT linked into executables (there are no
# symbols to link against). Building this OBJECT library on every build keeps the
# examples from bit-rotting while the API is reviewed.
#
# TODO(scalable-topics): once lib/st lands, replace this with one
# add_executable + target_link_libraries(... pulsarShared) per file, exactly like
# the samples above.
# The core samples are header-only previews of the pulsar::st API and build with
# no extra dependency.
set(SAMPLE_ST_SOURCES
st/SampleStProducer.cc
st/SampleStStreamConsumer.cc
st/SampleStQueueConsumer.cc
st/SampleStCheckpointConsumer.cc
)
# reflect-cpp powers jsonSchema<T>() (reflection-based JSON SerDe + schema). It is
# optional for this API-only PR: when the package is present the JSON sample is
# added and linked against it; when absent, only that one sample is skipped. (The
# reflectcpp vcpkg port does not yet ship an Avro backend, so it is not yet wired
# into the manifest; it will be added with the lib/st implementation.)
find_package(reflectcpp CONFIG QUIET)
if (reflectcpp_FOUND)
list(APPEND SAMPLE_ST_SOURCES st/SampleStJsonSchema.cc)
endif ()

add_library(StExamples OBJECT ${SAMPLE_ST_SOURCES})
# The scalable-topics (pulsar::st) API targets C++20; the rest of the client stays
# C++17. Set the standard per-target so only this code requires C++20.
set_target_properties(StExamples PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON)
# PRIVATE link gives the object sources pulsarShared's include directories; an
# OBJECT library is not itself linked, so the missing lib/st symbols are fine.
target_link_libraries(StExamples PRIVATE ${CLIENT_LIBS} pulsarShared)
if (reflectcpp_FOUND)
target_link_libraries(StExamples PRIVATE reflectcpp::reflectcpp)
endif ()
40 changes: 40 additions & 0 deletions examples/st/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Scalable Topics (`pulsar::st`) — API preview examples

These examples exercise the new typed scalable-topics C++ API under
[`include/pulsar/st/`](../../include/pulsar/st). They illustrate the proposed
surface and exist to gather community feedback.

> **Status: API definition only.** The implementation (`lib/st/`) does not exist
> yet, so these examples **compile but do not yet link**. They are wired into the
> CMake build as a compile-only `OBJECT` library (`StExamples` in
> [`examples/CMakeLists.txt`](../CMakeLists.txt)) — header-verified on every build,
> but not linked. Once `lib/st` lands they become normal `add_executable` targets.

The `pulsar::st` API requires **C++20** (the rest of the client stays C++17).
Syntax-check an example against the headers (no linking):

```sh
clang++ -std=c++20 -I ../../include -Wall -fsyntax-only SampleStProducer.cc
```

| File | Shows |
|---|---|
| `SampleStProducer.cc` | blocking + asynchronous publishing, transactions |
| `SampleStStreamConsumer.cc` | ordered (per-key) delivery, cumulative ack |
| `SampleStQueueConsumer.cc` | parallel delivery, individual ack + nack, dead-letter |
| `SampleStCheckpointConsumer.cc`| externally held position via `Checkpoint` |
| `SampleStJsonSchema.cc` | a struct as JSON with zero boilerplate (`jsonSchema<T>()`, reflect-cpp) |

## API at a glance

- **Typed builders** off one `PulsarClient`: `newProducer` / `newStreamConsumer` /
`newQueueConsumer` / `newCheckpointConsumer`, each taking a `Schema<T>`.
- **Synchronous calls return `Expected<T>`** (a stand-in for `std::expected`,
which is C++23): check it, or call `.value()` to throw `ClientException`.
`Expected<T>` is `[[nodiscard]]`, so a failure cannot be silently dropped.
- **Asynchronous calls return `Future<T>`**: `addListener(...)` to react on
completion without blocking, `get()` to block, or `co_await` it.
- **Schemas**: primitives are built in; structured types use `jsonSchema<T>()` /
`avroSchema<T>()` (reflect-cpp derives the SerDe **and** the declared schema from
the struct — no boilerplate), `protobufNativeSchema<T>()`, or a custom
`Schema<T>(serde)`. reflect-cpp is a required dependency of `pulsar::st`.
69 changes: 69 additions & 0 deletions examples/st/SampleStCheckpointConsumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Scalable-topics CheckpointConsumer: the application owns the position. Read,
// snapshot a Checkpoint, persist it externally, and later resume from it.

#include <pulsar/st/Client.h>

#include <iostream>
#include <string>

using namespace pulsar::st;

int main() {
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << "failed to build client: " << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

// Restore from a previously stored checkpoint if you have one; else start at
// the earliest message. (Checkpoint::fromByteArray(savedBytes) to resume.)
auto consumerResult = client.newCheckpointConsumer(Schema<std::string>{})
.topic("topic://public/default/orders")
.startPosition(Checkpoint::earliest())
.create(); // NOTE: create(), not subscribe()
if (!consumerResult) {
std::cerr << "failed to create consumer: " << consumerResult.error() << "\n";
return 1;
}
CheckpointConsumer<std::string> consumer = std::move(consumerResult).value();

for (int i = 0; i < 5; i++) {
auto msg = consumer.receive(std::chrono::seconds(5));
if (!msg) {
if (msg.error().result == ResultTimeout) break;
std::cerr << "receive failed: " << msg.error() << "\n";
break;
}
std::cout << "read: " << msg->value() << "\n";
}

// Atomic position snapshot across all segments. Store the bytes yourself
// (Flink/Spark state backend, a file, etc.) — there is no broker-side cursor.
Checkpoint checkpoint = consumer.checkpoint();
std::string persisted = checkpoint.toByteArray();
std::cout << "checkpoint is " << persisted.size() << " bytes\n";

(void)consumer.close();
(void)client.close();
return 0;
}
85 changes: 85 additions & 0 deletions examples/st/SampleStJsonSchema.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Passing a struct as JSON: `jsonSchema<T>()` derives both the SerDe and the
// declared schema from the struct's fields (via reflect-cpp) — NO macros, NO base
// class, NO schema string, NO serializer. Nested structs and containers included.
// `avroSchema<T>()` is identical for Avro.

#include <pulsar/st/Client.h>
#include <pulsar/st/JsonSchema.h>

#include <iostream>
#include <string>
#include <vector>

// Plain value types — that is the entire schema "declaration".
struct Address {
std::string street;
std::string city;
};
struct Order {
std::string orderId;
int quantity;
double unitPrice;
Address shipTo; // nested struct — handled automatically
std::vector<std::string> tags; // container — handled automatically
};

using namespace pulsar::st;

int main() {
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

auto producerResult =
client.newProducer(jsonSchema<Order>()).topic("topic://public/default/orders").create();
if (!producerResult) {
std::cerr << producerResult.error() << "\n";
return 1;
}
Producer<Order> producer = std::move(producerResult).value();

Order order{"ord-1", 3, 9.99, {"1 Main St", "Springfield"}, {"priority", "gift"}};
if (auto sent = producer.send(order); sent) {
std::cout << "sent " << *sent << "\n";
}

auto consumerResult = client.newStreamConsumer(jsonSchema<Order>())
.topic("topic://public/default/orders")
.subscriptionName("orders-sub")
.subscribe();
if (consumerResult) {
StreamConsumer<Order> consumer = std::move(consumerResult).value();
if (auto msg = consumer.receive(std::chrono::seconds(5))) {
Order received = msg->value(); // decoded straight back into the struct
std::cout << received.orderId << " -> " << received.shipTo.city << "\n";
consumer.acknowledgeCumulative(msg->id());
}
(void)consumer.close();
}

(void)producer.close();
(void)client.close();
return 0;
}
94 changes: 94 additions & 0 deletions examples/st/SampleStProducer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Scalable-topics producer: blocking and asynchronous publishing.

#include <pulsar/st/Client.h>

#include <iostream>

using namespace pulsar::st;

int main() {
// One client per application; keep it for the whole lifetime.
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << "failed to build client: " << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

auto producerResult = client.newProducer(Schema<std::string>{})
.topic("topic://public/default/orders")
.sendTimeout(std::chrono::seconds(30))
.create();
if (!producerResult) {
std::cerr << "failed to create producer: " << producerResult.error() << "\n";
return 1;
}
Producer<std::string> producer = std::move(producerResult).value();

// Blocking send: returns Expected<MessageId> (must be checked — [[nodiscard]]).
for (int i = 0; i < 10; i++) {
auto sent = producer.newMessage()
.key("order-" + std::to_string(i % 4)) // per-key ordering
.value("payload-" + std::to_string(i))
.property("attempt", "1")
.send();
if (sent) {
std::cout << "sent " << *sent << "\n";
} else {
std::cerr << "send failed: " << sent.error() << "\n";
}
}

// Asynchronous send: react on completion without blocking.
producer.newMessage()
.key("order-async")
.value("async-payload")
.sendAsync()
.addListener([](const Expected<MessageId>& result) {
if (result) {
std::cout << "async sent " << *result << "\n";
} else {
std::cerr << "async send failed: " << result.error() << "\n";
}
});

// Transaction: produced messages become visible atomically on commit.
if (auto txnResult = client.newTransaction()) {
Transaction txn = *txnResult;
auto a = producer.newMessage().value("tx-a").transaction(txn).send();
auto b = producer.newMessage().value("tx-b").transaction(txn).send();
if (a && b) {
if (auto committed = txn.commit(); !committed) {
std::cerr << "commit failed: " << committed.error() << "\n";
}
} else {
(void)txn.abort();
}
}

(void)producer.flush(); // await all sends issued before this call
if (auto closed = producer.close(); !closed) {
std::cerr << "close failed: " << closed.error() << "\n";
}
(void)client.close();
return 0;
}
Loading
Loading