summaryrefslogtreecommitdiffstats
path: root/tests/miniamqpsrvr.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/miniamqpsrvr.c')
-rw-r--r--tests/miniamqpsrvr.c658
1 files changed, 658 insertions, 0 deletions
diff --git a/tests/miniamqpsrvr.c b/tests/miniamqpsrvr.c
new file mode 100644
index 0000000..5aedacd
--- /dev/null
+++ b/tests/miniamqpsrvr.c
@@ -0,0 +1,658 @@
+/* a very simplistic tcp receiver for the rsyslog testbench.
+ *
+ * Author Philippe Duveau
+ *
+ * This file is contribution of the rsyslog project.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <signal.h>
+#include <errno.h>
+#if defined(__FreeBSD__)
+#include <netinet/in.h>
+#endif
+
+#include "rsyslog.h"
+
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+#define AMQP_STARTING ((uchar)0x10)
+#define AMQP_STOP ((uchar)0x00)
+
+#define AMQP_BEHAVIOR_STANDARD 1
+#define AMQP_BEHAVIOR_NOEXCH 2
+#define AMQP_BEHAVIOR_DECEXCH 3
+#define AMQP_BEHAVIOR_BADEXCH 4
+
+uchar connection_start[487] = {
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xDF, 0x00, 0x0A, 0x00, 0x0A, 0x00, 0x09, 0x00, 0x00, 0x01,
+ 0xBA, 0x0C, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6C, 0x69, 0x74, 0x69, 0x65, 0x73, 0x46, 0x00,
+ 0x00, 0x00, 0xC7, 0x12, 0x70, 0x75, 0x62, 0x6C, 0x69, 0x73, 0x68, 0x65, 0x72, 0x5F, 0x63, 0x6F,
+ 0x6E, 0x66, 0x69, 0x72, 0x6D, 0x73, 0x74, 0x01, 0x1A, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67,
+ 0x65, 0x5F, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x62, 0x69, 0x6E, 0x64, 0x69,
+ 0x6E, 0x67, 0x73, 0x74, 0x01, 0x0A, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2E, 0x6E, 0x61, 0x63, 0x6B,
+ 0x74, 0x01, 0x16, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x63, 0x61, 0x6E, 0x63,
+ 0x65, 0x6C, 0x5F, 0x6E, 0x6F, 0x74, 0x69, 0x66, 0x79, 0x74, 0x01, 0x12, 0x63, 0x6F, 0x6E, 0x6E,
+ 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x2E, 0x62, 0x6C, 0x6F, 0x63, 0x6B, 0x65, 0x64, 0x74, 0x01,
+ 0x13, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x70, 0x72, 0x69, 0x6F, 0x72, 0x69,
+ 0x74, 0x69, 0x65, 0x73, 0x74, 0x01, 0x1C, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6E, 0x74, 0x69, 0x63,
+ 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x5F, 0x66, 0x61, 0x69, 0x6C, 0x75, 0x72, 0x65, 0x5F, 0x63, 0x6C,
+ 0x6F, 0x73, 0x65, 0x74, 0x01, 0x10, 0x70, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D,
+ 0x65, 0x72, 0x5F, 0x71, 0x6F, 0x73, 0x74, 0x01, 0x0F, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5F,
+ 0x72, 0x65, 0x70, 0x6C, 0x79, 0x5F, 0x74, 0x6F, 0x74, 0x01, 0x0C, 0x63, 0x6C, 0x75, 0x73, 0x74,
+ 0x65, 0x72, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x53, 0x00, 0x00, 0x00, 0x0D, 0x72, 0x61, 0x62, 0x62,
+ 0x69, 0x74, 0x40, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x09, 0x63, 0x6F, 0x70, 0x79, 0x72, 0x69,
+ 0x67, 0x68, 0x74, 0x53, 0x00, 0x00, 0x00, 0x2E, 0x43, 0x6F, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68,
+ 0x74, 0x20, 0x28, 0x43, 0x29, 0x20, 0x32, 0x30, 0x30, 0x37, 0x2D, 0x32, 0x30, 0x31, 0x36, 0x20,
+ 0x50, 0x69, 0x76, 0x6F, 0x74, 0x61, 0x6C, 0x20, 0x53, 0x6F, 0x66, 0x74, 0x77, 0x61, 0x72, 0x65,
+ 0x2C, 0x20, 0x49, 0x6E, 0x63, 0x2E, 0x0B, 0x69, 0x6E, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x69,
+ 0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00, 0x35, 0x4C, 0x69, 0x63, 0x65, 0x6E, 0x73, 0x65, 0x64, 0x20,
+ 0x75, 0x6E, 0x64, 0x65, 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, 0x4D, 0x50, 0x4C, 0x2E, 0x20, 0x20,
+ 0x53, 0x65, 0x65, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2E, 0x72,
+ 0x61, 0x62, 0x62, 0x69, 0x74, 0x6D, 0x71, 0x2E, 0x63, 0x6F, 0x6D, 0x2F, 0x08, 0x70, 0x6C, 0x61,
+ 0x74, 0x66, 0x6F, 0x72, 0x6D, 0x53, 0x00, 0x00, 0x00, 0x0A, 0x45, 0x72, 0x6C, 0x61, 0x6E, 0x67,
+ 0x2F, 0x4F, 0x54, 0x50, 0x07, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, 0x74, 0x53, 0x00, 0x00, 0x00,
+ 0x08, 0x52, 0x61, 0x62, 0x62, 0x69, 0x74, 0x4D, 0x51, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6F,
+ 0x6E, 0x53, 0x00, 0x00, 0x00, 0x05, 0x33, 0x2E, 0x36, 0x2E, 0x32, 0x00, 0x00, 0x00, 0x0E, 0x41,
+ 0x4D, 0x51, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x20, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x00, 0x00, 0x00,
+ 0x05, 0x65, 0x6E, 0x5F, 0x55, 0x53, 0xCE
+};
+
+static uchar connection_tune[20] = {
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0C, 0x00, 0x0A, 0x00, 0x1E, 0x00, 0x00, 0x00, 0x02, 0x00,
+ 0x00, 0x00, 0x3C, 0xCE
+};
+
+static uchar connection_open_ok[13] = {
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x0A, 0x00, 0x29, 0x00, 0xCE
+};
+
+static uchar channel_open_ok[16] = {
+ 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x14, 0x00, 0x0B, 0x00, 0x00, 0x00, 0x00, 0xCE
+};
+
+static uchar exchange_declare_ok[12] = {
+ 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x28, 0x00, 0x0B, 0xCE
+};
+
+static uchar channel_close_ok[12] = {
+ 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x14, 0x00, 0x29, 0xCE
+};
+
+static uchar connection_close_ok[12] = {
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x0A, 0x00, 0x33, 0xCE
+};
+
+static uchar channel_close_ok_on_badexch[148] = {
+ 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x8C, 0x00, 0x14, 0x00, 0x28, 0x01,
+ 0x96, 0x81, 0x50, 0x52, 0x45, 0x43, 0x4F, 0x4E, 0x44, 0x49, 0x54, 0x49,
+ 0x4F, 0x4E, 0x5F, 0x46, 0x41, 0x49, 0x4C, 0x45, 0x44, 0x20, 0x2D, 0x20,
+ 0x69, 0x6E, 0x65, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6C, 0x65, 0x6E, 0x74,
+ 0x20, 0x61, 0x72, 0x67, 0x20, 0x27, 0x64, 0x75, 0x72, 0x61, 0x62, 0x6C,
+ 0x65, 0x27, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x65, 0x78, 0x63, 0x68, 0x61,
+ 0x6E, 0x67, 0x65, 0x20, 0x27, 0x69, 0x6E, 0x27, 0x20, 0x69, 0x6E, 0x20,
+ 0x76, 0x68, 0x6F, 0x73, 0x74, 0x20, 0x27, 0x2F, 0x6D, 0x65, 0x74, 0x72,
+ 0x6F, 0x6C, 0x6F, 0x67, 0x69, 0x65, 0x27, 0x3A, 0x20, 0x72, 0x65, 0x63,
+ 0x65, 0x69, 0x76, 0x65, 0x64, 0x20, 0x27, 0x66, 0x61, 0x6C, 0x73, 0x65,
+ 0x27, 0x20, 0x62, 0x75, 0x74, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6E,
+ 0x74, 0x20, 0x69, 0x73, 0x20, 0x27, 0x74, 0x72, 0x75, 0x65, 0x27, 0x00,
+ 0x28, 0x00, 0x0A, 0xCE
+};
+
+typedef struct {
+ uchar type;
+ ushort ch;
+ uint32_t method;
+ uint16_t header_flags;
+ size_t datalen;
+ size_t framelen;
+ uchar *data;
+} amqp_frame_type_t;
+
+#define DBGPRINTF0(f, ...) if (debug>0) { \
+ struct timeval dbgtv; \
+ gettimeofday(&dbgtv, NULL);\
+ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
+ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
+}
+#define DBGPRINTF1(f, ...) if (debug>0) { \
+ struct timeval dbgtv; \
+ gettimeofday(&dbgtv, NULL);\
+ dbgtv.tv_sec -= dbgtv_base.tv_sec; \
+ dbgtv.tv_usec -= dbgtv_base.tv_usec; \
+ if (dbgtv.tv_usec < 0) { \
+ dbgtv.tv_usec += 1000000; \
+ dbgtv.tv_sec--; \
+ } \
+ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
+ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
+}
+#define DBGPRINTF2(f, ...) if (debug==2) { \
+ struct timeval dbgtv; \
+ gettimeofday(&dbgtv, NULL);\
+ dbgtv.tv_sec -= dbgtv_base.tv_sec; \
+ dbgtv.tv_usec -= dbgtv_base.tv_usec; \
+ if (dbgtv.tv_usec < 0) { \
+ dbgtv.tv_usec += 1000000; \
+ dbgtv.tv_sec--; \
+ } \
+ fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
+ (int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
+}
+
+static struct timeval dbgtv_base;
+static int server_behaviors = 0;
+static int behaviors;
+static int wait_after_accept = 200; /* milliseconds */
+static char *outfile = NULL;
+static int debug = 1;
+
+FILE* fpout = NULL;
+
+static ATTR_NORETURN void
+errout(const char *reason, int server)
+{
+ char txt[256];
+ snprintf(txt,256,"%s server %d", reason, server);
+ perror(txt);
+ if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; }
+ if (outfile) unlink(outfile);
+ exit(1);
+}
+
+static ATTR_NORETURN void
+usage(void)
+{
+ fprintf(stderr, "usage: minirmqsrvr -f outfile [-b behaviour] "
+ "[-t keep_alive_max] [-w delay_after_fail] [-d]\n");
+ exit (1);
+}
+
+/* Those three functions are "endianess" insensitive */
+static uint16_t buf2uint16(uchar*b) {
+ return ((uint16_t)b[0]) << 8 | ((uint16_t)b[1]);
+}
+
+static uint32_t buf2uint32(uchar*b) {
+ return ((uint32_t)b[0]) << 24 | ((uint32_t)b[1]) << 16 | ((uint32_t)b[2]) << 8 | ((uint32_t)b[3]);
+}
+
+static uint64_t buf2uint64(uchar*b) {
+ return ((uint64_t)b[0]) << 56 | ((uint64_t)b[1]) << 48 | ((uint64_t)b[2]) << 40 | ((uint64_t)b[3]) << 32
+ | ((uint64_t)b[4]) << 24 | ((uint64_t)b[5]) << 16 | ((uint64_t)b[6]) << 8 | ((uint64_t)b[7]);
+}
+
+
+static char AMQP091[8] = { 'A', 'M', 'Q', 'P', 0x00, 0x00, 0x09, 0x01 };
+
+static int
+decode_frame_type(uchar *buf, amqp_frame_type_t *frame, size_t nread) {
+ if (nread == 8){
+ if (memcmp(buf, AMQP091, sizeof(AMQP091)))
+ return -1;
+ frame->framelen = 8;
+ frame->type = AMQP_STARTING;
+ frame->ch = 0;
+ return 0;
+ }
+ frame->type = buf[0];
+ frame->ch = buf2uint16(buf+1);
+ frame->datalen = buf2uint32(buf+3);
+ frame->framelen = frame->datalen + 8;
+ frame->method = buf2uint32(buf+7);
+ switch (frame->type) {
+ case AMQP_FRAME_BODY:
+ frame->data = buf + 7;
+ break;
+ default:
+ frame->data = buf + 11;
+ }
+ return 0;
+}
+
+static ssize_t
+amqp_write(int fdc, uchar *buf, size_t blen, unsigned short channel) {
+ buf[1] = (char) (channel >> 8);
+ buf[2] = (char) (channel & 0xFF);
+ return write(fdc, buf, blen);
+}
+
+static uchar *
+amqpFieldUint64(uint64_t *d, uchar *s) {
+ *d = buf2uint64(s);
+ return s + 8;
+}
+
+static uchar *
+amqpFieldUint32(uint32_t *d, uchar *s) {
+ *d = buf2uint32(s);
+ return s + 4;
+}
+
+static uchar *
+amqpFieldUint16(uint16_t *d, uchar *s) {
+ *d = buf2uint16(s);
+ return s + 2;
+}
+
+static uchar *
+amqpFieldLenFprintf(const char *pfx, uchar *s, uint32_t len) {
+ if (fpout)
+ fprintf(fpout, "%s%.*s", pfx, (int)len, (char*)s);
+ return s + len;
+}
+
+static uchar *
+amqpFieldFprintf(const char *pfx, uchar *s) {
+ uint32_t len = *s++;
+ return amqpFieldLenFprintf(pfx, s, len);
+}
+
+static uchar *
+amqpHeaderFprintf(uchar *s, uint32_t *size) {
+ uint32_t len;
+ uchar *p = amqpFieldFprintf(", ", s);
+ p++; /* value type */
+ p = amqpFieldUint32(&len, p);
+ *size -= (p - s) + len;
+ return amqpFieldLenFprintf(":", p, len);
+}
+
+static void
+amqp_srvr(int port, int srvr, int fds, int piperead, int pipewrite)
+{
+ uchar wrkBuf[8192], *p;
+ size_t nRead = 0, bsize = 0;
+ ssize_t nSent;
+ amqp_frame_type_t frame;
+ uint64_t body_ui64 = 0;
+ uint32_t props_header_size;
+ uint16_t props_flags;
+ int my_behaviour;
+ struct timeval tv;
+ fd_set rfds;
+ int nfds = ((piperead > fds)? piperead : fds) + 1;
+
+ int fdc;
+
+ my_behaviour = behaviors & 0x000F;
+ behaviors = behaviors >> 4; /* for next server */;
+
+ if(listen(fds, 0) != 0) errout("listen", port);
+
+ DBGPRINTF1("Server AMQP %d on port %d started\n", srvr, port);
+
+ tv.tv_sec = 120;
+ tv.tv_usec = 0;
+ FD_ZERO(&rfds);
+ FD_SET(fds, &rfds);
+ if (piperead > 0)
+ FD_SET(piperead, &rfds);
+
+ if (select(nfds,&rfds,NULL,NULL, &tv) == 0) {
+ exit(1);
+ }
+
+ if (piperead > 0 && FD_ISSET(piperead, &rfds)) {
+ char c;
+ int l = read(piperead, &c, 1);
+ if (l == 1) {
+ my_behaviour = behaviors & 0x000F;
+ if (my_behaviour != 0) {
+ DBGPRINTF1("Server AMQP %d on port %d switch behaviour", srvr, port);
+ } else {
+ DBGPRINTF1("Server AMQP %d on port %d leaving", srvr, port);
+ if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; }
+ exit(1);
+ }
+ }
+ }
+
+ fdc = accept(fds, NULL, NULL);
+
+ if (pipewrite > 0)
+ nSent = write(pipewrite, "N", 1);
+
+ close(fds);
+ fds = -1;
+
+ /* this let the os understand that the port is closed */
+ usleep(1000 * wait_after_accept);
+
+ frame.type = AMQP_STARTING;
+
+ while(fdc > 0) {
+ nSent = 0;
+ ssize_t rd = 0;
+ if (nRead < 12) {
+ rd = read(fdc, wrkBuf + nRead, sizeof(wrkBuf) - nRead);
+ if (rd <= 0) {
+ DBGPRINTF1("Server AMQP %d on port %d disconnected\n", srvr, port);
+ close(fdc);
+ fdc = 0;
+ break;
+ }else {
+ nRead += (size_t)rd;
+ }
+ }
+
+ if (decode_frame_type(wrkBuf, &frame, nRead)) {
+ DBGPRINTF1("Server AMQP %d on port %d killed : bad protocol\n", srvr, port);
+ close(fdc);
+ fdc = 0;
+ break;
+ }
+ if (rd > 4)
+ DBGPRINTF2("Server received : %zd\n", rd);
+
+ switch (frame.type) {
+
+ case AMQP_STARTING: /* starting handshake */
+
+ DBGPRINTF1("Server AMQP %d on port %d type %d connected\n", srvr, port, my_behaviour);
+ DBGPRINTF2("Server %d connection.start\n", srvr);
+ nSent = amqp_write(fdc, connection_start, sizeof(connection_start), frame.ch);
+ break;
+
+ case AMQP_FRAME_METHOD:
+
+ DBGPRINTF2("Server %d method : 0x%X\n", srvr, frame.method);
+
+ switch (frame.method) {
+
+ case AMQP_CONNECTION_START_OK_METHOD:
+
+ DBGPRINTF2("Server %d connection.tune\n", srvr);
+ nSent = amqp_write(fdc, connection_tune, sizeof(connection_tune), frame.ch);
+ break;
+
+ case AMQP_CONNECTION_TUNE_OK_METHOD:
+
+ DBGPRINTF2("Client %d connection.tune-ok\n", srvr);
+ nSent = 0;
+ break;
+
+ case AMQP_CONNECTION_OPEN_METHOD:
+
+ nSent = amqp_write(fdc, connection_open_ok,
+ sizeof(connection_open_ok), frame.ch);
+ DBGPRINTF2("Server %d connection.open\n", srvr);
+ break;
+
+ case AMQP_CHANNEL_OPEN_METHOD:
+
+ nSent = amqp_write(fdc, channel_open_ok,
+ sizeof(channel_open_ok), frame.ch);
+ DBGPRINTF2("Server %d channel.open\n", srvr);
+ if (my_behaviour == AMQP_BEHAVIOR_NOEXCH) {
+ close(fdc);
+ DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
+ fdc = 0;
+ frame.type = 0;
+ }
+ break;
+
+ case AMQP_EXCHANGE_DECLARE_METHOD:
+
+ if (my_behaviour == AMQP_BEHAVIOR_BADEXCH) {
+ nSent = amqp_write(fdc, channel_close_ok_on_badexch,
+ sizeof(channel_close_ok_on_badexch), frame.ch);
+ }else{
+ nSent = amqp_write(fdc, exchange_declare_ok,
+ sizeof(exchange_declare_ok), frame.ch);
+ }
+ DBGPRINTF2("Server %d exchange.declare\n", srvr);
+ if (my_behaviour == AMQP_BEHAVIOR_DECEXCH) {
+ close(fdc);
+ DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
+ fdc = 0;
+ frame.type = 0;
+ }
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+
+ nSent = amqp_write(fdc, channel_close_ok,
+ sizeof(channel_close_ok), frame.ch);
+ DBGPRINTF2("Server %d channel.close\n", srvr);
+ break;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+
+ nSent = amqp_write(fdc, connection_close_ok,
+ sizeof(connection_close_ok), frame.ch);
+ DBGPRINTF2("Server %d connection.close\n", srvr);
+ break;
+
+ case AMQP_BASIC_PUBLISH_METHOD:
+
+ p = amqpFieldFprintf("Exchange:", frame.data + 2);
+ amqpFieldFprintf(", routing-key:", p);
+ break;
+
+ default:
+
+ nSent = 0;
+ }
+ break;
+
+ case AMQP_FRAME_HEADER:
+
+ DBGPRINTF2("Server %d HEADERS\n", srvr);
+ p = amqpFieldUint64(&body_ui64, frame.data);
+ bsize = (size_t)body_ui64;
+ p = amqpFieldUint16(&props_flags, p);
+ if (props_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ p = amqpFieldFprintf(", content-type:", p);
+ }
+ if (props_flags & AMQP_BASIC_HEADERS_FLAG) {
+ p = amqpFieldUint32(&props_header_size, p);
+ while (props_header_size) {
+ p = amqpHeaderFprintf(p, &props_header_size);
+ }
+ }
+ if (props_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
+ if (fpout)
+ fprintf(fpout, ", delivery-mode:%s", (*p++)?"transient":"persistent");
+ }
+ if (props_flags & AMQP_BASIC_EXPIRATION_FLAG) {
+ p = amqpFieldFprintf(", expiration:", p);
+ }
+ if (props_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
+ if (fpout)
+ fprintf(fpout, ", timestamp:OK");
+ p += sizeof(uint64_t);
+ }
+ if (props_flags & AMQP_BASIC_APP_ID_FLAG) {
+ amqpFieldFprintf(", app-id:", p);
+ }
+ if (fpout)
+ fprintf(fpout, ", msg:");
+ break;
+
+ case AMQP_FRAME_BODY:
+
+ DBGPRINTF2("Server %d Body size left : %zu, received : %zu\n",
+ srvr, bsize, frame.datalen);
+ bsize -= frame.datalen;
+ if (fpout) {
+ fprintf(fpout, "%.*s", (int)frame.datalen, frame.data);
+ if (frame.data[frame.datalen-1] != '\n')
+ fprintf(fpout, "\n");
+ fflush(fpout);
+ }
+ break;
+
+ default:
+
+ DBGPRINTF1("Server %d unsupported frame type %d\n", srvr, frame.type);
+ close(fdc);
+ fdc = 0;
+ frame.type = 0;
+ frame.framelen = 0;
+ } /* switch (frame.type) */
+
+ nRead -= frame.framelen;
+ if (nRead>0)
+ memmove(wrkBuf, wrkBuf + frame.framelen, nRead);
+
+ if (nSent < 0) {
+ close(fdc);
+ fdc = 0;
+ }
+ } /* while(fdc) */
+ DBGPRINTF2("Leaving thread %d\n", srvr);
+}
+
+int
+main(int argc, char *argv[])
+{
+ int port[2], fds[2], i, opt, nb_port = 1;
+ int pipeS1toS2[2] = { -1, -1 };
+ int pipeS2toS1[2] = { -1, -1 };
+ int pipeRead[2], pipeWrite[2];
+
+
+ struct sockaddr_in srvAddr[2];
+ unsigned int addrLen = sizeof(struct sockaddr_in), len;
+ pid_t pid[2];
+
+ fpout = stdout;
+
+ while((opt = getopt(argc, argv, "f:b:w:d")) != -1) {
+ switch (opt) {
+ case 'w':
+ wait_after_accept = atoi(optarg);
+ break;
+ case 'd':
+ debug = 2;
+ break;
+ case 'b':
+ server_behaviors = atoi(optarg);
+ break;
+ case 'f':
+ if(strcmp(optarg, "-")) {
+ outfile = optarg;
+ fpout = fopen(optarg, "w");
+ if(fpout == NULL){
+ fprintf(stderr, "file %s could not be created\n", outfile);
+ exit(1);
+ }
+ }
+ break;
+ default:
+ fprintf(stderr, "invalid option '%c' or value missing - terminating...\n", opt);
+ usage();
+ break;
+ }
+ }
+
+ switch (server_behaviors) {
+ case 0:
+ behaviors = AMQP_BEHAVIOR_STANDARD;
+ nb_port = 1;
+ break;
+ case 1: /* two standard servers get message successfully */
+ behaviors = AMQP_BEHAVIOR_STANDARD;
+ nb_port = 2;
+ break;
+ case 2: /* 2 servers first server which disconnect after after open channel : no declare exchange */
+ behaviors = AMQP_BEHAVIOR_NOEXCH | AMQP_BEHAVIOR_STANDARD << 4;
+ nb_port = 2;
+ break;
+ case 3: /* 2 servers first server which disconnect after declare exchange*/
+ behaviors = AMQP_BEHAVIOR_DECEXCH | AMQP_BEHAVIOR_STANDARD << 4;
+ nb_port = 2;
+ break;
+ case 4: /* one server with bad exchange declare */
+ behaviors = AMQP_BEHAVIOR_BADEXCH;
+ nb_port = 1;
+ break;
+ default:
+ fprintf(stderr,"Invalid behavior");
+ exit(1);
+ }
+
+ gettimeofday(&dbgtv_base, NULL);
+
+ port[0] = port[1] = -1;
+
+ if (nb_port == 2) {
+ if(pipe(pipeS1toS2) == -1 || pipe(pipeS2toS1) == -1) {
+ fprintf(stderr, "Pipe failed !");
+ exit(1);
+ }
+ }
+
+ pipeRead[0] = pipeS2toS1[0];
+ pipeWrite[0] = pipeS1toS2[1];
+
+ pipeRead[1] = pipeS1toS2[0];
+ pipeWrite[1] = pipeS2toS1[1];
+
+ for (i = 0; i < nb_port; i++) {
+ fds[i] = socket(AF_INET, SOCK_STREAM, 0);
+ srvAddr[i].sin_family = AF_INET;
+ srvAddr[i].sin_addr.s_addr = INADDR_ANY;
+ srvAddr[i].sin_port = 0;
+ if(bind(fds[i], (struct sockaddr *)&srvAddr[i], addrLen) != 0)
+ errout("bind", 1);
+ len = addrLen;
+ if (getsockname(fds[i], (struct sockaddr *)&srvAddr[i], &len) == -1)
+ errout("bind", i+1);
+ if ((port[i] = ntohs(srvAddr[i].sin_port)) <= 0)
+ errout("get port", i+1);
+ }
+
+ for (i = 0; i < nb_port; i++) {
+ if ((pid[i] = fork()) == -1) {
+ fprintf(stderr, "Fork failed !");
+ exit(1);
+ }
+ if (pid[i] == 0) {
+ /* this is the child */
+ if (fds[1-i] > 0) close(fds[1-i]);
+ amqp_srvr(port[i], i+1, fds[i], pipeRead[i], pipeWrite[i]);
+
+ if (fpout && fpout != stdout) fclose(fpout);
+
+ DBGPRINTF2("%s\n","Leaving server");
+ return 0;
+ }
+ }
+
+ if (nb_port==2)
+ printf("export AMQPSRVRPID1=%ld AMQPSRVRPID2=%ld PORT_AMQP1=%d PORT_AMQP2=%d",
+ (long)pid[0], (long)pid[1], port[0], port[1]);
+ else
+ printf("export AMQPSRVRPID1=%ld PORT_AMQP1=%d",
+ (long)pid[0], port[0]);
+ return 0;
+}