1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
/*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
* Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __REDIS_RIO_H
#define __REDIS_RIO_H
#include <stdio.h>
#include <stdint.h>
#include "sds.h"
#include "connection.h"
#define RIO_FLAG_READ_ERROR (1<<0)
#define RIO_FLAG_WRITE_ERROR (1<<1)
#define RIO_TYPE_FILE (1<<0)
#define RIO_TYPE_BUFFER (1<<1)
#define RIO_TYPE_CONN (1<<2)
#define RIO_TYPE_FD (1<<3)
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
int (*flush)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;
/* number of bytes read or written */
size_t processed_bytes;
/* maximum single read or write chunk size */
size_t max_processing_chunk;
/* Backend-specific vars. */
union {
/* In-memory buffer target. */
struct {
sds ptr;
off_t pos;
} buffer;
/* Stdio file pointer target. */
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* Connection object (used to read from socket) */
struct {
connection *conn; /* Connection */
off_t pos; /* pos in buf that was returned */
sds buf; /* buffered data */
size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */
} conn;
/* FD target (used to write to pipe). */
struct {
int fd; /* File descriptor. */
off_t pos;
sds buf;
} fd;
} io;
};
typedef struct _rio rio;
/* The following functions are our interface with the stream. They'll call the
* actual implementation of read / write / tell, and will update the checksum
* if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
if (r->write(r,buf,bytes_to_write) == 0) {
r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
}
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
}
return 1;
}
static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
while (len) {
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->read(r,buf,bytes_to_read) == 0) {
r->flags |= RIO_FLAG_READ_ERROR;
return 0;
}
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
}
return 1;
}
static inline off_t rioTell(rio *r) {
return r->tell(r);
}
static inline int rioFlush(rio *r) {
return r->flush(r);
}
/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
static inline int rioGetReadError(rio *r) {
return (r->flags & RIO_FLAG_READ_ERROR) != 0;
}
/* Like rioGetReadError() but for write errors. */
static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}
static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
}
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithConn(rio *r, connection *conn, size_t read_limit);
void rioInitWithFd(rio *r, int fd);
void rioFreeFd(rio *r);
void rioFreeConn(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
struct redisObject;
int rioWriteBulkObject(rio *r, struct redisObject *obj);
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
uint8_t rioCheckType(rio *r);
#endif
|