/* a very simplistic tcp receiver for the rsyslog testbench. * * Author Philippe Duveau * * This file is contribution of the rsyslog project. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(__FreeBSD__) #include #endif #include "rsyslog.h" #include #include #define AMQP_STARTING ((uchar)0x10) #define AMQP_STOP ((uchar)0x00) #define AMQP_BEHAVIOR_STANDARD 1 #define AMQP_BEHAVIOR_NOEXCH 2 #define AMQP_BEHAVIOR_DECEXCH 3 #define AMQP_BEHAVIOR_BADEXCH 4 uchar connection_start[487] = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xDF, 0x00, 0x0A, 0x00, 0x0A, 0x00, 0x09, 0x00, 0x00, 0x01, 0xBA, 0x0C, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6C, 0x69, 0x74, 0x69, 0x65, 0x73, 0x46, 0x00, 0x00, 0x00, 0xC7, 0x12, 0x70, 0x75, 0x62, 0x6C, 0x69, 0x73, 0x68, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x66, 0x69, 0x72, 0x6D, 0x73, 0x74, 0x01, 0x1A, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x62, 0x69, 0x6E, 0x64, 0x69, 0x6E, 0x67, 0x73, 0x74, 0x01, 0x0A, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2E, 0x6E, 0x61, 0x63, 0x6B, 0x74, 0x01, 0x16, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x63, 0x61, 0x6E, 0x63, 0x65, 0x6C, 0x5F, 0x6E, 0x6F, 0x74, 0x69, 0x66, 0x79, 0x74, 0x01, 0x12, 0x63, 0x6F, 0x6E, 0x6E, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x2E, 0x62, 0x6C, 0x6F, 0x63, 0x6B, 0x65, 0x64, 0x74, 0x01, 0x13, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x70, 0x72, 0x69, 0x6F, 0x72, 0x69, 0x74, 0x69, 0x65, 0x73, 0x74, 0x01, 0x1C, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6E, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x5F, 0x66, 0x61, 0x69, 0x6C, 0x75, 0x72, 0x65, 0x5F, 0x63, 0x6C, 0x6F, 0x73, 0x65, 0x74, 0x01, 0x10, 0x70, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x71, 0x6F, 0x73, 0x74, 0x01, 0x0F, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5F, 0x72, 0x65, 0x70, 0x6C, 0x79, 0x5F, 0x74, 0x6F, 0x74, 0x01, 0x0C, 0x63, 0x6C, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x53, 0x00, 0x00, 0x00, 0x0D, 0x72, 0x61, 0x62, 0x62, 0x69, 0x74, 0x40, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x09, 0x63, 0x6F, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x53, 0x00, 0x00, 0x00, 0x2E, 0x43, 0x6F, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0x28, 0x43, 0x29, 0x20, 0x32, 0x30, 0x30, 0x37, 0x2D, 0x32, 0x30, 0x31, 0x36, 0x20, 0x50, 0x69, 0x76, 0x6F, 0x74, 0x61, 0x6C, 0x20, 0x53, 0x6F, 0x66, 0x74, 0x77, 0x61, 0x72, 0x65, 0x2C, 0x20, 0x49, 0x6E, 0x63, 0x2E, 0x0B, 0x69, 0x6E, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00, 0x35, 0x4C, 0x69, 0x63, 0x65, 0x6E, 0x73, 0x65, 0x64, 0x20, 0x75, 0x6E, 0x64, 0x65, 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, 0x4D, 0x50, 0x4C, 0x2E, 0x20, 0x20, 0x53, 0x65, 0x65, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2E, 0x72, 0x61, 0x62, 0x62, 0x69, 0x74, 0x6D, 0x71, 0x2E, 0x63, 0x6F, 0x6D, 0x2F, 0x08, 0x70, 0x6C, 0x61, 0x74, 0x66, 0x6F, 0x72, 0x6D, 0x53, 0x00, 0x00, 0x00, 0x0A, 0x45, 0x72, 0x6C, 0x61, 0x6E, 0x67, 0x2F, 0x4F, 0x54, 0x50, 0x07, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, 0x74, 0x53, 0x00, 0x00, 0x00, 0x08, 0x52, 0x61, 0x62, 0x62, 0x69, 0x74, 0x4D, 0x51, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00, 0x05, 0x33, 0x2E, 0x36, 0x2E, 0x32, 0x00, 0x00, 0x00, 0x0E, 0x41, 0x4D, 0x51, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x20, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x00, 0x00, 0x00, 0x05, 0x65, 0x6E, 0x5F, 0x55, 0x53, 0xCE }; static uchar connection_tune[20] = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0C, 0x00, 0x0A, 0x00, 0x1E, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x3C, 0xCE }; static uchar connection_open_ok[13] = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x0A, 0x00, 0x29, 0x00, 0xCE }; static uchar channel_open_ok[16] = { 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x14, 0x00, 0x0B, 0x00, 0x00, 0x00, 0x00, 0xCE }; static uchar exchange_declare_ok[12] = { 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x28, 0x00, 0x0B, 0xCE }; static uchar channel_close_ok[12] = { 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x14, 0x00, 0x29, 0xCE }; static uchar connection_close_ok[12] = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x0A, 0x00, 0x33, 0xCE }; static uchar channel_close_ok_on_badexch[148] = { 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x8C, 0x00, 0x14, 0x00, 0x28, 0x01, 0x96, 0x81, 0x50, 0x52, 0x45, 0x43, 0x4F, 0x4E, 0x44, 0x49, 0x54, 0x49, 0x4F, 0x4E, 0x5F, 0x46, 0x41, 0x49, 0x4C, 0x45, 0x44, 0x20, 0x2D, 0x20, 0x69, 0x6E, 0x65, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6C, 0x65, 0x6E, 0x74, 0x20, 0x61, 0x72, 0x67, 0x20, 0x27, 0x64, 0x75, 0x72, 0x61, 0x62, 0x6C, 0x65, 0x27, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x20, 0x27, 0x69, 0x6E, 0x27, 0x20, 0x69, 0x6E, 0x20, 0x76, 0x68, 0x6F, 0x73, 0x74, 0x20, 0x27, 0x2F, 0x6D, 0x65, 0x74, 0x72, 0x6F, 0x6C, 0x6F, 0x67, 0x69, 0x65, 0x27, 0x3A, 0x20, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x20, 0x27, 0x66, 0x61, 0x6C, 0x73, 0x65, 0x27, 0x20, 0x62, 0x75, 0x74, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6E, 0x74, 0x20, 0x69, 0x73, 0x20, 0x27, 0x74, 0x72, 0x75, 0x65, 0x27, 0x00, 0x28, 0x00, 0x0A, 0xCE }; typedef struct { uchar type; ushort ch; uint32_t method; uint16_t header_flags; size_t datalen; size_t framelen; uchar *data; } amqp_frame_type_t; #define DBGPRINTF0(f, ...) if (debug>0) { \ struct timeval dbgtv; \ gettimeofday(&dbgtv, NULL);\ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \ } #define DBGPRINTF1(f, ...) if (debug>0) { \ struct timeval dbgtv; \ gettimeofday(&dbgtv, NULL);\ dbgtv.tv_sec -= dbgtv_base.tv_sec; \ dbgtv.tv_usec -= dbgtv_base.tv_usec; \ if (dbgtv.tv_usec < 0) { \ dbgtv.tv_usec += 1000000; \ dbgtv.tv_sec--; \ } \ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \ } #define DBGPRINTF2(f, ...) if (debug==2) { \ struct timeval dbgtv; \ gettimeofday(&dbgtv, NULL);\ dbgtv.tv_sec -= dbgtv_base.tv_sec; \ dbgtv.tv_usec -= dbgtv_base.tv_usec; \ if (dbgtv.tv_usec < 0) { \ dbgtv.tv_usec += 1000000; \ dbgtv.tv_sec--; \ } \ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \ } static struct timeval dbgtv_base; static int server_behaviors = 0; static int behaviors; static int wait_after_accept = 200; /* milliseconds */ static char *outfile = NULL; static int debug = 1; FILE* fpout = NULL; static ATTR_NORETURN void errout(const char *reason, int server) { char txt[256]; snprintf(txt,256,"%s server %d", reason, server); perror(txt); if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; } if (outfile) unlink(outfile); exit(1); } static ATTR_NORETURN void usage(void) { fprintf(stderr, "usage: minirmqsrvr -f outfile [-b behaviour] " "[-t keep_alive_max] [-w delay_after_fail] [-d]\n"); exit (1); } /* Those three functions are "endianess" insensitive */ static uint16_t buf2uint16(uchar*b) { return ((uint16_t)b[0]) << 8 | ((uint16_t)b[1]); } static uint32_t buf2uint32(uchar*b) { return ((uint32_t)b[0]) << 24 | ((uint32_t)b[1]) << 16 | ((uint32_t)b[2]) << 8 | ((uint32_t)b[3]); } static uint64_t buf2uint64(uchar*b) { return ((uint64_t)b[0]) << 56 | ((uint64_t)b[1]) << 48 | ((uint64_t)b[2]) << 40 | ((uint64_t)b[3]) << 32 | ((uint64_t)b[4]) << 24 | ((uint64_t)b[5]) << 16 | ((uint64_t)b[6]) << 8 | ((uint64_t)b[7]); } static char AMQP091[8] = { 'A', 'M', 'Q', 'P', 0x00, 0x00, 0x09, 0x01 }; static int decode_frame_type(uchar *buf, amqp_frame_type_t *frame, size_t nread) { if (nread == 8){ if (memcmp(buf, AMQP091, sizeof(AMQP091))) return -1; frame->framelen = 8; frame->type = AMQP_STARTING; frame->ch = 0; return 0; } frame->type = buf[0]; frame->ch = buf2uint16(buf+1); frame->datalen = buf2uint32(buf+3); frame->framelen = frame->datalen + 8; frame->method = buf2uint32(buf+7); switch (frame->type) { case AMQP_FRAME_BODY: frame->data = buf + 7; break; default: frame->data = buf + 11; } return 0; } static ssize_t amqp_write(int fdc, uchar *buf, size_t blen, unsigned short channel) { buf[1] = (char) (channel >> 8); buf[2] = (char) (channel & 0xFF); return write(fdc, buf, blen); } static uchar * amqpFieldUint64(uint64_t *d, uchar *s) { *d = buf2uint64(s); return s + 8; } static uchar * amqpFieldUint32(uint32_t *d, uchar *s) { *d = buf2uint32(s); return s + 4; } static uchar * amqpFieldUint16(uint16_t *d, uchar *s) { *d = buf2uint16(s); return s + 2; } static uchar * amqpFieldLenFprintf(const char *pfx, uchar *s, uint32_t len) { if (fpout) fprintf(fpout, "%s%.*s", pfx, (int)len, (char*)s); return s + len; } static uchar * amqpFieldFprintf(const char *pfx, uchar *s) { uint32_t len = *s++; return amqpFieldLenFprintf(pfx, s, len); } static uchar * amqpHeaderFprintf(uchar *s, uint32_t *size) { uint32_t len; uchar *p = amqpFieldFprintf(", ", s); p++; /* value type */ p = amqpFieldUint32(&len, p); *size -= (p - s) + len; return amqpFieldLenFprintf(":", p, len); } static void amqp_srvr(int port, int srvr, int fds, int piperead, int pipewrite) { uchar wrkBuf[8192], *p; size_t nRead = 0, bsize = 0; ssize_t nSent; amqp_frame_type_t frame; uint64_t body_ui64 = 0; uint32_t props_header_size; uint16_t props_flags; int my_behaviour; struct timeval tv; fd_set rfds; int nfds = ((piperead > fds)? piperead : fds) + 1; int fdc; my_behaviour = behaviors & 0x000F; behaviors = behaviors >> 4; /* for next server */; if(listen(fds, 0) != 0) errout("listen", port); DBGPRINTF1("Server AMQP %d on port %d started\n", srvr, port); tv.tv_sec = 120; tv.tv_usec = 0; FD_ZERO(&rfds); FD_SET(fds, &rfds); if (piperead > 0) FD_SET(piperead, &rfds); if (select(nfds,&rfds,NULL,NULL, &tv) == 0) { exit(1); } if (piperead > 0 && FD_ISSET(piperead, &rfds)) { char c; int l = read(piperead, &c, 1); if (l == 1) { my_behaviour = behaviors & 0x000F; if (my_behaviour != 0) { DBGPRINTF1("Server AMQP %d on port %d switch behaviour", srvr, port); } else { DBGPRINTF1("Server AMQP %d on port %d leaving", srvr, port); if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; } exit(1); } } } fdc = accept(fds, NULL, NULL); if (pipewrite > 0) nSent = write(pipewrite, "N", 1); close(fds); fds = -1; /* this let the os understand that the port is closed */ usleep(1000 * wait_after_accept); frame.type = AMQP_STARTING; while(fdc > 0) { nSent = 0; ssize_t rd = 0; if (nRead < 12) { rd = read(fdc, wrkBuf + nRead, sizeof(wrkBuf) - nRead); if (rd <= 0) { DBGPRINTF1("Server AMQP %d on port %d disconnected\n", srvr, port); close(fdc); fdc = 0; break; }else { nRead += (size_t)rd; } } if (decode_frame_type(wrkBuf, &frame, nRead)) { DBGPRINTF1("Server AMQP %d on port %d killed : bad protocol\n", srvr, port); close(fdc); fdc = 0; break; } if (rd > 4) DBGPRINTF2("Server received : %zd\n", rd); switch (frame.type) { case AMQP_STARTING: /* starting handshake */ DBGPRINTF1("Server AMQP %d on port %d type %d connected\n", srvr, port, my_behaviour); DBGPRINTF2("Server %d connection.start\n", srvr); nSent = amqp_write(fdc, connection_start, sizeof(connection_start), frame.ch); break; case AMQP_FRAME_METHOD: DBGPRINTF2("Server %d method : 0x%X\n", srvr, frame.method); switch (frame.method) { case AMQP_CONNECTION_START_OK_METHOD: DBGPRINTF2("Server %d connection.tune\n", srvr); nSent = amqp_write(fdc, connection_tune, sizeof(connection_tune), frame.ch); break; case AMQP_CONNECTION_TUNE_OK_METHOD: DBGPRINTF2("Client %d connection.tune-ok\n", srvr); nSent = 0; break; case AMQP_CONNECTION_OPEN_METHOD: nSent = amqp_write(fdc, connection_open_ok, sizeof(connection_open_ok), frame.ch); DBGPRINTF2("Server %d connection.open\n", srvr); break; case AMQP_CHANNEL_OPEN_METHOD: nSent = amqp_write(fdc, channel_open_ok, sizeof(channel_open_ok), frame.ch); DBGPRINTF2("Server %d channel.open\n", srvr); if (my_behaviour == AMQP_BEHAVIOR_NOEXCH) { close(fdc); DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port); fdc = 0; frame.type = 0; } break; case AMQP_EXCHANGE_DECLARE_METHOD: if (my_behaviour == AMQP_BEHAVIOR_BADEXCH) { nSent = amqp_write(fdc, channel_close_ok_on_badexch, sizeof(channel_close_ok_on_badexch), frame.ch); }else{ nSent = amqp_write(fdc, exchange_declare_ok, sizeof(exchange_declare_ok), frame.ch); } DBGPRINTF2("Server %d exchange.declare\n", srvr); if (my_behaviour == AMQP_BEHAVIOR_DECEXCH) { close(fdc); DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port); fdc = 0; frame.type = 0; } break; case AMQP_CHANNEL_CLOSE_METHOD: nSent = amqp_write(fdc, channel_close_ok, sizeof(channel_close_ok), frame.ch); DBGPRINTF2("Server %d channel.close\n", srvr); break; case AMQP_CONNECTION_CLOSE_METHOD: nSent = amqp_write(fdc, connection_close_ok, sizeof(connection_close_ok), frame.ch); DBGPRINTF2("Server %d connection.close\n", srvr); break; case AMQP_BASIC_PUBLISH_METHOD: p = amqpFieldFprintf("Exchange:", frame.data + 2); amqpFieldFprintf(", routing-key:", p); break; default: nSent = 0; } break; case AMQP_FRAME_HEADER: DBGPRINTF2("Server %d HEADERS\n", srvr); p = amqpFieldUint64(&body_ui64, frame.data); bsize = (size_t)body_ui64; p = amqpFieldUint16(&props_flags, p); if (props_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { p = amqpFieldFprintf(", content-type:", p); } if (props_flags & AMQP_BASIC_HEADERS_FLAG) { p = amqpFieldUint32(&props_header_size, p); while (props_header_size) { p = amqpHeaderFprintf(p, &props_header_size); } } if (props_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { if (fpout) fprintf(fpout, ", delivery-mode:%s", (*p++)?"transient":"persistent"); } if (props_flags & AMQP_BASIC_EXPIRATION_FLAG) { p = amqpFieldFprintf(", expiration:", p); } if (props_flags & AMQP_BASIC_TIMESTAMP_FLAG) { if (fpout) fprintf(fpout, ", timestamp:OK"); p += sizeof(uint64_t); } if (props_flags & AMQP_BASIC_APP_ID_FLAG) { amqpFieldFprintf(", app-id:", p); } if (fpout) fprintf(fpout, ", msg:"); break; case AMQP_FRAME_BODY: DBGPRINTF2("Server %d Body size left : %zu, received : %zu\n", srvr, bsize, frame.datalen); bsize -= frame.datalen; if (fpout) { fprintf(fpout, "%.*s", (int)frame.datalen, frame.data); if (frame.data[frame.datalen-1] != '\n') fprintf(fpout, "\n"); fflush(fpout); } break; default: DBGPRINTF1("Server %d unsupported frame type %d\n", srvr, frame.type); close(fdc); fdc = 0; frame.type = 0; frame.framelen = 0; } /* switch (frame.type) */ nRead -= frame.framelen; if (nRead>0) memmove(wrkBuf, wrkBuf + frame.framelen, nRead); if (nSent < 0) { close(fdc); fdc = 0; } } /* while(fdc) */ DBGPRINTF2("Leaving thread %d\n", srvr); } int main(int argc, char *argv[]) { int port[2], fds[2], i, opt, nb_port = 1; int pipeS1toS2[2] = { -1, -1 }; int pipeS2toS1[2] = { -1, -1 }; int pipeRead[2], pipeWrite[2]; struct sockaddr_in srvAddr[2]; unsigned int addrLen = sizeof(struct sockaddr_in), len; pid_t pid[2]; fpout = stdout; while((opt = getopt(argc, argv, "f:b:w:d")) != -1) { switch (opt) { case 'w': wait_after_accept = atoi(optarg); break; case 'd': debug = 2; break; case 'b': server_behaviors = atoi(optarg); break; case 'f': if(strcmp(optarg, "-")) { outfile = optarg; fpout = fopen(optarg, "w"); if(fpout == NULL){ fprintf(stderr, "file %s could not be created\n", outfile); exit(1); } } break; default: fprintf(stderr, "invalid option '%c' or value missing - terminating...\n", opt); usage(); break; } } switch (server_behaviors) { case 0: behaviors = AMQP_BEHAVIOR_STANDARD; nb_port = 1; break; case 1: /* two standard servers get message successfully */ behaviors = AMQP_BEHAVIOR_STANDARD; nb_port = 2; break; case 2: /* 2 servers first server which disconnect after after open channel : no declare exchange */ behaviors = AMQP_BEHAVIOR_NOEXCH | AMQP_BEHAVIOR_STANDARD << 4; nb_port = 2; break; case 3: /* 2 servers first server which disconnect after declare exchange*/ behaviors = AMQP_BEHAVIOR_DECEXCH | AMQP_BEHAVIOR_STANDARD << 4; nb_port = 2; break; case 4: /* one server with bad exchange declare */ behaviors = AMQP_BEHAVIOR_BADEXCH; nb_port = 1; break; default: fprintf(stderr,"Invalid behavior"); exit(1); } gettimeofday(&dbgtv_base, NULL); port[0] = port[1] = -1; if (nb_port == 2) { if(pipe(pipeS1toS2) == -1 || pipe(pipeS2toS1) == -1) { fprintf(stderr, "Pipe failed !"); exit(1); } } pipeRead[0] = pipeS2toS1[0]; pipeWrite[0] = pipeS1toS2[1]; pipeRead[1] = pipeS1toS2[0]; pipeWrite[1] = pipeS2toS1[1]; for (i = 0; i < nb_port; i++) { fds[i] = socket(AF_INET, SOCK_STREAM, 0); srvAddr[i].sin_family = AF_INET; srvAddr[i].sin_addr.s_addr = INADDR_ANY; srvAddr[i].sin_port = 0; if(bind(fds[i], (struct sockaddr *)&srvAddr[i], addrLen) != 0) errout("bind", 1); len = addrLen; if (getsockname(fds[i], (struct sockaddr *)&srvAddr[i], &len) == -1) errout("bind", i+1); if ((port[i] = ntohs(srvAddr[i].sin_port)) <= 0) errout("get port", i+1); } for (i = 0; i < nb_port; i++) { if ((pid[i] = fork()) == -1) { fprintf(stderr, "Fork failed !"); exit(1); } if (pid[i] == 0) { /* this is the child */ if (fds[1-i] > 0) close(fds[1-i]); amqp_srvr(port[i], i+1, fds[i], pipeRead[i], pipeWrite[i]); if (fpout && fpout != stdout) fclose(fpout); DBGPRINTF2("%s\n","Leaving server"); return 0; } } if (nb_port==2) printf("export AMQPSRVRPID1=%ld AMQPSRVRPID2=%ld PORT_AMQP1=%d PORT_AMQP2=%d", (long)pid[0], (long)pid[1], port[0], port[1]); else printf("export AMQPSRVRPID1=%ld PORT_AMQP1=%d", (long)pid[0], port[0]); return 0; }