summaryrefslogtreecommitdiffstats
path: root/src/util/slmdb.c
blob: 499589d046d7309da6cfbf5bf203cae8f46aeeb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
/*++
/* NAME
/*	slmdb 3
/* SUMMARY
/*	Simplified LMDB API
/* SYNOPSIS
/*	#include <slmdb.h>
/*
/*	int	slmdb_init(slmdb, curr_limit, size_incr, hard_limit)
/*	SLMDB	*slmdb;
/*	size_t	curr_limit;
/*	int	size_incr;
/*	size_t	hard_limit;
/*
/*	int	slmdb_open(slmdb, path, open_flags, lmdb_flags, slmdb_flags)
/*	SLMDB	*slmdb;
/*	const char *path;
/*	int	open_flags;
/*	int	lmdb_flags;
/*	int	slmdb_flags;
/*
/*	int	slmdb_close(slmdb)
/*	SLMDB	*slmdb;
/*
/*	int	slmdb_get(slmdb, mdb_key, mdb_value)
/*	SLMDB	*slmdb;
/*	MDB_val	*mdb_key;
/*	MDB_val	*mdb_value;
/*
/*	int	slmdb_put(slmdb, mdb_key, mdb_value, flags)
/*	SLMDB	*slmdb;
/*	MDB_val	*mdb_key;
/*	MDB_val	*mdb_value;
/*	int	flags;
/*
/*	int	slmdb_del(slmdb, mdb_key)
/*	SLMDB	*slmdb;
/*	MDB_val	*mdb_key;
/*
/*	int	slmdb_cursor_get(slmdb, mdb_key, mdb_value, op)
/*	SLMDB	*slmdb;
/*	MDB_val	*mdb_key;
/*	MDB_val	*mdb_value;
/*	MDB_cursor_op op;
/* AUXILIARY FUNCTIONS
/*	int	slmdb_fd(slmdb)
/*	SLMDB	*slmdb;
/*
/*	size_t	slmdb_curr_limit(slmdb)
/*	SLMDB	*slmdb;
/*
/*	int	slmdb_control(slmdb, request, ...)
/*	SLMDB	*slmdb;
/*	int	request;
/* DESCRIPTION
/*	This module simplifies the LMDB API by hiding recoverable
/*	errors from the application.  Details are given in the
/*	section "ERROR RECOVERY".
/*
/*	slmdb_init() performs mandatory initialization before opening
/*	an LMDB database. The result value is an LMDB status code
/*	(zero in case of success).
/*
/*	slmdb_open() opens an LMDB database.  The result value is
/*	an LMDB status code (zero in case of success).
/*
/*	slmdb_close() finalizes an optional bulk-mode transaction
/*	and closes a successfully-opened LMDB database.  The result
/*	value is an LMDB status code (zero in case of success).
/*
/*	slmdb_get() is an mdb_get() wrapper with automatic error
/*	recovery.  The result value is an LMDB status code (zero
/*	in case of success).
/*
/*	slmdb_put() is an mdb_put() wrapper with automatic error
/*	recovery.  The result value is an LMDB status code (zero
/*	in case of success).
/*
/*	slmdb_del() is an mdb_del() wrapper with automatic error
/*	recovery.  The result value is an LMDB status code (zero
/*	in case of success).
/*
/*	slmdb_cursor_get() is an mdb_cursor_get() wrapper with
/*	automatic error recovery.  The result value is an LMDB
/*	status code (zero in case of success). This wrapper supports
/*	only one cursor per database.
/*
/*	slmdb_fd() returns the file descriptor for the specified
/*	database.  This may be used for file status queries or
/*	application-controlled locking.
/*
/*	slmdb_curr_limit() returns the current database size limit
/*	for the specified database.
/*
/*	slmdb_control() specifies optional features. The result is
/*	an LMDB status code (zero in case of success).
/*
/*	Arguments:
/* .IP slmdb
/*	Pointer to caller-provided storage.
/* .IP curr_limit
/*	The initial memory mapping size limit. This limit is
/*	automatically increased when the database becomes full.
/* .IP size_incr
/*	An integer factor by which the memory mapping size limit
/*	is increased when the database becomes full.
/* .IP hard_limit
/*	The upper bound for the memory mapping size limit.
/* .IP path
/*	LMDB database pathname.
/* .IP open_flags
/*	Flags that control file open operations. Do not specify
/*	locking flags here.
/* .IP lmdb_flags
/*	Flags that control the LMDB environment. If MDB_NOLOCK is
/*	specified, then each slmdb_get() or slmdb_cursor_get() call
/*	must be protected with a shared (or exclusive) external lock,
/*	and each slmdb_put() or slmdb_del() call must be protected
/*	with an exclusive external lock. A lock may be released
/*	after the call returns. A writer may atomically downgrade
/*	an exclusive lock to shared, but it must obtain an exclusive
/*	lock before making another slmdb(3) write request.
/* .sp
/*	Note: when a database is opened with MDB_NOLOCK, external
/*	locks such as fcntl() do not protect slmdb(3) requests
/*	within the same process against each other.  If a program
/*	cannot avoid making simultaneous slmdb(3) requests, then
/*	it must synchronize these requests with in-process locks,
/*	in addition to the per-process fcntl(2) locks.
/* .IP slmdb_flags
/*	Bit-wise OR of zero or more of the following:
/* .RS
/* .IP SLMDB_FLAG_BULK
/*	Open the database and create a "bulk" transaction that is
/*	committed when the database is closed. If MDB_NOLOCK is
/*	specified, then the entire transaction must be protected
/*	with a persistent external lock.  All slmdb_get(), slmdb_put()
/*	and slmdb_del() requests will be directed to the "bulk"
/*	transaction.
/* .RE
/* .IP mdb_key
/*	Pointer to caller-provided lookup key storage.
/* .IP mdb_value
/*	Pointer to caller-provided value storage.
/* .IP op
/*	LMDB cursor operation.
/* .IP request
/*	The start of a list of (name, value) pairs, terminated with
/*	CA_SLMDB_CTL_END.  The following text enumerates the symbolic
/*	request names and the corresponding argument types.
/* .RS
/* .IP "CA_SLMDB_CTL_LONGJMP_FN(void (*)(void *, int))"
/*	Call-back function pointer. The function is called to repeat
/*	a failed bulk-mode transaction from the start. The arguments
/*	are the application context and the setjmp() or sigsetjmp()
/*	result value.
/* .IP "CA_SLMDB_CTL_NOTIFY_FN(void (*)(void *, int, ...))"
/*	Call-back function pointer. The function is called to report
/*	successful error recovery. The arguments are the application
/*	context, the MDB error code, and additional arguments that
/*	depend on the error code.  Details are given in the section
/*	"ERROR RECOVERY".
/* .IP "CA_SLMDB_CTL_ASSERT_FN(void (*)(void *, const char *))"
/*	Call-back function pointer.  The function is called to
/*	report an LMDB internal assertion failure. The arguments
/*	are the application context, and text that describes the
/*	problem.
/* .IP "CA_SLMDB_CTL_CB_CONTEXT(void *)"
/*	Application context that is passed in call-back function
/*	calls.
/* .IP "CA_SLMDB_CTL_API_RETRY_LIMIT(int)"
/*	How many times to recover from LMDB errors within the
/*	execution of a single slmdb(3) API call before giving up.
/* .IP "CA_SLMDB_CTL_BULK_RETRY_LIMIT(int)"
/*	How many times to recover from a bulk-mode transaction
/*	before giving up.
/* .RE
/* ERROR RECOVERY
/* .ad
/* .fi
/*	This module automatically repeats failed requests after
/*	recoverable errors, up to the limits specified with
/*	slmdb_control().
/*
/*	Recoverable errors are reported through an optional
/*	notification function specified with slmdb_control().  With
/*	recoverable MDB_MAP_FULL and MDB_MAP_RESIZED errors, the
/*	additional argument is a size_t value with the updated
/*	current database size limit; with recoverable MDB_READERS_FULL
/*	errors there is no additional argument.
/* BUGS
/*	Recovery from MDB_MAP_FULL involves resizing the database
/*	memory mapping.  According to LMDB documentation this
/*	requires that there is no concurrent activity in the same
/*	database by other threads in the same memory address space.
/* SEE ALSO
/*	lmdb(3) API manpage (currently, non-existent).
/* AUTHOR(S)
/*	Howard Chu
/*	Symas Corporation
/*
/*	Wietse Venema
/*	IBM T.J. Watson Research
/*	P.O. Box 704
/*	Yorktown Heights, NY 10598, USA
/*
/*	Wietse Venema
/*	Google, Inc.
/*	111 8th Avenue
/*	New York, NY 10011, USA
/*--*/

 /*
  * DO NOT include other Postfix-specific header files. This LMDB wrapper
  * must be usable outside Postfix.
  */

