1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (c) 2009-2012,2016 Microsoft Corp.
* Copyright (c) 2012 NetApp Inc.
* Copyright (c) 2012 Citrix Inc.
* All rights reserved.
*/
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <string.h>
#include <sys/uio.h>
#include <rte_eal.h>
#include <rte_tailq.h>
#include <rte_log.h>
#include <rte_malloc.h>
#include <rte_bus.h>
#include <rte_atomic.h>
#include <rte_memory.h>
#include <rte_pause.h>
#include <rte_bus_vmbus.h>
#include "private.h"
/* Increase bufring index by inc with wraparound */
static inline uint32_t vmbus_br_idxinc(uint32_t idx, uint32_t inc, uint32_t sz)
{
idx += inc;
if (idx >= sz)
idx -= sz;
return idx;
}
void vmbus_br_setup(struct vmbus_br *br, void *buf, unsigned int blen)
{
br->vbr = buf;
br->windex = br->vbr->windex;
br->dsize = blen - sizeof(struct vmbus_bufring);
}
/*
* When we write to the ring buffer, check if the host needs to be
* signaled.
*
* The contract:
* - The host guarantees that while it is draining the TX bufring,
* it will set the br_imask to indicate it does not need to be
* interrupted when new data are added.
* - The host guarantees that it will completely drain the TX bufring
* before exiting the read loop. Further, once the TX bufring is
* empty, it will clear the br_imask and re-check to see if new
* data have arrived.
*/
static inline bool
vmbus_txbr_need_signal(const struct vmbus_bufring *vbr, uint32_t old_windex)
{
rte_smp_mb();
if (vbr->imask)
return false;
rte_smp_rmb();
/*
* This is the only case we need to signal when the
* ring transitions from being empty to non-empty.
*/
return old_windex == vbr->rindex;
}
static inline uint32_t
vmbus_txbr_copyto(const struct vmbus_br *tbr, uint32_t windex,
const void *src0, uint32_t cplen)
{
uint8_t *br_data = tbr->vbr->data;
uint32_t br_dsize = tbr->dsize;
const uint8_t *src = src0;
/* XXX use double mapping like Linux kernel? */
if (cplen > br_dsize - windex) {
uint32_t fraglen = br_dsize - windex;
/* Wrap-around detected */
memcpy(br_data + windex, src, fraglen);
memcpy(br_data, src + fraglen, cplen - fraglen);
} else {
memcpy(br_data + windex, src, cplen);
}
return vmbus_br_idxinc(windex, cplen, br_dsize);
}
/*
* Write scattered channel packet to TX bufring.
*
* The offset of this channel packet is written as a 64bits value
* immediately after this channel packet.
*
* The write goes through three stages:
* 1. Reserve space in ring buffer for the new data.
* Writer atomically moves priv_write_index.
* 2. Copy the new data into the ring.
* 3. Update the tail of the ring (visible to host) that indicates
* next read location. Writer updates write_index
*/
int
vmbus_txbr_write(struct vmbus_br *tbr, const struct iovec iov[], int iovlen,
bool *need_sig)
{
struct vmbus_bufring *vbr = tbr->vbr;
uint32_t ring_size = tbr->dsize;
uint32_t old_windex, next_windex, windex, total;
uint64_t save_windex;
int i;
total = 0;
for (i = 0; i < iovlen; i++)
total += iov[i].iov_len;
total += sizeof(save_windex);
/* Reserve space in ring */
do {
uint32_t avail;
/* Get current free location */
old_windex = tbr->windex;
/* Prevent compiler reordering this with calculation */
rte_compiler_barrier();
avail = vmbus_br_availwrite(tbr, old_windex);
/* If not enough space in ring, then tell caller. */
if (avail <= total)
return -EAGAIN;
next_windex = vmbus_br_idxinc(old_windex, total, ring_size);
/* Atomic update of next write_index for other threads */
} while (!rte_atomic32_cmpset(&tbr->windex, old_windex, next_windex));
/* Space from old..new is now reserved */
windex = old_windex;
for (i = 0; i < iovlen; i++) {
windex = vmbus_txbr_copyto(tbr, windex,
iov[i].iov_base, iov[i].iov_len);
}
/* Set the offset of the current channel packet. */
save_windex = ((uint64_t)old_windex) << 32;
windex = vmbus_txbr_copyto(tbr, windex, &save_windex,
sizeof(save_windex));
/* The region reserved should match region used */
RTE_ASSERT(windex == next_windex);
/* Ensure that data is available before updating host index */
rte_smp_wmb();
/* Checkin for our reservation. wait for our turn to update host */
while (!rte_atomic32_cmpset(&vbr->windex, old_windex, next_windex))
rte_pause();
/* If host had read all data before this, then need to signal */
*need_sig |= vmbus_txbr_need_signal(vbr, old_windex);
return 0;
}
static inline uint32_t
vmbus_rxbr_copyfrom(const struct vmbus_br *rbr, uint32_t rindex,
void *dst0, size_t cplen)
{
const uint8_t *br_data = rbr->vbr->data;
uint32_t br_dsize = rbr->dsize;
uint8_t *dst = dst0;
if (cplen > br_dsize - rindex) {
uint32_t fraglen = br_dsize - rindex;
/* Wrap-around detected. */
memcpy(dst, br_data + rindex, fraglen);
memcpy(dst + fraglen, br_data, cplen - fraglen);
} else {
memcpy(dst, br_data + rindex, cplen);
}
return vmbus_br_idxinc(rindex, cplen, br_dsize);
}
/* Copy data from receive ring but don't change index */
int
vmbus_rxbr_peek(const struct vmbus_br *rbr, void *data, size_t dlen)
{
uint32_t avail;
/*
* The requested data and the 64bits channel packet
* offset should be there at least.
*/
avail = vmbus_br_availread(rbr);
if (avail < dlen + sizeof(uint64_t))
return -EAGAIN;
vmbus_rxbr_copyfrom(rbr, rbr->vbr->rindex, data, dlen);
return 0;
}
/*
* Copy data from receive ring and change index
* NOTE:
* We assume (dlen + skip) == sizeof(channel packet).
*/
int
vmbus_rxbr_read(struct vmbus_br *rbr, void *data, size_t dlen, size_t skip)
{
struct vmbus_bufring *vbr = rbr->vbr;
uint32_t br_dsize = rbr->dsize;
uint32_t rindex;
if (vmbus_br_availread(rbr) < dlen + skip + sizeof(uint64_t))
return -EAGAIN;
/* Record where host was when we started read (for debug) */
rbr->windex = rbr->vbr->windex;
/*
* Copy channel packet from RX bufring.
*/
rindex = vmbus_br_idxinc(rbr->vbr->rindex, skip, br_dsize);
rindex = vmbus_rxbr_copyfrom(rbr, rindex, data, dlen);
/*
* Discard this channel packet's 64bits offset, which is useless to us.
*/
rindex = vmbus_br_idxinc(rindex, sizeof(uint64_t), br_dsize);
/* Update the read index _after_ the channel packet is fetched. */
rte_compiler_barrier();
vbr->rindex = rindex;
return 0;
}
|