summaryrefslogtreecommitdiffstats
path: root/lib/mgmt_msg.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/mgmt_msg.c107
1 files changed, 78 insertions, 29 deletions
diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c
index 1cd3240..aff9af7 100644
--- a/lib/mgmt_msg.c
+++ b/lib/mgmt_msg.c
@@ -7,12 +7,15 @@
* Copyright (c) 2023, LabN Consulting, L.L.C.
*/
#include <zebra.h>
+#include <sys/stat.h>
+
#include "debug.h"
#include "network.h"
#include "sockopt.h"
#include "stream.h"
#include "frrevent.h"
#include "mgmt_msg.h"
+#include "mgmt_msg_native.h"
#define MGMT_MSG_DBG(dbgtag, fmt, ...) \
@@ -84,7 +87,7 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
*/
assert(stream_get_getp(ms->ins) == 0);
left = stream_get_endp(ms->ins);
- while (left > (long)sizeof(struct mgmt_msg_hdr)) {
+ while (left > (ssize_t)sizeof(struct mgmt_msg_hdr)) {
mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
if (!MGMT_MSG_IS_MARKER(mhdr->marker)) {
MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
@@ -99,8 +102,30 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
mcount++;
}
- if (!mcount)
+ if (!mcount) {
+ /* Didn't manage to read a full message */
+ if (mhdr && avail == 0) {
+ struct stream *news;
+ /*
+ * Message was longer than what was left and we have no
+ * available space to read more in. B/c mcount == 0 the
+ * message starts at the beginning of the stream so
+ * therefor the stream is too small to fit the message..
+ * Resize the stream to fit.
+ */
+ if (mhdr->len > MGMT_MSG_MAX_MSG_ALLOC_LEN) {
+ MGMT_MSG_ERR(ms, "corrupt msg len rcvd %u",
+ mhdr->len);
+ return MSR_DISCONNECT;
+ }
+ news = stream_new(mhdr->len);
+ stream_put(news, mhdr, left);
+ stream_set_endp(news, left);
+ stream_free(ms->ins);
+ ms->ins = news;
+ }
return MSR_SCHED_STREAM;
+ }
/*
* We have read at least one message into the stream, queue it up.
@@ -108,7 +133,11 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
stream_set_endp(ms->ins, total);
stream_fifo_push(&ms->inq, ms->ins);
- ms->ins = stream_new(ms->max_msg_sz);
+ if (left < (ssize_t)sizeof(struct mgmt_msg_hdr))
+ ms->ins = stream_new(ms->max_msg_sz);
+ else
+ /* handle case where message is greater than max */
+ ms->ins = stream_new(MAX(ms->max_msg_sz, mhdr->len));
if (left) {
stream_put(ms->ins, mhdr, left);
stream_set_endp(ms->ins, left);
@@ -180,7 +209,7 @@ bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
}
/**
- * Write data from a onto the socket, using streams that have been queued for
+ * Write data onto the socket, using streams that have been queued for
* sending by mgmt_msg_send_msg. This function should be reschedulable.
*
* Args:
@@ -292,23 +321,26 @@ int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,
size_t endp, n;
size_t mlen = len + sizeof(*mhdr);
- if (mlen > ms->max_msg_sz) {
- MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,
- ms->max_msg_sz);
- return -1;
- }
+ if (mlen > ms->max_msg_sz)
+ MGMT_MSG_DBG(dbgtag, "Sending large msg size %zu > max size %zu",
+ mlen, ms->max_msg_sz);
if (!ms->outs) {
- MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",
- len);
- ms->outs = stream_new(ms->max_msg_sz);
+ MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu", mlen);
+ ms->outs = stream_new(MAX(ms->max_msg_sz, mlen));
+ } else if (mlen > ms->max_msg_sz && ms->outs->endp == 0) {
+ /* msg is larger than stream max size get a fit-to-size stream */
+ MGMT_MSG_DBG(dbgtag,
+ "replacing old stream with fit-to-size for msg len %zu",
+ mlen);
+ stream_free(ms->outs);
+ ms->outs = stream_new(mlen);
} else if (STREAM_WRITEABLE(ms->outs) < mlen) {
- MGMT_MSG_DBG(
- dbgtag,
- "enq existing stream len %zu and creating new stream for msg len %zu",
- STREAM_WRITEABLE(ms->outs), mlen);
+ MGMT_MSG_DBG(dbgtag,
+ "enq existing stream len %zu and creating new stream for msg len %zu",
+ STREAM_WRITEABLE(ms->outs), mlen);
stream_fifo_push(&ms->outq, ms->outs);
- ms->outs = stream_new(ms->max_msg_sz);
+ ms->outs = stream_new(MAX(ms->max_msg_sz, mlen));
} else {
MGMT_MSG_DBG(
dbgtag,
@@ -317,6 +349,16 @@ int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,
}
s = ms->outs;
+ if (dbgtag && version == MGMT_MSG_VERSION_NATIVE) {
+ struct mgmt_msg_header *native_msg = msg;
+
+ MGMT_MSG_DBG(
+ dbgtag,
+ "Sending native msg sess/txn-id %"PRIu64" req-id %"PRIu64" code %u",
+ native_msg->refer_id, native_msg->req_id, native_msg->code);
+
+ }
+
/* We have a stream with space, pack the message into it. */
mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
mhdr->marker = MGMT_MSG_MARKER(version);
@@ -636,13 +678,13 @@ static void msg_client_sched_connect(struct msg_client *client,
&client->conn_retry_tmr);
}
-static bool msg_client_connect_short_circuit(struct msg_client *client)
+static int msg_client_connect_short_circuit(struct msg_client *client)
{
struct msg_conn *server_conn;
struct msg_server *server;
const char *dbgtag =
client->conn.debug ? client->conn.mstate.idtag : NULL;
- union sockunion su = {0};
+ union sockunion su = {};
int sockets[2];
frr_each (msg_server_list, &msg_servers, server)
@@ -650,10 +692,9 @@ static bool msg_client_connect_short_circuit(struct msg_client *client)
break;
if (!server) {
MGMT_MSG_DBG(dbgtag,
- "no short-circuit connection available for %s",
+ "no short-circuit server available yet for %s",
client->sopath);
-
- return false;
+ return -1;
}
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) {
@@ -661,7 +702,7 @@ static bool msg_client_connect_short_circuit(struct msg_client *client)
&client->conn.mstate,
"socketpair failed trying to short-circuit connection on %s: %s",
client->sopath, safe_strerror(errno));
- return false;
+ return -1;
}
/* client side */
@@ -673,6 +714,9 @@ static bool msg_client_connect_short_circuit(struct msg_client *client)
/* server side */
memset(&su, 0, sizeof(union sockunion));
server_conn = server->create(sockets[1], &su);
+ server_conn->debug = DEBUG_MODE_CHECK(server->debug, DEBUG_MODE_ALL)
+ ? true
+ : false;
client->conn.remote_conn = server_conn;
server_conn->remote_conn = &client->conn;
@@ -689,7 +733,7 @@ static bool msg_client_connect_short_circuit(struct msg_client *client)
client->sopath, client->conn.mstate.idtag, client->conn.fd,
server_conn->mstate.idtag, server_conn->fd);
- return true;
+ return 0;
}
@@ -699,11 +743,12 @@ static void msg_client_connect(struct msg_client *client)
struct msg_conn *conn = &client->conn;
const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
- if (!client->short_circuit_ok ||
- !msg_client_connect_short_circuit(client))
+ if (!client->short_circuit_ok)
conn->fd =
mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
MSG_CONN_RECV_BUF_SIZE, dbgtag);
+ else if (msg_client_connect_short_circuit(client))
+ conn->fd = -1;
if (conn->fd == -1)
/* retry the connection */
@@ -743,7 +788,6 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm,
mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,
idtag);
- /* XXX maybe just have client kick this off */
/* Start trying to connect to server */
msg_client_sched_connect(client, 0);
}
@@ -766,8 +810,9 @@ void msg_client_cleanup(struct msg_client *client)
static void msg_server_accept(struct event *event)
{
struct msg_server *server = EVENT_ARG(event);
- int fd;
+ struct msg_conn *conn;
union sockunion su;
+ int fd;
if (server->fd < 0)
return;
@@ -790,7 +835,11 @@ static void msg_server_accept(struct event *event)
DEBUGD(server->debug, "Accepted new %s connection", server->idtag);
- server->create(fd, &su);
+ conn = server->create(fd, &su);
+ if (conn)
+ conn->debug = DEBUG_MODE_CHECK(server->debug, DEBUG_MODE_ALL)
+ ? true
+ : false;
}
int msg_server_init(struct msg_server *server, const char *sopath,