summaryrefslogtreecommitdiffstats
path: root/bgpd/bgp_io.c
diff options
context:
space:
mode:
Diffstat (limited to 'bgpd/bgp_io.c')
-rw-r--r--bgpd/bgp_io.c628
1 files changed, 628 insertions, 0 deletions
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c
new file mode 100644
index 0000000..b07e69a
--- /dev/null
+++ b/bgpd/bgp_io.c
@@ -0,0 +1,628 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* BGP I/O.
+ * Implements packet I/O in a pthread.
+ * Copyright (C) 2017 Cumulus Networks
+ * Quentin Young
+ */
+
+/* clang-format off */
+#include <zebra.h>
+#include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
+#include <sys/uio.h> // for writev
+
+#include "frr_pthread.h"
+#include "linklist.h" // for list_delete, list_delete_all_node, lis...
+#include "log.h" // for zlog_debug, safe_strerror, zlog_err
+#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
+#include "network.h" // for ERRNO_IO_RETRY
+#include "stream.h" // for stream_get_endp, stream_getw_from, str...
+#include "ringbuf.h" // for ringbuf_remain, ringbuf_peek, ringbuf_...
+#include "frrevent.h" // for EVENT_OFF, EVENT_ARG, thread...
+
+#include "bgpd/bgp_io.h"
+#include "bgpd/bgp_debug.h" // for bgp_debug_neighbor_events, bgp_type_str
+#include "bgpd/bgp_errors.h" // for expanded error reference information
+#include "bgpd/bgp_fsm.h" // for BGP_EVENT_ADD, bgp_event
+#include "bgpd/bgp_packet.h" // for bgp_notify_io_invalid...
+#include "bgpd/bgp_trace.h" // for frrtraces
+#include "bgpd/bgpd.h" // for peer, BGP_MARKER_SIZE, bgp_master, bm
+/* clang-format on */
+
+/* forward declarations */
+static uint16_t bgp_write(struct peer_connection *connection);
+static uint16_t bgp_read(struct peer_connection *connection, int *code_p);
+static void bgp_process_writes(struct event *event);
+static void bgp_process_reads(struct event *event);
+static bool validate_header(struct peer_connection *connection);
+
+/* generic i/o status codes */
+#define BGP_IO_TRANS_ERR (1 << 0) /* EAGAIN or similar occurred */
+#define BGP_IO_FATAL_ERR (1 << 1) /* some kind of fatal TCP error */
+#define BGP_IO_WORK_FULL_ERR (1 << 2) /* No room in work buffer */
+
+/* Thread external API ----------------------------------------------------- */
+
+void bgp_writes_on(struct peer_connection *connection)
+{
+ struct frr_pthread *fpt = bgp_pth_io;
+
+ assert(fpt->running);
+
+ assert(connection->status != Deleted);
+ assert(connection->obuf);
+ assert(connection->ibuf);
+ assert(connection->ibuf_work);
+ assert(!connection->t_connect_check_r);
+ assert(!connection->t_connect_check_w);
+ assert(connection->fd);
+
+ event_add_write(fpt->master, bgp_process_writes, connection,
+ connection->fd, &connection->t_write);
+ SET_FLAG(connection->thread_flags, PEER_THREAD_WRITES_ON);
+}
+
+void bgp_writes_off(struct peer_connection *connection)
+{
+ struct peer *peer = connection->peer;
+ struct frr_pthread *fpt = bgp_pth_io;
+ assert(fpt->running);
+
+ event_cancel_async(fpt->master, &connection->t_write, NULL);
+ EVENT_OFF(connection->t_generate_updgrp_packets);
+
+ UNSET_FLAG(peer->connection->thread_flags, PEER_THREAD_WRITES_ON);
+}
+
+void bgp_reads_on(struct peer_connection *connection)
+{
+ struct frr_pthread *fpt = bgp_pth_io;
+ assert(fpt->running);
+
+ assert(connection->status != Deleted);
+ assert(connection->ibuf);
+ assert(connection->fd);
+ assert(connection->ibuf_work);
+ assert(connection->obuf);
+ assert(!connection->t_connect_check_r);
+ assert(!connection->t_connect_check_w);
+ assert(connection->fd);
+
+ event_add_read(fpt->master, bgp_process_reads, connection,
+ connection->fd, &connection->t_read);
+
+ SET_FLAG(connection->thread_flags, PEER_THREAD_READS_ON);
+}
+
+void bgp_reads_off(struct peer_connection *connection)
+{
+ struct frr_pthread *fpt = bgp_pth_io;
+ assert(fpt->running);
+
+ event_cancel_async(fpt->master, &connection->t_read, NULL);
+ EVENT_OFF(connection->t_process_packet);
+ EVENT_OFF(connection->t_process_packet_error);
+
+ UNSET_FLAG(connection->thread_flags, PEER_THREAD_READS_ON);
+}
+
+/* Thread internal functions ----------------------------------------------- */
+
+/*
+ * Called from I/O pthread when a file descriptor has become ready for writing.
+ */
+static void bgp_process_writes(struct event *thread)
+{
+ static struct peer *peer;
+ struct peer_connection *connection = EVENT_ARG(thread);
+ uint16_t status;
+ bool reschedule;
+ bool fatal = false;
+
+ peer = connection->peer;
+
+ if (connection->fd < 0)
+ return;
+
+ struct frr_pthread *fpt = bgp_pth_io;
+
+ frr_with_mutex (&connection->io_mtx) {
+ status = bgp_write(connection);
+ reschedule = (stream_fifo_head(connection->obuf) != NULL);
+ }
+
+ /* no problem */
+ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
+ }
+
+ /* problem */
+ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
+ reschedule = false;
+ fatal = true;
+ }
+
+ /* If suppress fib pending is enabled, route is advertised to peers when
+ * the status is received from the FIB. The delay is added
+ * to update group packet generate which will allow more routes to be
+ * sent in the update message
+ */
+ if (reschedule) {
+ event_add_write(fpt->master, bgp_process_writes, connection,
+ connection->fd, &connection->t_write);
+ } else if (!fatal) {
+ BGP_UPDATE_GROUP_TIMER_ON(&connection->t_generate_updgrp_packets,
+ bgp_generate_updgrp_packets);
+ }
+}
+
+static int read_ibuf_work(struct peer_connection *connection)
+{
+ /* static buffer for transferring packets */
+ /* shorter alias to peer's input buffer */
+ struct ringbuf *ibw = connection->ibuf_work;
+ /* packet size as given by header */
+ uint16_t pktsize = 0;
+ struct stream *pkt;
+
+ /* ============================================== */
+ frr_with_mutex (&connection->io_mtx) {
+ if (connection->ibuf->count >= bm->inq_limit)
+ return -ENOMEM;
+ }
+
+ /* check that we have enough data for a header */
+ if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
+ return 0;
+
+ /* check that header is valid */
+ if (!validate_header(connection))
+ return -EBADMSG;
+
+ /* header is valid; retrieve packet size */
+ ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
+
+ pktsize = ntohs(pktsize);
+
+ /* if this fails we are seriously screwed */
+ assert(pktsize <= connection->peer->max_packet_size);
+
+ /*
+ * If we have that much data, chuck it into its own
+ * stream and append to input queue for processing.
+ *
+ * Otherwise, come back later.
+ */
+ if (ringbuf_remain(ibw) < pktsize)
+ return 0;
+
+ pkt = stream_new(pktsize);
+ assert(STREAM_WRITEABLE(pkt) == pktsize);
+ assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize);
+ stream_set_endp(pkt, pktsize);
+
+ frrtrace(2, frr_bgp, packet_read, connection->peer, pkt);
+ frr_with_mutex (&connection->io_mtx) {
+ stream_fifo_push(connection->ibuf, pkt);
+ }
+
+ return pktsize;
+}
+
+/*
+ * Called from I/O pthread when a file descriptor has become ready for reading,
+ * or has hung up.
+ *
+ * We read as much data as possible, process as many packets as we can and
+ * place them on peer->connection.ibuf for secondary processing by the main
+ * thread.
+ */
+static void bgp_process_reads(struct event *thread)
+{
+ /* clang-format off */
+ struct peer_connection *connection = EVENT_ARG(thread);
+ static struct peer *peer; /* peer to read from */
+ uint16_t status; /* bgp_read status code */
+ bool fatal = false; /* whether fatal error occurred */
+ bool added_pkt = false; /* whether we pushed onto ->connection.ibuf */
+ int code = 0; /* FSM code if error occurred */
+ static bool ibuf_full_logged; /* Have we logged full already */
+ int ret = 1;
+ /* clang-format on */
+
+ peer = connection->peer;
+
+ if (bm->terminating || connection->fd < 0)
+ return;
+
+ struct frr_pthread *fpt = bgp_pth_io;
+
+ frr_with_mutex (&connection->io_mtx) {
+ status = bgp_read(connection, &code);
+ }
+
+ /* error checking phase */
+ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
+ /* no problem; just don't process packets */
+ goto done;
+ }
+
+ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
+ /* problem; tear down session */
+ fatal = true;
+
+ /* Handle the error in the main pthread, include the
+ * specific state change from 'bgp_read'.
+ */
+ event_add_event(bm->master, bgp_packet_process_error, connection,
+ code, &connection->t_process_packet_error);
+ goto done;
+ }
+
+ while (true) {
+ ret = read_ibuf_work(connection);
+ if (ret <= 0)
+ break;
+
+ added_pkt = true;
+ }
+
+ switch (ret) {
+ case -EBADMSG:
+ fatal = true;
+ break;
+ case -ENOMEM:
+ if (!ibuf_full_logged) {
+ if (bgp_debug_neighbor_events(peer))
+ zlog_debug(
+ "%s [Event] Peer Input-Queue is full: limit (%u)",
+ peer->host, bm->inq_limit);
+
+ ibuf_full_logged = true;
+ }
+ break;
+ default:
+ ibuf_full_logged = false;
+ break;
+ }
+
+done:
+ /* handle invalid header */
+ if (fatal) {
+ /* wipe buffer just in case someone screwed up */
+ ringbuf_wipe(connection->ibuf_work);
+ return;
+ }
+
+ event_add_read(fpt->master, bgp_process_reads, connection,
+ connection->fd, &connection->t_read);
+ if (added_pkt)
+ event_add_event(bm->master, bgp_process_packet, connection, 0,
+ &connection->t_process_packet);
+}
+
+/*
+ * Flush peer output buffer.
+ *
+ * This function pops packets off of peer->connection.obuf and writes them to
+ * peer->connection.fd. The amount of packets written is equal to the minimum of
+ * peer->wpkt_quanta and the number of packets on the output buffer, unless an
+ * error occurs.
+ *
+ * If write() returns an error, the appropriate FSM event is generated.
+ *
+ * The return value is equal to the number of packets written
+ * (which may be zero).
+ */
+static uint16_t bgp_write(struct peer_connection *connection)
+{
+ struct peer *peer = connection->peer;
+ uint8_t type;
+ struct stream *s;
+ int update_last_write = 0;
+ unsigned int count;
+ uint32_t uo = 0;
+ uint16_t status = 0;
+ uint32_t wpkt_quanta_old;
+
+ int writenum = 0;
+ int num;
+ unsigned int iovsz;
+ unsigned int strmsz;
+ unsigned int total_written;
+ time_t now;
+
+ wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta,
+ memory_order_relaxed);
+ struct stream *ostreams[wpkt_quanta_old];
+ struct stream **streams = ostreams;
+ struct iovec iov[wpkt_quanta_old];
+
+ s = stream_fifo_head(connection->obuf);
+
+ if (!s)
+ goto done;
+
+ count = iovsz = 0;
+ while (count < wpkt_quanta_old && iovsz < array_size(iov) && s) {
+ ostreams[iovsz] = s;
+ iov[iovsz].iov_base = stream_pnt(s);
+ iov[iovsz].iov_len = STREAM_READABLE(s);
+ writenum += STREAM_READABLE(s);
+ s = s->next;
+ ++iovsz;
+ ++count;
+ }
+
+ strmsz = iovsz;
+ total_written = 0;
+
+ do {
+ num = writev(connection->fd, iov, iovsz);
+
+ if (num < 0) {
+ if (!ERRNO_IO_RETRY(errno)) {
+ BGP_EVENT_ADD(connection, TCP_fatal_error);
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+ } else {
+ SET_FLAG(status, BGP_IO_TRANS_ERR);
+ }
+
+ break;
+ } else if (num != writenum) {
+ unsigned int msg_written = 0;
+ unsigned int ic = iovsz;
+
+ for (unsigned int i = 0; i < ic; i++) {
+ size_t ss = iov[i].iov_len;
+
+ if (ss > (unsigned int) num)
+ break;
+
+ msg_written++;
+ iovsz--;
+ writenum -= ss;
+ num -= ss;
+ }
+
+ total_written += msg_written;
+
+ assert(total_written < count);
+
+ memmove(&iov, &iov[msg_written],
+ sizeof(iov[0]) * iovsz);
+ streams = &streams[msg_written];
+ stream_forward_getp(streams[0], num);
+ iov[0].iov_base = stream_pnt(streams[0]);
+ iov[0].iov_len = STREAM_READABLE(streams[0]);
+
+ writenum -= num;
+ num = 0;
+ assert(writenum > 0);
+ } else {
+ total_written = strmsz;
+ }
+
+ } while (num != writenum);
+
+ /* Handle statistics */
+ for (unsigned int i = 0; i < total_written; i++) {
+ s = stream_fifo_pop(connection->obuf);
+
+ assert(s == ostreams[i]);
+
+ /* Retrieve BGP packet type. */
+ stream_set_getp(s, BGP_MARKER_SIZE + 2);
+ type = stream_getc(s);
+
+ switch (type) {
+ case BGP_MSG_OPEN:
+ atomic_fetch_add_explicit(&peer->open_out, 1,
+ memory_order_relaxed);
+ break;
+ case BGP_MSG_UPDATE:
+ atomic_fetch_add_explicit(&peer->update_out, 1,
+ memory_order_relaxed);
+ uo++;
+ break;
+ case BGP_MSG_NOTIFY:
+ atomic_fetch_add_explicit(&peer->notify_out, 1,
+ memory_order_relaxed);
+ /* Double start timer. */
+ peer->v_start *= 2;
+
+ /* Overflow check. */
+ if (peer->v_start >= (60 * 2))
+ peer->v_start = (60 * 2);
+
+ /*
+ * Handle Graceful Restart case where the state changes
+ * to Connect instead of Idle.
+ */
+ BGP_EVENT_ADD(connection, BGP_Stop);
+ goto done;
+
+ case BGP_MSG_KEEPALIVE:
+ atomic_fetch_add_explicit(&peer->keepalive_out, 1,
+ memory_order_relaxed);
+ break;
+ case BGP_MSG_ROUTE_REFRESH_NEW:
+ case BGP_MSG_ROUTE_REFRESH_OLD:
+ atomic_fetch_add_explicit(&peer->refresh_out, 1,
+ memory_order_relaxed);
+ break;
+ case BGP_MSG_CAPABILITY:
+ atomic_fetch_add_explicit(&peer->dynamic_cap_out, 1,
+ memory_order_relaxed);
+ break;
+ }
+
+ stream_free(s);
+ ostreams[i] = NULL;
+ update_last_write = 1;
+ }
+
+done : {
+ now = monotime(NULL);
+ /*
+ * Update last_update if UPDATEs were written.
+ * Note: that these are only updated at end,
+ * not per message (i.e., per loop)
+ */
+ if (uo)
+ atomic_store_explicit(&peer->last_update, now,
+ memory_order_relaxed);
+
+ /* If we TXed any flavor of packet */
+ if (update_last_write) {
+ atomic_store_explicit(&peer->last_write, now,
+ memory_order_relaxed);
+ peer->last_sendq_ok = now;
+ }
+}
+
+ return status;
+}
+
+uint8_t ibuf_scratch[BGP_EXTENDED_MESSAGE_MAX_PACKET_SIZE * BGP_READ_PACKET_MAX];
+/*
+ * Reads a chunk of data from peer->connection.fd into
+ * peer->connection.ibuf_work.
+ *
+ * code_p
+ * Pointer to location to store FSM event code in case of fatal error.
+ *
+ * @return status flag (see top-of-file)
+ *
+ * PLEASE NOTE: If we ever transform the bgp_read to be a pthread
+ * per peer then we need to rethink the global ibuf_scratch
+ * data structure above.
+ */
+static uint16_t bgp_read(struct peer_connection *connection, int *code_p)
+{
+ size_t readsize; /* how many bytes we want to read */
+ ssize_t nbytes; /* how many bytes we actually read */
+ size_t ibuf_work_space; /* space we can read into the work buf */
+ uint16_t status = 0;
+
+ ibuf_work_space = ringbuf_space(connection->ibuf_work);
+
+ if (ibuf_work_space == 0) {
+ SET_FLAG(status, BGP_IO_WORK_FULL_ERR);
+ return status;
+ }
+
+ readsize = MIN(ibuf_work_space, sizeof(ibuf_scratch));
+
+ nbytes = read(connection->fd, ibuf_scratch, readsize);
+
+ /* EAGAIN or EWOULDBLOCK; come back later */
+ if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
+ SET_FLAG(status, BGP_IO_TRANS_ERR);
+ } else if (nbytes < 0) {
+ /* Fatal error; tear down session */
+ flog_err(EC_BGP_UPDATE_RCV,
+ "%s [Error] bgp_read_packet error: %s",
+ connection->peer->host, safe_strerror(errno));
+
+ /* Handle the error in the main pthread. */
+ if (code_p)
+ *code_p = TCP_fatal_error;
+
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+
+ } else if (nbytes == 0) {
+ /* Received EOF / TCP session closed */
+ if (bgp_debug_neighbor_events(connection->peer))
+ zlog_debug("%s [Event] BGP connection closed fd %d",
+ connection->peer->host, connection->fd);
+
+ /* Handle the error in the main pthread. */
+ if (code_p)
+ *code_p = TCP_connection_closed;
+
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+ } else {
+ assert(ringbuf_put(connection->ibuf_work, ibuf_scratch,
+ nbytes) == (size_t)nbytes);
+ }
+
+ return status;
+}
+
+/*
+ * Called after we have read a BGP packet header. Validates marker, message
+ * type and packet length. If any of these aren't correct, sends a notify.
+ *
+ * Assumes that there are at least BGP_HEADER_SIZE readable bytes in the input
+ * buffer.
+ */
+static bool validate_header(struct peer_connection *connection)
+{
+ struct peer *peer = connection->peer;
+ uint16_t size;
+ uint8_t type;
+ struct ringbuf *pkt = connection->ibuf_work;
+
+ static const uint8_t m_correct[BGP_MARKER_SIZE] = {
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+ uint8_t m_rx[BGP_MARKER_SIZE] = {0x00};
+
+ if (ringbuf_peek(pkt, 0, m_rx, BGP_MARKER_SIZE) != BGP_MARKER_SIZE)
+ return false;
+
+ if (memcmp(m_correct, m_rx, BGP_MARKER_SIZE) != 0) {
+ bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_NOT_SYNC, NULL, 0);
+ return false;
+ }
+
+ /* Get size and type in network byte order. */
+ ringbuf_peek(pkt, BGP_MARKER_SIZE, &size, sizeof(size));
+ ringbuf_peek(pkt, BGP_MARKER_SIZE + 2, &type, sizeof(type));
+
+ size = ntohs(size);
+
+ /* BGP type check. */
+ if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
+ && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
+ && type != BGP_MSG_ROUTE_REFRESH_NEW
+ && type != BGP_MSG_ROUTE_REFRESH_OLD
+ && type != BGP_MSG_CAPABILITY) {
+ if (bgp_debug_neighbor_events(peer))
+ zlog_debug("%s unknown message type 0x%02x", peer->host,
+ type);
+
+ bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_BAD_MESTYPE, &type, 1);
+ return false;
+ }
+
+ /* Minimum packet length check. */
+ if ((size < BGP_HEADER_SIZE) || (size > peer->max_packet_size)
+ || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
+ || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
+ || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
+ || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
+ || (type == BGP_MSG_ROUTE_REFRESH_NEW
+ && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+ || (type == BGP_MSG_ROUTE_REFRESH_OLD
+ && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+ || (type == BGP_MSG_CAPABILITY
+ && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
+ if (bgp_debug_neighbor_events(peer)) {
+ zlog_debug("%s bad message length - %d for %s",
+ peer->host, size,
+ type == 128 ? "ROUTE-REFRESH"
+ : bgp_type_str[(int)type]);
+ }
+
+ uint16_t nsize = htons(size);
+
+ bgp_notify_io_invalid(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_BAD_MESLEN,
+ (unsigned char *)&nsize, 2);
+ return false;
+ }
+
+ return true;
+}