diff options
Diffstat (limited to 'channel.hh')
-rw-r--r-- | channel.hh | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/channel.hh b/channel.hh new file mode 100644 index 0000000..a7ba0ee --- /dev/null +++ b/channel.hh @@ -0,0 +1,356 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once +#include <memory> +#include <optional> + +#include "misc.hh" + +/* g++ defines __SANITIZE_THREAD__ + clang++ supports the nice __has_feature(thread_sanitizer), + let's merge them */ +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define __SANITIZE_THREAD__ 1 +#endif +#endif + +#if __SANITIZE_THREAD__ +#if defined __has_include +#if __has_include(<sanitizer/tsan_interface.h>) +#include <sanitizer/tsan_interface.h> +#else /* __has_include(<sanitizer/tsan_interface.h>) */ +extern "C" void __tsan_acquire(void* addr); +extern "C" void __tsan_release(void* addr); +#endif /* __has_include(<sanitizer/tsan_interface.h>) */ +#else /* defined __has_include */ +extern "C" void __tsan_acquire(void* addr); +extern "C" void __tsan_release(void* addr); +#endif /* defined __has_include */ +#endif /* __SANITIZE_THREAD__ */ + +namespace pdns +{ +namespace channel +{ + enum class SenderBlockingMode + { + SenderNonBlocking, + SenderBlocking + }; + enum class ReceiverBlockingMode + { + ReceiverNonBlocking, + ReceiverBlocking + }; + + /** + * The sender's end of a channel used to pass objects between threads. + * + * A sender can be used by several threads in a safe way. + */ + template <typename T, typename D = std::default_delete<T>> + class Sender + { + public: + Sender() = default; + Sender(FDWrapper&& descriptor) : + d_fd(std::move(descriptor)) + { + } + Sender(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = default; + Sender& operator=(Sender&&) = default; + ~Sender() = default; + /** + * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode. + * + * \return True if the object was properly sent, False if the channel is full. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + bool send(std::unique_ptr<T, D>&&) const; + void close(); + + private: + FDWrapper d_fd; + }; + + /** + * The receiver's end of a channel used to pass objects between threads. + * + * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen. + */ + template <typename T, typename D = std::default_delete<T>> + class Receiver + { + public: + Receiver() = default; + Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) : + d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF) + { + } + Receiver(const Receiver&) = delete; + Receiver& operator=(const Receiver&) = delete; + Receiver(Receiver&&) = default; + Receiver& operator=(Receiver&&) = default; + ~Receiver() = default; + /** + * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode. + * + * \return An object if one was available, and std::nullopt otherwise. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + std::optional<std::unique_ptr<T, D>> receive(); + std::optional<std::unique_ptr<T, D>> receive(D deleter); + + /** + * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. + * + * \return A valid descriptor or -1 if the Receiver was not properly initialized. + */ + int getDescriptor() const + { + return d_fd.getHandle(); + } + /** + * \brief Whether the remote end has closed the channel. + */ + bool isClosed() const + { + return d_closed; + } + + private: + FDWrapper d_fd; + bool d_closed{false}; + bool d_throwOnEOF{true}; + }; + + /** + * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers. + * + * \return A pair of Sender and Receiver objects. + * + * \throw runtime_error if the channel creation failed. + */ + template <typename T, typename D = std::default_delete<T>> + std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true); + + /** + * The notifier's end of a channel used to communicate between threads. + * + * A notifier can be used by several threads in a safe way. + */ + class Notifier + { + public: + Notifier() = default; + Notifier(FDWrapper&&); + Notifier(const Notifier&) = delete; + Notifier& operator=(const Notifier&) = delete; + Notifier(Notifier&&) = default; + Notifier& operator=(Notifier&&) = default; + ~Notifier() = default; + + /** + * \brief Queue a notification to wake up the other end of the channel. + * + * \return True if the notification was properly sent, False if the channel is full. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + bool notify() const; + + private: + FDWrapper d_fd; + }; + + /** + * The waiter's end of a channel used to communicate between threads. + * + * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen. + */ + class Waiter + { + public: + Waiter() = default; + Waiter(FDWrapper&&, bool throwOnEOF = true); + Waiter(const Waiter&) = delete; + Waiter& operator=(const Waiter&) = delete; + Waiter(Waiter&&) = default; + Waiter& operator=(Waiter&&) = default; + ~Waiter() = default; + + /** + * \brief Clear all notifications queued on that channel, if any. + */ + void clear(); + /** + * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive. + * + * \return A valid descriptor or -1 if the Waiter was not properly initialized. + */ + int getDescriptor() const; + /** + * \brief Whether the remote end has closed the channel. + */ + bool isClosed() const + { + return d_closed; + } + + private: + FDWrapper d_fd; + bool d_closed{false}; + bool d_throwOnEOF{true}; + }; + + /** + * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers. + * + * \return A pair of Notifier and Sender objects. + * + * \throw runtime_error if the channel creation failed. + */ + std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); + + template <typename T, typename D> + bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const + { + /* we cannot touch the initial unique pointer after writing to the pipe, + not even to release it, so let's transfer it to a local object */ + auto localObj = std::move(object); + auto ptr = localObj.get(); + static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); + while (true) { +#if __SANITIZE_THREAD__ + __tsan_release(ptr); +#endif /* __SANITIZE_THREAD__ */ + ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); + + if (sent == sizeof(ptr)) { + // coverity[leaked_storage] + localObj.release(); + return true; + } + else if (sent == 0) { +#if __SANITIZE_THREAD__ + __tsan_acquire(ptr); +#endif /* __SANITIZE_THREAD__ */ + throw std::runtime_error("Unable to write to channel: remote end has been closed"); + } + else { +#if __SANITIZE_THREAD__ + __tsan_acquire(ptr); +#endif /* __SANITIZE_THREAD__ */ + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + object = std::move(localObj); + return false; + } + else { + throw std::runtime_error("Unable to write to channel:" + stringerror()); + } + } + } + } + + template <typename T, typename D> + void Sender<T, D>::close() + { + d_fd.reset(); + } + + template <typename T, typename D> + std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() + { + return receive(D()); + } + + template <typename T, typename D> + std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) + { + while (true) { + std::optional<std::unique_ptr<T, D>> result; + T* objPtr{nullptr}; + ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr)); + if (got == sizeof(objPtr)) { +#if __SANITIZE_THREAD__ + __tsan_acquire(objPtr); +#endif /* __SANITIZE_THREAD__ */ + return std::unique_ptr<T, D>(objPtr, deleter); + } + else if (got == 0) { + d_closed = true; + if (!d_throwOnEOF) { + return result; + } + throw std::runtime_error("EOF while reading from Channel receiver"); + } + else if (got == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return result; + } + throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); + } + else { + throw std::runtime_error("Partial read from Channel receiver"); + } + } + } + + template <typename T, typename D> + std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF) + { + int fds[2] = {-1, -1}; + if (pipe(fds) < 0) { + throw std::runtime_error("Error creating channel pipe: " + stringerror()); + } + + FDWrapper sender(fds[1]); + FDWrapper receiver(fds[0]); + if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) { + int err = errno; + throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); + } + + if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) { + int err = errno; + throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); + } + + if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) { + setPipeBufferSize(receiver.getHandle(), pipeBufferSize); + } + + return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)}; + } +} +} |