path: root/
diff options
Diffstat (limited to '')
1 files changed, 237 insertions, 0 deletions
diff --git a/ b/
new file mode 100644
index 0000000..96e5968
--- /dev/null
+++ b/
@@ -0,0 +1,237 @@
+#include <unistd.h>
+#include "threadname.hh"
+#include "remote_logger.hh"
+#include <sys/uio.h>
+#include "config.h"
+#include "logger.hh"
+#include "dolog.hh"
+bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
+ if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
+ return false;
+ }
+ return true;
+bool CircularWriteBuffer::write(const std::string& str)
+ if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
+ return false;
+ }
+ uint16_t len = htons(str.size());
+ const char* ptr = reinterpret_cast<const char*>(&len);
+ d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+ d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+ return true;
+bool CircularWriteBuffer::flush(int fd)
+ if (d_buffer.empty()) {
+ // not optional, we report EOF otherwise
+ return false;
+ }
+ auto arr1 = d_buffer.array_one();
+ auto arr2 = d_buffer.array_two();
+ struct iovec iov[2];
+ int pos = 0;
+ for(const auto& arr : {arr1, arr2}) {
+ if(arr.second) {
+ iov[pos].iov_base = arr.first;
+ iov[pos].iov_len = arr.second;
+ ++pos;
+ }
+ }
+ ssize_t res = 0;
+ do {
+ res = writev(fd, iov, pos);
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("Couldn't flush a thing: " + stringerror());
+ }
+ else if (!res) {
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("EOF");
+ }
+ }
+ while (res < 0);
+ if (static_cast<size_t>(res) == d_buffer.size()) {
+ d_buffer.clear();
+ }
+ else {
+ while (res--) {
+ d_buffer.pop_front();
+ }
+ }
+ return true;
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
+ if (!d_asyncConnect) {
+ reconnect();
+ }
+ d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+bool RemoteLogger::reconnect()
+ try {
+ auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+ newSock->setNonBlocking();
+ newSock->connect(d_remote, d_timeout);
+ {
+ /* we are now successfully connected, time to take the lock and update the
+ socket */
+ auto runtime = d_runtime.lock();
+ runtime->d_socket = std::move(newSock);
+ }
+ }
+ catch (const std::exception& e) {
+ g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
+ warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
+ return false;
+ }
+ return true;
+void RemoteLogger::queueData(const std::string& data)
+ if (data.size() > std::numeric_limits<uint16_t>::max()) {
+ throw std::runtime_error("Got a request to write an object of size " + std::to_string(data.size()));
+ }
+ auto runtime = d_runtime.lock();
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* not connected, queue is full, just drop */
+ if (!runtime->d_socket) {
+ ++d_drops;
+ return;
+ }
+ try {
+ /* we try to flush some data */
+ if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
+ /* but failed, let's just drop */
+ ++d_drops;
+ return;
+ }
+ /* see if we freed enough data */
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* we didn't */
+ ++d_drops;
+ return;
+ }
+ }
+ catch(const std::exception& e) {
+ // cout << "Got exception writing: "<<e.what()<<endl;
+ ++d_drops;
+ runtime->d_socket.reset();
+ return;
+ }
+ }
+ runtime->d_writer.write(data);
+ ++d_processed;
+void RemoteLogger::maintenanceThread()
+ try {
+ string threadName = "pdns-r/remLog";
+ string threadName = "dnsdist/remLog";
+ setThreadName(threadName);
+ for (;;) {
+ if (d_exiting) {
+ break;
+ }
+ bool connected = true;
+ if (d_runtime.lock()->d_socket == nullptr) {
+ // if it was unset, it will remain so, we are the only ones setting it!
+ connected = reconnect();
+ }
+ /* we will just go to sleep if the reconnection just failed */
+ if (connected) {
+ try {
+ /* we don't want to take the lock while trying to reconnect */
+ auto runtime = d_runtime.lock();
+ if (runtime->d_socket) { // check if it is set
+ /* if flush() returns false, it means that we couldn't flush anything yet
+ either because there is nothing to flush, or because the outgoing TCP
+ buffer is full. That's fine by us */
+ runtime->d_writer.flush(runtime->d_socket->getHandle());
+ }
+ else {
+ connected = false;
+ }
+ }
+ catch (const std::exception& e) {
+ d_runtime.lock()->d_socket.reset();
+ connected = false;
+ }
+ if (!connected) {
+ /* let's try to reconnect right away, we are about to sleep anyway */
+ reconnect();
+ }
+ }
+ sleep(d_reconnectWaitTime);
+ }
+ }
+ catch (const std::exception& e)
+ {
+ cerr << "Remote Logger's maintenance thead died on: " << e.what() << endl;
+ }
+ catch (...) {
+ cerr << "Remote Logger's maintenance thead died on unknown exception" << endl;
+ }
+ d_exiting = true;
+ d_thread.join();