/* mplay.c - memory-mapped database log replay */ /* * Copyright 2011-2023 Howard Chu, Symas Corp. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted only as authorized by the OpenLDAP * Public License. * * A copy of this license is available in the file LICENSE in the * top-level directory of the distribution or, alternatively, at * . */ #include #include #include #include #include #include #include #include #include #include "lmdb.h" #define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr) #define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0)) #define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \ "%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort())) #define MDB_SCNy(t) "z" #t #define SCMP(s) s, (sizeof(s)-1) char inbuf[8192]; char *dbuf, *kbuf; size_t dbufsize; int maxkey; #define SOFF(s) (sizeof(s)+1) #define MAXENVS 16 #define MAXTXNS 16 #define MAXCRSS 16 #define MAXPIDS 16 typedef struct crspair { void *tcrs; /* scanned text pointer */ MDB_cursor *rcrs; } crspair; typedef struct txnpair { void *ttxn; /* scanned text pointer */ MDB_txn *rtxn; crspair cursors[MAXCRSS]; int ncursors; } txnpair; typedef struct envpair { void *tenv; MDB_env *renv; txnpair txns[MAXTXNS]; int ntxns; } envpair; envpair envs[MAXENVS]; int nenvs; envpair *lastenv; txnpair *lasttxn; crspair *lastcrs; typedef struct pidpair { int tpid; pid_t rpid; int fdout, fdin; } pidpair; pidpair *lastpid; pidpair pids[MAXPIDS]; int npids; unsigned long lcount; static int unhex(unsigned char *c2) { int x, c; x = *c2++ & 0x4f; if (x & 0x40) x -= 55; c = x << 4; x = *c2 & 0x4f; if (x & 0x40) x -= 55; c |= x; return c; } int inhex(char *in, char *out) { char *c2 = out; while (isxdigit(*in)) { *c2++ = unhex((unsigned char *)in); in += 2; } return c2 - out; } static void addenv(void *tenv, MDB_env *renv) { assert(nenvs < MAXENVS); envs[nenvs].tenv = tenv; envs[nenvs].renv = renv; envs[nenvs].ntxns = 0; lastenv = envs+nenvs; nenvs++; } static envpair *findenv(void *tenv) { int i; if (!lastenv || lastenv->tenv != tenv) { for (i=0; intxns < MAXTXNS); tp = ep->txns+ep->ntxns; tp->ttxn = ttxn; tp->rtxn = rtxn; tp->ncursors = 0; ep->ntxns++; lasttxn = tp; } static txnpair *findtxn(void *ttxn) { int i, j; if (lasttxn && lasttxn->ttxn == ttxn) return lasttxn; if (lastenv) { for (i=0; intxns; i++) { if (lastenv->txns[i].ttxn == ttxn) { lasttxn = lastenv->txns+i; return lasttxn; } } } for (i=0; itxns; for (; intxns-1; i++) lastenv->txns[i] = lastenv->txns[i+1]; lastenv->ntxns--; lasttxn = NULL; } static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs) { int j = tp->ncursors; assert(tp->ncursors < MAXCRSS); tp->cursors[j].tcrs = tcrs; tp->cursors[j].rcrs = rcrs; tp->ncursors++; lastcrs = tp->cursors+j; } static crspair *findcrs(void *tcrs) { int i, j, k; envpair *ep; txnpair *tp; crspair *cp; if (lastcrs && lastcrs->tcrs == tcrs) return lastcrs; if (lasttxn) { for (k=0, cp=lasttxn->cursors; kncursors; k++, cp++) { if (cp->tcrs == tcrs) { lastcrs = cp; return lastcrs; } } } if (lastenv) { for (j=0, tp=lastenv->txns; jntxns; j++, tp++){ if (tp == lasttxn) continue; for (k=0, cp = tp->cursors; kncursors; k++, cp++) { if (cp->tcrs == tcrs) { lastcrs = cp; lasttxn = tp; return lastcrs; } } } } for (i=0, ep=envs; itxns; jntxns; j++, tp++) { if (tp == lasttxn) continue; for (k=0, cp = tp->cursors; kncursors; k++, cp++) { if (cp->tcrs == tcrs) { lastcrs = cp; lasttxn = tp; lastenv = ep; return lastcrs; } } } } assert(0); /* should have found it already */ } static void delcrs(void *tcrs) { int i; crspair *cp = findcrs(tcrs); mdb_cursor_close(cp->rcrs); for (i = cp - lasttxn->cursors; incursors-1; i++) lasttxn->cursors[i] = lasttxn->cursors[i+1]; lasttxn->ncursors--; lastcrs = NULL; } void child() { int rc; MDB_val key, data; char *ptr; while (fgets(inbuf, sizeof(inbuf), stdin)) { ptr = inbuf; if (!strncmp(ptr, SCMP("exit"))) break; if (!strncmp(ptr, SCMP("mdb_env_create"))) { void *tenv; MDB_env *renv; sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv); E(mdb_env_create(&renv)); addenv(tenv, renv); } else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) { void *tenv; envpair *ep; unsigned int maxdbs; sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs); ep = findenv(tenv); E(mdb_env_set_maxdbs(ep->renv, maxdbs)); } else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) { void *tenv; envpair *ep; size_t mapsize; sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize); ep = findenv(tenv); E(mdb_env_set_mapsize(ep->renv, mapsize)); } else if (!strncmp(ptr, SCMP("mdb_env_open"))) { void *tenv; envpair *ep; char *path; int len; unsigned int flags, mode; sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len); path = ptr+SOFF("mdb_env_open")+len; ptr = strchr(path, ','); *ptr++ = '\0'; sscanf(ptr, "%u, %o", &flags, &mode); ep = findenv(tenv); E(mdb_env_open(ep->renv, path, flags, mode)); if (!maxkey) { maxkey = mdb_env_get_maxkeysize(ep->renv); kbuf = malloc(maxkey+2); dbuf = malloc(maxkey+2); dbufsize = maxkey; } } else if (!strncmp(ptr, SCMP("mdb_env_close"))) { void *tenv; envpair *ep; sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv); ep = findenv(tenv); mdb_env_close(ep->renv); delenv(ep); if (!nenvs) /* if no other envs left, this process is done */ break; } else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) { unsigned int flags; void *tenv, *ttxn; envpair *ep; MDB_txn *rtxn; sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn); ep = findenv(tenv); E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn)); addtxn(tenv, ttxn, rtxn); } else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) { void *ttxn; txnpair *tp; sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn); tp = findtxn(ttxn); E(mdb_txn_commit(tp->rtxn)); deltxn(tp); } else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) { void *ttxn; txnpair *tp; sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn); tp = findtxn(ttxn); mdb_txn_abort(tp->rtxn); deltxn(tp); } else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) { void *ttxn; txnpair *tp; char *dbname; unsigned int flags; unsigned int tdbi; MDB_dbi dbi; sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn); dbname = strchr(ptr+SOFF("mdb_dbi_open"), ','); dbname += 2; ptr = strchr(dbname, ','); *ptr++ = '\0'; if (!strcmp(dbname, "(null)")) dbname = NULL; sscanf(ptr, "%u = %u", &flags, &tdbi); tp = findtxn(ttxn); E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi)); } else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) { void *tenv; envpair *ep; unsigned int tdbi; sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi); ep = findenv(tenv); mdb_dbi_close(ep->renv, tdbi); } else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) { void *ttxn, *tcrs; txnpair *tp; MDB_cursor *rcrs; unsigned int tdbi; sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs); tp = findtxn(ttxn); E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs)); addcrs(tp, tcrs, rcrs); } else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) { void *tcrs; sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs); delcrs(tcrs); } else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) { void *tcrs; crspair *cp; unsigned int flags; int len; sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs); cp = findcrs(tcrs); ptr = strchr(ptr+SOFF("mdb_cursor_put"), ','); sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size); if (key.mv_size) { ptr = strchr(ptr, '['); inhex(ptr+1, kbuf); key.mv_data = kbuf; ptr += key.mv_size * 2 + 2; } ptr = strchr(ptr+1, ','); sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); if (data.mv_size > dbufsize) { dbuf = realloc(dbuf, data.mv_size+2); assert(dbuf != NULL); dbufsize = data.mv_size; } ptr += len+1; if (*ptr == '[') { inhex(ptr+1, dbuf); data.mv_data = dbuf; ptr += data.mv_size * 2 + 2; } else { sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn)); } sscanf(ptr+1, "%u", &flags); E(mdb_cursor_put(cp->rcrs, &key, &data, flags)); } else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) { void *tcrs; crspair *cp; unsigned int flags; sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags); cp = findcrs(tcrs); E(mdb_cursor_del(cp->rcrs, flags)); } else if (!strncmp(ptr, SCMP("mdb_put"))) { void *ttxn; txnpair *tp; unsigned int tdbi, flags; int len; sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size); tp = findtxn(ttxn); ptr = strchr(ptr+SOFF("mdb_put"), '['); inhex(ptr+1, kbuf); key.mv_data = kbuf; ptr += key.mv_size * 2 + 2; sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); if (data.mv_size > dbufsize) { dbuf = realloc(dbuf, data.mv_size+2); assert(dbuf != NULL); dbufsize = data.mv_size; } ptr += len+1; if (*ptr == '[') { inhex(ptr+1, dbuf); ptr += data.mv_size * 2 + 2; } else { sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn)); } data.mv_data = dbuf; sscanf(ptr+1, "%u", &flags); RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags)); } else if (!strncmp(ptr, SCMP("mdb_del"))) { void *ttxn; txnpair *tp; unsigned int tdbi; int len; sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size); tp = findtxn(ttxn); ptr = strchr(ptr+SOFF("mdb_del"), '['); inhex(ptr+1, kbuf); key.mv_data = kbuf; ptr += key.mv_size * 2 + 2; sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len); if (data.mv_size > dbufsize) { dbuf = realloc(dbuf, data.mv_size+2); assert(dbuf != NULL); dbufsize = data.mv_size; } ptr += len+1; if (*ptr == '[') { inhex(ptr+1, dbuf); } else { sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn)); } data.mv_data = dbuf; RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data)); } write(1, "\n", 1); } exit(0); } static pidpair *addpid(int tpid) { int fdout[2], fdin[2]; pid_t pid; assert(npids < MAXPIDS); pids[npids].tpid = tpid; pipe(fdout); pipe(fdin); if ((pid = fork()) == 0) { /* child */ fclose(stdin); fclose(stdout); dup2(fdout[0], 0); dup2(fdin[1], 1); stdin = fdopen(0, "r"); stdout = fdopen(1, "w"); child(); return 0; /* NOTREACHED */ } else { pids[npids].rpid = pid; pids[npids].fdout = fdout[1]; pids[npids].fdin = fdin[0]; lastpid = pids+npids; npids++; return lastpid; } } static pidpair *findpid(int tpid) { int i; if (!lastpid || lastpid->tpid != tpid) { for (i=0; irpid; killpid = kpid; write(pp->fdout, "exit\n", sizeof("exit")); while (killpid == kpid) usleep(10000); } } static void reaper(int sig) { int status, i; pid_t pid = waitpid(-1, &status, 0); if (pid > 0) { fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid); for (i=0; ifdout, ptr, len); /* send command and wait for ack */ read(pp->fdin, &c, 1); } while (npids) delpid(pids[0].tpid); }