#ifdef HAS_LMDB

/* System library. */

#include <sys/stat.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <stdarg.h>
#include <string.h>
#include <stdlib.h>

/* Application-specific. */

#include <slmdb.h>

 /*
  * Minimum LMDB patchlevel.
  * 
  * LMDB 0.9.11 allows Postfix daemons to log an LMDB error message instead of
  * falling out of the sky without any explanation. Without such logging,
  * Postfix with LMDB would be too hard to support.
  * 
  * LMDB 0.9.10 fixes an information leak where LMDB wrote chunks of up to 4096
  * bytes of uninitialized heap memory to a database. This was a security
  * violation because it made information persistent that was not meant to be
  * persisted, or it was sharing information that was not meant to be shared.
  * 
  * LMDB 0.9.9 allows Postfix to use external (fcntl()-based) locks, instead of
  * having to use world-writable LMDB lock files.
  * 
  * LMDB 0.9.8 allows Postfix to update the database size limit on-the-fly, so
  * that it can recover from an MDB_MAP_FULL error without having to close
  * the database. It also allows an application to "pick up" a new database
  * size limit on-the-fly, so that it can recover from an MDB_MAP_RESIZED
  * error without having to close the database.
  * 
  * The database size limit that remains is imposed by the hardware memory
  * address space (31 or 47 bits, typically) or file system. The LMDB
  * implementation is supposed to handle databases larger than physical
  * memory. However, this is not necessarily guaranteed for (bulk)
  * transactions larger than physical memory.
  */
