// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2014 UnitedStack * * Author: Haomai Wang * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "common/errno.h" #include "EventKqueue.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << "KqueueDriver." #define KEVENT_NOWAIT 0 int KqueueDriver::test_kqfd() { struct kevent ke[1]; if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) { ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd << cpp_strerror(errno) << dendl; return -errno; } return kqfd; } int KqueueDriver::restore_events() { struct kevent ke[2]; int i; ldout(cct,30) << __func__ << " on kqfd = " << kqfd << dendl; for(i=0;i= sav_max) resize_events(sav_max+5000); sav_events[fd].mask = cur_mask | add_mask; return 0; } int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) { struct kevent ke[2]; int num = 0; int mask = cur_mask & del_mask; ldout(cct,30) << __func__ << " delete event kqfd = " << kqfd << " fd = " << fd << " cur_mask = " << cur_mask << " del_mask = " << del_mask << dendl; int r = test_thread_change(__func__); if ( r < 0 ) return r; if (mask & EVENT_READABLE) EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); if (mask & EVENT_WRITABLE) EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); if (num) { int r = 0; if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) { lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask << " failed." << cpp_strerror(errno) << dendl; return -errno; } } // keep the administration sav_events[fd].mask = cur_mask & ~del_mask; return 0; } int KqueueDriver::resize_events(int newsize) { ldout(cct,30) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize << dendl; if (newsize > sav_max) { sav_events = (struct SaveEvent*)realloc(sav_events, sizeof(struct SaveEvent)*newsize); if (!sav_events) { lderr(cct) << __func__ << " unable to realloc memory: " << cpp_strerror(errno) << dendl; ceph_assert(sav_events); return -ENOMEM; } memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max)); sav_max = newsize; } return 0; } int KqueueDriver::event_wait(std::vector &fired_events, struct timeval *tvp) { int retval, numevents = 0; struct timespec timeout; ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl; int r = test_thread_change(__func__); if ( r < 0 ) return r; if (tvp != NULL) { timeout.tv_sec = tvp->tv_sec; timeout.tv_nsec = tvp->tv_usec * 1000; ldout(cct,20) << __func__ << " " << timeout.tv_sec << " sec " << timeout.tv_nsec << " nsec" << dendl; retval = kevent(kqfd, NULL, 0, res_events, size, &timeout); } else { ldout(cct,30) << __func__ << " event_wait: " << " NULL" << dendl; retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT); } ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl; if (retval < 0) { lderr(cct) << __func__ << " kqueue error: " << cpp_strerror(errno) << dendl; return -errno; } else if (retval == 0) { ldout(cct,5) << __func__ << " Hit timeout(" << timeout.tv_sec << " sec " << timeout.tv_nsec << " nsec" << ")." << dendl; } else { int j; numevents = retval; fired_events.resize(numevents); for (j = 0; j < numevents; j++) { int mask = 0; struct kevent *e = res_events + j; if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; if (e->flags & EV_ERROR) mask |= EVENT_READABLE|EVENT_WRITABLE; fired_events[j].fd = (int)e->ident; fired_events[j].mask = mask; } } return numevents; }