From 2f230033794fafdf10822568e763d4db68cf6c6b Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 23:14:49 +0200 Subject: Merging upstream version 1.9.3. Signed-off-by: Daniel Baumann --- delaypipe.cc | 58 +++++++++++++++++++++------------------------------------- 1 file changed, 21 insertions(+), 37 deletions(-) (limited to 'delaypipe.cc') diff --git a/delaypipe.cc b/delaypipe.cc index be363d6..ada096c 100644 --- a/delaypipe.cc +++ b/delaypipe.cc @@ -28,34 +28,23 @@ template ObjectPipe::ObjectPipe() { - if(pipe(d_fds)) - unixDie("pipe"); -} - -template -ObjectPipe::~ObjectPipe() -{ - ::close(d_fds[0]); - if(d_fds[1] >= 0) - ::close(d_fds[1]); + auto [sender, receiver] = pdns::channel::createObjectQueue(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false); + d_sender = std::move(sender); + d_receiver = std::move(receiver); } template void ObjectPipe::close() { - if(d_fds[1] < 0) - return; - ::close(d_fds[1]); // the writing side - d_fds[1]=-1; + d_sender.close(); } template void ObjectPipe::write(T& t) { - auto ptr = new T(t); - if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) { - delete ptr; - unixDie("write"); + auto ptr = std::make_unique(t); + if (!d_sender.send(std::move(ptr))) { + unixDie("writing to the DelayPipe"); } } @@ -63,7 +52,7 @@ template int ObjectPipe::readTimeout(T* t, double msec) { while (true) { - int ret = waitForData(d_fds[0], 0, 1000*msec); + int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec); if (ret < 0) { if (errno == EINTR) { continue; @@ -74,26 +63,21 @@ int ObjectPipe::readTimeout(T* t, double msec) return -1; } - T* ptr = nullptr; - ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING! - - if (ret < 0) { - if (errno == EINTR) { + try { + auto tmp = d_receiver.receive(); + if (!tmp) { + if (d_receiver.isClosed()) { + return 0; + } continue; } - unixDie("read"); - } - else if (ret == 0) { - return false; - } - if (ret != sizeof(ptr)) { - throw std::runtime_error("Partial read, should not happen 2"); + *t = **tmp; + return 1; + } + catch (const std::exception& e) { + throw std::runtime_error("reading from the delay pipe: " + std::string(e.what())); } - - *t = *ptr; - delete ptr; - return 1; } } @@ -149,7 +133,7 @@ void DelayPipe::worker() The other special case is that the first we have to do.. is in the past, so we need to do it immediately. */ - + double delay=-1; // infinite struct timespec now; if(!d_work.empty()) { @@ -160,7 +144,7 @@ void DelayPipe::worker() } } if(delay != 0 ) { - int ret = d_pipe.readTimeout(&c, delay); + int ret = d_pipe.readTimeout(&c, delay); if(ret > 0) { // we got an object d_work.emplace(c.when, c.what); } -- cgit v1.2.3