#if MDB_VERSION_FULL < MDB_VERINT(0, 9, 11)
#error "This Postfix version requires LMDB version 0.9.11 or later"
#endif

 /*
  * Error recovery.
  * 
  * The purpose of the slmdb(3) API is to hide LMDB quirks (recoverable
  * MAP_FULL, MAP_RESIZED, or MDB_READERS_FULL errors). With these out of the
  * way, applications can pretend that those quirks don't exist, and focus on
  * their own job.
  * 
  * - To recover from a single-transaction LMDB error, each wrapper function
  * uses tail recursion instead of goto. Since LMDB errors are rare, code
  * clarity is more important than speed.
  * 
  * - To recover from a bulk-transaction LMDB error, the error-recovery code
  * triggers a long jump back into the caller to some pre-arranged point (the
  * closest thing that C has to exception handling). The application is then
  * expected to repeat the bulk transaction from scratch.
  * 
  * When any code aborts a bulk transaction, it must reset slmdb->txn to null
  * to avoid a use-after-free problem in slmdb_close().
  */

 /*
  * Our default retry attempt limits. We allow a few retries per slmdb(3) API
  * call for non-bulk transactions. We allow a number of bulk-transaction
  * retries that is proportional to the memory address space.
  */
#define SLMDB_DEF_API_RETRY_LIMIT 30	/* Retries per slmdb(3) API call */
#define SLMDB_DEF_BULK_RETRY_LIMIT \
        (2 * sizeof(size_t) * CHAR_BIT)	/* Retries per bulk-mode transaction */

 /*
  * We increment the recursion counter each time we try to recover from
  * error, and reset the recursion counter when returning to the application
  * from the slmdb(3) API.
  */
#define SLMDB_API_RETURN(slmdb, status) do { \
	(slmdb)->api_retry_count = 0; \
	return (status); \
    } while (0)

 /*
  * With MDB_NOLOCK, the application uses an external lock for inter-process
  * synchronization. Because the caller may release the external lock after
  * an SLMDB API call, each SLMDB API function must use a short-lived
  * transaction unless the transaction is a bulk-mode transaction.
  */

/* slmdb_cursor_close - close cursor and its read transaction */

static void slmdb_cursor_close(SLMDB *slmdb)
{
    MDB_txn *txn;

    /*
     * Close the cursor and its read transaction. We can restore it later
     * from the saved key information.
     */
    txn = mdb_cursor_txn(slmdb->cursor);
    mdb_cursor_close(slmdb->cursor);
    slmdb->cursor = 0;
    mdb_txn_abort(txn);
}

/* slmdb_saved_key_init - initialize saved key info */

