summaryrefslogtreecommitdiffstats
path: root/remote_logger.cc
diff options
context:
space:
mode:
Diffstat (limited to 'remote_logger.cc')
-rw-r--r--remote_logger.cc237
1 files changed, 237 insertions, 0 deletions
diff --git a/remote_logger.cc b/remote_logger.cc
new file mode 100644
index 0000000..96e5968
--- /dev/null
+++ b/remote_logger.cc
@@ -0,0 +1,237 @@
+#include <unistd.h>
+#include "threadname.hh"
+#include "remote_logger.hh"
+#include <sys/uio.h>
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#ifdef PDNS_CONFIG_ARGS
+#include "logger.hh"
+#define WE_ARE_RECURSOR
+#else
+#include "dolog.hh"
+#endif
+
+bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
+{
+ if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool CircularWriteBuffer::write(const std::string& str)
+{
+ if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
+ return false;
+ }
+
+ uint16_t len = htons(str.size());
+ const char* ptr = reinterpret_cast<const char*>(&len);
+ d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+ d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+
+ return true;
+}
+
+bool CircularWriteBuffer::flush(int fd)
+{
+ if (d_buffer.empty()) {
+ // not optional, we report EOF otherwise
+ return false;
+ }
+
+ auto arr1 = d_buffer.array_one();
+ auto arr2 = d_buffer.array_two();
+
+ struct iovec iov[2];
+ int pos = 0;
+ for(const auto& arr : {arr1, arr2}) {
+ if(arr.second) {
+ iov[pos].iov_base = arr.first;
+ iov[pos].iov_len = arr.second;
+ ++pos;
+ }
+ }
+
+ ssize_t res = 0;
+ do {
+ res = writev(fd, iov, pos);
+
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("Couldn't flush a thing: " + stringerror());
+ }
+ else if (!res) {
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("EOF");
+ }
+ }
+ while (res < 0);
+
+ if (static_cast<size_t>(res) == d_buffer.size()) {
+ d_buffer.clear();
+ }
+ else {
+ while (res--) {
+ d_buffer.pop_front();
+ }
+ }
+
+ return true;
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
+{
+ if (!d_asyncConnect) {
+ reconnect();
+ }
+
+ d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+}
+
+bool RemoteLogger::reconnect()
+{
+ try {
+ auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+ newSock->setNonBlocking();
+ newSock->connect(d_remote, d_timeout);
+
+ {
+ /* we are now successfully connected, time to take the lock and update the
+ socket */
+ auto runtime = d_runtime.lock();
+ runtime->d_socket = std::move(newSock);
+ }
+ }
+ catch (const std::exception& e) {
+#ifdef WE_ARE_RECURSOR
+ g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
+#else
+ warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
+#endif
+
+ return false;
+ }
+ return true;
+}
+
+void RemoteLogger::queueData(const std::string& data)
+{
+ if (data.size() > std::numeric_limits<uint16_t>::max()) {
+ throw std::runtime_error("Got a request to write an object of size " + std::to_string(data.size()));
+ }
+
+ auto runtime = d_runtime.lock();
+
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* not connected, queue is full, just drop */
+ if (!runtime->d_socket) {
+ ++d_drops;
+ return;
+ }
+ try {
+ /* we try to flush some data */
+ if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
+ /* but failed, let's just drop */
+ ++d_drops;
+ return;
+ }
+
+ /* see if we freed enough data */
+ if (!runtime->d_writer.hasRoomFor(data)) {
+ /* we didn't */
+ ++d_drops;
+ return;
+ }
+ }
+ catch(const std::exception& e) {
+ // cout << "Got exception writing: "<<e.what()<<endl;
+ ++d_drops;
+ runtime->d_socket.reset();
+ return;
+ }
+ }
+
+ runtime->d_writer.write(data);
+ ++d_processed;
+}
+
+void RemoteLogger::maintenanceThread()
+{
+ try {
+#ifdef WE_ARE_RECURSOR
+ string threadName = "pdns-r/remLog";
+#else
+ string threadName = "dnsdist/remLog";
+#endif
+ setThreadName(threadName);
+
+ for (;;) {
+ if (d_exiting) {
+ break;
+ }
+
+ bool connected = true;
+ if (d_runtime.lock()->d_socket == nullptr) {
+ // if it was unset, it will remain so, we are the only ones setting it!
+ connected = reconnect();
+ }
+
+ /* we will just go to sleep if the reconnection just failed */
+ if (connected) {
+ try {
+ /* we don't want to take the lock while trying to reconnect */
+ auto runtime = d_runtime.lock();
+ if (runtime->d_socket) { // check if it is set
+ /* if flush() returns false, it means that we couldn't flush anything yet
+ either because there is nothing to flush, or because the outgoing TCP
+ buffer is full. That's fine by us */
+ runtime->d_writer.flush(runtime->d_socket->getHandle());
+ }
+ else {
+ connected = false;
+ }
+ }
+ catch (const std::exception& e) {
+ d_runtime.lock()->d_socket.reset();
+ connected = false;
+ }
+
+ if (!connected) {
+ /* let's try to reconnect right away, we are about to sleep anyway */
+ reconnect();
+ }
+ }
+
+ sleep(d_reconnectWaitTime);
+ }
+ }
+ catch (const std::exception& e)
+ {
+ cerr << "Remote Logger's maintenance thead died on: " << e.what() << endl;
+ }
+ catch (...) {
+ cerr << "Remote Logger's maintenance thead died on unknown exception" << endl;
+ }
+}
+
+RemoteLogger::~RemoteLogger()
+{
+ d_exiting = true;
+
+ d_thread.join();
+}