diff options
Diffstat (limited to 'portsmplexer.cc')
-rw-r--r-- | portsmplexer.cc | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/portsmplexer.cc b/portsmplexer.cc new file mode 100644 index 0000000..d4f3659 --- /dev/null +++ b/portsmplexer.cc @@ -0,0 +1,241 @@ +#if defined(__sun__) && defined(__svr4__) +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <port.h> +#include <sys/port_impl.h> +#endif +#include <unistd.h> +#include "mplexer.hh" +#include "sstuff.hh" +#include <iostream> + +#include "misc.hh" + +#include "namespaces.hh" + +class PortsFDMultiplexer : public FDMultiplexer +{ +public: + PortsFDMultiplexer(); + ~PortsFDMultiplexer() + { + close(d_portfd); + } + + int run(struct timeval* tv, int timeout = 500) override; + void getAvailableFDs(std::vector<int>& fds, int timeout) override; + + void addFD(int fd, FDMultiplexer::EventKind kind) override; + void removeFD(int fd, FDMultiplexer::EventKind kind) override; + + string getName() const override + { + return "solaris completion ports"; + } + +private: + int d_portfd; + boost::shared_array<port_event_t> d_pevents; + static int s_maxevents; // not a hard maximum +}; + +static FDMultiplexer* makePorts() +{ + return new PortsFDMultiplexer(); +} + +static struct PortsRegisterOurselves +{ + PortsRegisterOurselves() + { + FDMultiplexer::getMultiplexerMap().emplace(0, &makePorts); // priority 0! + } +} doItPorts; + +int PortsFDMultiplexer::s_maxevents = 1024; + +PortsFDMultiplexer::PortsFDMultiplexer() : + d_pevents(new port_event_t[s_maxevents]) +{ + d_portfd = port_create(); // not hard max + if (d_portfd < 0) { + throw FDMultiplexerException("Setting up port: " + stringerror()); + } +} + +static int convertEventKind(FDMultiplexer::EventKind kind) +{ + switch (kind) { + case FDMultiplexer::EventKind::Read: + return POLLIN; + case FDMultiplexer::EventKind::Write: + return POLLOUT; + case FDMultiplexer::EventKind::Both: + return POLLIN | POLLOUT; + } + throw std::runtime_error("Unhandled event kind in the ports multiplexer"); +} + +void PortsFDMultiplexer::addFD(int fd, FDMultiplexer::EventKind kind) +{ + if (port_associate(d_portfd, PORT_SOURCE_FD, fd, convertEventKind(kind), 0) < 0) { + throw FDMultiplexerException("Adding fd to port set: " + stringerror()); + } +} + +void PortsFDMultiplexer::removeFD(int fd, FDMultiplexer::EventKind) +{ + if (port_dissociate(d_portfd, PORT_SOURCE_FD, fd) < 0 && errno != ENOENT) { // it appears under some circumstances, ENOENT will be returned, without this being an error. Apache has this same "fix" + throw FDMultiplexerException("Removing fd from port set: " + stringerror()); + } +} + +void PortsFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout) +{ + struct timespec timeoutspec; + timeoutspec.tv_sec = timeout / 1000; + timeoutspec.tv_nsec = (timeout % 1000) * 1000000; + unsigned int numevents = 1; + int ret = port_getn(d_portfd, d_pevents.get(), min(PORT_MAX_LIST, s_maxevents), &numevents, &timeoutspec); + + /* port_getn has an unusual API - (ret == -1, errno == ETIME) can + mean partial success; you must check (*numevents) in this case + and process anything in there, otherwise you'll never see any + events from that object again. We don't care about pure timeouts + (ret == -1, errno == ETIME, *numevents == 0) so we don't bother + with that case. */ + if (ret == -1 && errno != ETIME) { + if (errno != EINTR) { + throw FDMultiplexerException("completion port_getn returned error: " + stringerror()); + } + + // EINTR is not really an error + return; + } + + if (numevents == 0) { + // nothing + return; + } + + fds.reserve(numevents); + + for (unsigned int n = 0; n < numevents; ++n) { + const auto fd = d_pevents[n].portev_object; + + /* we need to re-associate the FD */ + if ((d_pevents[n].portev_events & POLLIN || d_pevents[n].portev_events & POLLERR || d_pevents[n].portev_events & POLLHUP)) { + if (d_readCallbacks.count(fd)) { + if (port_associate(d_portfd, PORT_SOURCE_FD, fd, d_writeCallbacks.count(fd) > 0 ? POLLIN | POLLOUT : POLLIN, 0) < 0) { + throw FDMultiplexerException("Unable to add fd back to ports (read): " + stringerror()); + } + } + } + else if ((d_pevents[n].portev_events & POLLOUT || d_pevents[n].portev_events & POLLERR)) { + if (d_writeCallbacks.count(fd)) { + if (port_associate(d_portfd, PORT_SOURCE_FD, fd, d_readCallbacks.count(fd) > 0 ? POLLIN | POLLOUT : POLLOUT, 0) < 0) { + throw FDMultiplexerException("Unable to add fd back to ports (write): " + stringerror()); + } + } + } + else { + /* not registered, this is unexpected */ + continue; + } + + fds.push_back(fd); + } +} + +int PortsFDMultiplexer::run(struct timeval* now, int timeout) +{ + if (d_inrun) { + throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); + } + + struct timespec timeoutspec; + timeoutspec.tv_sec = timeout / 1000; + timeoutspec.tv_nsec = (timeout % 1000) * 1000000; + unsigned int numevents = 1; + int ret = port_getn(d_portfd, d_pevents.get(), min(PORT_MAX_LIST, s_maxevents), &numevents, &timeoutspec); + + /* port_getn has an unusual API - (ret == -1, errno == ETIME) can + mean partial success; you must check (*numevents) in this case + and process anything in there, otherwise you'll never see any + events from that object again. We don't care about pure timeouts + (ret == -1, errno == ETIME, *numevents == 0) so we don't bother + with that case. */ + if (ret == -1 && errno != ETIME) { + if (errno != EINTR) { + throw FDMultiplexerException("completion port_getn returned error: " + stringerror()); + } + // EINTR is not really an error + gettimeofday(now, nullptr); + return 0; + } + gettimeofday(now, nullptr); + if (!numevents) { + // nothing + return 0; + } + + d_inrun = true; + int count = 0; + for (unsigned int n = 0; n < numevents; ++n) { + if (d_pevents[n].portev_events & POLLIN || d_pevents[n].portev_events & POLLERR || d_pevents[n].portev_events & POLLHUP) { + const auto& iter = d_readCallbacks.find(d_pevents[n].portev_object); + if (iter != d_readCallbacks.end()) { + iter->d_callback(iter->d_fd, iter->d_parameter); + count++; + if (d_readCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, d_writeCallbacks.count(d_pevents[n].portev_object) ? POLLIN | POLLOUT : POLLIN, 0) < 0) { + throw FDMultiplexerException("Unable to add fd back to ports (read): " + stringerror()); + } + } + } + if (d_pevents[n].portev_events & POLLOUT || d_pevents[n].portev_events & POLLERR) { + const auto& iter = d_writeCallbacks.find(d_pevents[n].portev_object); + if (iter != d_writeCallbacks.end()) { + iter->d_callback(iter->d_fd, iter->d_parameter); + count++; + if (d_writeCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, d_readCallbacks.count(d_pevents[n].portev_object) ? POLLIN | POLLOUT : POLLOUT, 0) < 0) { + throw FDMultiplexerException("Unable to add fd back to ports (write): " + stringerror()); + } + } + } + } + + d_inrun = false; + return count; +} + +#if 0 +void acceptData(int fd, boost::any& parameter) +{ + cout<<"Have data on fd "<<fd<<endl; + Socket* sock=boost::any_cast<Socket*>(parameter); + string packet; + IPEndpoint rem; + sock->recvFrom(packet, rem); + cout<<"Received "<<packet.size()<<" bytes!\n"; +} + + +int main() +{ + Socket s(AF_INET, SOCK_DGRAM); + + IPEndpoint loc("0.0.0.0", 2000); + s.bind(loc); + + PortsFDMultiplexer sfm; + + sfm.addReadFD(s.getHandle(), &acceptData, &s); + + for(int n=0; n < 100 ; ++n) { + sfm.run(); + } + sfm.removeReadFD(s.getHandle()); + sfm.removeReadFD(s.getHandle()); +} +#endif |