static void slmdb_saved_key_init(SLMDB *slmdb)
{
    slmdb->saved_key.mv_data = 0;
    slmdb->saved_key.mv_size = 0;
    slmdb->saved_key_size = 0;
}

/* slmdb_saved_key_free - destroy saved key info */

static void slmdb_saved_key_free(SLMDB *slmdb)
{
    free(slmdb->saved_key.mv_data);
    slmdb_saved_key_init(slmdb);
}

#define HAVE_SLMDB_SAVED_KEY(s) ((s)->saved_key.mv_data != 0)

/* slmdb_saved_key_assign - copy the saved key */

static int slmdb_saved_key_assign(SLMDB *slmdb, MDB_val *key_val)
{

    /*
     * Extend the buffer to fit the key, so that we can avoid malloc()
     * overhead most of the time.
     */
    if (slmdb->saved_key_size < key_val->mv_size) {
	if (slmdb->saved_key.mv_data == 0)
	    slmdb->saved_key.mv_data = malloc(key_val->mv_size);
	else
	    slmdb->saved_key.mv_data =
		realloc(slmdb->saved_key.mv_data, key_val->mv_size);
	if (slmdb->saved_key.mv_data == 0) {
	    slmdb_saved_key_init(slmdb);
	    return (ENOMEM);
	} else {
	    slmdb->saved_key_size = key_val->mv_size;
	}
    }

    /*
     * Copy the key under the cursor.
     */
    memcpy(slmdb->saved_key.mv_data, key_val->mv_data, key_val->mv_size);
    slmdb->saved_key.mv_size = key_val->mv_size;
    return (0);
}

/* slmdb_prepare - LMDB-specific (re)initialization before actual access */

static int slmdb_prepare(SLMDB *slmdb)
{
    int     status = 0;

    /*
     * This is called before accessing the database, or after recovery from
     * an LMDB error. Note: this code cannot recover from errors itself.
     * slmdb->txn is either the database open() transaction or a
     * freshly-created bulk-mode transaction. When slmdb_prepare() commits or
     * aborts commits a transaction, it must set slmdb->txn to null to avoid
     * a use-after-free error in slmdb_close().
     * 
     * - With O_TRUNC we make a "drop" request before updating the database.
     * 
     * - With a bulk-mode transaction we commit when the database is closed.
     */
    if (slmdb->open_flags & O_TRUNC) {
	if ((status = mdb_drop(slmdb->txn, slmdb->dbi, 0)) != 0) {
	    mdb_txn_abort(slmdb->txn);
	    slmdb->txn = 0;
	    return (status);
	}
	if ((slmdb->slmdb_flags & SLMDB_FLAG_BULK) == 0) {
	    status = mdb_txn_commit(slmdb->txn);
	    slmdb->txn = 0;
	    if (status != 0)
		return (status);
	}
    } else if ((slmdb->slmdb_flags & SLMDB_FLAG_BULK) == 0) {
	mdb_txn_abort(slmdb->txn);
	slmdb->txn = 0;
    }
    slmdb->api_retry_count = 0;
    return (status);
}

/* slmdb_recover - recover from LMDB errors */

