summaryrefslogtreecommitdiffstats
path: root/src/libstat/backends/mmaped_file.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstat/backends/mmaped_file.c')
-rw-r--r--src/libstat/backends/mmaped_file.c1113
1 files changed, 1113 insertions, 0 deletions
diff --git a/src/libstat/backends/mmaped_file.c b/src/libstat/backends/mmaped_file.c
new file mode 100644
index 0000000..5c20207
--- /dev/null
+++ b/src/libstat/backends/mmaped_file.c
@@ -0,0 +1,1113 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "stat_internal.h"
+#include "unix-std.h"
+
+#define CHAIN_LENGTH 128
+
+/* Section types */
+#define STATFILE_SECTION_COMMON 1
+
+/**
+ * Common statfile header
+ */
+struct stat_file_header {
+ u_char magic[3]; /**< magic signature ('r' 's' 'd') */
+ u_char version[2]; /**< version of statfile */
+ u_char padding[3]; /**< padding */
+ guint64 create_time; /**< create time (time_t->guint64) */
+ guint64 revision; /**< revision number */
+ guint64 rev_time; /**< revision time */
+ guint64 used_blocks; /**< used blocks number */
+ guint64 total_blocks; /**< total number of blocks */
+ guint64 tokenizer_conf_len; /**< length of tokenizer configuration */
+ u_char unused[231]; /**< some bytes that can be used in future */
+};
+
+/**
+ * Section header
+ */
+struct stat_file_section {
+ guint64 code; /**< section's code */
+ guint64 length; /**< section's length in blocks */
+};
+
+/**
+ * Block of data in statfile
+ */
+struct stat_file_block {
+ guint32 hash1; /**< hash1 (also acts as index) */
+ guint32 hash2; /**< hash2 */
+ double value; /**< double value */
+};
+
+/**
+ * Statistic file
+ */
+struct stat_file {
+ struct stat_file_header header; /**< header */
+ struct stat_file_section section; /**< first section */
+ struct stat_file_block blocks[1]; /**< first block of data */
+};
+
+/**
+ * Common view of statfile object
+ */
+typedef struct {
+#ifdef HAVE_PATH_MAX
+ gchar filename[PATH_MAX]; /**< name of file */
+#else
+ gchar filename[MAXPATHLEN]; /**< name of file */
+#endif
+ rspamd_mempool_t *pool;
+ gint fd; /**< descriptor */
+ void *map; /**< mmaped area */
+ off_t seek_pos; /**< current seek position */
+ struct stat_file_section cur_section; /**< current section */
+ size_t len; /**< length of file(in bytes) */
+ struct rspamd_statfile_config *cf;
+} rspamd_mmaped_file_t;
+
+
+#define RSPAMD_STATFILE_VERSION \
+ { \
+ '1', '2' \
+ }
+#define BACKUP_SUFFIX ".old"
+
+static void rspamd_mmaped_file_set_block_common(rspamd_mempool_t *pool,
+ rspamd_mmaped_file_t *file,
+ guint32 h1, guint32 h2, double value);
+
+rspamd_mmaped_file_t *rspamd_mmaped_file_open(rspamd_mempool_t *pool,
+ const gchar *filename, size_t size,
+ struct rspamd_statfile_config *stcf);
+gint rspamd_mmaped_file_create(const gchar *filename, size_t size,
+ struct rspamd_statfile_config *stcf,
+ rspamd_mempool_t *pool);
+gint rspamd_mmaped_file_close_file(rspamd_mempool_t *pool,
+ rspamd_mmaped_file_t *file);
+
+double
+rspamd_mmaped_file_get_block(rspamd_mmaped_file_t *file,
+ guint32 h1,
+ guint32 h2)
+{
+ struct stat_file_block *block;
+ guint i, blocknum;
+ u_char *c;
+
+ if (!file->map) {
+ return 0;
+ }
+
+ blocknum = h1 % file->cur_section.length;
+ c = (u_char *) file->map + file->seek_pos + blocknum * sizeof(struct stat_file_block);
+ block = (struct stat_file_block *) c;
+
+ for (i = 0; i < CHAIN_LENGTH; i++) {
+ if (i + blocknum >= file->cur_section.length) {
+ break;
+ }
+ if (block->hash1 == h1 && block->hash2 == h2) {
+ return block->value;
+ }
+ c += sizeof(struct stat_file_block);
+ block = (struct stat_file_block *) c;
+ }
+
+
+ return 0;
+}
+
+static void
+rspamd_mmaped_file_set_block_common(rspamd_mempool_t *pool,
+ rspamd_mmaped_file_t *file,
+ guint32 h1, guint32 h2, double value)
+{
+ struct stat_file_block *block, *to_expire = NULL;
+ struct stat_file_header *header;
+ guint i, blocknum;
+ u_char *c;
+ double min = G_MAXDOUBLE;
+
+ if (!file->map) {
+ return;
+ }
+
+ blocknum = h1 % file->cur_section.length;
+ header = (struct stat_file_header *) file->map;
+ c = (u_char *) file->map + file->seek_pos + blocknum * sizeof(struct stat_file_block);
+ block = (struct stat_file_block *) c;
+
+ for (i = 0; i < CHAIN_LENGTH; i++) {
+ if (i + blocknum >= file->cur_section.length) {
+ /* Need to expire some block in chain */
+ msg_info_pool("chain %ud is full in statfile %s, starting expire",
+ blocknum,
+ file->filename);
+ break;
+ }
+ /* First try to find block in chain */
+ if (block->hash1 == h1 && block->hash2 == h2) {
+ msg_debug_pool("%s found existing block %ud in chain %ud, value %.2f",
+ file->filename,
+ i,
+ blocknum,
+ value);
+ block->value = value;
+ return;
+ }
+ /* Check whether we have a free block in chain */
+ if (block->hash1 == 0 && block->hash2 == 0) {
+ /* Write new block here */
+ msg_debug_pool("%s found free block %ud in chain %ud, set h1=%ud, h2=%ud",
+ file->filename,
+ i,
+ blocknum,
+ h1,
+ h2);
+ block->hash1 = h1;
+ block->hash2 = h2;
+ block->value = value;
+ header->used_blocks++;
+
+ return;
+ }
+
+ /* Expire block with minimum value otherwise */
+ if (block->value < min) {
+ to_expire = block;
+ min = block->value;
+ }
+ c += sizeof(struct stat_file_block);
+ block = (struct stat_file_block *) c;
+ }
+
+ /* Try expire some block */
+ if (to_expire) {
+ block = to_expire;
+ }
+ else {
+ /* Expire first block in chain */
+ c = (u_char *) file->map + file->seek_pos + blocknum * sizeof(struct stat_file_block);
+ block = (struct stat_file_block *) c;
+ }
+
+ block->hash1 = h1;
+ block->hash2 = h2;
+ block->value = value;
+}
+
+void rspamd_mmaped_file_set_block(rspamd_mempool_t *pool,
+ rspamd_mmaped_file_t *file,
+ guint32 h1,
+ guint32 h2,
+ double value)
+{
+ rspamd_mmaped_file_set_block_common(pool, file, h1, h2, value);
+}
+
+gboolean
+rspamd_mmaped_file_set_revision(rspamd_mmaped_file_t *file, guint64 rev, time_t time)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return FALSE;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ header->revision = rev;
+ header->rev_time = time;
+
+ return TRUE;
+}
+
+gboolean
+rspamd_mmaped_file_inc_revision(rspamd_mmaped_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return FALSE;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ header->revision++;
+
+ return TRUE;
+}
+
+gboolean
+rspamd_mmaped_file_dec_revision(rspamd_mmaped_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return FALSE;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ header->revision--;
+
+ return TRUE;
+}
+
+
+gboolean
+rspamd_mmaped_file_get_revision(rspamd_mmaped_file_t *file, guint64 *rev, time_t *time)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return FALSE;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ if (rev != NULL) {
+ *rev = header->revision;
+ }
+ if (time != NULL) {
+ *time = header->rev_time;
+ }
+
+ return TRUE;
+}
+
+guint64
+rspamd_mmaped_file_get_used(rspamd_mmaped_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (guint64) -1;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ return header->used_blocks;
+}
+
+guint64
+rspamd_mmaped_file_get_total(rspamd_mmaped_file_t *file)
+{
+ struct stat_file_header *header;
+
+ if (file == NULL || file->map == NULL) {
+ return (guint64) -1;
+ }
+
+ header = (struct stat_file_header *) file->map;
+
+ /* If total blocks is 0 we have old version of header, so set total blocks correctly */
+ if (header->total_blocks == 0) {
+ header->total_blocks = file->cur_section.length;
+ }
+
+ return header->total_blocks;
+}
+
+/* Check whether specified file is statistic file and calculate its len in blocks */
+static gint
+rspamd_mmaped_file_check(rspamd_mempool_t *pool, rspamd_mmaped_file_t *file)
+{
+ struct stat_file *f;
+ gchar *c;
+ static gchar valid_version[] = RSPAMD_STATFILE_VERSION;
+
+
+ if (!file || !file->map) {
+ return -1;
+ }
+
+ if (file->len < sizeof(struct stat_file)) {
+ msg_info_pool("file %s is too short to be stat file: %z",
+ file->filename,
+ file->len);
+ return -1;
+ }
+
+ f = (struct stat_file *) file->map;
+ c = &f->header.magic[0];
+ /* Check magic and version */
+ if (*c++ != 'r' || *c++ != 's' || *c++ != 'd') {
+ msg_info_pool("file %s is invalid stat file", file->filename);
+ return -1;
+ }
+
+ c = &f->header.version[0];
+ /* Now check version and convert old version to new one (that can be used for sync */
+ if (*c == 1 && *(c + 1) == 0) {
+ return -1;
+ }
+ else if (memcmp(c, valid_version, sizeof(valid_version)) != 0) {
+ /* Unknown version */
+ msg_info_pool("file %s has invalid version %c.%c",
+ file->filename,
+ '0' + *c,
+ '0' + *(c + 1));
+ return -1;
+ }
+
+ /* Check first section and set new offset */
+ file->cur_section.code = f->section.code;
+ file->cur_section.length = f->section.length;
+ if (file->cur_section.length * sizeof(struct stat_file_block) >
+ file->len) {
+ msg_info_pool("file %s is truncated: %z, must be %z",
+ file->filename,
+ file->len,
+ file->cur_section.length * sizeof(struct stat_file_block));
+ return -1;
+ }
+ file->seek_pos = sizeof(struct stat_file) -
+ sizeof(struct stat_file_block);
+
+ return 0;
+}
+
+
+static rspamd_mmaped_file_t *
+rspamd_mmaped_file_reindex(rspamd_mempool_t *pool,
+ const gchar *filename,
+ size_t old_size,
+ size_t size,
+ struct rspamd_statfile_config *stcf)
+{
+ gchar *backup, *lock;
+ gint fd, lock_fd;
+ rspamd_mmaped_file_t *new, *old = NULL;
+ u_char *map, *pos;
+ struct stat_file_block *block;
+ struct stat_file_header *header, *nh;
+ struct timespec sleep_ts = {
+ .tv_sec = 0,
+ .tv_nsec = 1000000};
+
+ if (size <
+ sizeof(struct stat_file_header) + sizeof(struct stat_file_section) +
+ sizeof(block)) {
+ msg_err_pool("file %s is too small to carry any statistic: %z",
+ filename,
+ size);
+ return NULL;
+ }
+
+ lock = g_strconcat(filename, ".lock", NULL);
+ lock_fd = open(lock, O_WRONLY | O_CREAT | O_EXCL, 00600);
+
+ while (lock_fd == -1) {
+ /* Wait for lock */
+ lock_fd = open(lock, O_WRONLY | O_CREAT | O_EXCL, 00600);
+ if (lock_fd != -1) {
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return rspamd_mmaped_file_open(pool, filename, size, stcf);
+ }
+ else {
+ nanosleep(&sleep_ts, NULL);
+ }
+ }
+
+ backup = g_strconcat(filename, ".old", NULL);
+ if (rename(filename, backup) == -1) {
+ msg_err_pool("cannot rename %s to %s: %s", filename, backup, strerror(errno));
+ g_free(backup);
+ unlink(lock);
+ g_free(lock);
+ close(lock_fd);
+
+ return NULL;
+ }
+
+ old = rspamd_mmaped_file_open(pool, backup, old_size, stcf);
+
+ if (old == NULL) {
+ msg_warn_pool("old file %s is invalid mmapped file, just move it",
+ backup);
+ }
+
+ /* We need to release our lock here */
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ /* Now create new file with required size */
+ if (rspamd_mmaped_file_create(filename, size, stcf, pool) != 0) {
+ msg_err_pool("cannot create new file");
+ rspamd_mmaped_file_close(old);
+ g_free(backup);
+
+ return NULL;
+ }
+
+ new = rspamd_mmaped_file_open(pool, filename, size, stcf);
+
+ if (old) {
+ /* Now open new file and start copying */
+ fd = open(backup, O_RDONLY);
+ if (fd == -1 || new == NULL) {
+ if (fd != -1) {
+ close(fd);
+ }
+
+ msg_err_pool("cannot open file: %s", strerror(errno));
+ rspamd_mmaped_file_close(old);
+ g_free(backup);
+ return NULL;
+ }
+
+
+ /* Now start reading blocks from old statfile */
+ if ((map =
+ mmap(NULL, old_size, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ msg_err_pool("cannot mmap file: %s", strerror(errno));
+ close(fd);
+ rspamd_mmaped_file_close(old);
+ g_free(backup);
+ return NULL;
+ }
+
+ pos = map + (sizeof(struct stat_file) - sizeof(struct stat_file_block));
+
+ if (pos - map < (gssize) old_size) {
+ while ((gssize) old_size - (pos - map) >= (gssize) sizeof(struct stat_file_block)) {
+ block = (struct stat_file_block *) pos;
+ if (block->hash1 != 0 && block->value != 0) {
+ rspamd_mmaped_file_set_block_common(pool,
+ new, block->hash1,
+ block->hash2, block->value);
+ }
+ pos += sizeof(block);
+ }
+ }
+
+ header = (struct stat_file_header *) map;
+ rspamd_mmaped_file_set_revision(new, header->revision, header->rev_time);
+ nh = new->map;
+ /* Copy tokenizer configuration */
+ memcpy(nh->unused, header->unused, sizeof(header->unused));
+ nh->tokenizer_conf_len = header->tokenizer_conf_len;
+
+ munmap(map, old_size);
+ close(fd);
+ rspamd_mmaped_file_close_file(pool, old);
+ }
+
+ unlink(backup);
+ g_free(backup);
+
+ return new;
+}
+
+/*
+ * Pre-load mmaped file into memory
+ */
+static void
+rspamd_mmaped_file_preload(rspamd_mmaped_file_t *file)
+{
+ guint8 *pos, *end;
+ volatile guint8 t;
+ gsize size;
+
+ pos = (guint8 *) file->map;
+ end = (guint8 *) file->map + file->len;
+
+ if (madvise(pos, end - pos, MADV_SEQUENTIAL) == -1) {
+ msg_info("madvise failed: %s", strerror(errno));
+ }
+ else {
+ /* Load pages of file */
+#ifdef HAVE_GETPAGESIZE
+ size = getpagesize();
+#else
+ size = sysconf(_SC_PAGESIZE);
+#endif
+ while (pos < end) {
+ t = *pos;
+ (void) t;
+ pos += size;
+ }
+ }
+}
+
+rspamd_mmaped_file_t *
+rspamd_mmaped_file_open(rspamd_mempool_t *pool,
+ const gchar *filename, size_t size,
+ struct rspamd_statfile_config *stcf)
+{
+ struct stat st;
+ rspamd_mmaped_file_t *new_file;
+ gchar *lock;
+ gint lock_fd;
+
+ lock = g_strconcat(filename, ".lock", NULL);
+ lock_fd = open(lock, O_WRONLY | O_CREAT | O_EXCL, 00600);
+
+ if (lock_fd == -1) {
+ g_free(lock);
+ msg_info_pool("cannot open file %s, it is locked by another process",
+ filename);
+ return NULL;
+ }
+
+ close(lock_fd);
+ unlink(lock);
+ g_free(lock);
+
+ if (stat(filename, &st) == -1) {
+ msg_info_pool("cannot stat file %s, error %s, %d", filename, strerror(errno), errno);
+ return NULL;
+ }
+
+ if (labs((glong) size - st.st_size) > (long) sizeof(struct stat_file) * 2 && size > sizeof(struct stat_file)) {
+ msg_warn_pool("need to reindex statfile old size: %Hz, new size: %Hz",
+ (size_t) st.st_size, size);
+ return rspamd_mmaped_file_reindex(pool, filename, st.st_size, size, stcf);
+ }
+ else if (size < sizeof(struct stat_file)) {
+ msg_err_pool("requested to shrink statfile to %Hz but it is too small",
+ size);
+ }
+
+ new_file = g_malloc0(sizeof(rspamd_mmaped_file_t));
+ if ((new_file->fd = open(filename, O_RDWR)) == -1) {
+ msg_info_pool("cannot open file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ g_free(new_file);
+ return NULL;
+ }
+
+ if ((new_file->map =
+ mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED,
+ new_file->fd, 0)) == MAP_FAILED) {
+ close(new_file->fd);
+ msg_info_pool("cannot mmap file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ g_free(new_file);
+ return NULL;
+ }
+
+ rspamd_strlcpy(new_file->filename, filename, sizeof(new_file->filename));
+ new_file->len = st.st_size;
+ /* Try to lock pages in RAM */
+
+ /* Acquire lock for this operation */
+ if (!rspamd_file_lock(new_file->fd, FALSE)) {
+ close(new_file->fd);
+ munmap(new_file->map, st.st_size);
+ msg_info_pool("cannot lock file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ g_free(new_file);
+ return NULL;
+ }
+
+ if (rspamd_mmaped_file_check(pool, new_file) == -1) {
+ close(new_file->fd);
+ rspamd_file_unlock(new_file->fd, FALSE);
+ munmap(new_file->map, st.st_size);
+ g_free(new_file);
+ return NULL;
+ }
+
+ rspamd_file_unlock(new_file->fd, FALSE);
+ new_file->cf = stcf;
+ new_file->pool = pool;
+ rspamd_mmaped_file_preload(new_file);
+
+ g_assert(stcf->clcf != NULL);
+
+ msg_debug_pool("opened statfile %s of size %l", filename, (long) size);
+
+ return new_file;
+}
+
+gint rspamd_mmaped_file_close_file(rspamd_mempool_t *pool,
+ rspamd_mmaped_file_t *file)
+{
+ if (file->map) {
+ msg_info_pool("syncing statfile %s", file->filename);
+ msync(file->map, file->len, MS_ASYNC);
+ munmap(file->map, file->len);
+ }
+ if (file->fd != -1) {
+ close(file->fd);
+ }
+
+ g_free(file);
+
+ return 0;
+}
+
+gint rspamd_mmaped_file_create(const gchar *filename,
+ size_t size,
+ struct rspamd_statfile_config *stcf,
+ rspamd_mempool_t *pool)
+{
+ struct stat_file_header header = {
+ .magic = {'r', 's', 'd'},
+ .version = RSPAMD_STATFILE_VERSION,
+ .padding = {0, 0, 0},
+ .revision = 0,
+ .rev_time = 0,
+ .used_blocks = 0};
+ struct stat_file_section section = {
+ .code = STATFILE_SECTION_COMMON,
+ };
+ struct stat_file_block block = {0, 0, 0};
+ struct rspamd_stat_tokenizer *tokenizer;
+ gint fd, lock_fd;
+ guint buflen = 0, nblocks;
+ gchar *buf = NULL, *lock;
+ struct stat sb;
+ gpointer tok_conf;
+ gsize tok_conf_len;
+ struct timespec sleep_ts = {
+ .tv_sec = 0,
+ .tv_nsec = 1000000};
+
+ if (size <
+ sizeof(struct stat_file_header) + sizeof(struct stat_file_section) +
+ sizeof(block)) {
+ msg_err_pool("file %s is too small to carry any statistic: %z",
+ filename,
+ size);
+ return -1;
+ }
+
+ lock = g_strconcat(filename, ".lock", NULL);
+ lock_fd = open(lock, O_WRONLY | O_CREAT | O_EXCL, 00600);
+
+ while (lock_fd == -1) {
+ /* Wait for lock */
+ lock_fd = open(lock, O_WRONLY | O_CREAT | O_EXCL, 00600);
+ if (lock_fd != -1) {
+ if (stat(filename, &sb) != -1) {
+ /* File has been created by some other process */
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return 0;
+ }
+
+ /* We still need to create it */
+ goto create;
+ }
+ else {
+ nanosleep(&sleep_ts, NULL);
+ }
+ }
+
+create:
+
+ msg_debug_pool("create statfile %s of size %l", filename, (long) size);
+ nblocks =
+ (size - sizeof(struct stat_file_header) -
+ sizeof(struct stat_file_section)) /
+ sizeof(struct stat_file_block);
+ header.total_blocks = nblocks;
+
+ if ((fd =
+ open(filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
+ msg_info_pool("cannot create file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return -1;
+ }
+
+ rspamd_fallocate(fd,
+ 0,
+ sizeof(header) + sizeof(section) + sizeof(block) * nblocks);
+
+ header.create_time = (guint64) time(NULL);
+ g_assert(stcf->clcf != NULL);
+ g_assert(stcf->clcf->tokenizer != NULL);
+ tokenizer = rspamd_stat_get_tokenizer(stcf->clcf->tokenizer->name);
+ g_assert(tokenizer != NULL);
+ tok_conf = tokenizer->get_config(pool, stcf->clcf->tokenizer, &tok_conf_len);
+ header.tokenizer_conf_len = tok_conf_len;
+ g_assert(tok_conf_len < sizeof(header.unused) - sizeof(guint64));
+ memcpy(header.unused, tok_conf, tok_conf_len);
+
+ if (write(fd, &header, sizeof(header)) == -1) {
+ msg_info_pool("cannot write header to file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ close(fd);
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return -1;
+ }
+
+ section.length = (guint64) nblocks;
+ if (write(fd, &section, sizeof(section)) == -1) {
+ msg_info_pool("cannot write section header to file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ close(fd);
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return -1;
+ }
+
+ /* Buffer for write 256 blocks at once */
+ if (nblocks > 256) {
+ buflen = sizeof(block) * 256;
+ buf = g_malloc0(buflen);
+ }
+
+ while (nblocks) {
+ if (nblocks > 256) {
+ /* Just write buffer */
+ if (write(fd, buf, buflen) == -1) {
+ msg_info_pool("cannot write blocks buffer to file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ close(fd);
+ g_free(buf);
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return -1;
+ }
+ nblocks -= 256;
+ }
+ else {
+ if (write(fd, &block, sizeof(block)) == -1) {
+ msg_info_pool("cannot write block to file %s, error %d, %s",
+ filename,
+ errno,
+ strerror(errno));
+ close(fd);
+ if (buf) {
+ g_free(buf);
+ }
+
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+
+ return -1;
+ }
+ nblocks--;
+ }
+ }
+
+ close(fd);
+
+ if (buf) {
+ g_free(buf);
+ }
+
+ unlink(lock);
+ close(lock_fd);
+ g_free(lock);
+ msg_debug_pool("created statfile %s of size %l", filename, (long) size);
+
+ return 0;
+}
+
+gpointer
+rspamd_mmaped_file_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg, struct rspamd_statfile *st)
+{
+ struct rspamd_statfile_config *stf = st->stcf;
+ rspamd_mmaped_file_t *mf;
+ const ucl_object_t *filenameo, *sizeo;
+ const gchar *filename;
+ gsize size;
+
+ filenameo = ucl_object_lookup(stf->opts, "filename");
+
+ if (filenameo == NULL || ucl_object_type(filenameo) != UCL_STRING) {
+ filenameo = ucl_object_lookup(stf->opts, "path");
+
+ if (filenameo == NULL || ucl_object_type(filenameo) != UCL_STRING) {
+ msg_err_config("statfile %s has no filename defined", stf->symbol);
+ return NULL;
+ }
+ }
+
+ filename = ucl_object_tostring(filenameo);
+
+ sizeo = ucl_object_lookup(stf->opts, "size");
+
+ if (sizeo == NULL || ucl_object_type(sizeo) != UCL_INT) {
+ msg_err_config("statfile %s has no size defined", stf->symbol);
+ return NULL;
+ }
+
+ size = ucl_object_toint(sizeo);
+ mf = rspamd_mmaped_file_open(cfg->cfg_pool, filename, size, stf);
+
+ if (mf != NULL) {
+ mf->pool = cfg->cfg_pool;
+ }
+ else {
+ /* Create file here */
+
+ filenameo = ucl_object_find_key(stf->opts, "filename");
+ if (filenameo == NULL || ucl_object_type(filenameo) != UCL_STRING) {
+ filenameo = ucl_object_find_key(stf->opts, "path");
+ if (filenameo == NULL || ucl_object_type(filenameo) != UCL_STRING) {
+ msg_err_config("statfile %s has no filename defined", stf->symbol);
+ return NULL;
+ }
+ }
+
+ filename = ucl_object_tostring(filenameo);
+
+ sizeo = ucl_object_find_key(stf->opts, "size");
+ if (sizeo == NULL || ucl_object_type(sizeo) != UCL_INT) {
+ msg_err_config("statfile %s has no size defined", stf->symbol);
+ return NULL;
+ }
+
+ size = ucl_object_toint(sizeo);
+
+ if (rspamd_mmaped_file_create(filename, size, stf, cfg->cfg_pool) != 0) {
+ msg_err_config("cannot create new file");
+ }
+
+ mf = rspamd_mmaped_file_open(cfg->cfg_pool, filename, size, stf);
+ }
+
+ return (gpointer) mf;
+}
+
+void rspamd_mmaped_file_close(gpointer p)
+{
+ rspamd_mmaped_file_t *mf = p;
+
+
+ if (mf) {
+ rspamd_mmaped_file_close_file(mf->pool, mf);
+ }
+}
+
+gpointer
+rspamd_mmaped_file_runtime(struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
+ gboolean learn,
+ gpointer p,
+ gint _id)
+{
+ rspamd_mmaped_file_t *mf = p;
+
+ return (gpointer) mf;
+}
+
+gboolean
+rspamd_mmaped_file_process_tokens(struct rspamd_task *task, GPtrArray *tokens,
+ gint id,
+ gpointer p)
+{
+ rspamd_mmaped_file_t *mf = p;
+ guint32 h1, h2;
+ rspamd_token_t *tok;
+ guint i;
+
+ g_assert(tokens != NULL);
+ g_assert(p != NULL);
+
+ for (i = 0; i < tokens->len; i++) {
+ tok = g_ptr_array_index(tokens, i);
+ memcpy(&h1, (guchar *) &tok->data, sizeof(h1));
+ memcpy(&h2, ((guchar *) &tok->data) + sizeof(h1), sizeof(h2));
+ tok->values[id] = rspamd_mmaped_file_get_block(mf, h1, h2);
+ }
+
+ if (mf->cf->is_spam) {
+ task->flags |= RSPAMD_TASK_FLAG_HAS_SPAM_TOKENS;
+ }
+ else {
+ task->flags |= RSPAMD_TASK_FLAG_HAS_HAM_TOKENS;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_mmaped_file_learn_tokens(struct rspamd_task *task, GPtrArray *tokens,
+ gint id,
+ gpointer p)
+{
+ rspamd_mmaped_file_t *mf = p;
+ guint32 h1, h2;
+ rspamd_token_t *tok;
+ guint i;
+
+ g_assert(tokens != NULL);
+ g_assert(p != NULL);
+
+ for (i = 0; i < tokens->len; i++) {
+ tok = g_ptr_array_index(tokens, i);
+ memcpy(&h1, (guchar *) &tok->data, sizeof(h1));
+ memcpy(&h2, ((guchar *) &tok->data) + sizeof(h1), sizeof(h2));
+ rspamd_mmaped_file_set_block(task->task_pool, mf, h1, h2,
+ tok->values[id]);
+ }
+
+ return TRUE;
+}
+
+gulong
+rspamd_mmaped_file_total_learns(struct rspamd_task *task, gpointer runtime,
+ gpointer ctx)
+{
+ rspamd_mmaped_file_t *mf = (rspamd_mmaped_file_t *) runtime;
+ guint64 rev = 0;
+ time_t t;
+
+ if (mf != NULL) {
+ rspamd_mmaped_file_get_revision(mf, &rev, &t);
+ }
+
+ return rev;
+}
+
+gulong
+rspamd_mmaped_file_inc_learns(struct rspamd_task *task, gpointer runtime,
+ gpointer ctx)
+{
+ rspamd_mmaped_file_t *mf = (rspamd_mmaped_file_t *) runtime;
+ guint64 rev = 0;
+ time_t t;
+
+ if (mf != NULL) {
+ rspamd_mmaped_file_inc_revision(mf);
+ rspamd_mmaped_file_get_revision(mf, &rev, &t);
+ }
+
+ return rev;
+}
+
+gulong
+rspamd_mmaped_file_dec_learns(struct rspamd_task *task, gpointer runtime,
+ gpointer ctx)
+{
+ rspamd_mmaped_file_t *mf = (rspamd_mmaped_file_t *) runtime;
+ guint64 rev = 0;
+ time_t t;
+
+ if (mf != NULL) {
+ rspamd_mmaped_file_dec_revision(mf);
+ rspamd_mmaped_file_get_revision(mf, &rev, &t);
+ }
+
+ return rev;
+}
+
+
+ucl_object_t *
+rspamd_mmaped_file_get_stat(gpointer runtime,
+ gpointer ctx)
+{
+ ucl_object_t *res = NULL;
+ guint64 rev;
+ rspamd_mmaped_file_t *mf = (rspamd_mmaped_file_t *) runtime;
+
+ if (mf != NULL) {
+ res = ucl_object_typed_new(UCL_OBJECT);
+ rspamd_mmaped_file_get_revision(mf, &rev, NULL);
+ ucl_object_insert_key(res, ucl_object_fromint(rev), "revision",
+ 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(mf->len), "size",
+ 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(rspamd_mmaped_file_get_total(mf)), "total", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(rspamd_mmaped_file_get_used(mf)), "used", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromstring(mf->cf->symbol),
+ "symbol", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromstring("mmap"),
+ "type", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(0),
+ "languages", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(0),
+ "users", 0, false);
+
+ if (mf->cf->label) {
+ ucl_object_insert_key(res, ucl_object_fromstring(mf->cf->label),
+ "label", 0, false);
+ }
+ }
+
+ return res;
+}
+
+gboolean
+rspamd_mmaped_file_finalize_learn(struct rspamd_task *task, gpointer runtime,
+ gpointer ctx, GError **err)
+{
+ rspamd_mmaped_file_t *mf = (rspamd_mmaped_file_t *) runtime;
+
+ if (mf != NULL) {
+ msync(mf->map, mf->len, MS_INVALIDATE | MS_ASYNC);
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_mmaped_file_finalize_process(struct rspamd_task *task, gpointer runtime,
+ gpointer ctx)
+{
+ return TRUE;
+}
+
+gpointer
+rspamd_mmaped_file_load_tokenizer_config(gpointer runtime,
+ gsize *len)
+{
+ rspamd_mmaped_file_t *mf = runtime;
+ struct stat_file_header *header;
+
+ g_assert(mf != NULL);
+ header = mf->map;
+
+ if (len) {
+ *len = header->tokenizer_conf_len;
+ }
+
+ return header->unused;
+}