diff options
Diffstat (limited to 'delaypipe.cc')
-rw-r--r-- | delaypipe.cc | 58 |
1 files changed, 21 insertions, 37 deletions
diff --git a/delaypipe.cc b/delaypipe.cc index be363d6..ada096c 100644 --- a/delaypipe.cc +++ b/delaypipe.cc @@ -28,34 +28,23 @@ template<class T> ObjectPipe<T>::ObjectPipe() { - if(pipe(d_fds)) - unixDie("pipe"); -} - -template<class T> -ObjectPipe<T>::~ObjectPipe() -{ - ::close(d_fds[0]); - if(d_fds[1] >= 0) - ::close(d_fds[1]); + auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false); + d_sender = std::move(sender); + d_receiver = std::move(receiver); } template<class T> void ObjectPipe<T>::close() { - if(d_fds[1] < 0) - return; - ::close(d_fds[1]); // the writing side - d_fds[1]=-1; + d_sender.close(); } template<class T> void ObjectPipe<T>::write(T& t) { - auto ptr = new T(t); - if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) { - delete ptr; - unixDie("write"); + auto ptr = std::make_unique<T>(t); + if (!d_sender.send(std::move(ptr))) { + unixDie("writing to the DelayPipe"); } } @@ -63,7 +52,7 @@ template<class T> int ObjectPipe<T>::readTimeout(T* t, double msec) { while (true) { - int ret = waitForData(d_fds[0], 0, 1000*msec); + int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec); if (ret < 0) { if (errno == EINTR) { continue; @@ -74,26 +63,21 @@ int ObjectPipe<T>::readTimeout(T* t, double msec) return -1; } - T* ptr = nullptr; - ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING! - - if (ret < 0) { - if (errno == EINTR) { + try { + auto tmp = d_receiver.receive(); + if (!tmp) { + if (d_receiver.isClosed()) { + return 0; + } continue; } - unixDie("read"); - } - else if (ret == 0) { - return false; - } - if (ret != sizeof(ptr)) { - throw std::runtime_error("Partial read, should not happen 2"); + *t = **tmp; + return 1; + } + catch (const std::exception& e) { + throw std::runtime_error("reading from the delay pipe: " + std::string(e.what())); } - - *t = *ptr; - delete ptr; - return 1; } } @@ -149,7 +133,7 @@ void DelayPipe<T>::worker() The other special case is that the first we have to do.. is in the past, so we need to do it immediately. */ - + double delay=-1; // infinite struct timespec now; if(!d_work.empty()) { @@ -160,7 +144,7 @@ void DelayPipe<T>::worker() } } if(delay != 0 ) { - int ret = d_pipe.readTimeout(&c, delay); + int ret = d_pipe.readTimeout(&c, delay); if(ret > 0) { // we got an object d_work.emplace(c.when, c.what); } |