static int slmdb_recover(SLMDB *slmdb, int status)
{
    MDB_envinfo info;
    int     original_status = status;

    /*
     * This may be needed in non-MDB_NOLOCK mode. Recovery is rare enough
     * that we don't care about a few wasted cycles.
     */
    if (slmdb->cursor != 0)
	slmdb_cursor_close(slmdb);

    /*
     * Limit the number of recovery attempts per slmdb(3) API request.
     */
    if ((slmdb->api_retry_count += 1) >= slmdb->api_retry_limit)
	return (status);

    /*
     * Limit the number of bulk transaction recovery attempts.
     */
    if ((slmdb->slmdb_flags & SLMDB_FLAG_BULK) != 0
	&& (slmdb->bulk_retry_count += 1) > slmdb->bulk_retry_limit)
	return (status);

    /*
     * Try to clear the error condition.
     */
    switch (status) {

	/*
	 * As of LMDB 0.9.8 when a non-bulk update runs into a "map full"
	 * error, we can resize the environment's memory map and clear the
	 * error condition. The caller should retry immediately.
	 */
    case MDB_MAP_FULL:
	/* Can we increase the memory map? Give up if we can't. */
	if (slmdb->curr_limit < slmdb->hard_limit / slmdb->size_incr) {
	    slmdb->curr_limit = slmdb->curr_limit * slmdb->size_incr;
	} else if (slmdb->curr_limit < slmdb->hard_limit) {
	    slmdb->curr_limit = slmdb->hard_limit;
	} else {
	    /* Sorry, we are already maxed out. */
	    break;
	}
	if (slmdb->notify_fn)
	    slmdb->notify_fn(slmdb->cb_context, MDB_MAP_FULL,
			     slmdb->curr_limit);
	status = mdb_env_set_mapsize(slmdb->env, slmdb->curr_limit);
	break;

	/*
	 * When a writer resizes the database, read-only applications must
	 * increase their LMDB memory map size limit, too. Otherwise, they
	 * won't be able to read a table after it grows.
	 * 
	 * As of LMDB 0.9.8 we can import the new memory map size limit into the
	 * database environment by calling mdb_env_set_mapsize() with a zero
	 * size argument. Then we extract the map size limit for later use.
	 * The caller should retry immediately.
	 */
    case MDB_MAP_RESIZED:
	if ((status = mdb_env_set_mapsize(slmdb->env, 0)) == 0) {
	    /* Do not panic. Maps may shrink after bulk update. */
	    mdb_env_info(slmdb->env, &info);
	    slmdb->curr_limit = info.me_mapsize;
	    if (slmdb->notify_fn)
		slmdb->notify_fn(slmdb->cb_context, MDB_MAP_RESIZED,
				 slmdb->curr_limit);
	}
	break;

	/*
	 * What is it with these built-in hard limits that cause systems to
	 * stop when demand is at its highest? When the system is under
	 * stress it should slow down and keep making progress.
	 */
    case MDB_READERS_FULL:
	if (slmdb->notify_fn)
	    slmdb->notify_fn(slmdb->cb_context, MDB_READERS_FULL);
	sleep(1);
	status = 0;
	break;

	/*
	 * We can't solve this problem. The application should terminate with
	 * a fatal run-time error and the program should be re-run later.
	 */
    default:
	break;
    }

    /*
     * If we cleared the error condition for a non-bulk transaction, return a
     * success status. The caller should retry the failed operation
     * immediately.
     */
    if (status == 0 && (slmdb->slmdb_flags & SLMDB_FLAG_BULK) != 0) {

	/*
	 * We cleared the error condition for a	bulk transaction. If the
	 * transaction is not restartable, return the original error. The
	 * caller should terminate with a fatal run-time error, and the
	 * program should be re-run later.
	 */
	if (slmdb->longjmp_fn == 0)
	    return (original_status);

	/*
	 * Rebuild a bulk transaction from scratch, by making a long jump
	 * back into the caller at some pre-arranged point. In MDB_NOLOCK
	 * mode, there is no need to upgrade a lock to "exclusive", because a
	 * failed write transaction has no side effects.
	 */
	if ((status = mdb_txn_begin(slmdb->env, (MDB_txn *) 0,
				    slmdb->lmdb_flags & MDB_RDONLY,
				    &slmdb->txn)) == 0
	    && (status = slmdb_prepare(slmdb)) == 0)
	    slmdb->longjmp_fn(slmdb->cb_context, 1);
    }
    return (status);
}

/* slmdb_txn_begin - mdb_txn_begin() wrapper with LMDB error recovery */

static int slmdb_txn_begin(SLMDB *slmdb, int rdonly, MDB_txn **txn)
{
    int     status;

    if ((status = mdb_txn_begin(slmdb->env, (MDB_txn *) 0, rdonly, txn)) != 0
	&& (status = slmdb_recover(slmdb, status)) == 0)
	status = slmdb_txn_begin(slmdb, rdonly, txn);

    return (status);
}

/* slmdb_get - mdb_get() wrapper with LMDB error recovery */

int     slmdb_get(SLMDB *slmdb, MDB_val *mdb_key, MDB_val *mdb_value)
{
    MDB_txn *txn;
    int     status;

    /*
     * Start a read transaction if there's no bulk-mode txn.
     */
    if (slmdb->txn)
	txn = slmdb->txn;
    else if ((status = slmdb_txn_begin(slmdb, MDB_RDONLY, &txn)) != 0)
	SLMDB_API_RETURN(slmdb, status);

    /*
     * Do the lookup.
     */
    if ((status = mdb_get(txn, slmdb->dbi, mdb_key, mdb_value)) != 0
	&& status != MDB_NOTFOUND) {
	mdb_txn_abort(txn);
	if (txn == slmdb->txn)
	    slmdb->txn = 0;
	if ((status = slmdb_recover(slmdb, status)) == 0)
	    status = slmdb_get(slmdb, mdb_key, mdb_value);
	SLMDB_API_RETURN(slmdb, status);
    }

    /*
     * Close the read txn if it's not the bulk-mode txn.
     */
    if (slmdb->txn == 0)
	mdb_txn_abort(txn);

    SLMDB_API_RETURN(slmdb, status);
}

