summaryrefslogtreecommitdiffstats
path: root/lib/clplumbing/ipcsocket.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
commite9be59e1502a41bab9891d96d753102a7dafef0b (patch)
treec3b2da87c414881f4b53d0964f407c83492d813e /lib/clplumbing/ipcsocket.c
parentInitial commit. (diff)
downloadcluster-glue-upstream.tar.xz
cluster-glue-upstream.zip
Adding upstream version 1.0.12.upstream/1.0.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/clplumbing/ipcsocket.c')
-rw-r--r--lib/clplumbing/ipcsocket.c2767
1 files changed, 2767 insertions, 0 deletions
diff --git a/lib/clplumbing/ipcsocket.c b/lib/clplumbing/ipcsocket.c
new file mode 100644
index 0000000..14c3504
--- /dev/null
+++ b/lib/clplumbing/ipcsocket.c
@@ -0,0 +1,2767 @@
+/*
+ * ipcsocket unix domain socket implementation of IPC abstraction.
+ *
+ * Copyright (c) 2002 Xiaoxiang Liu <xiliu@ncsa.uiuc.edu>
+ *
+ * Stream support (c) 2004,2006 David Lee <t.d.lee@durham.ac.uk>
+ * Note: many of the variable/function names "*socket*" should be
+ * interpreted as having a more generic "ipc-channel-type" meaning.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include <lha_internal.h>
+
+#include <clplumbing/ipc.h>
+#include <clplumbing/cl_log.h>
+#include <clplumbing/realtime.h>
+#include <clplumbing/cl_poll.h>
+
+#include <ha_msg.h>
+/* avoid including cib.h - used in gshi's "late message" code to avoid
+ * printing insanely large messages
+ */
+#define F_CIB_CALLDATA "cib_calldata"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <sched.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <sys/uio.h>
+#ifdef HAVE_SYS_FILIO_H
+# include <sys/filio.h>
+#endif
+#ifdef HAVE_SYS_SYSLIMITS_H
+# include <sys/syslimits.h>
+#endif
+#ifdef HAVE_SYS_CRED_H
+# include <sys/cred.h>
+#endif
+#ifdef HAVE_SYS_UCRED_H
+# include <sys/ucred.h>
+#endif
+
+/* For 'getpeerucred()' (Solaris 10 upwards) */
+#ifdef HAVE_UCRED_H
+# include <ucred.h>
+#endif
+
+#ifdef HAVE_SYS_SOCKET_H
+# include <sys/socket.h>
+#endif
+
+/*
+ * Normally use "socket" code. But on some OSes alternatives may be
+ * preferred (or necessary).
+ */
+#define HB_IPC_SOCKET 1
+#define HB_IPC_STREAM 2
+/* #define HB_IPC_ANOTHER 3 */
+
+#ifndef HB_IPC_METHOD
+# if defined(SO_PEERCRED) || defined(HAVE_GETPEEREID) \
+ || defined(SCM_CREDS) || defined(HAVE_GETPEERUCRED)
+# define HB_IPC_METHOD HB_IPC_SOCKET
+# elif defined(HAVE_STROPTS_H)
+# define HB_IPC_METHOD HB_IPC_STREAM
+# else
+# error. Surely we have sockets or streams...
+# endif
+#endif
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+# include <sys/poll.h>
+# include <netinet/in.h>
+# include <sys/un.h>
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+# include <stropts.h>
+#else
+# error "IPC type invalid"
+#endif
+
+#include <sys/ioctl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#ifndef UNIX_PATH_MAX
+# define UNIX_PATH_MAX 108
+#endif
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+
+# define MAX_LISTEN_NUM 128
+
+# ifndef MSG_NOSIGNAL
+# define MSG_NOSIGNAL 0
+# endif
+
+# ifndef AF_LOCAL
+# define AF_LOCAL AF_UNIX
+# endif
+
+#endif /* HB_IPC_METHOD */
+
+/***********************************************************************
+ *
+ * Determine the IPC authentication scheme... More machine dependent than
+ * we'd like, but don't know any better way...
+ *
+ ***********************************************************************/
+#ifdef SO_PEERCRED
+# define USE_SO_PEERCRED
+#elif HAVE_GETPEEREID
+# define USE_GETPEEREID
+#elif defined(SCM_CREDS)
+# define USE_SCM_CREDS
+#elif HAVE_GETPEERUCRED /* e.g. Solaris 10 upwards */
+# define USE_GETPEERUCRED
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+# define USE_STREAM_CREDS
+#else
+# define USE_DUMMY_CREDS
+/* This will make it compile, but attempts to authenticate
+ * will fail. This is a stopgap measure ;-)
+ */
+#endif
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+
+# ifdef USE_BINDSTAT_CREDS
+# ifndef SUN_LEN
+# define SUN_LEN(ptr) ((size_t) (offsetof (sockaddr_un, sun_path) + strlen ((ptr)->sun_path))
+# endif
+# endif
+
+#endif /* HB_IPC_METHOD */
+
+/* wait connection private data. */
+struct SOCKET_WAIT_CONN_PRIVATE{
+ /* the path name wich the connection will be built on. */
+ char path_name[UNIX_PATH_MAX];
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* the domain socket. */
+ int s;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ /* the streams pipe */
+ int pipefds[2];
+#endif
+};
+
+/* channel private data. */
+struct SOCKET_CH_PRIVATE{
+ /* the path name wich the connection will be built on. */
+ char path_name[UNIX_PATH_MAX];
+ /* the domain socket. */
+ int s;
+ /* the size of expecting data for below buffered message buf_msg */
+ int remaining_data;
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* The address of our peer - used by USE_BINDSTAT_CREDS version of
+ * socket_verify_auth()
+ */
+ struct sockaddr_un *peer_addr;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ uid_t farside_uid;
+ gid_t farside_gid;
+#endif
+
+ /* the buf used to save unfinished message */
+ struct IPC_MESSAGE *buf_msg;
+};
+
+struct IPC_Stats {
+ long nsent;
+ long noutqueued;
+ long send_count;
+ long nreceived;
+ long ninqueued;
+ long recv_count;
+ int last_recv_errno;
+ int last_recv_rc;
+ int last_send_errno;
+ int last_send_rc;
+};
+
+static struct IPC_Stats SocketIPCStats = {0, 0, 0, 0};
+extern int debug_level;
+
+/* unix domain socket implementations of IPC functions. */
+
+static int socket_resume_io(struct IPC_CHANNEL *ch);
+
+static struct IPC_MESSAGE* socket_message_new(struct IPC_CHANNEL*ch
+, int msg_len);
+
+struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable* ch_attrs);
+
+/* *** FIXME: This is also declared in 'ocf_ipc.c'. */
+struct IPC_CHANNEL* socket_client_channel_new(GHashTable *attrs);
+
+static struct IPC_CHANNEL* socket_server_channel_new(int sockfd);
+
+static struct IPC_CHANNEL * channel_new(int sockfd, int conntype, const char *pathname);
+static int client_channel_new_auth(int sockfd);
+static int verify_creds(struct IPC_AUTH *auth_info, uid_t uid, gid_t gid);
+
+typedef void (*DelProc)(IPC_Message*);
+
+static struct IPC_MESSAGE * ipcmsg_new(struct IPC_CHANNEL* ch,
+ const void* data, int len, void* private, DelProc d);
+
+static pid_t socket_get_farside_pid(int sockfd);
+
+extern int (*ipc_pollfunc_ptr)(struct pollfd *, nfds_t, int);
+
+static int socket_resume_io_read(struct IPC_CHANNEL *ch, int*, gboolean read1anyway);
+
+static struct IPC_OPS socket_ops;
+static gboolean ipc_time_debug_flag = TRUE;
+
+void
+set_ipc_time_debug_flag(gboolean flag)
+{
+ ipc_time_debug_flag = flag;
+}
+
+#ifdef IPC_TIME_DEBUG
+
+extern struct ha_msg* wirefmt2msg(const char* s, size_t length, int flag);
+void cl_log_message (int log_level, const struct ha_msg *m);
+int timediff(longclock_t t1, longclock_t t2);
+void ha_msg_del(struct ha_msg* msg);
+void ipc_time_debug(IPC_Channel* ch, IPC_Message* ipcmsg, int whichpos);
+
+#define SET_ENQUEUE_TIME(x,t) memcpy(&((struct SOCKET_MSG_HEAD*)x->msg_buf)->enqueue_time, &t, sizeof(longclock_t))
+#define SET_SEND_TIME(x,t) memcpy(&((struct SOCKET_MSG_HEAD*)x->msg_buf)->send_time, &t, sizeof(longclock_t))
+#define SET_RECV_TIME(x,t) memcpy(&((struct SOCKET_MSG_HEAD*)x->msg_buf)->recv_time, &t, sizeof(longclock_t))
+#define SET_DEQUEUE_TIME(x,t) memcpy(&((struct SOCKET_MSG_HEAD*)x->msg_buf)->dequeue_time, &t, sizeof(longclock_t))
+
+static
+longclock_t
+get_enqueue_time(IPC_Message *ipcmsg)
+{
+ longclock_t t;
+
+ memcpy(&t,
+ &(((struct SOCKET_MSG_HEAD *)ipcmsg->msg_buf)->enqueue_time),
+ sizeof(longclock_t));
+
+ return t;
+}
+
+int
+timediff(longclock_t t1, longclock_t t2)
+{
+ longclock_t remain;
+
+ remain = sub_longclock(t1, t2);
+
+ return longclockto_ms(remain);
+}
+
+void
+ipc_time_debug(IPC_Channel* ch, IPC_Message* ipcmsg, int whichpos)
+{
+ int msdiff = 0;
+ longclock_t lnow = time_longclock();
+ char positions[4][16]={
+ "enqueue",
+ "send",
+ "recv",
+ "dequeue"};
+
+ if (ipc_time_debug_flag == FALSE) {
+ return ;
+ }
+
+ if (ipcmsg->msg_body == NULL
+ || ipcmsg->msg_buf == NULL) {
+ cl_log(LOG_ERR, "msg_body =%p, msg_bu=%p",
+ ipcmsg->msg_body, ipcmsg->msg_buf);
+ abort();
+ return;
+ }
+
+ switch(whichpos) {
+ case MSGPOS_ENQUEUE:
+ SET_ENQUEUE_TIME(ipcmsg, lnow);
+ break;
+ case MSGPOS_SEND:
+ SET_SEND_TIME(ipcmsg, lnow);
+ goto checktime;
+ case MSGPOS_RECV:
+ SET_RECV_TIME(ipcmsg, lnow);
+ goto checktime;
+ case MSGPOS_DEQUEUE:
+ SET_DEQUEUE_TIME(ipcmsg, lnow);
+
+ checktime:
+ msdiff = timediff(lnow, get_enqueue_time(ipcmsg));
+ if (msdiff > MAXIPCTIME) {
+ struct ha_msg* hamsg = NULL;
+ cl_log(LOG_WARNING,
+ " message delayed from enqueue to %s %d ms "
+ "(enqueue-time=%lu, peer pid=%d) ",
+ positions[whichpos],
+ msdiff,
+ longclockto_ms(get_enqueue_time(ipcmsg)),
+ ch->farside_pid);
+
+ (void)hamsg;
+#if 0
+ hamsg = wirefmt2msg(ipcmsg->msg_body, ipcmsg->msg_len, 0);
+ if (hamsg != NULL) {
+ struct ha_msg *crm_data = NULL;
+ crm_data = cl_get_struct(
+ hamsg, F_CRM_DATA);
+
+ if(crm_data == NULL) {
+ crm_data = cl_get_struct(
+ hamsg, F_CIB_CALLDATA);
+ }
+ if(crm_data != NULL) {
+ cl_msg_remove_value(
+ hamsg, crm_data);
+ }
+
+ cl_log_message(LOG_DEBUG, hamsg);
+ ha_msg_del(hamsg);
+ } else {
+ if (!ipcmsg) {
+ cl_log(LOG_ERR,
+ "IPC msg 0x%lx is unallocated"
+ , (gulong)ipcmsg);
+ return;
+ }
+ if (!ipcmsg->msg_body) {
+ cl_log(LOG_ERR,
+ "IPC msg body 0x%lx is unallocated"
+ , (gulong)ipcmsg->msg_body);
+ return;
+ }
+ }
+#endif
+
+ }
+ break;
+ default:
+ cl_log(LOG_ERR, "wrong position value in IPC:%d", whichpos);
+ return;
+ }
+}
+#endif
+
+void dump_ipc_info(const IPC_Channel* chan);
+
+#undef AUDIT_CHANNELS
+
+#ifndef AUDIT_CHANNELS
+# define CHANAUDIT(ch) /*NOTHING */
+#else
+# define CHANAUDIT(ch) socket_chan_audit(ch)
+# define MAXPID 65535
+
+static void
+socket_chan_audit(const struct IPC_CHANNEL* ch)
+{
+ int badch = FALSE;
+
+ struct SOCKET_CH_PRIVATE *chp;
+ struct stat b;
+
+ if ((chp = ch->ch_private) == NULL) {
+ cl_log(LOG_CRIT, "Bad ch_private");
+ badch = TRUE;
+ }
+ if (ch->ops != &socket_ops) {
+ cl_log(LOG_CRIT, "Bad socket_ops");
+ badch = TRUE;
+ }
+ if (ch->ch_status == IPC_DISCONNECT) {
+ return;
+ }
+ if (!IPC_ISRCONN(ch)) {
+ cl_log(LOG_CRIT, "Bad ch_status [%d]", ch->ch_status);
+ badch = TRUE;
+ }
+ if (ch->farside_pid < 0 || ch->farside_pid > MAXPID) {
+ cl_log(LOG_CRIT, "Bad farside_pid");
+ badch = TRUE;
+ }
+ if (fstat(chp->s, &b) < 0) {
+ badch = TRUE;
+ } else if ((b.st_mode & S_IFMT) != S_IFSOCK) {
+ cl_log(LOG_CRIT, "channel @ 0x%lx: not a socket"
+ , (unsigned long)ch);
+ badch = TRUE;
+ }
+ if (chp->remaining_data < 0) {
+ cl_log(LOG_CRIT, "Negative remaining_data");
+ badch = TRUE;
+ }
+ if (chp->remaining_data < 0 || chp->remaining_data > MAXMSG) {
+ cl_log(LOG_CRIT, "Excessive/bad remaining_data");
+ badch = TRUE;
+ }
+ if (chp->remaining_data && chp->buf_msg == NULL) {
+ cl_log(LOG_CRIT
+ , "inconsistent remaining_data [%ld]/buf_msg[0x%lx]"
+ , (long)chp->remaining_data, (unsigned long)chp->buf_msg);
+ badch = TRUE;
+ }
+ if (chp->remaining_data == 0 && chp->buf_msg != NULL) {
+ cl_log(LOG_CRIT
+ , "inconsistent remaining_data [%ld]/buf_msg[0x%lx] (2)"
+ , (long)chp->remaining_data, (unsigned long)chp->buf_msg);
+ badch = TRUE;
+ }
+ if (ch->send_queue == NULL || ch->recv_queue == NULL) {
+ cl_log(LOG_CRIT, "bad send/recv queue");
+ badch = TRUE;
+ }
+ if (ch->recv_queue->current_qlen < 0
+ || ch->recv_queue->current_qlen > ch->recv_queue->max_qlen) {
+ cl_log(LOG_CRIT, "bad recv queue");
+ badch = TRUE;
+ }
+ if (ch->send_queue->current_qlen < 0
+ || ch->send_queue->current_qlen > ch->send_queue->max_qlen) {
+ cl_log(LOG_CRIT, "bad send_queue");
+ badch = TRUE;
+ }
+ if (badch) {
+ cl_log(LOG_CRIT, "Bad channel @ 0x%lx", (unsigned long)ch);
+ dump_ipc_info(ch);
+ abort();
+ }
+}
+#endif
+
+#ifdef CHEAT_CHECKS
+long SeqNums[32];
+
+static long
+cheat_get_sequence(IPC_Message* msg)
+{
+ const char header [] = "String-";
+ size_t header_len = sizeof(header)-1;
+ char * body;
+
+ if (msg == NULL || msg->msg_len < sizeof(header)
+ || msg->msg_len > sizeof(header) + 10
+ || strncmp(msg->msg_body, header, header_len) != 0) {
+ return -1L;
+ }
+ body = msg->msg_body;
+ return atol(body+header_len);
+}
+static char SavedReadBody[32];
+static char SavedReceivedBody[32];
+static char SavedQueuedBody[32];
+static char SavedSentBody[32];
+#ifndef MIN
+# define MIN(a,b) (a < b ? a : b)
+#endif
+
+static void
+save_body(struct IPC_MESSAGE *msg, char * savearea, size_t length)
+{
+ int mlen = strnlen(msg->msg_body, MIN(length, msg->msg_len));
+ memcpy(savearea, msg->msg_body, mlen);
+ savearea[mlen] = EOS;
+}
+
+static void
+audit_readmsgq_msg(gpointer msg, gpointer user_data)
+{
+ long cheatseq = cheat_get_sequence(msg);
+
+ if (cheatseq < SeqNums[1] || cheatseq > SeqNums[2]) {
+ cl_log(LOG_ERR
+ , "Read Q Message %ld not in range [%ld:%ld]"
+ , cheatseq, SeqNums[1], SeqNums[2]);
+ }
+}
+
+static void
+saveandcheck(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg, char * savearea
+, size_t savesize, long* lastseq, const char * text)
+{
+ long cheatseq = cheat_get_sequence(msg);
+
+ save_body(msg, savearea, savesize);
+ if (*lastseq != 0 ) {
+ if (cheatseq != *lastseq +1) {
+ int j;
+ cl_log(LOG_ERR
+ , "%s packets out of sequence! %ld versus %ld [pid %d]"
+ , text, cheatseq, *lastseq, (int)getpid());
+ dump_ipc_info(ch);
+ for (j=0; j < 4; ++j) {
+ cl_log(LOG_DEBUG
+ , "SeqNums[%d] = %ld"
+ , j, SeqNums[j]);
+ }
+ cl_log(LOG_ERR
+ , "SocketIPCStats.nsent = %ld"
+ , SocketIPCStats.nsent);
+ cl_log(LOG_ERR
+ , "SocketIPCStats.noutqueued = %ld"
+ , SocketIPCStats.noutqueued);
+ cl_log(LOG_ERR
+ , "SocketIPCStats.nreceived = %ld"
+ , SocketIPCStats.nreceived);
+ cl_log(LOG_ERR
+ , "SocketIPCStats.ninqueued = %ld"
+ , SocketIPCStats.ninqueued);
+ }
+
+ }
+ g_list_foreach(ch->recv_queue->queue, audit_readmsgq_msg, NULL);
+ if (cheatseq > 0) {
+ *lastseq = cheatseq;
+ }
+}
+
+# define CHECKFOO(which, ch, msg, area, text) { \
+ saveandcheck(ch,msg,area,sizeof(area),SeqNums+which,text); \
+ }
+#else
+# define CHECKFOO(which, ch, msg, area, text) /* Nothing */
+#endif
+
+static void
+dump_msg(struct IPC_MESSAGE *msg, const char * label)
+{
+#ifdef CHEAT_CHECKS
+ cl_log(LOG_DEBUG, "%s packet (length %d) [%s] %ld pid %d"
+ , label, (int)msg->msg_len, (char*)msg->msg_body
+ , cheat_get_sequence(msg), (int)getpid());
+#else
+ cl_log(LOG_DEBUG, "%s length %d [%s] pid %d"
+ , label, (int)msg->msg_len, (char*)msg->msg_body
+ , (int)getpid());
+#endif
+}
+
+static void
+dump_msgq_msg(gpointer data, gpointer user_data)
+{
+ dump_msg(data, user_data);
+}
+
+void
+dump_ipc_info(const IPC_Channel* chan)
+{
+ char squeue[] = "Send queue";
+ char rqueue[] = "Receive queue";
+#ifdef CHEAT_CHECKS
+ cl_log(LOG_DEBUG, "Saved Last Body read[%s]", SavedReadBody);
+ cl_log(LOG_DEBUG, "Saved Last Body received[%s]", SavedReceivedBody);
+ cl_log(LOG_DEBUG, "Saved Last Body Queued[%s]", SavedQueuedBody);
+ cl_log(LOG_DEBUG, "Saved Last Body Sent[%s]", SavedSentBody);
+#endif
+ g_list_foreach(chan->send_queue->queue, dump_msgq_msg, squeue);
+ g_list_foreach(chan->recv_queue->queue, dump_msgq_msg, rqueue);
+ CHANAUDIT(chan);
+}
+
+/* destroy socket wait channel */
+static void
+socket_destroy_wait_conn(struct IPC_WAIT_CONNECTION * wait_conn)
+{
+ struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private;
+
+ if (wc != NULL) {
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ if (wc->s >= 0) {
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG
+ , "%s: closing socket %d"
+ , __FUNCTION__, wc->s);
+ }
+ close(wc->s);
+ cl_poll_ignore(wc->s);
+ unlink(wc->path_name);
+ wc->s = -1;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ cl_poll_ignore(wc->pipefds[0]);
+ if (wc->pipefds[0] >= 0) {
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG
+ , "%s: closing pipe[0] %d"
+ , __FUNCTION__, wc->pipefds[0]);
+ }
+ wc->pipefds[0] = -1;
+ }
+ if (wc->pipefds[1] >= 0) {
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG
+ , "%s: closing pipe[1] %d"
+ , __FUNCTION__, wc->pipefds[1]);
+ }
+ wc->pipefds[0] = -1;
+ }
+ unlink(wc->path_name);
+#endif
+ g_free(wc);
+ }
+ g_free((void*) wait_conn);
+}
+
+/* return a fd which can be listened on for new connections. */
+static int
+socket_wait_selectfd(struct IPC_WAIT_CONNECTION *wait_conn)
+{
+ struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private;
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ return (wc == NULL ? -1 : wc->s);
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ return (wc == NULL ? -1 : wc->pipefds[0]);
+#endif
+}
+
+/* socket accept connection. */
+static struct IPC_CHANNEL*
+socket_accept_connection(struct IPC_WAIT_CONNECTION * wait_conn
+, struct IPC_AUTH *auth_info)
+{
+ struct IPC_CHANNEL * ch = NULL;
+ int s;
+ int new_sock;
+ struct SOCKET_WAIT_CONN_PRIVATE* conn_private;
+ struct SOCKET_CH_PRIVATE * ch_private ;
+ int auth_result = IPC_FAIL;
+ int saveerrno=errno;
+ gboolean was_error = FALSE;
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* make peer_addr a pointer so it can be used by the
+ * USE_BINDSTAT_CREDS implementation of socket_verify_auth()
+ */
+ struct sockaddr_un * peer_addr = NULL;
+ socklen_t sin_size;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ struct strrecvfd strrecvfd;
+#endif
+
+ /* get select fd */
+
+ s = wait_conn->ops->get_select_fd(wait_conn);
+ if (s < 0) {
+ cl_log(LOG_ERR, "get_select_fd: invalid fd");
+ return NULL;
+ }
+
+ /* Get client connection. */
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ peer_addr = g_new(struct sockaddr_un, 1);
+ *peer_addr->sun_path = '\0';
+ sin_size = sizeof(struct sockaddr_un);
+ new_sock = accept(s, (struct sockaddr *)peer_addr, &sin_size);
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ if (ioctl(s, I_RECVFD, &strrecvfd) == -1) {
+ new_sock = -1;
+ }
+ else {
+ new_sock = strrecvfd.fd;
+ }
+#endif
+ saveerrno=errno;
+ if (new_sock == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ cl_perror("socket_accept_connection: accept(sock=%d)"
+ , s);
+ }
+ was_error = TRUE;
+
+ } else {
+ if ((ch = socket_server_channel_new(new_sock)) == NULL) {
+ cl_log(LOG_ERR
+ , "socket_accept_connection:"
+ " Can't create new channel");
+ was_error = TRUE;
+ } else {
+ conn_private=(struct SOCKET_WAIT_CONN_PRIVATE*)
+ ( wait_conn->ch_private);
+ ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private);
+ strncpy(ch_private->path_name,conn_private->path_name
+ , sizeof(conn_private->path_name));
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ ch_private->peer_addr = peer_addr;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ ch_private->farside_uid = strrecvfd.uid;
+ ch_private->farside_gid = strrecvfd.gid;
+#endif
+ }
+ }
+
+ /* Verify the client authorization information. */
+ if(was_error == FALSE) {
+ auth_result = ch->ops->verify_auth(ch, auth_info);
+ if (auth_result == IPC_OK) {
+ ch->ch_status = IPC_CONNECT;
+ ch->farside_pid = socket_get_farside_pid(new_sock);
+ return ch;
+ }
+ saveerrno=errno;
+ }
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ g_free(peer_addr);
+ peer_addr = NULL;
+#endif
+ errno=saveerrno;
+ return NULL;
+}
+
+/*
+ * Called by socket_destroy(). Disconnect the connection
+ * and set ch_status to IPC_DISCONNECT.
+ *
+ * parameters :
+ * ch (IN) the pointer to the channel.
+ *
+ * return values :
+ * IPC_OK the connection is disconnected successfully.
+ * IPC_FAIL operation fails.
+*/
+
+static int
+socket_disconnect(struct IPC_CHANNEL* ch)
+{
+ struct SOCKET_CH_PRIVATE* conn_info;
+
+ conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG
+ , "%s(sock=%d, ch=0x%lx){"
+ , __FUNCTION__
+ , conn_info->s, (unsigned long)ch);
+ }
+#if 0
+ if (ch->ch_status != IPC_DISCONNECT) {
+ cl_log(LOG_INFO, "forced disconnect for fd %d", conn_info->s);
+ }
+#endif
+ if (ch->ch_status == IPC_CONNECT) {
+ socket_resume_io(ch);
+ }
+
+ if (conn_info->s >= 0) {
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG
+ , "%s: closing socket %d"
+ , __FUNCTION__, conn_info->s);
+ }
+ close(conn_info->s);
+ cl_poll_ignore(conn_info->s);
+ conn_info->s = -1;
+ }
+ ch->ch_status = IPC_DISCONNECT;
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG, "}/*%s(sock=%d, ch=0x%lx)*/"
+ , __FUNCTION__, conn_info->s, (unsigned long)ch);
+ }
+ return IPC_OK;
+}
+
+/*
+ * destroy a ipc queue and clean all memory space assigned to this queue.
+ * parameters:
+ * q (IN) the pointer to the queue which should be destroied.
+ *
+ * FIXME: This function does not free up messages that might
+ * be in the queue.
+ */
+
+static void
+socket_destroy_queue(struct IPC_QUEUE * q)
+{
+ g_list_free(q->queue);
+
+ g_free((void *) q);
+}
+
+static void
+socket_destroy_channel(struct IPC_CHANNEL * ch)
+{
+ --ch->refcount;
+ if (ch->refcount > 0) {
+ return;
+ }
+ if (ch->ch_status == IPC_CONNECT) {
+ socket_resume_io(ch);
+ }
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG, "socket_destroy(ch=0x%lx){"
+ , (unsigned long)ch);
+ }
+ socket_disconnect(ch);
+ socket_destroy_queue(ch->send_queue);
+ socket_destroy_queue(ch->recv_queue);
+
+ if (ch->pool) {
+ ipc_bufpool_unref(ch->pool);
+ }
+
+ if (ch->ch_private != NULL) {
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ struct SOCKET_CH_PRIVATE *priv = (struct SOCKET_CH_PRIVATE *)
+ ch->ch_private;
+ if(priv->peer_addr != NULL) {
+ if (*priv->peer_addr->sun_path) {
+ unlink(priv->peer_addr->sun_path);
+ }
+ g_free((void*)(priv->peer_addr));
+ }
+#endif
+ g_free((void*)(ch->ch_private));
+ }
+ memset(ch, 0xff, sizeof(*ch));
+ g_free((void*)ch);
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG, "}/*socket_destroy(ch=0x%lx)*/"
+ , (unsigned long)ch);
+ }
+}
+
+static int
+socket_check_disc_pending(struct IPC_CHANNEL* ch)
+{
+ int rc;
+ struct pollfd sockpoll;
+
+ if (ch->ch_status == IPC_DISCONNECT) {
+ cl_log(LOG_ERR, "check_disc_pending() already disconnected");
+ return IPC_BROKEN;
+ }
+ if (ch->recv_queue->current_qlen > 0) {
+ return IPC_OK;
+ }
+ sockpoll.fd = ch->ops->get_recv_select_fd(ch);
+ sockpoll.events = POLLIN;
+
+ rc = ipc_pollfunc_ptr(&sockpoll, 1, 0);
+
+ if (rc < 0) {
+ cl_log(LOG_INFO
+ , "socket_check_disc_pending() bad poll call");
+ ch->ch_status = IPC_DISCONNECT;
+ return IPC_BROKEN;
+ }
+
+ if (sockpoll.revents & POLLHUP) {
+ if (sockpoll.revents & POLLIN) {
+ ch->ch_status = IPC_DISC_PENDING;
+ } else {
+#if 1
+ cl_log(LOG_INFO, "HUP without input");
+#endif
+ ch->ch_status = IPC_DISCONNECT;
+ return IPC_BROKEN;
+ }
+ }
+ if (sockpoll.revents & POLLIN) {
+ int dummy;
+ socket_resume_io_read(ch, &dummy, FALSE);
+ }
+ return IPC_OK;
+}
+
+static int
+socket_initiate_connection(struct IPC_CHANNEL * ch)
+{
+ struct SOCKET_CH_PRIVATE* conn_info;
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ struct sockaddr_un peer_addr; /* connector's address information */
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+#endif
+
+ conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* Prepare the socket */
+ memset(&peer_addr, 0, sizeof(peer_addr));
+ peer_addr.sun_family = AF_LOCAL; /* host byte order */
+
+ if (strlen(conn_info->path_name) >= sizeof(peer_addr.sun_path)) {
+ return IPC_FAIL;
+ }
+ strncpy(peer_addr.sun_path, conn_info->path_name, sizeof(peer_addr.sun_path));
+
+ /* Send connection request */
+ if (connect(conn_info->s, (struct sockaddr *)&peer_addr
+ , sizeof(struct sockaddr_un)) == -1) {
+ return IPC_FAIL;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+
+#endif
+
+ ch->ch_status = IPC_CONNECT;
+ ch->farside_pid = socket_get_farside_pid(conn_info->s);
+ return IPC_OK;
+}
+
+static void
+socket_set_high_flow_callback(IPC_Channel* ch,
+ flow_callback_t callback,
+ void* userdata) {
+ ch->high_flow_callback = callback;
+ ch->high_flow_userdata = userdata;
+}
+
+static void
+socket_set_low_flow_callback(IPC_Channel* ch,
+ flow_callback_t callback,
+ void* userdata) {
+ ch->low_flow_callback = callback;
+ ch->low_flow_userdata = userdata;
+}
+
+static void
+socket_check_flow_control(struct IPC_CHANNEL* ch,
+ int orig_qlen,
+ int curr_qlen)
+{
+ if (!IPC_ISRCONN(ch)) {
+ return;
+ }
+
+ if (curr_qlen >= ch->high_flow_mark
+ && ch->high_flow_callback) {
+ ch->high_flow_callback(ch, ch->high_flow_userdata);
+ }
+
+ if (curr_qlen <= ch->low_flow_mark
+ && orig_qlen > ch->low_flow_mark
+ && ch->low_flow_callback) {
+ ch->low_flow_callback(ch, ch->low_flow_userdata);
+ }
+}
+
+static int
+socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg)
+{
+ int orig_qlen;
+ int diff;
+ struct IPC_MESSAGE* newmsg;
+
+ if (msg->msg_len > MAXMSG) {
+ cl_log(LOG_ERR, "%s: sorry, cannot send messages "
+ "bigger than %d (requested %lu)",
+ __FUNCTION__, MAXMSG, (unsigned long)msg->msg_len);
+ return IPC_FAIL;
+ }
+ if (msg->msg_len < 0) {
+ cl_log(LOG_ERR, "socket_send: "
+ "invalid message");
+ return IPC_FAIL;
+ }
+
+ if (ch->ch_status != IPC_CONNECT) {
+ return IPC_FAIL;
+ }
+
+ ch->ops->resume_io(ch);
+
+ if (ch->send_queue->maxqlen_cnt &&
+ time(NULL) - ch->send_queue->last_maxqlen_warn >= 60) {
+ cl_log(LOG_ERR, "%u messages dropped on a non-blocking channel (send queue maximum length %d)",
+ ch->send_queue->maxqlen_cnt, (int)ch->send_queue->max_qlen);
+ ch->send_queue->maxqlen_cnt = 0;
+ }
+ if ( !ch->should_send_block &&
+ ch->send_queue->current_qlen >= ch->send_queue->max_qlen) {
+ if (!ch->send_queue->maxqlen_cnt) {
+ ch->send_queue->last_maxqlen_warn = time(NULL);
+ }
+ ch->send_queue->maxqlen_cnt++;
+
+ if (ch->should_block_fail) {
+ return IPC_FAIL;
+ } else {
+ return IPC_OK;
+ }
+ }
+
+ while (ch->send_queue->current_qlen >= ch->send_queue->max_qlen) {
+ if (ch->ch_status != IPC_CONNECT) {
+ cl_log(LOG_WARNING, "socket_send:"
+ " message queue exceeded and IPC not connected");
+ return IPC_FAIL;
+ }
+ cl_shortsleep();
+ ch->ops->resume_io(ch);
+ }
+
+ /* add the message into the send queue */
+ CHECKFOO(0,ch, msg, SavedQueuedBody, "queued message");
+ SocketIPCStats.noutqueued++;
+
+ diff = 0;
+ if (msg->msg_buf ) {
+ diff = (char*)msg->msg_body - (char*)msg->msg_buf;
+ }
+ if ( diff < (int)sizeof(struct SOCKET_MSG_HEAD) ) {
+ /* either we don't have msg->msg_buf set
+ * or we don't have enough bytes for socket head
+ * we delete this message and creates
+ * a new one and delete the old one
+ */
+
+ newmsg= socket_message_new(ch, msg->msg_len);
+ if (newmsg == NULL) {
+ cl_log(LOG_ERR, "socket_resume_io_write: "
+ "allocating memory for new ipc msg failed");
+ return IPC_FAIL;
+ }
+
+ memcpy(newmsg->msg_body, msg->msg_body, msg->msg_len);
+
+ if(msg->msg_done) {
+ msg->msg_done(msg);
+ };
+ msg = newmsg;
+ }
+#ifdef IPC_TIME_DEBUG
+ ipc_time_debug(ch,msg, MSGPOS_ENQUEUE);
+#endif
+ ch->send_queue->queue = g_list_append(ch->send_queue->queue,
+ msg);
+ orig_qlen = ch->send_queue->current_qlen++;
+
+ socket_check_flow_control(ch, orig_qlen, orig_qlen +1 );
+
+ /* resume io */
+ ch->ops->resume_io(ch);
+ return IPC_OK;
+}
+
+static int
+socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message)
+{
+ GList *element;
+
+ int nbytes;
+ int result;
+
+ socket_resume_io(ch);
+ result = socket_resume_io_read(ch, &nbytes, TRUE);
+
+ *message = NULL;
+
+ if (ch->recv_queue->current_qlen == 0) {
+ return result != IPC_OK ? result : IPC_FAIL;
+ /*return IPC_OK;*/
+ }
+ element = g_list_first(ch->recv_queue->queue);
+
+ if (element == NULL) {
+ /* Internal accounting error, but correctable */
+ cl_log(LOG_ERR
+ , "recv failure: qlen (%ld) > 0, but no message found."
+ , (long)ch->recv_queue->current_qlen);
+ ch->recv_queue->current_qlen = 0;
+ return IPC_FAIL;
+ }
+ *message = (struct IPC_MESSAGE *) (element->data);
+#ifdef IPC_TIME_DEBUG
+ ipc_time_debug(ch, *message, MSGPOS_DEQUEUE);
+#endif
+
+ CHECKFOO(1,ch, *message, SavedReadBody, "read message");
+ SocketIPCStats.nreceived++;
+ ch->recv_queue->queue = g_list_remove(ch->recv_queue->queue
+ , element->data);
+ ch->recv_queue->current_qlen--;
+ return IPC_OK;
+}
+
+static int
+socket_check_poll(struct IPC_CHANNEL * ch
+, struct pollfd * sockpoll)
+{
+ if (ch->ch_status == IPC_DISCONNECT) {
+ return IPC_OK;
+ }
+ if (sockpoll->revents & POLLHUP) {
+ /* If input present, or this is an output-only poll... */
+ if (sockpoll->revents & POLLIN
+ || (sockpoll-> events & POLLIN) == 0 ) {
+ ch->ch_status = IPC_DISC_PENDING;
+ return IPC_OK;
+ }
+#if 1
+ cl_log(LOG_INFO, "socket_check_poll(): HUP without input");
+#endif
+ ch->ch_status = IPC_DISCONNECT;
+ return IPC_BROKEN;
+
+ } else if (sockpoll->revents & (POLLNVAL|POLLERR)) {
+ /* Have we already closed the socket? */
+ if (fcntl(sockpoll->fd, F_GETFL) < 0) {
+ cl_perror("socket_check_poll(pid %d): bad fd [%d]"
+ , (int) getpid(), sockpoll->fd);
+ ch->ch_status = IPC_DISCONNECT;
+ return IPC_OK;
+ }
+ cl_log(LOG_ERR
+ , "revents failure: fd %d, flags 0x%x"
+ , sockpoll->fd, sockpoll->revents);
+ errno = EINVAL;
+ return IPC_FAIL;
+ }
+ return IPC_OK;
+}
+
+static int
+socket_waitfor(struct IPC_CHANNEL * ch
+, gboolean (*finished)(struct IPC_CHANNEL * ch))
+{
+ struct pollfd sockpoll;
+
+ CHANAUDIT(ch);
+ if (finished(ch)) {
+ return IPC_OK;
+ }
+
+ if (ch->ch_status == IPC_DISCONNECT) {
+ return IPC_BROKEN;
+ }
+ sockpoll.fd = ch->ops->get_recv_select_fd(ch);
+
+ while (!finished(ch) && IPC_ISRCONN(ch)) {
+ int rc;
+
+ sockpoll.events = POLLIN;
+
+ /* Cannot call resume_io after the call to finished()
+ * and before the call to poll because we might
+ * change the state of the thing finished() is
+ * waiting for.
+ * This means that the poll call below would be
+ * not only pointless, but might
+ * make us hang forever waiting for this
+ * event which has already happened
+ */
+ if (ch->send_queue->current_qlen > 0) {
+ sockpoll.events |= POLLOUT;
+ }
+
+ rc = ipc_pollfunc_ptr(&sockpoll, 1, -1);
+
+ if (rc < 0) {
+ return (errno == EINTR ? IPC_INTR : IPC_FAIL);
+ }
+
+ rc = socket_check_poll(ch, &sockpoll);
+ if (sockpoll.revents & POLLIN) {
+ socket_resume_io(ch);
+ }
+ if (rc != IPC_OK) {
+ CHANAUDIT(ch);
+ return rc;
+ }
+ }
+
+ CHANAUDIT(ch);
+ return IPC_OK;
+}
+
+static int
+socket_waitin(struct IPC_CHANNEL * ch)
+{
+ return socket_waitfor(ch, ch->ops->is_message_pending);
+}
+static gboolean
+socket_is_output_flushed(struct IPC_CHANNEL * ch)
+{
+ return ! ch->ops->is_sending_blocked(ch);
+}
+
+static int
+socket_waitout(struct IPC_CHANNEL * ch)
+{
+ int rc;
+ CHANAUDIT(ch);
+ rc = socket_waitfor(ch, socket_is_output_flushed);
+
+ if (rc != IPC_OK) {
+ cl_log(LOG_ERR
+ , "socket_waitout failure: rc = %d", rc);
+ } else if (ch->ops->is_sending_blocked(ch)) {
+ cl_log(LOG_ERR, "socket_waitout output still blocked");
+ }
+ CHANAUDIT(ch);
+ return rc;
+}
+
+static gboolean
+socket_is_message_pending(struct IPC_CHANNEL * ch)
+{
+ int nbytes;
+
+ socket_resume_io_read(ch, &nbytes, TRUE);
+ ch->ops->resume_io(ch);
+ if (ch->recv_queue->current_qlen > 0) {
+ return TRUE;
+ }
+
+ return !IPC_ISRCONN(ch);
+}
+
+static gboolean
+socket_is_output_pending(struct IPC_CHANNEL * ch)
+{
+ socket_resume_io(ch);
+ return ch->ch_status == IPC_CONNECT
+ && ch->send_queue->current_qlen > 0;
+}
+
+static gboolean
+socket_is_sendq_full(struct IPC_CHANNEL * ch)
+{
+ ch->ops->resume_io(ch);
+ return(ch->send_queue->current_qlen == ch->send_queue->max_qlen);
+}
+
+static gboolean
+socket_is_recvq_full(struct IPC_CHANNEL * ch)
+{
+ ch->ops->resume_io(ch);
+ return(ch->recv_queue->current_qlen == ch->recv_queue->max_qlen);
+}
+
+static int
+socket_get_conntype(struct IPC_CHANNEL* ch)
+{
+ return ch->conntype;
+}
+
+static int
+socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth)
+{
+ cl_log(LOG_ERR
+ , "the assert_auth function for domain socket is not implemented");
+ return IPC_FAIL;
+}
+
+static int
+socket_resume_io_read(struct IPC_CHANNEL *ch, int* nbytes, gboolean read1anyway)
+{
+ struct SOCKET_CH_PRIVATE* conn_info;
+ int retcode = IPC_OK;
+ struct pollfd sockpoll;
+ int debug_loopcount = 0;
+ int debug_bytecount = 0;
+ size_t maxqlen = ch->recv_queue->max_qlen;
+ struct ipc_bufpool* pool = ch->pool;
+ int nmsgs = 0;
+ int spaceneeded;
+ *nbytes = 0;
+
+ CHANAUDIT(ch);
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ if (ch->ch_status == IPC_DISCONNECT) {
+ return IPC_BROKEN;
+ }
+
+ if (pool == NULL) {
+ ch->pool = pool = ipc_bufpool_new(0);
+ if (pool == NULL) {
+ cl_log(LOG_ERR, "socket_resume_io_read: "
+ "memory allocation for ipc pool failed");
+ return IPC_FAIL;
+ }
+ }
+
+ if (ipc_bufpool_full(pool, ch, &spaceneeded)) {
+ struct ipc_bufpool* newpool;
+
+ newpool = ipc_bufpool_new(spaceneeded);
+ if (newpool == NULL) {
+ cl_log(LOG_ERR, "socket_resume_io_read: "
+ "memory allocation for a new ipc pool failed");
+ return IPC_FAIL;
+ }
+
+ ipc_bufpool_partial_copy(newpool, pool);
+ ipc_bufpool_unref(pool);
+ ch->pool = pool = newpool;
+ }
+ if (maxqlen <= 0 && read1anyway) {
+ maxqlen = 1;
+ }
+ if (ch->recv_queue->current_qlen < maxqlen && retcode == IPC_OK) {
+ void * msg_begin;
+ int msg_len;
+ int len;
+#if HB_IPC_METHOD == HB_IPC_STREAM
+ struct strbuf d;
+ int flags, rc;
+#endif
+
+ CHANAUDIT(ch);
+ ++debug_loopcount;
+
+ len = ipc_bufpool_spaceleft(pool);
+ msg_begin = pool->currpos;
+
+ CHANAUDIT(ch);
+
+ /* Now try to receive some data */
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ msg_len = recv(conn_info->s, msg_begin, len, MSG_DONTWAIT);
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ d.maxlen = len;
+ d.len = 0;
+ d.buf = msg_begin;
+ flags = 0;
+ rc = getmsg(conn_info->s, NULL, &d, &flags);
+ msg_len = (rc < 0) ? rc : d.len;
+#endif
+ SocketIPCStats.last_recv_rc = msg_len;
+ SocketIPCStats.last_recv_errno = errno;
+ ++SocketIPCStats.recv_count;
+
+ /* Did we get an error? */
+ if (msg_len < 0) {
+ switch (errno) {
+ case EAGAIN:
+ if (ch->ch_status==IPC_DISC_PENDING) {
+ ch->ch_status =IPC_DISCONNECT;
+ retcode = IPC_BROKEN;
+ }
+ break;
+
+ case ECONNREFUSED:
+ case ECONNRESET:
+ ch->ch_status = IPC_DISC_PENDING;
+ retcode= socket_check_disc_pending(ch);
+ break;
+
+ default:
+ cl_perror("socket_resume_io_read"
+ ": unknown recv error, peerpid=%d",
+ ch->farside_pid);
+ ch->ch_status = IPC_DISCONNECT;
+ retcode = IPC_FAIL;
+ break;
+ }
+
+ } else if (msg_len == 0) {
+ ch->ch_status = IPC_DISC_PENDING;
+ if(ch->recv_queue->current_qlen <= 0) {
+ ch->ch_status = IPC_DISCONNECT;
+ retcode = IPC_FAIL;
+ }
+ } else {
+ /* We read something! */
+ /* Note that all previous cases break out of the loop */
+ debug_bytecount += msg_len;
+ *nbytes = msg_len;
+ nmsgs = ipc_bufpool_update(pool, ch, msg_len, ch->recv_queue) ;
+
+ if (nmsgs < 0) {
+ /* we didn't like the other side */
+ cl_log(LOG_ERR, "socket_resume_io_read: "
+ "disconnecting the other side");
+ ch->ch_status = IPC_DISCONNECT;
+ retcode = IPC_FAIL;
+ } else {
+ SocketIPCStats.ninqueued += nmsgs;
+ }
+ }
+ }
+
+ /* Check for errors uncaught by recv() */
+ /* NOTE: It doesn't seem right we have to do this every time */
+ /* FIXME?? */
+
+ memset(&sockpoll,0, sizeof(struct pollfd));
+ if ((retcode == IPC_OK)
+ && (sockpoll.fd = conn_info->s) >= 0) {
+ /* Just check for errors, not for data */
+ sockpoll.events = 0;
+ ipc_pollfunc_ptr(&sockpoll, 1, 0);
+ retcode = socket_check_poll(ch, &sockpoll);
+ }
+
+ CHANAUDIT(ch);
+ if (retcode != IPC_OK) {
+ return retcode;
+ }
+
+ return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;
+}
+
+static int
+socket_resume_io_write(struct IPC_CHANNEL *ch, int* nmsg)
+{
+ int retcode = IPC_OK;
+ struct SOCKET_CH_PRIVATE* conn_info;
+
+ CHANAUDIT(ch);
+ *nmsg = 0;
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ while (ch->ch_status == IPC_CONNECT
+ && retcode == IPC_OK
+ && ch->send_queue->current_qlen > 0) {
+
+ GList * element;
+ struct IPC_MESSAGE * msg;
+ struct SOCKET_MSG_HEAD head;
+ struct IPC_MESSAGE* oldmsg = NULL;
+ int sendrc = 0;
+ struct IPC_MESSAGE* newmsg;
+ char* p;
+ unsigned int bytes_remaining;
+ int diff;
+
+ CHANAUDIT(ch);
+ element = g_list_first(ch->send_queue->queue);
+ if (element == NULL) {
+ /* OOPS! - correct consistency problem */
+ ch->send_queue->current_qlen = 0;
+ break;
+ }
+ msg = (struct IPC_MESSAGE *) (element->data);
+
+ diff = 0;
+ if (msg->msg_buf ) {
+ diff = (char*)msg->msg_body - (char*)msg->msg_buf;
+ }
+ if ( diff < (int)sizeof(struct SOCKET_MSG_HEAD) ) {
+ /* either we don't have msg->msg_buf set
+ * or we don't have enough bytes for socket head
+ * we delete this message and creates
+ * a new one and delete the old one
+ */
+
+ newmsg= socket_message_new(ch, msg->msg_len);
+ if (newmsg == NULL) {
+ cl_log(LOG_ERR, "socket_resume_io_write: "
+ "allocating memory for new ipc msg failed");
+ return IPC_FAIL;
+ }
+
+ memcpy(newmsg->msg_body, msg->msg_body, msg->msg_len);
+ oldmsg = msg;
+ msg = newmsg;
+ }
+
+ head.msg_len = msg->msg_len;
+ head.magic = HEADMAGIC;
+ memcpy(msg->msg_buf, &head, sizeof(struct SOCKET_MSG_HEAD));
+
+ if (ch->bytes_remaining == 0) {
+ /*we start to send a new message*/
+#ifdef IPC_TIME_DEBUG
+ ipc_time_debug(ch, msg, MSGPOS_SEND);
+#endif
+ bytes_remaining = msg->msg_len + ch->msgpad;
+ p = msg->msg_buf;
+ } else {
+ bytes_remaining = ch->bytes_remaining;
+ p = ((char*)msg->msg_buf) + msg->msg_len + ch->msgpad
+ - bytes_remaining;
+
+ }
+
+ sendrc = 0;
+
+ do {
+#if HB_IPC_METHOD == HB_IPC_STREAM
+ struct strbuf d;
+ int msglen, putmsgrc;
+#endif
+
+ CHANAUDIT(ch);
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ sendrc = send(conn_info->s, p
+ , bytes_remaining, (MSG_DONTWAIT|MSG_NOSIGNAL));
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ d.maxlen = 0;
+ d.len = msglen = bytes_remaining;
+ d.buf = p;
+ putmsgrc = putmsg(conn_info->s, NULL, &d, 0);
+ sendrc = putmsgrc == 0 ? msglen : -1;
+#endif
+ SocketIPCStats.last_send_rc = sendrc;
+ SocketIPCStats.last_send_errno = errno;
+ ++SocketIPCStats.send_count;
+
+ if (sendrc <= 0) {
+ break;
+ } else {
+ p = p + sendrc;
+ bytes_remaining -= sendrc;
+ }
+
+ } while(bytes_remaining > 0 );
+
+ ch->bytes_remaining = bytes_remaining;
+
+ if (sendrc < 0) {
+ switch (errno) {
+ case EAGAIN:
+ retcode = IPC_OK;
+ break;
+ case EPIPE:
+ ch->ch_status = IPC_DISC_PENDING;
+ socket_check_disc_pending(ch);
+ retcode = IPC_BROKEN;
+ break;
+ default:
+ cl_perror("socket_resume_io_write"
+ ": send2 bad errno");
+ ch->ch_status = IPC_DISCONNECT;
+ retcode = IPC_FAIL;
+ break;
+ }
+ break;
+ } else {
+ int orig_qlen;
+
+ CHECKFOO(3,ch, msg, SavedSentBody, "sent message")
+
+ if (oldmsg) {
+ if (msg->msg_done != NULL) {
+ msg->msg_done(msg);
+ }
+ msg=oldmsg;
+ }
+
+ if(ch->bytes_remaining ==0) {
+ ch->send_queue->queue = g_list_remove(ch->send_queue->queue, msg);
+ if (msg->msg_done != NULL) {
+ msg->msg_done(msg);
+ }
+
+ SocketIPCStats.nsent++;
+ orig_qlen = ch->send_queue->current_qlen--;
+ socket_check_flow_control(ch, orig_qlen, orig_qlen -1 );
+ (*nmsg)++;
+ }
+ }
+ }
+ CHANAUDIT(ch);
+ if (retcode != IPC_OK) {
+ return retcode;
+ }
+ return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;
+}
+
+static int
+socket_resume_io(struct IPC_CHANNEL *ch)
+{
+ int rc1 = IPC_OK;
+ int rc2 = IPC_OK;
+ int nwmsg = 1;
+ int nbytes_r = 1;
+ gboolean OKonce = FALSE;
+
+ CHANAUDIT(ch);
+ if (!IPC_ISRCONN(ch)) {
+ return IPC_BROKEN;
+ }
+
+ do {
+ if (nbytes_r > 0) {
+ rc1 = socket_resume_io_read(ch, &nbytes_r, FALSE);
+ }
+ if (nwmsg > 0) {
+ nwmsg = 0;
+ rc2 = socket_resume_io_write(ch, &nwmsg);
+ }
+ if (rc1 == IPC_OK || rc2 == IPC_OK) {
+ OKonce = TRUE;
+ }
+ } while ((nbytes_r > 0 || nwmsg > 0) && IPC_ISRCONN(ch));
+
+ if (IPC_ISRCONN(ch)) {
+ if (rc1 != IPC_OK) {
+ cl_log(LOG_ERR
+ , "socket_resume_io_read() failure");
+ }
+ if (rc2 != IPC_OK && IPC_CONNECT == ch->ch_status) {
+ cl_log(LOG_ERR
+ , "socket_resume_io_write() failure");
+ }
+ } else {
+ return (OKonce ? IPC_OK : IPC_BROKEN);
+ }
+
+ return (rc1 != IPC_OK ? rc1 : rc2);
+}
+
+static int
+socket_get_recv_fd(struct IPC_CHANNEL *ch)
+{
+ struct SOCKET_CH_PRIVATE* chp = ch ->ch_private;
+
+ return (chp == NULL ? -1 : chp->s);
+}
+
+static int
+socket_get_send_fd(struct IPC_CHANNEL *ch)
+{
+ return socket_get_recv_fd(ch);
+}
+
+static void
+socket_adjust_buf(struct IPC_CHANNEL *ch, int optname, unsigned q_len)
+{
+ const char *direction = optname == SO_SNDBUF ? "snd" : "rcv";
+ int fd = socket_get_send_fd(ch);
+ unsigned byte;
+
+ /* Arbitrary scaling.
+ * DEFAULT_MAX_QLEN is 64, default socket buf is often 64k to 128k,
+ * at least on those linux I checked.
+ * Keep that ratio, and allow for some overhead. */
+ if (q_len == 0)
+ /* client does not want anything,
+ * reduce system buffers as well */
+ byte = 4096;
+ else if (q_len < 512)
+ byte = (32 + q_len) * 1024;
+ else
+ byte = q_len * 1024;
+
+ if (0 == setsockopt(fd, SOL_SOCKET, optname, &byte, sizeof(byte))) {
+ if (debug_level > 1) {
+ cl_log(LOG_DEBUG, "adjusted %sbuf size to %u",
+ direction, byte);
+ }
+ } else {
+ /* If this fails, you may need to adjust net.core.rmem_max,
+ * ...wmem_max, or equivalent */
+ cl_log(LOG_WARNING, "adjust %sbuf size to %u failed: %s",
+ direction, byte, strerror(errno));
+ }
+}
+
+static int
+socket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len)
+{
+ /* This seems more like an assertion failure than a normal error */
+ if (ch->send_queue == NULL) {
+ return IPC_FAIL;
+ }
+ socket_adjust_buf(ch, SO_SNDBUF, q_len);
+ ch->send_queue->max_qlen = q_len;
+ return IPC_OK;
+}
+
+static int
+socket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len)
+{
+ /* This seems more like an assertion failure than a normal error */
+ if (ch->recv_queue == NULL) {
+ return IPC_FAIL;
+ }
+ socket_adjust_buf(ch, SO_RCVBUF, q_len);
+ ch->recv_queue->max_qlen = q_len;
+ return IPC_OK;
+}
+
+static int ipcmsg_count_allocated = 0;
+static int ipcmsg_count_freed = 0;
+void socket_ipcmsg_dump_stats(void);
+void
+socket_ipcmsg_dump_stats(void) {
+ cl_log(LOG_INFO, "ipcsocket ipcmsg allocated=%d, freed=%d, diff=%d",
+ ipcmsg_count_allocated,
+ ipcmsg_count_freed,
+ ipcmsg_count_allocated - ipcmsg_count_freed);
+}
+
+static void
+socket_del_ipcmsg(IPC_Message* m)
+{
+ if (m == NULL) {
+ cl_log(LOG_ERR, "socket_del_ipcmsg:"
+ "msg is NULL");
+ return;
+ }
+
+ if (m->msg_body) {
+ memset(m->msg_body, 0, m->msg_len);
+ }
+ if (m->msg_buf) {
+ g_free(m->msg_buf);
+ }
+
+ memset(m, 0, sizeof(*m));
+ g_free(m);
+
+ ipcmsg_count_freed ++;
+}
+
+static IPC_Message*
+socket_new_ipcmsg(IPC_Channel* ch, const void* data, int len, void* private)
+{
+ IPC_Message* hdr;
+
+ if (ch == NULL || len < 0) {
+ cl_log(LOG_ERR, "socket_new_ipcmsg:"
+ " invalid parameter");
+ return NULL;
+ }
+
+ if (ch->msgpad > MAX_MSGPAD) {
+ cl_log(LOG_ERR, "socket_new_ipcmsg: too many pads "
+ "something is wrong");
+ return NULL;
+ }
+
+ hdr = ipcmsg_new(ch, data, len, private, socket_del_ipcmsg);
+
+ if (hdr) ipcmsg_count_allocated ++;
+
+ return hdr;
+}
+
+static
+struct IPC_MESSAGE *
+ipcmsg_new(struct IPC_CHANNEL * ch, const void* data, int len, void* private,
+ DelProc delproc)
+{
+ struct IPC_MESSAGE * hdr;
+ char* copy = NULL;
+ char* buf;
+ char* body;
+
+ if ((hdr = g_new(struct IPC_MESSAGE, 1)) == NULL) {
+ return NULL;
+ }
+ memset(hdr, 0, sizeof(*hdr));
+
+ if (len > 0) {
+ if ((copy = (char*)g_malloc(ch->msgpad + len)) == NULL) {
+ g_free(hdr);
+ return NULL;
+ }
+ if (data) {
+ memcpy(copy + ch->msgpad, data, len);
+ }
+ buf = copy;
+ body = copy + ch->msgpad;;
+ } else {
+ len = 0;
+ buf = body = NULL;
+ }
+ hdr->msg_len = len;
+ hdr->msg_buf = buf;
+ hdr->msg_body = body;
+ hdr->msg_ch = ch;
+ hdr->msg_done = delproc;
+ hdr->msg_private = private;
+
+ return hdr;
+}
+
+static int
+socket_get_chan_status(IPC_Channel* ch)
+{
+ socket_resume_io(ch);
+ return ch->ch_status;
+}
+
+/* socket object of the function table */
+static struct IPC_WAIT_OPS socket_wait_ops = {
+ socket_destroy_wait_conn,
+ socket_wait_selectfd,
+ socket_accept_connection,
+};
+
+/*
+ * create a new ipc queue whose length = 0 and inner queue = NULL.
+ * return the pointer to a new ipc queue or NULL is the queue can't be created.
+ */
+
+static struct IPC_QUEUE*
+socket_queue_new(void)
+{
+ struct IPC_QUEUE *temp_queue;
+
+ /* temp queue with length = 0 and inner queue = NULL. */
+ temp_queue = g_new(struct IPC_QUEUE, 1);
+ temp_queue->current_qlen = 0;
+ temp_queue->max_qlen = DEFAULT_MAX_QLEN;
+ temp_queue->queue = NULL;
+ temp_queue->last_maxqlen_warn = 0;
+ temp_queue->maxqlen_cnt = 0;
+ return temp_queue;
+}
+
+/*
+ * socket_wait_conn_new:
+ * Called by ipc_wait_conn_constructor to get a new socket
+ * waiting connection.
+ * (better explanation of this role might be nice)
+ *
+ * Parameters :
+ * ch_attrs (IN) the attributes used to create this connection.
+ *
+ * Return :
+ * the pointer to the new waiting connection or NULL if the connection
+ * can't be created.
+ *
+ * NOTE :
+ * for domain socket implementation, the only attribute needed is path name.
+ * so the user should
+ * create the hash table like this:
+ * GHashTable * attrs;
+ * attrs = g_hash_table_new(g_str_hash, g_str_equal);
+ * g_hash_table_insert(attrs, PATH_ATTR, path_name);
+ * here PATH_ATTR is defined as "path".
+ *
+ * NOTE :
+ * The streams implementation uses "Streams Programming Guide", Solaris 8,
+ * as its guide (sample code near end of "Configuration" chapter 11).
+ */
+struct IPC_WAIT_CONNECTION *
+socket_wait_conn_new(GHashTable *ch_attrs)
+{
+ struct IPC_WAIT_CONNECTION * temp_ch;
+ char *path_name;
+ char *mode_attr;
+ int s, flags;
+ struct SOCKET_WAIT_CONN_PRIVATE *wait_private;
+ mode_t s_mode;
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ struct sockaddr_un my_addr;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ int pipefds[2];
+#endif
+
+ path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR);
+ mode_attr = (char *) g_hash_table_lookup(ch_attrs, IPC_MODE_ATTR);
+
+ if (mode_attr != NULL) {
+ s_mode = (mode_t)strtoul((const char *)mode_attr, NULL, 8);
+ } else {
+ s_mode = 0777;
+ }
+ if (path_name == NULL) {
+ return NULL;
+ }
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* prepare the unix domain socket */
+ if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {
+ cl_perror("socket_wait_conn_new: socket() failure");
+ return NULL;
+ }
+
+ if (unlink(path_name) < 0 && errno != ENOENT) {
+ cl_perror("socket_wait_conn_new: unlink failure(%s)",
+ path_name);
+ }
+ memset(&my_addr, 0, sizeof(my_addr));
+ my_addr.sun_family = AF_LOCAL; /* host byte order */
+
+ if (strlen(path_name) >= sizeof(my_addr.sun_path)) {
+ close(s);
+ return NULL;
+ }
+
+ strncpy(my_addr.sun_path, path_name, sizeof(my_addr.sun_path));
+
+ if (bind(s, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {
+ cl_perror("socket_wait_conn_new: trying to create in %s bind:"
+ , path_name);
+ close(s);
+ return NULL;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ /* Set up the communication channel the clients will use to us (server) */
+ if (pipe(pipefds) == -1) {
+ cl_perror("pipe() failure");
+ return NULL;
+ }
+
+ /* Let clients have unique connections to us */
+ if (ioctl(pipefds[1], I_PUSH, "connld") == -1) {
+ cl_perror("ioctl(%d, I_PUSH, \"connld\") failure", pipefds[1]);
+ return NULL;
+ }
+
+ if (unlink(path_name) < 0 && errno != ENOENT) {
+ cl_perror("socket_wait_conn_new: unlink failure(%s)",
+ path_name);
+ }
+
+ if (mkfifo(path_name, s_mode) == -1) {
+ cl_perror("socket_wait_conn_new: mkfifo(%s, ...) failure", path_name);
+ return NULL;
+ }
+
+ if (fattach(pipefds[1], path_name) == -1) {
+ cl_perror("socket_wait_conn_new: fattach(..., %s) failure", path_name);
+ return NULL;
+ }
+
+ /* the pseudo-socket is the other part of the pipe */
+ s = pipefds[0];
+#endif
+
+ /* Change the permission of the socket */
+ if (chmod(path_name,s_mode) < 0) {
+ cl_perror("socket_wait_conn_new: failure trying to chmod %s"
+ , path_name);
+ close(s);
+ return NULL;
+ }
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* listen to the socket */
+ if (listen(s, MAX_LISTEN_NUM) == -1) {
+ cl_perror("socket_wait_conn_new: listen(MAX_LISTEN_NUM)");
+ close(s);
+ return NULL;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+
+#endif
+
+ flags = fcntl(s, F_GETFL);
+ if (flags == -1) {
+ cl_perror("socket_wait_conn_new: cannot read file descriptor flags");
+ close(s);
+ return NULL;
+ }
+ flags |= O_NONBLOCK;
+ if (fcntl(s, F_SETFL, flags) < 0) {
+ cl_perror("socket_wait_conn_new: cannot set O_NONBLOCK");
+ close(s);
+ return NULL;
+ }
+
+ wait_private = g_new(struct SOCKET_WAIT_CONN_PRIVATE, 1);
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ wait_private->s = s;
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ wait_private->pipefds[0] = pipefds[0];
+ wait_private->pipefds[1] = pipefds[1];
+#endif
+ strncpy(wait_private->path_name, path_name, sizeof(wait_private->path_name));
+ temp_ch = g_new(struct IPC_WAIT_CONNECTION, 1);
+ temp_ch->ch_private = (void *) wait_private;
+ temp_ch->ch_status = IPC_WAIT;
+ temp_ch->ops = (struct IPC_WAIT_OPS *)&socket_wait_ops;
+
+ return temp_ch;
+}
+
+/*
+ * will be called by ipc_channel_constructor to create a new socket channel.
+ * parameters :
+ * attrs (IN) the hash table of the attributes used to create this channel.
+ *
+ * return:
+ * the pointer to the new waiting channel or NULL if the channel can't be created.
+*/
+
+struct IPC_CHANNEL *
+socket_client_channel_new(GHashTable *ch_attrs) {
+ char *path_name;
+ int sockfd;
+
+ /*
+ * I don't really understand why the client and the server use different
+ * parameter names...
+ *
+ * It's a really bad idea to store both integers and strings
+ * in the same table.
+ *
+ * Maybe we need an internal function with a different set of parameters?
+ */
+
+ /*
+ * if we want to seperate them. I suggest
+ * <client side>
+ * user call ipc_channel_constructor(ch_type,attrs) to create a new channel.
+ * ipc_channel_constructor() call socket_channel_new(GHashTable*)to
+ * create a new socket channel.
+ * <server side>
+ * wait_conn->accept_connection() will call another function to create a
+ * new channel. This function will take socketfd as the parameter to
+ * create a socket channel.
+ */
+
+ path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR);
+ if (path_name == NULL) {
+ return NULL;
+ }
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ /* prepare the socket */
+ if ((sockfd = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {
+ cl_perror("socket_client_channel_new: socket");
+ return NULL;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ sockfd = open(path_name, O_RDWR|O_NONBLOCK);
+ if (sockfd == -1) {
+ cl_perror("socket_client_channel_new: open(%s, ...) failure", path_name);
+ return NULL;
+ }
+#endif
+
+ if (client_channel_new_auth(sockfd) < 0) {
+ close(sockfd);
+ return NULL;
+ }
+ return channel_new(sockfd, IPC_CLIENT, path_name);
+}
+
+static
+int client_channel_new_auth(int sockfd) {
+#ifdef USE_BINDSTAT_CREDS
+ char rand_id[16];
+ char uuid_str_tmp[40];
+ struct sockaddr_un sock_addr;
+
+ /* Prepare the socket */
+ memset(&sock_addr, 0, sizeof(sock_addr));
+ sock_addr.sun_family = AF_UNIX;
+
+ /* make sure socket paths never clash */
+ uuid_generate(rand_id);
+ uuid_unparse(rand_id, uuid_str_tmp);
+
+ snprintf(sock_addr.sun_path, sizeof(sock_addr.sun_path),
+ "%s/%s", HA_VARLIBHBDIR, uuid_str_tmp);
+
+ unlink(sock_addr.sun_path);
+ if(bind(sockfd, (struct sockaddr*)&sock_addr, SUN_LEN(&sock_addr)) < 0) {
+ perror("Client bind() failure");
+ return 0;
+ }
+#endif
+
+ return 0;
+}
+
+static
+struct IPC_CHANNEL *
+socket_server_channel_new(int sockfd) {
+ return channel_new(sockfd, IPC_SERVER, "?");
+}
+
+static
+struct IPC_CHANNEL *
+channel_new(int sockfd, int conntype, const char *path_name) {
+ struct IPC_CHANNEL * temp_ch;
+ struct SOCKET_CH_PRIVATE* conn_info;
+ int flags;
+
+ if (path_name == NULL || strlen(path_name) >= sizeof(conn_info->path_name)) {
+ return NULL;
+ }
+
+ temp_ch = g_new(struct IPC_CHANNEL, 1);
+ if (temp_ch == NULL) {
+ cl_log(LOG_ERR, "channel_new: allocating memory for channel failed");
+ return NULL;
+ }
+ memset(temp_ch, 0, sizeof(struct IPC_CHANNEL));
+
+ conn_info = g_new(struct SOCKET_CH_PRIVATE, 1);
+
+ flags = fcntl(sockfd, F_GETFL);
+ if (flags == -1) {
+ cl_perror("channel_new: cannot read file descriptor flags");
+ g_free(conn_info); conn_info = NULL;
+ g_free(temp_ch);
+ if (conntype == IPC_CLIENT) close(sockfd);
+ return NULL;
+ }
+ flags |= O_NONBLOCK;
+ if (fcntl(sockfd, F_SETFL, flags) < 0) {
+ cl_perror("channel_new: cannot set O_NONBLOCK");
+ g_free(conn_info); conn_info = NULL;
+ g_free(temp_ch);
+ if (conntype == IPC_CLIENT) close(sockfd);
+ return NULL;
+ }
+
+ conn_info->s = sockfd;
+ conn_info->remaining_data = 0;
+ conn_info->buf_msg = NULL;
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ conn_info->peer_addr = NULL;
+#endif
+ strncpy(conn_info->path_name, path_name, sizeof(conn_info->path_name));
+
+#ifdef DEBUG
+ cl_log(LOG_INFO, "Initializing socket %d to DISCONNECT", sockfd);
+#endif
+ temp_ch->ch_status = IPC_DISCONNECT;
+ temp_ch->ch_private = (void*) conn_info;
+ temp_ch->ops = (struct IPC_OPS *)&socket_ops;
+ temp_ch->msgpad = sizeof(struct SOCKET_MSG_HEAD);
+ temp_ch->bytes_remaining = 0;
+ temp_ch->should_send_block = FALSE;
+ temp_ch->should_block_fail = TRUE;
+ temp_ch->send_queue = socket_queue_new();
+ temp_ch->recv_queue = socket_queue_new();
+ temp_ch->pool = NULL;
+ temp_ch->high_flow_mark = temp_ch->send_queue->max_qlen;
+ temp_ch->low_flow_mark = -1;
+ temp_ch->conntype = conntype;
+ temp_ch->refcount = 0;
+ temp_ch->farside_uid = -1;
+ temp_ch->farside_gid = -1;
+
+ return temp_ch;
+}
+
+/*
+ * Create a new pair of pre-connected IPC channels similar to
+ * the result of pipe(2), or socketpair(2).
+ */
+
+int
+ipc_channel_pair(IPC_Channel* channels[2])
+{
+ int sockets[2];
+ int rc;
+ int j;
+ const char *pname;
+
+#if HB_IPC_METHOD == HB_IPC_SOCKET
+ pname = "[socketpair]";
+
+ if ((rc = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets)) < 0) {
+ return IPC_FAIL;
+ }
+#elif HB_IPC_METHOD == HB_IPC_STREAM
+ pname = "[pipe]";
+
+ if ((rc = pipe(sockets)) < 0) {
+ return IPC_FAIL;
+ }
+ rc = 0;
+ for (j=0; j < 2; ++j) {
+ if (fcntl(sockets[j], F_SETFL, O_NONBLOCK) < 0) {
+ cl_perror("ipc_channel_pair: cannot set O_NONBLOCK");
+ rc = -1;
+ }
+ }
+ if (rc < 0) {
+ close(sockets[0]);
+ close(sockets[1]);
+ return IPC_FAIL;
+ }
+#endif
+
+ if ((channels[0] = socket_server_channel_new(sockets[0])) == NULL) {
+ close(sockets[0]);
+ close(sockets[1]);
+ return IPC_FAIL;
+ }
+ if ((channels[1] = socket_server_channel_new(sockets[1])) == NULL) {
+ close(sockets[0]);
+ close(sockets[1]);
+ channels[0]->ops->destroy(channels[0]);
+ return IPC_FAIL;
+ }
+ for (j=0; j < 2; ++j) {
+ struct SOCKET_CH_PRIVATE* p = channels[j]->ch_private;
+ channels[j]->ch_status = IPC_CONNECT;
+ channels[j]->conntype = IPC_PEER;
+ /* Valid, but not terribly meaningful */
+ channels[j]->farside_pid = getpid();
+ strncpy(p->path_name, pname, sizeof(p->path_name));
+ }
+
+ return IPC_OK;
+}
+
+/* brief free the memory space allocated to msg and destroy msg. */
+
+static void
+socket_free_message(struct IPC_MESSAGE * msg) {
+#if 0
+ memset(msg->msg_body, 0xff, msg->msg_len);
+#endif
+ if (msg->msg_buf) {
+ g_free(msg->msg_buf);
+ } else {
+ g_free(msg->msg_body);
+ }
+#if 0
+ memset(msg, 0xff, sizeof(*msg));
+#endif
+ g_free((void *)msg);
+}
+
+/*
+ * create a new ipc message whose msg_body's length is msg_len.
+ *
+ * parameters :
+ * msg_len (IN) the length of this message body in this message.
+ *
+ * return :
+ * the pointer to the new message or NULL if the message can't be created.
+ */
+
+static struct IPC_MESSAGE*
+socket_message_new(struct IPC_CHANNEL *ch, int msg_len)
+{
+ return ipcmsg_new(ch, NULL, msg_len, NULL, socket_free_message);
+}
+
+/***********************************************************************
+ *
+ * IPC authentication schemes... More machine dependent than
+ * we'd like, but don't know any better way...
+ *
+ ***********************************************************************/
+
+static int
+verify_creds(struct IPC_AUTH *auth_info, uid_t uid, gid_t gid)
+{
+ int ret = IPC_FAIL;
+
+ if (!auth_info || (!auth_info->uid && !auth_info->gid)) {
+ return IPC_OK;
+ }
+ if ( auth_info->uid
+ && (g_hash_table_lookup(auth_info->uid
+ , GUINT_TO_POINTER((guint)uid)) != NULL)) {
+ ret = IPC_OK;
+ } else if (auth_info->gid
+ && (g_hash_table_lookup(auth_info->gid
+ , GUINT_TO_POINTER((guint)gid)) != NULL)) {
+ ret = IPC_OK;
+ }
+ return ret;
+}
+
+/***********************************************************************
+ * SO_PEERCRED VERSION... (Linux)
+ ***********************************************************************/
+
+#ifdef USE_SO_PEERCRED
+/* verify the authentication information. */
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ struct SOCKET_CH_PRIVATE * conn_info;
+ int ret = IPC_FAIL;
+ struct ucred cred;
+ socklen_t n = sizeof(cred);
+
+ if (ch == NULL || ch->ch_private == NULL) {
+ return IPC_FAIL;
+ }
+ if (auth_info == NULL
+ || (auth_info->uid == NULL && auth_info->gid == NULL)) {
+ ret = IPC_OK; /* no restriction for authentication */
+ }
+
+ /* Get the credential information for our peer */
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+ if (getsockopt(conn_info->s, SOL_SOCKET, SO_PEERCRED, &cred, &n) != 0
+ || (size_t)n != sizeof(cred)) {
+ return ret;
+ }
+
+ ch->farside_uid = cred.uid;
+ ch->farside_gid = cred.gid;
+ if (ret == IPC_OK) {
+ return ret;
+ }
+#if 0
+ cl_log(LOG_DEBUG, "SO_PEERCRED returned [%d, (%ld:%ld)]"
+ , cred.pid, (long)cred.uid, (long)cred.uid);
+ cl_log(LOG_DEBUG, "Verifying authentication: cred.uid=%d cred.gid=%d"
+ , cred.uid, cred.gid);
+ cl_log(LOG_DEBUG, "Verifying authentication: uidptr=0x%lx gidptr=0x%lx"
+ , (unsigned long)auth_info->uid
+ , (unsigned long)auth_info->gid);
+#endif
+ /* verify the credential information. */
+ return verify_creds(auth_info, cred.uid, cred.gid);
+}
+
+/* get farside pid for our peer process */
+
+static
+pid_t
+socket_get_farside_pid(int sockfd)
+{
+ socklen_t n;
+ struct ucred *cred;
+ pid_t f_pid;
+
+ /* Get the credential information from peer */
+ n = sizeof(struct ucred);
+ cred = g_new(struct ucred, 1);
+ if (getsockopt(sockfd, SOL_SOCKET, SO_PEERCRED, cred, &n) != 0) {
+ g_free(cred);
+ return -1;
+ }
+
+ f_pid = cred->pid;
+ g_free(cred);
+ return f_pid;
+}
+#endif /* SO_PEERCRED version */
+
+#ifdef USE_GETPEEREID
+/*
+ * This is implemented in OpenBSD and FreeBSD.
+ *
+ * It's not a half-bad interface...
+ *
+ * This should probably be our standard way of doing it, and put it
+ * as a replacement library. That would simplify things...
+ */
+
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ struct SOCKET_CH_PRIVATE *conn_info;
+ uid_t euid;
+ gid_t egid;
+ int ret = IPC_FAIL;
+
+ if (auth_info == NULL
+ || (auth_info->uid == NULL && auth_info->gid == NULL)) {
+ ret = IPC_OK; /* no restriction for authentication */
+ }
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ if (getpeereid(conn_info->s, &euid, &egid) < 0) {
+ cl_perror("getpeereid() failure");
+ return ret;
+ }
+
+ ch->farside_uid = euid;
+ ch->farside_gid = egid;
+
+ /* verify the credential information. */
+ return verify_creds(auth_info, euid, egid);
+}
+
+static
+pid_t
+socket_get_farside_pid(int sock)
+{
+ return -1;
+}
+#endif /* USE_GETPEEREID */
+
+/***********************************************************************
+ * SCM_CREDS VERSION... (*BSD systems)
+ ***********************************************************************/
+#ifdef USE_SCM_CREDS
+/* FIXME! Need to implement SCM_CREDS mechanism for BSD-based systems
+ * This isn't an emergency, but should be done in the future...
+ * Hint: * Postgresql does both types of authentication...
+ * see src/backend/libpq/auth.c
+ * Not clear its SO_PEERCRED implementation works though ;-)
+ */
+
+/* Done.... Haven't tested yet. */
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ struct msghdr msg;
+ /* Credentials structure */
+
+#define EXTRASPACE 0
+
+#ifdef HAVE_STRUCT_CMSGCRED
+ /* FreeBSD */
+ typedef struct cmsgcred Cred;
+# define crRuid cmcred_uid
+# define crEuid cmcred_euid
+# define crRgid cmcred_gid
+# define crEgid cmcred_groups[0] /* Best guess */
+# define crpid cmcred_pid
+# define crngrp cmcred_ngroups
+# define crgrps cmcred_groups
+
+#elif HAVE_STRUCT_FCRED
+ /* Stevens' book */
+ typedef struct fcred Cred;
+# define crRuid fc_uid
+# define crRgid fc_rgid
+# define crEgid fc_gid
+# define crngrp fc_ngroups
+# define crgrps fc_groups
+
+#elif HAVE_STRUCT_SOCKCRED
+ /* NetBSD */
+ typedef struct sockcred Cred;
+# define crRuid sc_uid
+# define crEuid sc_euid
+# define crRgid sc_gid
+# define crEgid sc_egid
+# define crngrp sc_ngroups
+# define crgrps sc_groups
+# undef EXTRASPACE
+# define EXTRASPACE SOCKCREDSIZE(ngroups)
+
+#elif HAVE_STRUCT_CRED
+ typedef struct cred Cred;
+#define cruid c_uid
+
+#elif HAVE_STRUCT_UCRED
+ typedef struct ucred Cred;
+
+ /* reuse this define for the moment */
+# if HAVE_STRUCT_UCRED_DARWIN
+# define crEuid cr_uid
+# define crEgid cr_groups[0] /* Best guess */
+# define crgrps cr_groups
+# define crngrp cr_ngroups
+# else
+# define crEuid c_uid
+# define crEgid c_gid
+# endif
+#else
+# error "No credential type found!"
+#endif
+
+ struct SOCKET_CH_PRIVATE *conn_info;
+ int ret = IPC_FAIL;
+ char buf;
+
+ /* Compute size without padding */
+ #define CMSGSIZE (sizeof(struct cmsghdr)+(sizeof(Cred))+EXTRASPACE)
+
+ union {
+ char mem[CMSGSIZE];
+ struct cmsghdr hdr;
+ Cred credu;
+ }cmsgmem;
+ Cred cred;
+
+ /* Point to start of first structure */
+ struct cmsghdr *cmsg = &cmsgmem.hdr;
+
+ if (auth_info == NULL
+ || (auth_info->uid == NULL && auth_info->gid == NULL)) {
+ ret = IPC_OK; /* no restriction for authentication */
+ }
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = g_new(struct iovec, 1);
+ msg.msg_iovlen = 1;
+ msg.msg_control = (char *) cmsg;
+ msg.msg_controllen = CMSGSIZE;
+ memset(cmsg, 0, sizeof(cmsgmem));
+
+ /*
+ * The one character which is received here is not meaningful; its
+ * purpose is only to make sure that recvmsg() blocks long enough for
+ * the other side to send its credentials.
+ */
+ msg.msg_iov->iov_base = &buf;
+ msg.msg_iov->iov_len = 1;
+
+ if (recvmsg(conn_info->s, &msg, 0) < 0
+ || cmsg->cmsg_len < CMSGSIZE
+ || cmsg->cmsg_type != SCM_CREDS) {
+ cl_perror("can't get credential information from peer");
+ return ret;
+ }
+
+ /* Avoid alignment issues - just copy it! */
+ memcpy(&cred, CMSG_DATA(cmsg), sizeof(cred));
+
+ ch->farside_uid = cred.crEuid;
+ ch->farside_gid = cred.crEgid;
+ if (ret == IPC_OK) {
+ return ret;
+ }
+
+ /* verify the credential information. */
+ return verify_creds(auth_info, cred.crEuid, cred.crEgid);
+}
+
+/*
+ * FIXME! Need to implement SCM_CREDS mechanism for BSD-based systems
+ * this is similar to the SCM_CREDS mechanism for verify_auth() function.
+ * here we just want to get the pid of the other side from the credential
+ * information.
+ */
+
+static
+pid_t
+socket_get_farside_pid(int sock)
+{
+ /* FIXME! */
+ return -1;
+}
+#endif /* SCM_CREDS version */
+
+/***********************************************************************
+ * Bind/Stat VERSION... (Supported on OSX/Darwin and 4.3+BSD at least...)
+ *
+ * This is for use on systems such as OSX-Darwin where
+ * none of the other options is available.
+ *
+ * This implementation has been adapted from "Advanced Programming
+ * in the Unix Environment", Section 15.5.2, by W. Richard Stevens.
+ *
+ */
+#ifdef USE_BINDSTAT_CREDS
+
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ int len = 0;
+ int ret = IPC_FAIL;
+ struct stat stat_buf;
+ struct sockaddr_un *peer_addr = NULL;
+ struct SOCKET_CH_PRIVATE *ch_private = NULL;
+
+ if(ch != NULL) {
+ ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private);
+ if(ch_private != NULL) {
+ peer_addr = ch_private->peer_addr;
+ }
+ }
+
+ if(ch == NULL) {
+ cl_log(LOG_ERR, "No channel to authenticate");
+ return IPC_FAIL;
+
+ } else if (auth_info == NULL
+ || (auth_info->uid == NULL && auth_info->gid == NULL)) {
+ ret = IPC_OK; /* no restriction for authentication */
+
+ }
+
+ if(ch_private == NULL) {
+ cl_log(LOG_ERR, "No channel private data available");
+ return ret;
+
+ } else if(peer_addr == NULL) {
+ cl_log(LOG_ERR, "No peer information available");
+ return ret;
+ }
+
+ len = SUN_LEN(peer_addr);
+
+ if(len < 1) {
+ cl_log(LOG_ERR, "No peer information available");
+ return ret;
+ }
+ peer_addr->sun_path[len] = 0;
+ stat(peer_addr->sun_path, &stat_buf);
+
+ ch->farside_uid = stat_buf.st_uid;
+ ch->farside_gid = stat_buf.st_gid;
+ if (ret == IPC_OK) {
+ return ret;
+ }
+
+ if ((auth_info->uid == NULL || g_hash_table_size(auth_info->uid) == 0)
+ && auth_info->gid != NULL
+ && g_hash_table_size(auth_info->gid) != 0) {
+ cl_log(LOG_WARNING,
+ "GID-Only IPC security is not supported"
+ " on this platform.");
+ return IPC_BROKEN;
+ }
+
+ /* verify the credential information. */
+ return verify_creds(auth_info, stat_buf.st_uid, stat_buf.st_gid);
+}
+
+static pid_t
+socket_get_farside_pid(int sock)
+{
+ return -1;
+}
+#endif /* Bind/stat version */
+
+/***********************************************************************
+ * USE_STREAM_CREDS VERSION... (e.g. Solaris pre-10)
+ ***********************************************************************/
+#ifdef USE_STREAM_CREDS
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ struct SOCKET_CH_PRIVATE *conn_info;
+
+ if (ch == NULL || ch->ch_private == NULL) {
+ return IPC_FAIL;
+ }
+
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ ch->farside_uid = conn_info->farside_uid;
+ ch->farside_gid = conn_info->farside_gid;
+
+ /* verify the credential information. */
+ return verify_creds(auth_info,
+ conn_info->farside_uid, conn_info->farside_gid);
+}
+
+static
+pid_t
+socket_get_farside_pid(int sock)
+{
+ return -1;
+}
+#endif
+
+/***********************************************************************
+ * GETPEERUCRED VERSION... (e.g. Solaris 10 upwards)
+ ***********************************************************************/
+
+#ifdef USE_GETPEERUCRED
+/* verify the authentication information. */
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ struct SOCKET_CH_PRIVATE *conn_info;
+ ucred_t *ucred = NULL;
+ int rc = IPC_FAIL;
+
+ if (ch == NULL || ch->ch_private == NULL) {
+ return IPC_FAIL;
+ }
+
+ conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;
+
+ if (auth_info == NULL
+ || (auth_info->uid == NULL && auth_info->gid == NULL)) {
+ rc = IPC_OK; /* no restriction for authentication */
+ }
+
+ if (getpeerucred(conn_info->s, &ucred) < 0) {
+ cl_perror("getpeereid() failure");
+ return rc;
+ }
+
+ ch->farside_uid = ucred_geteuid(ucred);
+ ch->farside_gid = ucred_getegid(ucred);
+ if (rc == IPC_OK) {
+ return rc;
+ }
+
+ /* verify the credential information. */
+ rc = verify_creds(auth_info,
+ ucred_geteuid(ucred), ucred_getegid(ucred));
+ ucred_free(ucred);
+ return rc;
+}
+
+static
+pid_t
+socket_get_farside_pid(int sockfd)
+{
+ ucred_t *ucred = NULL;
+ pid_t pid;
+
+ if (getpeerucred(sockfd, &ucred) < 0) {
+ cl_perror("getpeereid() failure");
+ return IPC_FAIL;
+ }
+
+ pid = ucred_getpid(ucred);
+
+ ucred_free(ucred);
+
+ return pid;
+}
+#endif
+
+/***********************************************************************
+ * DUMMY VERSION... (other systems...)
+ *
+ * Other options that seem to be out there include
+ * SCM_CREDENTIALS and LOCAL_CREDS
+ * There are some kludgy things you can do with SCM_RIGHTS
+ * to pass an fd which could only be opened by the user id to
+ * validate the user id, but I don't know of a similar kludge which
+ * would work for group ids. And, even the uid one will fail
+ * if normal users are allowed to give away (chown) files.
+ *
+ * Unfortunately, this set of authentication routines have become
+ * very important to this API and its users (like heartbeat).
+ *
+ ***********************************************************************/
+
+#ifdef USE_DUMMY_CREDS
+static int
+socket_verify_auth(struct IPC_CHANNEL* ch, struct IPC_AUTH * auth_info)
+{
+ return IPC_FAIL;
+}
+
+static
+pid_t
+socket_get_farside_pid(int sock)
+{
+ return -1;
+}
+#endif /* Dummy version */
+
+/* socket object of the function table */
+static struct IPC_OPS socket_ops = {
+ socket_destroy_channel,
+ socket_initiate_connection,
+ socket_verify_auth,
+ socket_assert_auth,
+ socket_send,
+ socket_recv,
+ socket_waitin,
+ socket_waitout,
+ socket_is_message_pending,
+ socket_is_output_pending,
+ socket_resume_io,
+ socket_get_send_fd,
+ socket_get_recv_fd,
+ socket_set_send_qlen,
+ socket_set_recv_qlen,
+ socket_set_high_flow_callback,
+ socket_set_low_flow_callback,
+ socket_new_ipcmsg,
+ socket_get_chan_status,
+ socket_is_sendq_full,
+ socket_is_recvq_full,
+ socket_get_conntype,
+ socket_disconnect,
+};