diff options
Diffstat (limited to 'delaypipe.cc')
-rw-r--r-- | delaypipe.cc | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/delaypipe.cc b/delaypipe.cc new file mode 100644 index 0000000..be363d6 --- /dev/null +++ b/delaypipe.cc @@ -0,0 +1,188 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "delaypipe.hh" +#include "misc.hh" +#include "gettime.hh" +#include <thread> +#include "threadname.hh" + +template<class T> +ObjectPipe<T>::ObjectPipe() +{ + if(pipe(d_fds)) + unixDie("pipe"); +} + +template<class T> +ObjectPipe<T>::~ObjectPipe() +{ + ::close(d_fds[0]); + if(d_fds[1] >= 0) + ::close(d_fds[1]); +} + +template<class T> +void ObjectPipe<T>::close() +{ + if(d_fds[1] < 0) + return; + ::close(d_fds[1]); // the writing side + d_fds[1]=-1; +} + +template<class T> +void ObjectPipe<T>::write(T& t) +{ + auto ptr = new T(t); + if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) { + delete ptr; + unixDie("write"); + } +} + +template<class T> +int ObjectPipe<T>::readTimeout(T* t, double msec) +{ + while (true) { + int ret = waitForData(d_fds[0], 0, 1000*msec); + if (ret < 0) { + if (errno == EINTR) { + continue; + } + unixDie("waiting for data in object pipe"); + } + else if (ret == 0) { + return -1; + } + + T* ptr = nullptr; + ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING! + + if (ret < 0) { + if (errno == EINTR) { + continue; + } + unixDie("read"); + } + else if (ret == 0) { + return false; + } + + if (ret != sizeof(ptr)) { + throw std::runtime_error("Partial read, should not happen 2"); + } + + *t = *ptr; + delete ptr; + return 1; + } +} + + +template<class T> +DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this) +{ +} + +template<class T> +void DelayPipe<T>::gettime(struct timespec* ts) +{ + ::gettime(ts); +} + + +template<class T> +void DelayPipe<T>::submit(T& t, int msec) +{ + struct timespec now; + gettime(&now); + now.tv_nsec += msec*1e6; + while(now.tv_nsec > 1e9) { + now.tv_sec++; + now.tv_nsec-=1e9; + } + Combo c{t, now}; + d_pipe.write(c); +} + +template<class T> +DelayPipe<T>::~DelayPipe() +{ + d_pipe.close(); + d_thread.join(); +} + + + +template<class T> +void DelayPipe<T>::worker() +{ + setThreadName("dnsdist/delayPi"); + Combo c; + for(;;) { + /* this code is slightly too subtle, but I don't see how it could be any simpler. + So we have a set of work to do, and we need to wait until the time arrives to do it. + Simultaneously new work might come in. So we try to combine both of these things by + setting a timeout on listening to the pipe over which new work comes in. This timeout + is equal to the wait until the first thing that needs to be done. + + Two additional cases exist: we have no work to wait for, so we can wait infinitely long. + The other special case is that the first we have to do.. is in the past, so we need to do it + immediately. */ + + + double delay=-1; // infinite + struct timespec now; + if(!d_work.empty()) { + gettime(&now); + delay=1000*tsdelta(d_work.begin()->first, now); + if(delay < 0) { + delay=0; // don't wait - we have work that is late already! + } + } + if(delay != 0 ) { + int ret = d_pipe.readTimeout(&c, delay); + if(ret > 0) { // we got an object + d_work.emplace(c.when, c.what); + } + else if(ret==0) { // EOF + break; + } + else { + ; + } + gettime(&now); + } + + tscomp cmp; + + for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful + if(cmp(iter->first, now)) { + iter->second(); + d_work.erase(iter++); + } + else { + break; + } + } + } +} |