/* slmdb_put - mdb_put() wrapper with LMDB error recovery */

int     slmdb_put(SLMDB *slmdb, MDB_val *mdb_key,
		          MDB_val *mdb_value, int flags)
{
    MDB_txn *txn;
    int     status;

    /*
     * Start a write transaction if there's no bulk-mode txn.
     */
    if (slmdb->txn)
	txn = slmdb->txn;
    else if ((status = slmdb_txn_begin(slmdb, 0, &txn)) != 0)
	SLMDB_API_RETURN(slmdb, status);

    /*
     * Do the update.
     */
    if ((status = mdb_put(txn, slmdb->dbi, mdb_key, mdb_value, flags)) != 0) {
	if (status != MDB_KEYEXIST) {
	    mdb_txn_abort(txn);
	    if (txn == slmdb->txn)
		slmdb->txn = 0;
	    if ((status = slmdb_recover(slmdb, status)) == 0)
		status = slmdb_put(slmdb, mdb_key, mdb_value, flags);
	    SLMDB_API_RETURN(slmdb, status);
	} else {
	    /* Abort non-bulk transaction only. */
	    if (slmdb->txn == 0)
		mdb_txn_abort(txn);
	}
    }

    /*
     * Commit the transaction if it's not the bulk-mode txn.
     */
    if (status == 0 && slmdb->txn == 0 && (status = mdb_txn_commit(txn)) != 0
	&& (status = slmdb_recover(slmdb, status)) == 0)
	status = slmdb_put(slmdb, mdb_key, mdb_value, flags);

    SLMDB_API_RETURN(slmdb, status);
}

/* slmdb_del - mdb_del() wrapper with LMDB error recovery */

int     slmdb_del(SLMDB *slmdb, MDB_val *mdb_key)
{
    MDB_txn *txn;
    int     status;

    /*
     * Start a write transaction if there's no bulk-mode txn.
     */
    if (slmdb->txn)
	txn = slmdb->txn;
    else if ((status = slmdb_txn_begin(slmdb, 0, &txn)) != 0)
	SLMDB_API_RETURN(slmdb, status);

    /*
     * Do the update.
     */
    if ((status = mdb_del(txn, slmdb->dbi, mdb_key, (MDB_val *) 0)) != 0) {
	if (status != MDB_NOTFOUND) {
	    mdb_txn_abort(txn);
	    if (txn == slmdb->txn)
		slmdb->txn = 0;
	    if ((status = slmdb_recover(slmdb, status)) == 0)
		status = slmdb_del(slmdb, mdb_key);
	    SLMDB_API_RETURN(slmdb, status);
	} else {
	    /* Abort non-bulk transaction only. */
	    if (slmdb->txn == 0)
		mdb_txn_abort(txn);
	}
    }

    /*
     * Commit the transaction if it's not the bulk-mode txn.
     */
    if (status == 0 && slmdb->txn == 0 && (status = mdb_txn_commit(txn)) != 0
	&& (status = slmdb_recover(slmdb, status)) == 0)
	status = slmdb_del(slmdb, mdb_key);

    SLMDB_API_RETURN(slmdb, status);
}

/* slmdb_cursor_get - mdb_cursor_get() wrapper with LMDB error recovery */

