summaryrefslogtreecommitdiffstats
path: root/delaypipe.cc
diff options
context:
space:
mode:
Diffstat (limited to 'delaypipe.cc')
-rw-r--r--delaypipe.cc58
1 files changed, 21 insertions, 37 deletions
diff --git a/delaypipe.cc b/delaypipe.cc
index be363d6..ada096c 100644
--- a/delaypipe.cc
+++ b/delaypipe.cc
@@ -28,34 +28,23 @@
template<class T>
ObjectPipe<T>::ObjectPipe()
{
- if(pipe(d_fds))
- unixDie("pipe");
-}
-
-template<class T>
-ObjectPipe<T>::~ObjectPipe()
-{
- ::close(d_fds[0]);
- if(d_fds[1] >= 0)
- ::close(d_fds[1]);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false);
+ d_sender = std::move(sender);
+ d_receiver = std::move(receiver);
}
template<class T>
void ObjectPipe<T>::close()
{
- if(d_fds[1] < 0)
- return;
- ::close(d_fds[1]); // the writing side
- d_fds[1]=-1;
+ d_sender.close();
}
template<class T>
void ObjectPipe<T>::write(T& t)
{
- auto ptr = new T(t);
- if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
- delete ptr;
- unixDie("write");
+ auto ptr = std::make_unique<T>(t);
+ if (!d_sender.send(std::move(ptr))) {
+ unixDie("writing to the DelayPipe");
}
}
@@ -63,7 +52,7 @@ template<class T>
int ObjectPipe<T>::readTimeout(T* t, double msec)
{
while (true) {
- int ret = waitForData(d_fds[0], 0, 1000*msec);
+ int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec);
if (ret < 0) {
if (errno == EINTR) {
continue;
@@ -74,26 +63,21 @@ int ObjectPipe<T>::readTimeout(T* t, double msec)
return -1;
}
- T* ptr = nullptr;
- ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING!
-
- if (ret < 0) {
- if (errno == EINTR) {
+ try {
+ auto tmp = d_receiver.receive();
+ if (!tmp) {
+ if (d_receiver.isClosed()) {
+ return 0;
+ }
continue;
}
- unixDie("read");
- }
- else if (ret == 0) {
- return false;
- }
- if (ret != sizeof(ptr)) {
- throw std::runtime_error("Partial read, should not happen 2");
+ *t = **tmp;
+ return 1;
+ }
+ catch (const std::exception& e) {
+ throw std::runtime_error("reading from the delay pipe: " + std::string(e.what()));
}
-
- *t = *ptr;
- delete ptr;
- return 1;
}
}
@@ -149,7 +133,7 @@ void DelayPipe<T>::worker()
The other special case is that the first we have to do.. is in the past, so we need to do it
immediately. */
-
+
double delay=-1; // infinite
struct timespec now;
if(!d_work.empty()) {
@@ -160,7 +144,7 @@ void DelayPipe<T>::worker()
}
}
if(delay != 0 ) {
- int ret = d_pipe.readTimeout(&c, delay);
+ int ret = d_pipe.readTimeout(&c, delay);
if(ret > 0) { // we got an object
d_work.emplace(c.when, c.what);
}