/* * Copyright (c) 2006-2009 Red Hat Inc * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef QBLOG #include #endif #ifndef HOST_NAME_MAX #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX #endif static int quit = 0; static int show_ip = 0; static int restart = 0; static uint32_t nodeidStart = 0; static void print_localnodeid(cpg_handle_t handle); static void print_cpgname (const struct cpg_name *name) { unsigned int i; for (i = 0; i < name->length; i++) { printf ("%c", name->value[i]); } } static char * node_pid_format(unsigned int nodeid, unsigned int pid) { static char buffer[100]; if (show_ip) { struct in_addr saddr; #if __BYTE_ORDER == __LITTLE_ENDIAN saddr.s_addr = swab32(nodeid); #else saddr.s_addr = nodeid; #endif sprintf(buffer, "node/pid %s/%d", inet_ntoa(saddr),pid); } else { sprintf(buffer, "node/pid " CS_PRI_NODE_ID "/%d", nodeid, pid); } return buffer; } static void print_time(void) { #define MAXLEN (256) char buf[MAXLEN]; char hostname[HOST_NAME_MAX]; struct timeval tnow; time_t t; size_t len; char *s = buf; len = sizeof(hostname); if(gethostname(hostname, len) == 0) { char *longName; hostname[len-1] = '\0'; longName = hostname; if( (longName = strstr( hostname, "." )) != NULL ) *longName = '\0'; } strcpy(s, hostname); s += strlen(hostname); s += snprintf(s, sizeof(buf)-(s-buf), ":%d", getpid()); t = time(0); gettimeofday( &tnow, 0 ); s += strftime(s, sizeof(buf)-(s-buf) , " %Y-%m-%d %T", localtime(&t)); s += snprintf(s, sizeof(buf)-(s-buf), ".%03ld", tnow.tv_usec/1000); assert(s-buf < (int)sizeof(buf)); printf("%s\n", buf); } static void DeliverCallback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { print_time(); printf("DeliverCallback: message (len=%lu)from %s: '%s'\n", (unsigned long int) msg_len, node_pid_format(nodeid, pid), (const char *)msg); } static void ConfchgCallback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { unsigned int i; int result; uint32_t nodeid; print_time(); printf("ConfchgCallback: group '"); print_cpgname(groupName); printf("'\n"); print_localnodeid(handle); for (i=0; isin_addr.s_addr = nodeid; if(inet_ntop(AF_INET, (const void *)&v4addr->sin_addr.s_addr, addrStr, (socklen_t)sizeof(addrStr)) == NULL) { addrStr[0] = 0; } printf ("Local node id is %s/" CS_PRI_NODE_ID " result %d\n", addrStr, nodeid, result); } } int main (int argc, char *argv[]) { cpg_handle_t handle; fd_set read_fds; int select_fd; int result; int retries; const char *options = "i"; int opt; unsigned int nodeid; char *fgets_res; struct cpg_address member_list[CPG_MEMBERS_MAX]; int member_list_entries; int i; int recnt; int doexit; const char *exitStr = "EXIT"; doexit = 0; #ifdef QBLOG qb_log_init("testcpg", LOG_USER, LOG_ERR); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "[%p] %f %b"); #endif while ( (opt = getopt(argc, argv, options)) != -1 ) { switch (opt) { case 'i': show_ip = 1; break; } } if (argc > optind) { if (strlen(argv[optind]) >= CPG_MAX_NAME_LENGTH) { fprintf(stderr, "Invalid name for cpg group\n"); return (1); } strcpy(group_name.value, argv[optind]); group_name.length = strlen(argv[optind]); } else { strcpy(group_name.value, "GROUP"); group_name.length = 6; } recnt = 0; printf ("Type %s to finish\n", exitStr); restart = 1; do { if(restart) { restart = 0; retries = 0; cs_repeat_init(retries, 30, result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_data, NULL)); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); retrybackoff(recnt); } retries = 0; cs_repeat(retries, 30, result = cpg_local_get(handle, &nodeid)); if (result != CS_OK) { printf ("Could not get local node id\n"); retrybackoff(recnt); } printf ("Local node id is " CS_PRI_NODE_ID "\n", nodeid); nodeidStart = nodeid; retries = 0; cs_repeat(retries, 30, result = cpg_join(handle, &group_name)); if (result != CS_OK) { printf ("Could not join process group, error %d\n", result); retrybackoff(recnt); } retries = 0; cs_repeat(retries, 30, result = cpg_membership_get (handle, &group_name, (struct cpg_address *)&member_list, &member_list_entries)); if (result != CS_OK) { printf ("Could not get current membership list %d\n", result); retrybackoff(recnt); } recnt = 0; printf ("membership list\n"); for (i = 0; i < member_list_entries; i++) { printf ("node id " CS_PRI_NODE_ID " pid %d\n", member_list[i].nodeid, member_list[i].pid); } FD_ZERO (&read_fds); cpg_fd_get(handle, &select_fd); } FD_SET (select_fd, &read_fds); FD_SET (STDIN_FILENO, &read_fds); result = select (select_fd + 1, &read_fds, 0, 0, 0); if (result == -1) { perror ("select\n"); } if (FD_ISSET (STDIN_FILENO, &read_fds)) { char inbuf[132]; struct iovec iov; fgets_res = fgets(inbuf, (int)sizeof(inbuf), stdin); if (fgets_res == NULL) { doexit = 1; cpg_leave(handle, &group_name); } if (strncmp(inbuf, exitStr, strlen(exitStr)) == 0) { doexit = 1; cpg_leave(handle, &group_name); } else { iov.iov_base = inbuf; iov.iov_len = strlen(inbuf)+1; cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); } } if (FD_ISSET (select_fd, &read_fds)) { if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK) { if(doexit) { exit(1); } restart = 1; } } if(restart) { if(!doexit) { result = cpg_finalize (handle); printf ("Finalize+restart result is %d (should be 1)\n", result); continue; } } } while (result && !quit && !doexit); result = cpg_finalize (handle); printf ("Finalize result is %d (should be 1)\n", result); return (0); }