int     slmdb_cursor_get(SLMDB *slmdb, MDB_val *mdb_key,
			         MDB_val *mdb_value, MDB_cursor_op op)
{
    MDB_txn *txn;
    int     status = 0;

    /*
     * TODO: figure how we would recover a failing bulk transaction.
     */
    if ((slmdb->slmdb_flags & SLMDB_FLAG_BULK) != 0) {
	if (slmdb->assert_fn)
	    slmdb->assert_fn(slmdb->cb_context,
		     "slmdb_cursor_get: bulk transaction is not supported");
	return (MDB_PANIC);
    }

    /*
     * Open a read transaction and cursor if needed.
     */
    if (slmdb->cursor == 0) {
	if ((status = slmdb_txn_begin(slmdb, MDB_RDONLY, &txn)) != 0)
	    SLMDB_API_RETURN(slmdb, status);
	if ((status = mdb_cursor_open(txn, slmdb->dbi, &slmdb->cursor)) != 0) {
	    mdb_txn_abort(txn);
	    if ((status = slmdb_recover(slmdb, status)) == 0)
		status = slmdb_cursor_get(slmdb, mdb_key, mdb_value, op);
	    SLMDB_API_RETURN(slmdb, status);
	}

	/*
	 * Restore the cursor position from the saved key information.
	 */
	if (HAVE_SLMDB_SAVED_KEY(slmdb) && op != MDB_FIRST)
	    status = mdb_cursor_get(slmdb->cursor, &slmdb->saved_key,
				    (MDB_val *) 0, MDB_SET);
    }

    /*
     * Database lookup.
     */
    if (status == 0)
	status = mdb_cursor_get(slmdb->cursor, mdb_key, mdb_value, op);

    /*
     * Save the cursor position if successful. This can fail only with
     * ENOMEM.
     * 
     * Close the cursor read transaction if in MDB_NOLOCK mode, because the
     * caller may release the external lock after we return.
     */
    if (status == 0) {
	status = slmdb_saved_key_assign(slmdb, mdb_key);
	if (slmdb->lmdb_flags & MDB_NOLOCK)
	    slmdb_cursor_close(slmdb);
    }

    /*
     * Handle end-of-database or other error.
     */
    else {
	/* Do not hand-optimize out the slmdb_cursor_close() calls below. */
	if (status == MDB_NOTFOUND) {
	    slmdb_cursor_close(slmdb);
	    if (HAVE_SLMDB_SAVED_KEY(slmdb))
		slmdb_saved_key_free(slmdb);
	} else {
	    slmdb_cursor_close(slmdb);
	    if ((status = slmdb_recover(slmdb, status)) == 0)
		status = slmdb_cursor_get(slmdb, mdb_key, mdb_value, op);
	    SLMDB_API_RETURN(slmdb, status);
	    /* Do not hand-optimize out the above return statement. */
	}
    }
    SLMDB_API_RETURN(slmdb, status);
}

/* slmdb_assert_cb - report LMDB assertion failure */

static void slmdb_assert_cb(MDB_env *env, const char *text)
{
    SLMDB  *slmdb = (SLMDB *) mdb_env_get_userctx(env);

    if (slmdb->assert_fn)
	slmdb->assert_fn(slmdb->cb_context, text);
}

/* slmdb_control - control optional settings */

int     slmdb_control(SLMDB *slmdb, int first,...)
{
    va_list ap;
    int     status = 0;
    int     reqno;
    int     rc;

    va_start(ap, first);
    for (reqno = first; status == 0 && reqno != SLMDB_CTL_END; reqno = va_arg(ap, int)) {
	switch (reqno) {
	case SLMDB_CTL_LONGJMP_FN:
	    slmdb->longjmp_fn = va_arg(ap, SLMDB_LONGJMP_FN);
	    break;
	case SLMDB_CTL_NOTIFY_FN:
	    slmdb->notify_fn = va_arg(ap, SLMDB_NOTIFY_FN);
	    break;
	case SLMDB_CTL_ASSERT_FN:
	    slmdb->assert_fn = va_arg(ap, SLMDB_ASSERT_FN);
	    if ((rc = mdb_env_set_userctx(slmdb->env, (void *) slmdb)) != 0
	     || (rc = mdb_env_set_assert(slmdb->env, slmdb_assert_cb)) != 0)
		status = rc;
	    break;
	case SLMDB_CTL_CB_CONTEXT:
	    slmdb->cb_context = va_arg(ap, void *);
	    break;
	case SLMDB_CTL_API_RETRY_LIMIT:
	    slmdb->api_retry_limit = va_arg(ap, int);
	    break;
	case SLMDB_CTL_BULK_RETRY_LIMIT:
	    slmdb->bulk_retry_limit = va_arg(ap, int);
	    break;
	default:
	    status = errno = EINVAL;
	    break;
	}
    }
    va_end(ap);
    return (status);
}

/* slmdb_close - wrapper with LMDB error recovery */

