summaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--fs/dlm/lowcomms.c12
1 files changed, 12 insertions, 0 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 32dbd1a828..6296c62c10 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,6 +63,7 @@
#include "config.h"
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
+#define DLM_MAX_PROCESS_BUFFERS 24
#define NEEDED_RMEM (4*1024*1024)
struct connection {
@@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops;
#define DLM_IO_END 1
#define DLM_IO_EOF 2
#define DLM_IO_RESCHED 3
+#define DLM_IO_FLUSH 4
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
@@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
+static atomic_t processqueue_count;
static LIST_HEAD(processqueue);
bool dlm_lowcomms_is_running(void)
@@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work)
}
list_del(&pentry->list);
+ atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
for (;;) {
@@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work)
}
list_del(&pentry->list);
+ atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
}
}
@@ -962,6 +967,7 @@ again:
con->rx_leftover);
spin_lock(&processqueue_lock);
+ ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
@@ -969,6 +975,9 @@ again:
}
spin_unlock(&processqueue_lock);
+ if (ret > DLM_MAX_PROCESS_BUFFERS)
+ return DLM_IO_FLUSH;
+
return DLM_IO_SUCCESS;
}
@@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work)
wake_up(&con->shutdown_wait);
/* CF_RECV_PENDING cleared */
break;
+ case DLM_IO_FLUSH:
+ flush_workqueue(process_workqueue);
+ fallthrough;
case DLM_IO_RESCHED:
cond_resched();
queue_work(io_workqueue, &con->rwork);