summaryrefslogtreecommitdiffstats
path: root/remote_logger.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 21:11:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 21:11:59 +0000
commit3cd01b932e1c85394272ae64fae67ebeda92fb00 (patch)
treec5a3115d710afc1879ddea5349362a2bc651733c /remote_logger.cc
parentInitial commit. (diff)
downloaddnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.tar.xz
dnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.zip
Adding upstream version 1.8.3.upstream/1.8.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'remote_logger.cc')
-rw-r--r--remote_logger.cc257
1 files changed, 257 insertions, 0 deletions
diff --git a/remote_logger.cc b/remote_logger.cc
new file mode 100644
index 0000000..94a8a94
--- /dev/null
+++ b/remote_logger.cc
@@ -0,0 +1,257 @@
+#include <unistd.h>
+#include "threadname.hh"
+#include "remote_logger.hh"
+#include <sys/uio.h>
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#ifdef PDNS_CONFIG_ARGS
+#include "logger.hh"
+#define WE_ARE_RECURSOR
+#else
+#include "dolog.hh"
+#endif
+#include "logging.hh"
+
+bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
+{
+ if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool CircularWriteBuffer::write(const std::string& str)
+{
+ if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
+ return false;
+ }
+
+ uint16_t len = htons(str.size());
+ const char* ptr = reinterpret_cast<const char*>(&len);
+ d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+ d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+
+ return true;
+}
+
+bool CircularWriteBuffer::flush(int fd)
+{
+ if (d_buffer.empty()) {
+ // not optional, we report EOF otherwise
+ return false;
+ }
+
+ auto arr1 = d_buffer.array_one();
+ auto arr2 = d_buffer.array_two();
+
+ struct iovec iov[2];
+ int pos = 0;
+ for(const auto& arr : {arr1, arr2}) {
+ if(arr.second) {
+ iov[pos].iov_base = arr.first;
+ iov[pos].iov_len = arr.second;
+ ++pos;
+ }
+ }
+
+ ssize_t res = 0;
+ do {
+ res = writev(fd, iov, pos);
+
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("Couldn't flush a thing: " + stringerror());
+ }
+ else if (!res) {
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("EOF");
+ }
+ }
+ while (res < 0);
+
+ if (static_cast<size_t>(res) == d_buffer.size()) {
+ d_buffer.clear();
+ }
+ else {
+ while (res--) {
+ d_buffer.pop_front();
+ }
+ }
+
+ return true;
+}
+
+const std::string& RemoteLoggerInterface::toErrorString(Result r)
+{
+ static const std::array<std::string,5> str = {
+ "Queued",
+ "Queue full, dropping",
+ "Not sending too large protobuf message",
+ "Submiting to queue failed",
+ "?"
+ };
+ auto i = static_cast<unsigned int>(r);
+ return str[std::min(i, 4U)];
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
+{
+ if (!d_asyncConnect) {
+ reconnect();
+ }
+
+ d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+}
+
+bool RemoteLogger::reconnect()
+{
+ try {
+ auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+ newSock->setNonBlocking();
+ newSock->connect(d_remote, d_timeout);
+
+ {
+ /* we are now successfully connected, time to take the lock and update the
+ socket */
+ auto runtime = d_runtime.lock();
+ runtime->d_socket = std::move(newSock);
+ }
+ }
+ catch (const std::exception& e) {
+#ifdef WE_ARE_RECURSOR
+ SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
+ g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connection to remote logger", "address", Logging::Loggable(d_remote)));
+#else
+ warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
+#endif
+
+ return false;
+ }
+ return true;
+}
+
+RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
+{
+ auto runtime = d_runtime.lock();
+
+ if (data.size() > std::numeric_limits<uint16_t>::max()) {
+ ++runtime->d_stats.d_tooLarge;
+ return Result::TooLarge;
+ }
+
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* not connected, queue is full, just drop */
+ if (!runtime->d_socket) {
+ ++runtime->d_stats.d_pipeFull;
+ return Result::PipeFull;
+ }
+ try {
+ /* we try to flush some data */
+ if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
+ /* but failed, let's just drop */
+ ++runtime->d_stats.d_pipeFull;
+ return Result::PipeFull;
+ }
+
+ /* see if we freed enough data */
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* we didn't */
+ ++runtime->d_stats.d_pipeFull;
+ return Result::PipeFull;
+ }
+ }
+ catch(const std::exception& e) {
+ // cout << "Got exception writing: "<<e.what()<<endl;
+ runtime->d_socket.reset();
+ ++runtime->d_stats.d_otherError;
+ return Result::OtherError;
+ }
+ }
+
+ runtime->d_writer.write(data);
+ ++runtime->d_stats.d_queued;
+ return Result::Queued;
+}
+
+void RemoteLogger::maintenanceThread()
+{
+ try {
+#ifdef WE_ARE_RECURSOR
+ string threadName = "rec/remlog";
+#else
+ string threadName = "dnsdist/remLog";
+#endif
+ setThreadName(threadName);
+
+ for (;;) {
+ if (d_exiting) {
+ break;
+ }
+
+ bool connected = true;
+ if (d_runtime.lock()->d_socket == nullptr) {
+ // if it was unset, it will remain so, we are the only ones setting it!
+ connected = reconnect();
+ }
+
+ /* we will just go to sleep if the reconnection just failed */
+ if (connected) {
+ try {
+ /* we don't want to take the lock while trying to reconnect */
+ auto runtime = d_runtime.lock();
+ if (runtime->d_socket) { // check if it is set
+ /* if flush() returns false, it means that we couldn't flush anything yet
+ either because there is nothing to flush, or because the outgoing TCP
+ buffer is full. That's fine by us */
+ runtime->d_writer.flush(runtime->d_socket->getHandle());
+ }
+ else {
+ connected = false;
+ }
+ }
+ catch (const std::exception& e) {
+ d_runtime.lock()->d_socket.reset();
+ connected = false;
+ }
+
+ if (!connected) {
+ /* let's try to reconnect right away, we are about to sleep anyway */
+ reconnect();
+ }
+ }
+
+ sleep(d_reconnectWaitTime);
+ }
+ }
+ catch (const std::exception& e)
+ {
+ SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
+ g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
+ }
+ catch (...) {
+ SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
+ g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
+ }
+}
+
+RemoteLogger::~RemoteLogger()
+{
+ d_exiting = true;
+
+ d_thread.join();
+}
+