int     slmdb_close(SLMDB *slmdb)
{
    int     status = 0;

    /*
     * Finish an open bulk transaction. If slmdb_recover() returns after a
     * bulk-transaction error, then it was unable to clear the error
     * condition, or unable to restart the bulk transaction.
     */
    if ((slmdb->slmdb_flags & SLMDB_FLAG_BULK) != 0 && slmdb->txn != 0
	&& (status = mdb_txn_commit(slmdb->txn)) != 0)
	status = slmdb_recover(slmdb, status);

    /*
     * Clean up after an unfinished sequence() operation.
     */
    if (slmdb->cursor != 0)
	slmdb_cursor_close(slmdb);

    mdb_env_close(slmdb->env);

    /*
     * Clean up the saved key information.
     */
    if (HAVE_SLMDB_SAVED_KEY(slmdb))
	slmdb_saved_key_free(slmdb);

    SLMDB_API_RETURN(slmdb, status);
}

/* slmdb_init - mandatory initialization */

int     slmdb_init(SLMDB *slmdb, size_t curr_limit, int size_incr,
		           size_t hard_limit)
{

    /*
     * This is a separate operation to keep the slmdb_open() API simple.
     * Don't allocate resources here. Just store control information,
     */
    slmdb->curr_limit = curr_limit;
    slmdb->size_incr = size_incr;
    slmdb->hard_limit = hard_limit;

    return (MDB_SUCCESS);
}

/* slmdb_open - open wrapped LMDB database */

int     slmdb_open(SLMDB *slmdb, const char *path, int open_flags,
		           int lmdb_flags, int slmdb_flags)
{
    struct stat st;
    MDB_env *env;
    MDB_txn *txn;
    MDB_dbi dbi;
    int     db_fd;
    int     status;

    /*
     * Create LMDB environment.
     */
    if ((status = mdb_env_create(&env)) != 0)
	return (status);

    /*
     * Make sure that the memory map has room to store and commit an initial
     * "drop" transaction as well as fixed database metadata. We have no way
     * to recover from errors before the first application-level I/O request.
     */
#define SLMDB_FUDGE      10240

    if (slmdb->curr_limit < SLMDB_FUDGE)
	slmdb->curr_limit = SLMDB_FUDGE;
    if (stat(path, &st) == 0
	&& st.st_size > slmdb->curr_limit - SLMDB_FUDGE) {
	if (st.st_size > slmdb->hard_limit)
	    slmdb->hard_limit = st.st_size;
	if (st.st_size < slmdb->hard_limit - SLMDB_FUDGE)
	    slmdb->curr_limit = st.st_size + SLMDB_FUDGE;
	else
	    slmdb->curr_limit = slmdb->hard_limit;
    }

    /*
     * mdb_open() requires a txn, but since the default DB always exists in
     * an LMDB environment, we usually don't need to do anything else with
     * the txn. It is currently used for truncate and for bulk transactions.
     */
    if ((status = mdb_env_set_mapsize(env, slmdb->curr_limit)) != 0
	|| (status = mdb_env_open(env, path, lmdb_flags, 0644)) != 0
	|| (status = mdb_txn_begin(env, (MDB_txn *) 0,
				   lmdb_flags & MDB_RDONLY, &txn)) != 0
	|| (status = mdb_open(txn, (const char *) 0, 0, &dbi)) != 0
	|| (status = mdb_env_get_fd(env, &db_fd)) != 0) {
	mdb_env_close(env);
	return (status);
    }

    /*
     * Bundle up.
     */
    slmdb->open_flags = open_flags;
    slmdb->lmdb_flags = lmdb_flags;
    slmdb->slmdb_flags = slmdb_flags;
    slmdb->env = env;
    slmdb->dbi = dbi;
    slmdb->db_fd = db_fd;
    slmdb->cursor = 0;
    slmdb_saved_key_init(slmdb);
    slmdb->api_retry_count = 0;
    slmdb->bulk_retry_count = 0;
    slmdb->api_retry_limit = SLMDB_DEF_API_RETRY_LIMIT;
    slmdb->bulk_retry_limit = SLMDB_DEF_BULK_RETRY_LIMIT;
    slmdb->longjmp_fn = 0;
    slmdb->notify_fn = 0;
    slmdb->assert_fn = 0;
    slmdb->cb_context = 0;
    slmdb->txn = txn;

    if ((status = slmdb_prepare(slmdb)) != 0)
	mdb_env_close(env);

    return (status);
}

#endif