diff options
Diffstat (limited to 'remote_logger.cc')
-rw-r--r-- | remote_logger.cc | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/remote_logger.cc b/remote_logger.cc new file mode 100644 index 0000000..96e5968 --- /dev/null +++ b/remote_logger.cc @@ -0,0 +1,237 @@ +#include <unistd.h> +#include "threadname.hh" +#include "remote_logger.hh" +#include <sys/uio.h> +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#ifdef PDNS_CONFIG_ARGS +#include "logger.hh" +#define WE_ARE_RECURSOR +#else +#include "dolog.hh" +#endif + +bool CircularWriteBuffer::hasRoomFor(const std::string& str) const +{ + if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) { + return false; + } + + return true; +} + +bool CircularWriteBuffer::write(const std::string& str) +{ + if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) { + return false; + } + + uint16_t len = htons(str.size()); + const char* ptr = reinterpret_cast<const char*>(&len); + d_buffer.insert(d_buffer.end(), ptr, ptr + 2); + d_buffer.insert(d_buffer.end(), str.begin(), str.end()); + + return true; +} + +bool CircularWriteBuffer::flush(int fd) +{ + if (d_buffer.empty()) { + // not optional, we report EOF otherwise + return false; + } + + auto arr1 = d_buffer.array_one(); + auto arr2 = d_buffer.array_two(); + + struct iovec iov[2]; + int pos = 0; + for(const auto& arr : {arr1, arr2}) { + if(arr.second) { + iov[pos].iov_base = arr.first; + iov[pos].iov_len = arr.second; + ++pos; + } + } + + ssize_t res = 0; + do { + res = writev(fd, iov, pos); + + if (res < 0) { + if (errno == EINTR) { + continue; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + + /* we can't be sure we haven't sent a partial message, + and we don't want to send the remaining part after reconnecting */ + d_buffer.clear(); + throw std::runtime_error("Couldn't flush a thing: " + stringerror()); + } + else if (!res) { + /* we can't be sure we haven't sent a partial message, + and we don't want to send the remaining part after reconnecting */ + d_buffer.clear(); + throw std::runtime_error("EOF"); + } + } + while (res < 0); + + if (static_cast<size_t>(res) == d_buffer.size()) { + d_buffer.clear(); + } + else { + while (res--) { + d_buffer.pop_front(); + } + } + + return true; +} + +RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr}) +{ + if (!d_asyncConnect) { + reconnect(); + } + + d_thread = std::thread(&RemoteLogger::maintenanceThread, this); +} + +bool RemoteLogger::reconnect() +{ + try { + auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0); + newSock->setNonBlocking(); + newSock->connect(d_remote, d_timeout); + + { + /* we are now successfully connected, time to take the lock and update the + socket */ + auto runtime = d_runtime.lock(); + runtime->d_socket = std::move(newSock); + } + } + catch (const std::exception& e) { +#ifdef WE_ARE_RECURSOR + g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl; +#else + warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what()); +#endif + + return false; + } + return true; +} + +void RemoteLogger::queueData(const std::string& data) +{ + if (data.size() > std::numeric_limits<uint16_t>::max()) { + throw std::runtime_error("Got a request to write an object of size " + std::to_string(data.size())); + } + + auto runtime = d_runtime.lock(); + + if (!runtime->d_writer.hasRoomFor(data)) { + /* not connected, queue is full, just drop */ + if (!runtime->d_socket) { + ++d_drops; + return; + } + try { + /* we try to flush some data */ + if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) { + /* but failed, let's just drop */ + ++d_drops; + return; + } + + /* see if we freed enough data */ + if (!runtime->d_writer.hasRoomFor(data)) { + /* we didn't */ + ++d_drops; + return; + } + } + catch(const std::exception& e) { + // cout << "Got exception writing: "<<e.what()<<endl; + ++d_drops; + runtime->d_socket.reset(); + return; + } + } + + runtime->d_writer.write(data); + ++d_processed; +} + +void RemoteLogger::maintenanceThread() +{ + try { +#ifdef WE_ARE_RECURSOR + string threadName = "pdns-r/remLog"; +#else + string threadName = "dnsdist/remLog"; +#endif + setThreadName(threadName); + + for (;;) { + if (d_exiting) { + break; + } + + bool connected = true; + if (d_runtime.lock()->d_socket == nullptr) { + // if it was unset, it will remain so, we are the only ones setting it! + connected = reconnect(); + } + + /* we will just go to sleep if the reconnection just failed */ + if (connected) { + try { + /* we don't want to take the lock while trying to reconnect */ + auto runtime = d_runtime.lock(); + if (runtime->d_socket) { // check if it is set + /* if flush() returns false, it means that we couldn't flush anything yet + either because there is nothing to flush, or because the outgoing TCP + buffer is full. That's fine by us */ + runtime->d_writer.flush(runtime->d_socket->getHandle()); + } + else { + connected = false; + } + } + catch (const std::exception& e) { + d_runtime.lock()->d_socket.reset(); + connected = false; + } + + if (!connected) { + /* let's try to reconnect right away, we are about to sleep anyway */ + reconnect(); + } + } + + sleep(d_reconnectWaitTime); + } + } + catch (const std::exception& e) + { + cerr << "Remote Logger's maintenance thead died on: " << e.what() << endl; + } + catch (...) { + cerr << "Remote Logger's maintenance thead died on unknown exception" << endl; + } +} + +RemoteLogger::~RemoteLogger() +{ + d_exiting = true; + + d_thread.join(); +} |