diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:54:12 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:54:12 +0000 |
commit | b527294153be3b79563c82c66102adc0004736c0 (patch) | |
tree | 9b423a224848441885190b5ea7cf0feb23510c9d /libraries/liblmdb/mplay.c | |
parent | Initial commit. (diff) | |
download | openldap-b527294153be3b79563c82c66102adc0004736c0.tar.xz openldap-b527294153be3b79563c82c66102adc0004736c0.zip |
Adding upstream version 2.6.7+dfsg.upstream/2.6.7+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'libraries/liblmdb/mplay.c')
-rw-r--r-- | libraries/liblmdb/mplay.c | 582 |
1 files changed, 582 insertions, 0 deletions
diff --git a/libraries/liblmdb/mplay.c b/libraries/liblmdb/mplay.c new file mode 100644 index 0000000..0fef74f --- /dev/null +++ b/libraries/liblmdb/mplay.c @@ -0,0 +1,582 @@ +/* mplay.c - memory-mapped database log replay */ +/* + * Copyright 2011-2023 Howard Chu, Symas Corp. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * <http://www.OpenLDAP.org/license.html>. + */ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <time.h> +#include <string.h> +#include <ctype.h> +#include <assert.h> +#include <sys/types.h> +#include <sys/wait.h> + +#include "lmdb.h" + +#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr) +#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0)) +#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \ + "%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort())) + +#define MDB_SCNy(t) "z" #t + +#define SCMP(s) s, (sizeof(s)-1) +char inbuf[8192]; +char *dbuf, *kbuf; +size_t dbufsize; +int maxkey; + +#define SOFF(s) (sizeof(s)+1) + +#define MAXENVS 16 +#define MAXTXNS 16 +#define MAXCRSS 16 + +#define MAXPIDS 16 + +typedef struct crspair { + void *tcrs; /* scanned text pointer */ + MDB_cursor *rcrs; +} crspair; + +typedef struct txnpair { + void *ttxn; /* scanned text pointer */ + MDB_txn *rtxn; + crspair cursors[MAXCRSS]; + int ncursors; +} txnpair; + +typedef struct envpair { + void *tenv; + MDB_env *renv; + txnpair txns[MAXTXNS]; + int ntxns; +} envpair; + +envpair envs[MAXENVS]; +int nenvs; + +envpair *lastenv; +txnpair *lasttxn; +crspair *lastcrs; + +typedef struct pidpair { + int tpid; + pid_t rpid; + int fdout, fdin; +} pidpair; + +pidpair *lastpid; + +pidpair pids[MAXPIDS]; +int npids; + +unsigned long lcount; + +static int unhex(unsigned char *c2) +{ + int x, c; + x = *c2++ & 0x4f; + if (x & 0x40) + x -= 55; + c = x << 4; + x = *c2 & 0x4f; + if (x & 0x40) + x -= 55; + c |= x; + return c; +} + +int inhex(char *in, char *out) +{ + char *c2 = out; + while (isxdigit(*in)) { + *c2++ = unhex((unsigned char *)in); + in += 2; + } + return c2 - out; +} + +static void addenv(void *tenv, MDB_env *renv) +{ + assert(nenvs < MAXENVS); + envs[nenvs].tenv = tenv; + envs[nenvs].renv = renv; + envs[nenvs].ntxns = 0; + lastenv = envs+nenvs; + nenvs++; +} + +static envpair *findenv(void *tenv) +{ + int i; + if (!lastenv || lastenv->tenv != tenv) { + for (i=0; i<nenvs; i++) + if (envs[i].tenv == tenv) + break; + assert(i < nenvs); + lastenv = &envs[i]; + } + return lastenv; +} + +static void delenv(envpair *ep) +{ + int i = ep - envs; + for (; i<nenvs-1; i++) + envs[i] = envs[i+1]; + nenvs--; + lastenv = NULL; +} + +static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn) +{ + envpair *ep; + txnpair *tp; + + ep = findenv(tenv); + assert(ep->ntxns < MAXTXNS); + tp = ep->txns+ep->ntxns; + tp->ttxn = ttxn; + tp->rtxn = rtxn; + tp->ncursors = 0; + ep->ntxns++; + lasttxn = tp; +} + +static txnpair *findtxn(void *ttxn) +{ + int i, j; + if (lasttxn && lasttxn->ttxn == ttxn) + return lasttxn; + if (lastenv) { + for (i=0; i<lastenv->ntxns; i++) { + if (lastenv->txns[i].ttxn == ttxn) { + lasttxn = lastenv->txns+i; + return lasttxn; + } + } + } + for (i=0; i<nenvs; i++) { + if (envs+i == lastenv) continue; + for (j=0; j<envs[i].ntxns; j++) { + if (envs[i].txns[j].ttxn == ttxn) { + lastenv = envs+i; + lasttxn = envs[i].txns+j; + return lasttxn; + } + } + } + assert(0); /* should have found it */ +} + +static void deltxn(txnpair *tp) +{ + int i = tp - lastenv->txns; + for (; i<lastenv->ntxns-1; i++) + lastenv->txns[i] = lastenv->txns[i+1]; + lastenv->ntxns--; + lasttxn = NULL; +} + +static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs) +{ + int j = tp->ncursors; + assert(tp->ncursors < MAXCRSS); + + tp->cursors[j].tcrs = tcrs; + tp->cursors[j].rcrs = rcrs; + tp->ncursors++; + lastcrs = tp->cursors+j; +} + +static crspair *findcrs(void *tcrs) +{ + int i, j, k; + envpair *ep; + txnpair *tp; + crspair *cp; + if (lastcrs && lastcrs->tcrs == tcrs) + return lastcrs; + if (lasttxn) { + for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) { + if (cp->tcrs == tcrs) { + lastcrs = cp; + return lastcrs; + } + } + } + if (lastenv) { + for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){ + if (tp == lasttxn) + continue; + for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) { + if (cp->tcrs == tcrs) { + lastcrs = cp; + lasttxn = tp; + return lastcrs; + } + } + } + } + for (i=0, ep=envs; i<nenvs; i++, ep++) { + for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) { + if (tp == lasttxn) + continue; + for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) { + if (cp->tcrs == tcrs) { + lastcrs = cp; + lasttxn = tp; + lastenv = ep; + return lastcrs; + } + } + } + } + assert(0); /* should have found it already */ +} + +static void delcrs(void *tcrs) +{ + int i; + crspair *cp = findcrs(tcrs); + mdb_cursor_close(cp->rcrs); + for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++) + lasttxn->cursors[i] = lasttxn->cursors[i+1]; + lasttxn->ncursors--; + lastcrs = NULL; +} + +void child() +{ + int rc; + MDB_val key, data; + char *ptr; + + while (fgets(inbuf, sizeof(inbuf), stdin)) { + ptr = inbuf; + if (!strncmp(ptr, SCMP("exit"))) + break; + + if (!strncmp(ptr, SCMP("mdb_env_create"))) { + void *tenv; + MDB_env *renv; + sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv); + E(mdb_env_create(&renv)); + addenv(tenv, renv); + } else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) { + void *tenv; + envpair *ep; + unsigned int maxdbs; + sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs); + ep = findenv(tenv); + E(mdb_env_set_maxdbs(ep->renv, maxdbs)); + } else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) { + void *tenv; + envpair *ep; + size_t mapsize; + sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize); + ep = findenv(tenv); + E(mdb_env_set_mapsize(ep->renv, mapsize)); + } else if (!strncmp(ptr, SCMP("mdb_env_open"))) { + void *tenv; + envpair *ep; + char *path; + int len; + unsigned int flags, mode; + sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len); + path = ptr+SOFF("mdb_env_open")+len; + ptr = strchr(path, ','); + *ptr++ = '\0'; + sscanf(ptr, "%u, %o", &flags, &mode); + ep = findenv(tenv); + E(mdb_env_open(ep->renv, path, flags, mode)); + if (!maxkey) { + maxkey = mdb_env_get_maxkeysize(ep->renv); + kbuf = malloc(maxkey+2); + dbuf = malloc(maxkey+2); + dbufsize = maxkey; + } + } else if (!strncmp(ptr, SCMP("mdb_env_close"))) { + void *tenv; + envpair *ep; + sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv); + ep = findenv(tenv); + mdb_env_close(ep->renv); + delenv(ep); + if (!nenvs) /* if no other envs left, this process is done */ + break; + } else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) { + unsigned int flags; + void *tenv, *ttxn; + envpair *ep; + MDB_txn *rtxn; + sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn); + ep = findenv(tenv); + E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn)); + addtxn(tenv, ttxn, rtxn); + } else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) { + void *ttxn; + txnpair *tp; + sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn); + tp = findtxn(ttxn); + E(mdb_txn_commit(tp->rtxn)); + deltxn(tp); + } else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) { + void *ttxn; + txnpair *tp; + sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn); + tp = findtxn(ttxn); + mdb_txn_abort(tp->rtxn); + deltxn(tp); + } else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) { + void *ttxn; + txnpair *tp; + char *dbname; + unsigned int flags; + unsigned int tdbi; + MDB_dbi dbi; + sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn); + dbname = strchr(ptr+SOFF("mdb_dbi_open"), ','); + dbname += 2; + ptr = strchr(dbname, ','); + *ptr++ = '\0'; + if (!strcmp(dbname, "(null)")) + dbname = NULL; + sscanf(ptr, "%u = %u", &flags, &tdbi); + tp = findtxn(ttxn); + E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi)); + } else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) { + void *tenv; + envpair *ep; + unsigned int tdbi; + sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi); + ep = findenv(tenv); + mdb_dbi_close(ep->renv, tdbi); + } else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) { + void *ttxn, *tcrs; + txnpair *tp; + MDB_cursor *rcrs; + unsigned int tdbi; + sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs); + tp = findtxn(ttxn); + E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs)); + addcrs(tp, tcrs, rcrs); + } else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) { + void *tcrs; + sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs); + delcrs(tcrs); + } else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) { + void *tcrs; + crspair *cp; + unsigned int flags; + int len; + sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs); + cp = findcrs(tcrs); + ptr = strchr(ptr+SOFF("mdb_cursor_put"), ','); + sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size); + if (key.mv_size) { + ptr = strchr(ptr, '['); + inhex(ptr+1, kbuf); + key.mv_data = kbuf; + ptr += key.mv_size * 2 + 2; + } + ptr = strchr(ptr+1, ','); + sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); + if (data.mv_size > dbufsize) { + dbuf = realloc(dbuf, data.mv_size+2); + assert(dbuf != NULL); + dbufsize = data.mv_size; + } + ptr += len+1; + if (*ptr == '[') { + inhex(ptr+1, dbuf); + data.mv_data = dbuf; + ptr += data.mv_size * 2 + 2; + } else { + sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn)); + } + sscanf(ptr+1, "%u", &flags); + E(mdb_cursor_put(cp->rcrs, &key, &data, flags)); + } else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) { + void *tcrs; + crspair *cp; + unsigned int flags; + sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags); + cp = findcrs(tcrs); + E(mdb_cursor_del(cp->rcrs, flags)); + } else if (!strncmp(ptr, SCMP("mdb_put"))) { + void *ttxn; + txnpair *tp; + unsigned int tdbi, flags; + int len; + sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size); + tp = findtxn(ttxn); + ptr = strchr(ptr+SOFF("mdb_put"), '['); + inhex(ptr+1, kbuf); + key.mv_data = kbuf; + ptr += key.mv_size * 2 + 2; + sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); + if (data.mv_size > dbufsize) { + dbuf = realloc(dbuf, data.mv_size+2); + assert(dbuf != NULL); + dbufsize = data.mv_size; + } + ptr += len+1; + if (*ptr == '[') { + inhex(ptr+1, dbuf); + ptr += data.mv_size * 2 + 2; + } else { + sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn)); + } + data.mv_data = dbuf; + sscanf(ptr+1, "%u", &flags); + RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags)); + } else if (!strncmp(ptr, SCMP("mdb_del"))) { + void *ttxn; + txnpair *tp; + unsigned int tdbi; + int len; + sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size); + tp = findtxn(ttxn); + ptr = strchr(ptr+SOFF("mdb_del"), '['); + inhex(ptr+1, kbuf); + key.mv_data = kbuf; + ptr += key.mv_size * 2 + 2; + sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); + if (data.mv_size > dbufsize) { + dbuf = realloc(dbuf, data.mv_size+2); + assert(dbuf != NULL); + dbufsize = data.mv_size; + } + ptr += len+1; + if (*ptr == '[') { + inhex(ptr+1, dbuf); + } else { + sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn)); + } + data.mv_data = dbuf; + RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data)); + } + write(1, "\n", 1); + } + exit(0); +} + +static pidpair *addpid(int tpid) +{ + int fdout[2], fdin[2]; + pid_t pid; + assert(npids < MAXPIDS); + pids[npids].tpid = tpid; + pipe(fdout); + pipe(fdin); + if ((pid = fork()) == 0) { + /* child */ + fclose(stdin); + fclose(stdout); + dup2(fdout[0], 0); + dup2(fdin[1], 1); + stdin = fdopen(0, "r"); + stdout = fdopen(1, "w"); + child(); + return 0; /* NOTREACHED */ + } else { + pids[npids].rpid = pid; + pids[npids].fdout = fdout[1]; + pids[npids].fdin = fdin[0]; + lastpid = pids+npids; + npids++; + return lastpid; + } +} + +static pidpair *findpid(int tpid) +{ + int i; + if (!lastpid || lastpid->tpid != tpid) { + for (i=0; i<npids; i++) + if (pids[i].tpid == tpid) + break; + if (i == npids) + return NULL; + lastpid = &pids[i]; + } + return lastpid; +} + +volatile pid_t killpid; + +static void delpid(int tpid) +{ + pidpair *pp = findpid(tpid); + if (pp) { + pid_t kpid = pp->rpid; + killpid = kpid; + write(pp->fdout, "exit\n", sizeof("exit")); + while (killpid == kpid) + usleep(10000); + } +} + +static void reaper(int sig) +{ + int status, i; + pid_t pid = waitpid(-1, &status, 0); + if (pid > 0) { + fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid); + for (i=0; i<npids; i++) + if (pids[i].rpid == pid) + break; + assert(i < npids); + close(pids[i].fdout); + close(pids[i].fdin); + for (;i<npids-1; i++) + pids[i] = pids[i+1]; + npids--; + killpid = 0; + } +} + +int main(int argc,char * argv[]) +{ + signal(SIGCHLD, reaper); + + while (fgets(inbuf, sizeof(inbuf), stdin)) { + pidpair *pp; + int tpid, len; + char c, *ptr; + lcount++; + + if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) { + sscanf(inbuf+SOFF("killed"),"%d", &tpid); + delpid(tpid); + continue; + } + + if (inbuf[0] != '>') + continue; + ptr = inbuf+1; + sscanf(ptr, "%d:%n", &tpid, &len); + pp = findpid(tpid); + if (!pp) + pp = addpid(tpid); /* new process */ + + ptr = inbuf+len+1; + len = strlen(ptr); + write(pp->fdout, ptr, len); /* send command and wait for ack */ + read(pp->fdin, &c, 1); + } + while (npids) + delpid(pids[0].tpid); +} |