summaryrefslogtreecommitdiffstats
path: root/src/os/ObjectStore.h
blob: 1c12068954edc533ddd61d19a2ca8db9c1fa1d54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */
#ifndef CEPH_OBJECTSTORE_H
#define CEPH_OBJECTSTORE_H

#include "include/Context.h"
#include "include/buffer.h"
#include "include/types.h"
#include "include/stringify.h"
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
#include "common/WorkQueue.h"
#include "ObjectMap.h"

#include <errno.h>
#include <sys/stat.h>
#include <vector>
#include <map>

#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__sun)
#include <sys/statvfs.h>
#else
#include <sys/vfs.h>    /* or <sys/statfs.h> */
#endif

#define OPS_PER_PTR 32

class CephContext;

using std::vector;
using std::string;
using std::map;

namespace ceph {
  class Formatter;
}

/*
 * low-level interface to the local OSD file system
 */

class Logger;
class ContextQueue;

static inline void encode(const map<string,bufferptr> *attrset, bufferlist &bl) {
  encode(*attrset, bl);
}

// this isn't the best place for these, but...
void decode_str_str_map_to_bl(bufferlist::const_iterator& p, bufferlist *out);
void decode_str_set_to_bl(bufferlist::const_iterator& p, bufferlist *out);

// Flag bits
typedef uint32_t osflagbits_t;
const int SKIP_JOURNAL_REPLAY = 1 << 0;
const int SKIP_MOUNT_OMAP = 1 << 1;

class ObjectStore {
protected:
  string path;

public:
  CephContext* cct;
  /**
   * create - create an ObjectStore instance.
   *
   * This is invoked once at initialization time.
   *
   * @param type type of store. This is a string from the configuration file.
   * @param data path (or other descriptor) for data
   * @param journal path (or other descriptor) for journal (optional)
   * @param flags which filestores should check if applicable
   */
  static ObjectStore *create(CephContext *cct,
			     const string& type,
			     const string& data,
			     const string& journal,
			     osflagbits_t flags = 0);

  /**
   * probe a block device to learn the uuid of the owning OSD
   *
   * @param cct cct
   * @param path path to device
   * @param fsid [out] osd uuid
   */
  static int probe_block_device_fsid(
    CephContext *cct,
    const string& path,
    uuid_d *fsid);

  /**
   * Fetch Object Store statistics.
   *
   * Currently only latency of write and apply times are measured.
   *
   * This appears to be called with nothing locked.
   */
  virtual objectstore_perf_stat_t get_cur_stats() = 0;

  /**
   * Fetch Object Store performance counters.
   *
   *
   * This appears to be called with nothing locked.
   */
  virtual const PerfCounters* get_perf_counters() const = 0;

  /**
   * a collection also orders transactions
   *
   * Any transactions queued under a given collection will be applied in
   * sequence.  Transactions queued under different collections may run
   * in parallel.
   *
   * ObjectStore users my get collection handles with open_collection() (or,
   * for bootstrapping a new collection, create_new_collection()).
   */
  struct CollectionImpl : public RefCountedObject {
    const coll_t cid;

    CollectionImpl(const coll_t& c)
      : RefCountedObject(NULL, 0),
	cid(c) {}

    /// wait for any queued transactions to apply
    // block until any previous transactions are visible.  specifically,
    // collection_list and collection_empty need to reflect prior operations.
    virtual void flush() = 0;

    /**
     * Async flush_commit
     *
     * There are two cases:
     * 1) collection is currently idle: the method returns true.  c is
     *    not touched.
     * 2) collection is not idle: the method returns false and c is
     *    called asynchronously with a value of 0 once all transactions
     *    queued on this collection prior to the call have been applied
     *    and committed.
     */
    virtual bool flush_commit(Context *c) = 0;

    const coll_t &get_cid() {
      return cid;
    }
  };
  typedef boost::intrusive_ptr<CollectionImpl> CollectionHandle;


  /*********************************
   *
   * Object Contents and semantics
   *
   * All ObjectStore objects are identified as a named object
   * (ghobject_t and hobject_t) in a named collection (coll_t).
   * ObjectStore operations support the creation, mutation, deletion
   * and enumeration of objects within a collection.  Enumeration is
   * in sorted key order (where keys are sorted by hash). Object names
   * are globally unique.
   *
   * Each object has four distinct parts: byte data, xattrs, omap_header
   * and omap entries.
   *
   * The data portion of an object is conceptually equivalent to a
   * file in a file system. Random and Partial access for both read
   * and write operations is required. The ability to have a sparse
   * implementation of the data portion of an object is beneficial for
   * some workloads, but not required. There is a system-wide limit on
   * the maximum size of an object, which is typically around 100 MB.
   *
   * Xattrs are equivalent to the extended attributes of file
   * systems. Xattrs are a set of key/value pairs.  Sub-value access
   * is not required. It is possible to enumerate the set of xattrs in
   * key order.  At the implementation level, xattrs are used
   * exclusively internal to Ceph and the implementer can expect the
   * total size of all of the xattrs on an object to be relatively
   * small, i.e., less than 64KB. Much of Ceph assumes that accessing
   * xattrs on temporally adjacent object accesses (recent past or
   * near future) is inexpensive.
   *
   * omap_header is a single blob of data. It can be read or written
   * in total.
   *
   * Omap entries are conceptually the same as xattrs
   * but in a different address space. In other words, you can have
   * the same key as an xattr and an omap entry and they have distinct
   * values. Enumeration of xattrs doesn't include omap entries and
   * vice versa. The size and access characteristics of omap entries
   * are very different from xattrs. In particular, the value portion
   * of an omap entry can be quite large (MBs).  More importantly, the
   * interface must support efficient range queries on omap entries even
   * when there are a large numbers of entries.
   *
   *********************************/

  /*******************************
   *
   * Collections
   *
   * A collection is simply a grouping of objects. Collections have
   * names (coll_t) and can be enumerated in order.  Like an
   * individual object, a collection also has a set of xattrs.
   *
   *
   */


  /*********************************
   * transaction
   *
   * A Transaction represents a sequence of primitive mutation
   * operations.
   *
   * Three events in the life of a Transaction result in
   * callbacks. Any Transaction can contain any number of callback
   * objects (Context) for any combination of the three classes of
   * callbacks:
   *
   *    on_applied_sync, on_applied, and on_commit.
   *
   * The "on_applied" and "on_applied_sync" callbacks are invoked when
   * the modifications requested by the Transaction are visible to
   * subsequent ObjectStore operations, i.e., the results are
   * readable. The only conceptual difference between on_applied and
   * on_applied_sync is the specific thread and locking environment in
   * which the callbacks operate.  "on_applied_sync" is called
   * directly by an ObjectStore execution thread. It is expected to
   * execute quickly and must not acquire any locks of the calling
   * environment. Conversely, "on_applied" is called from the separate
   * Finisher thread, meaning that it can contend for calling
   * environment locks. NB, on_applied and on_applied_sync are
   * sometimes called on_readable and on_readable_sync.
   *
   * The "on_commit" callback is also called from the Finisher thread
   * and indicates that all of the mutations have been durably
   * committed to stable storage (i.e., are now software/hardware
   * crashproof).
   *
   * At the implementation level, each mutation primitive (and its
   * associated data) can be serialized to a single buffer.  That
   * serialization, however, does not copy any data, but (using the
   * bufferlist library) will reference the original buffers.  This
   * implies that the buffer that contains the data being submitted
   * must remain stable until the on_commit callback completes.  In
   * practice, bufferlist handles all of this for you and this
   * subtlety is only relevant if you are referencing an existing
   * buffer via buffer::raw_static.
   *
   * Some implementations of ObjectStore choose to implement their own
   * form of journaling that uses the serialized form of a
   * Transaction. This requires that the encode/decode logic properly
   * version itself and handle version upgrades that might change the
   * format of the encoded Transaction. This has already happened a
   * couple of times and the Transaction object contains some helper
   * variables that aid in this legacy decoding:
   *
   *   sobject_encoding detects an older/simpler version of oid
   *   present in pre-bobtail versions of ceph.  use_pool_override
   *   also detects a situation where the pool of an oid can be
   *   overridden for legacy operations/buffers.  For non-legacy
   *   implementations of ObjectStore, neither of these fields are
   *   relevant.
   *
   *
   * TRANSACTION ISOLATION
   *
   * Except as noted above, isolation is the responsibility of the
   * caller. In other words, if any storage element (storage element
   * == any of the four portions of an object as described above) is
   * altered by a transaction (including deletion), the caller
   * promises not to attempt to read that element while the
   * transaction is pending (here pending means from the time of
   * issuance until the "on_applied_sync" callback has been
   * received). Violations of isolation need not be detected by
   * ObjectStore and there is no corresponding error mechanism for
   * reporting an isolation violation (crashing would be the
   * appropriate way to report an isolation violation if detected).
   *
   * Enumeration operations may violate transaction isolation as
   * described above when a storage element is being created or
   * deleted as part of a transaction. In this case, ObjectStore is
   * allowed to consider the enumeration operation to either precede
   * or follow the violating transaction element. In other words, the
   * presence/absence of the mutated element in the enumeration is
   * entirely at the discretion of ObjectStore. The arbitrary ordering
   * applies independently to each transaction element. For example,
   * if a transaction contains two mutating elements "create A" and
   * "delete B". And an enumeration operation is performed while this
   * transaction is pending. It is permissible for ObjectStore to
   * report any of the four possible combinations of the existence of
   * A and B.
   *
   */
  class Transaction {
  public:
    enum {
      OP_NOP =          0,
      OP_TOUCH =        9,   // cid, oid
      OP_WRITE =        10,  // cid, oid, offset, len, bl
      OP_ZERO =         11,  // cid, oid, offset, len
      OP_TRUNCATE =     12,  // cid, oid, len
      OP_REMOVE =       13,  // cid, oid
      OP_SETATTR =      14,  // cid, oid, attrname, bl
      OP_SETATTRS =     15,  // cid, oid, attrset
      OP_RMATTR =       16,  // cid, oid, attrname
      OP_CLONE =        17,  // cid, oid, newoid
      OP_CLONERANGE =   18,  // cid, oid, newoid, offset, len
      OP_CLONERANGE2 =  30,  // cid, oid, newoid, srcoff, len, dstoff

      OP_TRIMCACHE =    19,  // cid, oid, offset, len  **DEPRECATED**

      OP_MKCOLL =       20,  // cid
      OP_RMCOLL =       21,  // cid
      OP_COLL_ADD =     22,  // cid, oldcid, oid
      OP_COLL_REMOVE =  23,  // cid, oid
      OP_COLL_SETATTR = 24,  // cid, attrname, bl
      OP_COLL_RMATTR =  25,  // cid, attrname
      OP_COLL_SETATTRS = 26,  // cid, attrset
      OP_COLL_MOVE =    8,   // newcid, oldcid, oid

      OP_RMATTRS =      28,  // cid, oid
      OP_COLL_RENAME =       29,  // cid, newcid

      OP_OMAP_CLEAR = 31,   // cid
      OP_OMAP_SETKEYS = 32, // cid, attrset
      OP_OMAP_RMKEYS = 33,  // cid, keyset
      OP_OMAP_SETHEADER = 34, // cid, header
      OP_SPLIT_COLLECTION = 35, // cid, bits, destination
      OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination
				    doesn't create the destination */
      OP_OMAP_RMKEYRANGE = 37,  // cid, oid, firstkey, lastkey
      OP_COLL_MOVE_RENAME = 38,   // oldcid, oldoid, newcid, newoid

      OP_SETALLOCHINT = 39,  // cid, oid, object_size, write_size
      OP_COLL_HINT = 40, // cid, type, bl

      OP_TRY_RENAME = 41,   // oldcid, oldoid, newoid

      OP_COLL_SET_BITS = 42, // cid, bits

      OP_MERGE_COLLECTION = 43, // cid, destination
    };

    // Transaction hint type
    enum {
      COLL_HINT_EXPECTED_NUM_OBJECTS = 1,
    };

    struct Op {
      ceph_le32 op;
      ceph_le32 cid;
      ceph_le32 oid;
      ceph_le64 off;
      ceph_le64 len;
      ceph_le32 dest_cid;
      ceph_le32 dest_oid;               //OP_CLONE, OP_CLONERANGE
      ceph_le64 dest_off;               //OP_CLONERANGE
      union {
	struct {
	  ceph_le32 hint_type;          //OP_COLL_HINT
	};
	struct {
	  ceph_le32 alloc_hint_flags;   //OP_SETALLOCHINT
	};
      };
      ceph_le64 expected_object_size;   //OP_SETALLOCHINT
      ceph_le64 expected_write_size;    //OP_SETALLOCHINT
      ceph_le32 split_bits;             //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS,
                                        //OP_MKCOLL
      ceph_le32 split_rem;              //OP_SPLIT_COLLECTION2
    } __attribute__ ((packed)) ;

    struct TransactionData {
      ceph_le64 ops;
      ceph_le32 largest_data_len;
      ceph_le32 largest_data_off;
      ceph_le32 largest_data_off_in_data_bl;
      ceph_le32 fadvise_flags;

      TransactionData() noexcept :
        ops(init_le64(0)),
        largest_data_len(init_le32(0)),
        largest_data_off(init_le32(0)),
        largest_data_off_in_data_bl(init_le32(0)),
	fadvise_flags(init_le32(0)) { }

      // override default move operations to reset default values
      TransactionData(TransactionData&& other) noexcept :
        ops(other.ops),
        largest_data_len(other.largest_data_len),
        largest_data_off(other.largest_data_off),
        largest_data_off_in_data_bl(other.largest_data_off_in_data_bl),
        fadvise_flags(other.fadvise_flags) {
        other.ops = 0;
        other.largest_data_len = 0;
        other.largest_data_off = 0;
        other.largest_data_off_in_data_bl = 0;
        other.fadvise_flags = 0;
      }
      TransactionData& operator=(TransactionData&& other) noexcept {
        ops = other.ops;
        largest_data_len = other.largest_data_len;
        largest_data_off = other.largest_data_off;
        largest_data_off_in_data_bl = other.largest_data_off_in_data_bl;
        fadvise_flags = other.fadvise_flags;
        other.ops = 0;
        other.largest_data_len = 0;
        other.largest_data_off = 0;
        other.largest_data_off_in_data_bl = 0;
        other.fadvise_flags = 0;
        return *this;
      }

      TransactionData(const TransactionData& other) = default;
      TransactionData& operator=(const TransactionData& other) = default;

      void encode(bufferlist& bl) const {
        bl.append((char*)this, sizeof(TransactionData));
      }
      void decode(bufferlist::const_iterator &bl) {
        bl.copy(sizeof(TransactionData), (char*)this);
      }
    } __attribute__ ((packed)) ;

  private:
    TransactionData data;

    map<coll_t, __le32> coll_index;
    map<ghobject_t, __le32> object_index;

    __le32 coll_id {0};
    __le32 object_id {0};

    bufferlist data_bl;
    bufferlist op_bl;

    list<Context *> on_applied;
    list<Context *> on_commit;
    list<Context *> on_applied_sync;

  public:
    Transaction() = default;

    explicit Transaction(bufferlist::const_iterator &dp) {
      decode(dp);
    }
    explicit Transaction(bufferlist &nbl) {
      auto dp = nbl.cbegin();
      decode(dp);
    }

    // override default move operations to reset default values
    Transaction(Transaction&& other) noexcept :
      data(std::move(other.data)),
      coll_index(std::move(other.coll_index)),
      object_index(std::move(other.object_index)),
      coll_id(other.coll_id),
      object_id(other.object_id),
      data_bl(std::move(other.data_bl)),
      op_bl(std::move(other.op_bl)),
      on_applied(std::move(other.on_applied)),
      on_commit(std::move(other.on_commit)),
      on_applied_sync(std::move(other.on_applied_sync)) {
      other.coll_id = 0;
      other.object_id = 0;
    }

    Transaction& operator=(Transaction&& other) noexcept {
      data = std::move(other.data);
      coll_index = std::move(other.coll_index);
      object_index = std::move(other.object_index);
      coll_id = other.coll_id;
      object_id = other.object_id;
      data_bl = std::move(other.data_bl);
      op_bl = std::move(other.op_bl);
      on_applied = std::move(other.on_applied);
      on_commit = std::move(other.on_commit);
      on_applied_sync = std::move(other.on_applied_sync);
      other.coll_id = 0;
      other.object_id = 0;
      return *this;
    }

    Transaction(const Transaction& other) = default;
    Transaction& operator=(const Transaction& other) = default;

    // expose object_index for FileStore::Op's benefit
    const map<ghobject_t, __le32>& get_object_index() const {
      return object_index;
    }

    /* Operations on callback contexts */
    void register_on_applied(Context *c) {
      if (!c) return;
      on_applied.push_back(c);
    }
    void register_on_commit(Context *c) {
      if (!c) return;
      on_commit.push_back(c);
    }
    void register_on_applied_sync(Context *c) {
      if (!c) return;
      on_applied_sync.push_back(c);
    }
    void register_on_complete(Context *c) {
      if (!c) return;
      RunOnDeleteRef _complete (std::make_shared<RunOnDelete>(c));
      register_on_applied(new ContainerContext<RunOnDeleteRef>(_complete));
      register_on_commit(new ContainerContext<RunOnDeleteRef>(_complete));
    }
    bool has_contexts() const {
      return
	!on_commit.empty() ||
	!on_applied.empty() ||
	!on_applied_sync.empty();
    }

    static void collect_contexts(
      vector<Transaction>& t,
      Context **out_on_applied,
      Context **out_on_commit,
      Context **out_on_applied_sync) {
      ceph_assert(out_on_applied);
      ceph_assert(out_on_commit);
      ceph_assert(out_on_applied_sync);
      list<Context *> on_applied, on_commit, on_applied_sync;
      for (auto& i : t) {
	on_applied.splice(on_applied.end(), i.on_applied);
	on_commit.splice(on_commit.end(), i.on_commit);
	on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync);
      }
      *out_on_applied = C_Contexts::list_to_context(on_applied);
      *out_on_commit = C_Contexts::list_to_context(on_commit);
      *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
    }
    static void collect_contexts(
      vector<Transaction>& t,
      list<Context*> *out_on_applied,
      list<Context*> *out_on_commit,
      list<Context*> *out_on_applied_sync) {
      ceph_assert(out_on_applied);
      ceph_assert(out_on_commit);
      ceph_assert(out_on_applied_sync);
      for (auto& i : t) {
	out_on_applied->splice(out_on_applied->end(), i.on_applied);
	out_on_commit->splice(out_on_commit->end(), i.on_commit);
	out_on_applied_sync->splice(out_on_applied_sync->end(),
				    i.on_applied_sync);
      }
    }

    Context *get_on_applied() {
      return C_Contexts::list_to_context(on_applied);
    }
    Context *get_on_commit() {
      return C_Contexts::list_to_context(on_commit);
    }
    Context *get_on_applied_sync() {
      return C_Contexts::list_to_context(on_applied_sync);
    }

    void set_fadvise_flags(uint32_t flags) {
      data.fadvise_flags = flags;
    }
    void set_fadvise_flag(uint32_t flag) {
      data.fadvise_flags = data.fadvise_flags | flag;
    }
    uint32_t get_fadvise_flags() { return data.fadvise_flags; }

    void swap(Transaction& other) noexcept {
      std::swap(data, other.data);
      std::swap(on_applied, other.on_applied);
      std::swap(on_commit, other.on_commit);
      std::swap(on_applied_sync, other.on_applied_sync);

      std::swap(coll_index, other.coll_index);
      std::swap(object_index, other.object_index);
      std::swap(coll_id, other.coll_id);
      std::swap(object_id, other.object_id);
      op_bl.swap(other.op_bl);
      data_bl.swap(other.data_bl);
    }

    void _update_op(Op* op,
      vector<__le32> &cm,
      vector<__le32> &om) {

      switch (op->op) {
      case OP_NOP:
        break;

      case OP_TOUCH:
      case OP_REMOVE:
      case OP_SETATTR:
      case OP_SETATTRS:
      case OP_RMATTR:
      case OP_RMATTRS:
      case OP_COLL_REMOVE:
      case OP_OMAP_CLEAR:
      case OP_OMAP_SETKEYS:
      case OP_OMAP_RMKEYS:
      case OP_OMAP_RMKEYRANGE:
      case OP_OMAP_SETHEADER:
      case OP_WRITE:
      case OP_ZERO:
      case OP_TRUNCATE:
      case OP_SETALLOCHINT:
        ceph_assert(op->cid < cm.size());
        ceph_assert(op->oid < om.size());
        op->cid = cm[op->cid];
        op->oid = om[op->oid];
        break;

      case OP_CLONERANGE2:
      case OP_CLONE:
        ceph_assert(op->cid < cm.size());
        ceph_assert(op->oid < om.size());
        ceph_assert(op->dest_oid < om.size());
        op->cid = cm[op->cid];
        op->oid = om[op->oid];
        op->dest_oid = om[op->dest_oid];
        break;

      case OP_MKCOLL:
      case OP_RMCOLL:
      case OP_COLL_SETATTR:
      case OP_COLL_RMATTR:
      case OP_COLL_SETATTRS:
      case OP_COLL_HINT:
      case OP_COLL_SET_BITS:
        ceph_assert(op->cid < cm.size());
        op->cid = cm[op->cid];
        break;

      case OP_COLL_ADD:
        ceph_assert(op->cid < cm.size());
        ceph_assert(op->oid < om.size());
        ceph_assert(op->dest_cid < om.size());
        op->cid = cm[op->cid];
        op->dest_cid = cm[op->dest_cid];
        op->oid = om[op->oid];
        break;

      case OP_COLL_MOVE_RENAME:
        ceph_assert(op->cid < cm.size());
        ceph_assert(op->oid < om.size());
        ceph_assert(op->dest_cid < cm.size());
        ceph_assert(op->dest_oid < om.size());
        op->cid = cm[op->cid];
        op->oid = om[op->oid];
        op->dest_cid = cm[op->dest_cid];
        op->dest_oid = om[op->dest_oid];
        break;

      case OP_TRY_RENAME:
        ceph_assert(op->cid < cm.size());
        ceph_assert(op->oid < om.size());
        ceph_assert(op->dest_oid < om.size());
        op->cid = cm[op->cid];
        op->oid = om[op->oid];
        op->dest_oid = om[op->dest_oid];
	break;

      case OP_SPLIT_COLLECTION2:
        ceph_assert(op->cid < cm.size());
	ceph_assert(op->dest_cid < cm.size());
        op->cid = cm[op->cid];
        op->dest_cid = cm[op->dest_cid];
        break;

      case OP_MERGE_COLLECTION:
        ceph_assert(op->cid < cm.size());
	ceph_assert(op->dest_cid < cm.size());
        op->cid = cm[op->cid];
        op->dest_cid = cm[op->dest_cid];
        break;

      default:
        ceph_abort_msg("Unknown OP");
      }
    }
    void _update_op_bl(
      bufferlist& bl,
      vector<__le32> &cm,
      vector<__le32> &om) {
      for (auto& bp : bl.buffers()) {
        ceph_assert(bp.length() % sizeof(Op) == 0);

        char* raw_p = const_cast<char*>(bp.c_str());
        char* raw_end = raw_p + bp.length();
        while (raw_p < raw_end) {
          _update_op(reinterpret_cast<Op*>(raw_p), cm, om);
          raw_p += sizeof(Op);
        }
      }
    }
    /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
    void append(Transaction& other) {

      data.ops = data.ops + other.data.ops;
      if (other.data.largest_data_len > data.largest_data_len) {
	data.largest_data_len = other.data.largest_data_len;
	data.largest_data_off = other.data.largest_data_off;
	data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl;
      }
      data.fadvise_flags = data.fadvise_flags | other.data.fadvise_flags;
      on_applied.splice(on_applied.end(), other.on_applied);
      on_commit.splice(on_commit.end(), other.on_commit);
      on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);

      //append coll_index & object_index
      vector<__le32> cm(other.coll_index.size());
      map<coll_t, __le32>::iterator coll_index_p;
      for (coll_index_p = other.coll_index.begin();
           coll_index_p != other.coll_index.end();
           ++coll_index_p) {
        cm[coll_index_p->second] = _get_coll_id(coll_index_p->first);
      }

      vector<__le32> om(other.object_index.size());
      map<ghobject_t, __le32>::iterator object_index_p;
      for (object_index_p = other.object_index.begin();
           object_index_p != other.object_index.end();
           ++object_index_p) {
        om[object_index_p->second] = _get_object_id(object_index_p->first);
      }      

      //the other.op_bl SHOULD NOT be changes during append operation,
      //we use additional bufferlist to avoid this problem
      bufferlist other_op_bl;
      {
        bufferptr other_op_bl_ptr(other.op_bl.length());
        other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str());
        other_op_bl.append(std::move(other_op_bl_ptr));
      }

      //update other_op_bl with cm & om
      //When the other is appended to current transaction, all coll_index and
      //object_index in other.op_buffer should be updated by new index of the
      //combined transaction
      _update_op_bl(other_op_bl, cm, om);

      //append op_bl
      op_bl.append(other_op_bl);
      //append data_bl
      data_bl.append(other.data_bl);
    }

    /** Inquires about the Transaction as a whole. */

    /// How big is the encoded Transaction buffer?
    uint64_t get_encoded_bytes() {
      //layout: data_bl + op_bl + coll_index + object_index + data

      // coll_index size, object_index size and sizeof(transaction_data)
      // all here, so they may be computed at compile-time
      size_t final_size = sizeof(__u32) * 2 + sizeof(data);

      // coll_index second and object_index second
      final_size += (coll_index.size() + object_index.size()) * sizeof(__le32);

      // coll_index first
      for (auto p = coll_index.begin(); p != coll_index.end(); ++p) {
	final_size += p->first.encoded_size();
      }

      // object_index first
      for (auto p = object_index.begin(); p != object_index.end(); ++p) {
	final_size += p->first.encoded_size();
      }

      return data_bl.length() +
	op_bl.length() +
	final_size;
    }

    /// Retain old version for regression testing purposes
    uint64_t get_encoded_bytes_test() {
      using ceph::encode;
      //layout: data_bl + op_bl + coll_index + object_index + data
      bufferlist bl;
      encode(coll_index, bl);
      encode(object_index, bl);

      return data_bl.length() +
	op_bl.length() +
	bl.length() +
	sizeof(data);
    }

    uint64_t get_num_bytes() {
      return get_encoded_bytes();
    }
    /// Size of largest data buffer to the "write" operation encountered so far
    uint32_t get_data_length() {
      return data.largest_data_len;
    }
    /// offset within the encoded buffer to the start of the largest data buffer that's encoded
    uint32_t get_data_offset() {
      if (data.largest_data_off_in_data_bl) {
	return data.largest_data_off_in_data_bl +
	  sizeof(__u8) +      // encode struct_v
	  sizeof(__u8) +      // encode compat_v
	  sizeof(__u32) +     // encode len
	  sizeof(__u32);      // data_bl len
      }
      return 0;  // none
    }
    /// offset of buffer as aligned to destination within object.
    int get_data_alignment() {
      if (!data.largest_data_len)
	return 0;
      return (0 - get_data_offset()) & ~CEPH_PAGE_MASK;
    }
    /// Is the Transaction empty (no operations)
    bool empty() {
      return !data.ops;
    }
    /// Number of operations in the transaction
    int get_num_ops() {
      return data.ops;
    }

    /**
     * iterator
     *
     * Helper object to parse Transactions.
     *
     * ObjectStore instances use this object to step down the encoded
     * buffer decoding operation codes and parameters as we go.
     *
     */
    class iterator {
      Transaction *t;

      uint64_t ops;
      char* op_buffer_p;

      bufferlist::const_iterator data_bl_p;

    public:
      vector<coll_t> colls;
      vector<ghobject_t> objects;

    private:
      explicit iterator(Transaction *t)
        : t(t),
	  data_bl_p(t->data_bl.cbegin()),
          colls(t->coll_index.size()),
          objects(t->object_index.size()) {

        ops = t->data.ops;
        op_buffer_p = t->op_bl.c_str();

        map<coll_t, __le32>::iterator coll_index_p;
        for (coll_index_p = t->coll_index.begin();
             coll_index_p != t->coll_index.end();
             ++coll_index_p) {
          colls[coll_index_p->second] = coll_index_p->first;
        }

        map<ghobject_t, __le32>::iterator object_index_p;
        for (object_index_p = t->object_index.begin();
             object_index_p != t->object_index.end();
             ++object_index_p) {
          objects[object_index_p->second] = object_index_p->first;
        }
      }

      friend class Transaction;

    public:

      bool have_op() {
        return ops > 0;
      }
      Op* decode_op() {
        ceph_assert(ops > 0);

        Op* op = reinterpret_cast<Op*>(op_buffer_p);
        op_buffer_p += sizeof(Op);
        ops--;

        return op;
      }
      string decode_string() {
	using ceph::decode;
        string s;
        decode(s, data_bl_p);
        return s;
      }
      void decode_bp(bufferptr& bp) {
	using ceph::decode;
        decode(bp, data_bl_p);
      }
      void decode_bl(bufferlist& bl) {
	using ceph::decode;
        decode(bl, data_bl_p);
      }
      void decode_attrset(map<string,bufferptr>& aset) {
	using ceph::decode;
        decode(aset, data_bl_p);
      }
      void decode_attrset(map<string,bufferlist>& aset) {
	using ceph::decode;
        decode(aset, data_bl_p);
      }
      void decode_attrset_bl(bufferlist *pbl) {
	decode_str_str_map_to_bl(data_bl_p, pbl);
      }
      void decode_keyset(set<string> &keys){
	using ceph::decode;
        decode(keys, data_bl_p);
      }
      void decode_keyset_bl(bufferlist *pbl){
        decode_str_set_to_bl(data_bl_p, pbl);
      }

      const ghobject_t &get_oid(__le32 oid_id) {
        ceph_assert(oid_id < objects.size());
        return objects[oid_id];
      }
      const coll_t &get_cid(__le32 cid_id) {
        ceph_assert(cid_id < colls.size());
        return colls[cid_id];
      }
      uint32_t get_fadvise_flags() const {
	return t->get_fadvise_flags();
      }
    };

    iterator begin() {
       return iterator(this);
    }

private:
    void _build_actions_from_tbl();

    /**
     * Helper functions to encode the various mutation elements of a
     * transaction.  These are 1:1 with the operation codes (see
     * enumeration above).  These routines ensure that the
     * encoder/creator of a transaction gets the right data in the
     * right place. Sadly, there's no corresponding version nor any
     * form of seat belts for the decoder.
     */
    Op* _get_next_op() {
      if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) {
        op_bl.reserve(sizeof(Op) * OPS_PER_PTR);
      }
      // append_hole ensures bptr merging. Even huge number of ops
      // shouldn't result in overpopulating bl::_buffers.
      char* const p = op_bl.append_hole(sizeof(Op)).c_str();
      memset(p, 0, sizeof(Op));
      return reinterpret_cast<Op*>(p);
    }
    __le32 _get_coll_id(const coll_t& coll) {
      map<coll_t, __le32>::iterator c = coll_index.find(coll);
      if (c != coll_index.end())
        return c->second;

      __le32 index_id = coll_id++;
      coll_index[coll] = index_id;
      return index_id;
    }
    __le32 _get_object_id(const ghobject_t& oid) {
      map<ghobject_t, __le32>::iterator o = object_index.find(oid);
      if (o != object_index.end())
        return o->second;

      __le32 index_id = object_id++;
      object_index[oid] = index_id;
      return index_id;
    }

public:
    /// noop. 'nuf said
    void nop() {
      Op* _op = _get_next_op();
      _op->op = OP_NOP;
      data.ops = data.ops + 1;
    }
    /**
     * touch
     *
     * Ensure the existance of an object in a collection. Create an
     * empty object if necessary
     */
    void touch(const coll_t& cid, const ghobject_t& oid) {
      Op* _op = _get_next_op();
      _op->op = OP_TOUCH;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }
    /**
     * Write data to an offset within an object. If the object is too
     * small, it is expanded as needed.  It is possible to specify an
     * offset beyond the current end of an object and it will be
     * expanded as needed. Simple implementations of ObjectStore will
     * just zero the data between the old end of the object and the
     * newly provided data. More sophisticated implementations of
     * ObjectStore will omit the untouched data and store it as a
     * "hole" in the file.
     *
     * Note that a 0-length write does not affect the size of the object.
     */
    void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len,
	       const bufferlist& write_data, uint32_t flags = 0) {
      using ceph::encode;
      uint32_t orig_len = data_bl.length();
      Op* _op = _get_next_op();
      _op->op = OP_WRITE;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->off = off;
      _op->len = len;
      encode(write_data, data_bl);

      ceph_assert(len == write_data.length());
      data.fadvise_flags = data.fadvise_flags | flags;
      if (write_data.length() > data.largest_data_len) {
	data.largest_data_len = write_data.length();
	data.largest_data_off = off;
	data.largest_data_off_in_data_bl = orig_len + sizeof(__u32);  // we are about to
      }
      data.ops = data.ops + 1;
    }
    /**
     * zero out the indicated byte range within an object. Some
     * ObjectStore instances may optimize this to release the
     * underlying storage space.
     *
     * If the zero range extends beyond the end of the object, the object
     * size is extended, just as if we were writing a buffer full of zeros.
     * EXCEPT if the length is 0, in which case (just like a 0-length write)
     * we do not adjust the object size.
     */
    void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) {
      Op* _op = _get_next_op();
      _op->op = OP_ZERO;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->off = off;
      _op->len = len;
      data.ops = data.ops + 1;
    }
    /// Discard all data in the object beyond the specified size.
    void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) {
      Op* _op = _get_next_op();
      _op->op = OP_TRUNCATE;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->off = off;
      data.ops = data.ops + 1;
    }
    /// Remove an object. All four parts of the object are removed.
    void remove(const coll_t& cid, const ghobject_t& oid) {
      Op* _op = _get_next_op();
      _op->op = OP_REMOVE;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }
    /// Set an xattr of an object
    void setattr(const coll_t& cid, const ghobject_t& oid, const char* name, bufferlist& val) {
      string n(name);
      setattr(cid, oid, n, val);
    }
    /// Set an xattr of an object
    void setattr(const coll_t& cid, const ghobject_t& oid, const string& s, bufferlist& val) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_SETATTR;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(s, data_bl);
      encode(val, data_bl);
      data.ops = data.ops + 1;
    }
    /// Set multiple xattrs of an object
    void setattrs(const coll_t& cid, const ghobject_t& oid, const map<string,bufferptr>& attrset) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_SETATTRS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(attrset, data_bl);
      data.ops = data.ops + 1;
    }
    /// Set multiple xattrs of an object
    void setattrs(const coll_t& cid, const ghobject_t& oid, const map<string,bufferlist>& attrset) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_SETATTRS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(attrset, data_bl);
      data.ops = data.ops + 1;
    }
    /// remove an xattr from an object
    void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) {
      string n(name);
      rmattr(cid, oid, n);
    }
    /// remove an xattr from an object
    void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_RMATTR;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(s, data_bl);
      data.ops = data.ops + 1;
    }
    /// remove all xattrs from an object
    void rmattrs(const coll_t& cid, const ghobject_t& oid) {
      Op* _op = _get_next_op();
      _op->op = OP_RMATTRS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }
    /**
     * Clone an object into another object.
     *
     * Low-cost (e.g., O(1)) cloning (if supported) is best, but
     * fallback to an O(n) copy is allowed.  All four parts of the
     * object are cloned (data, xattrs, omap header, omap
     * entries).
     *
     * The destination named object may already exist, in
     * which case its previous contents are discarded.
     */
    void clone(const coll_t& cid, const ghobject_t& oid,
	       const ghobject_t& noid) {
      Op* _op = _get_next_op();
      _op->op = OP_CLONE;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->dest_oid = _get_object_id(noid);
      data.ops = data.ops + 1;
    }
    /**
     * Clone a byte range from one object to another.
     *
     * The data portion of the destination object receives a copy of a
     * portion of the data from the source object. None of the other
     * three parts of an object is copied from the source.
     *
     * The destination object size may be extended to the dstoff + len.
     *
     * The source range *must* overlap with the source object data. If it does
     * not the result is undefined.
     */
    void clone_range(const coll_t& cid, const ghobject_t& oid,
		     const ghobject_t& noid,
		     uint64_t srcoff, uint64_t srclen, uint64_t dstoff) {
      Op* _op = _get_next_op();
      _op->op = OP_CLONERANGE2;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->dest_oid = _get_object_id(noid);
      _op->off = srcoff;
      _op->len = srclen;
      _op->dest_off = dstoff;
      data.ops = data.ops + 1;
    }

    /// Create the collection
    void create_collection(const coll_t& cid, int bits) {
      Op* _op = _get_next_op();
      _op->op = OP_MKCOLL;
      _op->cid = _get_coll_id(cid);
      _op->split_bits = bits;
      data.ops = data.ops + 1;
    }

    /**
     * Give the collection a hint.
     *
     * @param cid  - collection id.
     * @param type - hint type.
     * @param hint - the hint payload, which contains the customized
     *               data along with the hint type.
     */
    void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_COLL_HINT;
      _op->cid = _get_coll_id(cid);
      _op->hint_type = type;
      encode(hint, data_bl);
      data.ops = data.ops + 1;
    }

    /// remove the collection, the collection must be empty
    void remove_collection(const coll_t& cid) {
      Op* _op = _get_next_op();
      _op->op = OP_RMCOLL;
      _op->cid = _get_coll_id(cid);
      data.ops = data.ops + 1;
    }
    void collection_move(const coll_t& cid, const coll_t &oldcid, const ghobject_t& oid)
      __attribute__ ((deprecated)) {
	// NOTE: we encode this as a fixed combo of ADD + REMOVE.  they
	// always appear together, so this is effectively a single MOVE.
	Op* _op = _get_next_op();
	_op->op = OP_COLL_ADD;
	_op->cid = _get_coll_id(oldcid);
	_op->oid = _get_object_id(oid);
	_op->dest_cid = _get_coll_id(cid);
	data.ops = data.ops + 1;

	_op = _get_next_op();
	_op->op = OP_COLL_REMOVE;
	_op->cid = _get_coll_id(oldcid);
	_op->oid = _get_object_id(oid);
	data.ops = data.ops + 1;
      }
    void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
				const coll_t &cid, const ghobject_t& oid) {
      Op* _op = _get_next_op();
      _op->op = OP_COLL_MOVE_RENAME;
      _op->cid = _get_coll_id(oldcid);
      _op->oid = _get_object_id(oldoid);
      _op->dest_cid = _get_coll_id(cid);
      _op->dest_oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }
    void try_rename(const coll_t &cid, const ghobject_t& oldoid,
                    const ghobject_t& oid) {
      Op* _op = _get_next_op();
      _op->op = OP_TRY_RENAME;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oldoid);
      _op->dest_oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }

    /// Remove omap from oid
    void omap_clear(
      const coll_t &cid,           ///< [in] Collection containing oid
      const ghobject_t &oid  ///< [in] Object from which to remove omap
      ) {
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_CLEAR;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data.ops = data.ops + 1;
    }
    /// Set keys on oid omap.  Replaces duplicate keys.
    void omap_setkeys(
      const coll_t& cid,                           ///< [in] Collection containing oid
      const ghobject_t &oid,                ///< [in] Object to update
      const map<string, bufferlist> &attrset ///< [in] Replacement keys and values
      ) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_SETKEYS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(attrset, data_bl);
      data.ops = data.ops + 1;
    }

    /// Set keys on an oid omap (bufferlist variant).
    void omap_setkeys(
      const coll_t &cid,                           ///< [in] Collection containing oid
      const ghobject_t &oid,                ///< [in] Object to update
      const bufferlist &attrset_bl          ///< [in] Replacement keys and values
      ) {
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_SETKEYS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data_bl.append(attrset_bl);
      data.ops = data.ops + 1;
    }

    /// Remove keys from oid omap
    void omap_rmkeys(
      const coll_t &cid,             ///< [in] Collection containing oid
      const ghobject_t &oid,  ///< [in] Object from which to remove the omap
      const set<string> &keys ///< [in] Keys to clear
      ) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_RMKEYS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(keys, data_bl);
      data.ops = data.ops + 1;
    }

    /// Remove keys from oid omap
    void omap_rmkeys(
      const coll_t &cid,             ///< [in] Collection containing oid
      const ghobject_t &oid,  ///< [in] Object from which to remove the omap
      const bufferlist &keys_bl ///< [in] Keys to clear
      ) {
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_RMKEYS;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      data_bl.append(keys_bl);
      data.ops = data.ops + 1;
    }

    /// Remove key range from oid omap
    void omap_rmkeyrange(
      const coll_t &cid,             ///< [in] Collection containing oid
      const ghobject_t &oid,  ///< [in] Object from which to remove the omap keys
      const string& first,    ///< [in] first key in range
      const string& last      ///< [in] first key past range, range is [first,last)
      ) {
        using ceph::encode;
	Op* _op = _get_next_op();
	_op->op = OP_OMAP_RMKEYRANGE;
	_op->cid = _get_coll_id(cid);
	_op->oid = _get_object_id(oid);
	encode(first, data_bl);
	encode(last, data_bl);
	data.ops = data.ops + 1;
      }

    /// Set omap header
    void omap_setheader(
      const coll_t &cid,             ///< [in] Collection containing oid
      const ghobject_t &oid,  ///< [in] Object
      const bufferlist &bl    ///< [in] Header value
      ) {
      using ceph::encode;
      Op* _op = _get_next_op();
      _op->op = OP_OMAP_SETHEADER;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      encode(bl, data_bl);
      data.ops = data.ops + 1;
    }

    /// Split collection based on given prefixes, objects matching the specified bits/rem are
    /// moved to the new collection
    void split_collection(
      const coll_t &cid,
      uint32_t bits,
      uint32_t rem,
      const coll_t &destination) {
      Op* _op = _get_next_op();
      _op->op = OP_SPLIT_COLLECTION2;
      _op->cid = _get_coll_id(cid);
      _op->dest_cid = _get_coll_id(destination);
      _op->split_bits = bits;
      _op->split_rem = rem;
      data.ops = data.ops + 1;
    }

    /// Merge collection into another.
    void merge_collection(
      coll_t cid,
      coll_t destination,
      uint32_t bits) {
      Op* _op = _get_next_op();
      _op->op = OP_MERGE_COLLECTION;
      _op->cid = _get_coll_id(cid);
      _op->dest_cid = _get_coll_id(destination);
      _op->split_bits = bits;
      data.ops = data.ops + 1;
    }

    void collection_set_bits(
      const coll_t &cid,
      int bits) {
      Op* _op = _get_next_op();
      _op->op = OP_COLL_SET_BITS;
      _op->cid = _get_coll_id(cid);
      _op->split_bits = bits;
      data.ops = data.ops + 1;
    }

    /// Set allocation hint for an object
    /// make 0 values(expected_object_size, expected_write_size) noops for all implementations
    void set_alloc_hint(
      const coll_t &cid,
      const ghobject_t &oid,
      uint64_t expected_object_size,
      uint64_t expected_write_size,
      uint32_t flags
    ) {
      Op* _op = _get_next_op();
      _op->op = OP_SETALLOCHINT;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->expected_object_size = expected_object_size;
      _op->expected_write_size = expected_write_size;
      _op->alloc_hint_flags = flags;
      data.ops = data.ops + 1;
    }

    void encode(bufferlist& bl) const {
      //layout: data_bl + op_bl + coll_index + object_index + data
      ENCODE_START(9, 9, bl);
      encode(data_bl, bl);
      encode(op_bl, bl);
      encode(coll_index, bl);
      encode(object_index, bl);
      data.encode(bl);
      ENCODE_FINISH(bl);
    }

    void decode(bufferlist::const_iterator &bl) {
      DECODE_START(9, bl);
      DECODE_OLDEST(9);

      decode(data_bl, bl);
      decode(op_bl, bl);
      decode(coll_index, bl);
      decode(object_index, bl);
      data.decode(bl);
      coll_id = coll_index.size();
      object_id = object_index.size();

      DECODE_FINISH(bl);
    }

    void dump(ceph::Formatter *f);
    static void generate_test_instances(list<Transaction*>& o);
  };

  int queue_transaction(CollectionHandle& ch,
			Transaction&& t,
			TrackedOpRef op = TrackedOpRef(),
			ThreadPool::TPHandle *handle = NULL) {
    vector<Transaction> tls;
    tls.push_back(std::move(t));
    return queue_transactions(ch, tls, op, handle);
  }

  virtual int queue_transactions(
    CollectionHandle& ch, vector<Transaction>& tls,
    TrackedOpRef op = TrackedOpRef(),
    ThreadPool::TPHandle *handle = NULL) = 0;


 public:
  ObjectStore(CephContext* cct,
	      const std::string& path_) : path(path_), cct(cct) {}
  virtual ~ObjectStore() {}

  // no copying
  explicit ObjectStore(const ObjectStore& o) = delete;
  const ObjectStore& operator=(const ObjectStore& o) = delete;

  // versioning
  virtual int upgrade() {
    return 0;
  }

  virtual void get_db_statistics(Formatter *f) { }
  virtual void generate_db_histogram(Formatter *f) { }
  virtual int flush_cache(ostream *os = NULL) { return -1; }
  virtual void dump_perf_counters(Formatter *f) {}
  virtual void dump_cache_stats(Formatter *f) {}
  virtual void dump_cache_stats(ostream& os) {}

  virtual string get_type() = 0;

  // mgmt
  virtual bool test_mount_in_use() = 0;
  virtual int mount() = 0;
  virtual int umount() = 0;
  virtual int fsck(bool deep) {
    return -EOPNOTSUPP;
  }
  virtual int repair(bool deep) {
    return -EOPNOTSUPP;
  }
  virtual int quick_fix() {
    return -EOPNOTSUPP;
  }

  virtual void set_cache_shards(unsigned num) { }

  /**
   * Returns 0 if the hobject is valid, -error otherwise
   *
   * Errors:
   * -ENAMETOOLONG: locator/namespace/name too large
   */
  virtual int validate_hobject_key(const hobject_t &obj) const = 0;

  virtual unsigned get_max_attr_name_length() = 0;
  virtual int mkfs() = 0;  // wipe
  virtual int mkjournal() = 0; // journal only
  virtual bool needs_journal() = 0;  //< requires a journal
  virtual bool wants_journal() = 0;  //< prefers a journal
  virtual bool allows_journal() = 0; //< allows a journal

  /// enumerate hardware devices (by 'devname', e.g., 'sda' as in /sys/block/sda)
  virtual int get_devices(std::set<string> *devls) {
    return -EOPNOTSUPP;
  }

  /// true if a txn is readable immediately after it is queued.
  virtual bool is_sync_onreadable() const {
    return true;
  }

  /**
   * is_rotational
   *
   * Check whether store is backed by a rotational (HDD) or non-rotational
   * (SSD) device.
   *
   * This must be usable *before* the store is mounted.
   *
   * @return true for HDD, false for SSD
   */
  virtual bool is_rotational() {
    return true;
  }

  /**
   * is_journal_rotational
   *
   * Check whether journal is backed by a rotational (HDD) or non-rotational
   * (SSD) device.
   *
   *
   * @return true for HDD, false for SSD
   */
  virtual bool is_journal_rotational() {
    return true;
  }

  virtual string get_default_device_class() {
    return is_rotational() ? "hdd" : "ssd";
  }

  virtual int get_numa_node(
    int *numa_node,
    set<int> *nodes,
    set<string> *failed) {
    return -EOPNOTSUPP;
  }


  virtual bool can_sort_nibblewise() {
    return false;   // assume a backend cannot, unless it says otherwise
  }

  virtual int statfs(struct store_statfs_t *buf,
		     osd_alert_list_t* alerts = nullptr) = 0;
  virtual int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) = 0;

  virtual void collect_metadata(map<string,string> *pm) { }

  /**
   * write_meta - write a simple configuration key out-of-band
   *
   * Write a simple key/value pair for basic store configuration
   * (e.g., a uuid or magic number) to an unopened/unmounted store.
   * The default implementation writes this to a plaintext file in the
   * path.
   *
   * A newline is appended.
   *
   * @param key key name (e.g., "fsid")
   * @param value value (e.g., a uuid rendered as a string)
   * @returns 0 for success, or an error code
   */
  virtual int write_meta(const std::string& key,
			 const std::string& value);

  /**
   * read_meta - read a simple configuration key out-of-band
   *
   * Read a simple key value to an unopened/mounted store.
   *
   * Trailing whitespace is stripped off.
   *
   * @param key key name
   * @param value pointer to value string
   * @returns 0 for success, or an error code
   */
  virtual int read_meta(const std::string& key,
			std::string *value);

  /**
   * get ideal max value for collection_list()
   *
   * default to some arbitrary values; the implementation will override.
   */
  virtual int get_ideal_list_max() { return 64; }


  /**
   * get a collection handle
   *
   * Provide a trivial handle as a default to avoid converting legacy
   * implementations.
   */
  virtual CollectionHandle open_collection(const coll_t &cid) = 0;

  /**
   * get a collection handle for a soon-to-be-created collection
   *
   * This handle must be used by queue_transaction that includes a
   * create_collection call in order to become valid.  It will become the
   * reference to the created collection.
   */
  virtual CollectionHandle create_new_collection(const coll_t &cid) = 0;

  /**
   * set ContextQueue for a collection
   *
   * After that, oncommits of Transaction will queue into commit_queue.
   * And osd ShardThread will call oncommits.
   */
  virtual void set_collection_commit_queue(const coll_t &cid, ContextQueue *commit_queue) = 0;

  /**
   * Synchronous read operations
   */

  /**
   * exists -- Test for existance of object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @returns true if object exists, false otherwise
   */
  virtual bool exists(CollectionHandle& c, const ghobject_t& oid) = 0;
  /**
   * set_collection_opts -- set pool options for a collectioninformation for an object
   *
   * @param cid collection
   * @param opts new collection options
   * @returns 0 on success, negative error code on failure.
   */
  virtual int set_collection_opts(
    CollectionHandle& c,
    const pool_opts_t& opts) = 0;

  /**
   * stat -- get information for an object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param st output information for the object
   * @param allow_eio if false, assert on -EIO operation failure
   * @returns 0 on success, negative error code on failure.
   */
  virtual int stat(
    CollectionHandle &c,
    const ghobject_t& oid,
    struct stat *st,
    bool allow_eio = false) = 0;
  /**
   * read -- read a byte range of data from an object
   *
   * Note: if reading from an offset past the end of the object, we
   * return 0 (not, say, -EINVAL).
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param offset location offset of first byte to be read
   * @param len number of bytes to be read
   * @param bl output bufferlist
   * @param op_flags is CEPH_OSD_OP_FLAG_*
   * @returns number of bytes read on success, or negative error code on failure.
   */
   virtual int read(
     CollectionHandle &c,
     const ghobject_t& oid,
     uint64_t offset,
     size_t len,
     bufferlist& bl,
     uint32_t op_flags = 0) = 0;

  /**
   * fiemap -- get extent map of data of an object
   *
   * Returns an encoded map of the extents of an object's data portion
   * (map<offset,size>).
   *
   * A non-enlightened implementation is free to return the extent (offset, len)
   * as the sole extent.
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param offset location offset of first byte to be read
   * @param len number of bytes to be read
   * @param bl output bufferlist for extent map information.
   * @returns 0 on success, negative error code on failure.
   */
   virtual int fiemap(CollectionHandle& c, const ghobject_t& oid,
		      uint64_t offset, size_t len, bufferlist& bl) = 0;
   virtual int fiemap(CollectionHandle& c, const ghobject_t& oid,
		      uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) = 0;

  /**
   * getattr -- get an xattr of an object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param name name of attr to read
   * @param value place to put output result.
   * @returns 0 on success, negative error code on failure.
   */
  virtual int getattr(CollectionHandle &c, const ghobject_t& oid,
		      const char *name, bufferptr& value) = 0;

  /**
   * getattr -- get an xattr of an object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param name name of attr to read
   * @param value place to put output result.
   * @returns 0 on success, negative error code on failure.
   */
  int getattr(
    CollectionHandle &c, const ghobject_t& oid,
    const string& name, bufferlist& value) {
    bufferptr bp;
    int r = getattr(c, oid, name.c_str(), bp);
    value.push_back(bp);
    return r;
  }

  /**
   * getattrs -- get all of the xattrs of an object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param aset place to put output result.
   * @returns 0 on success, negative error code on failure.
   */
  virtual int getattrs(CollectionHandle &c, const ghobject_t& oid,
		       map<string,bufferptr>& aset) = 0;

  /**
   * getattrs -- get all of the xattrs of an object
   *
   * @param cid collection for object
   * @param oid oid of object
   * @param aset place to put output result.
   * @returns 0 on success, negative error code on failure.
   */
  int getattrs(CollectionHandle &c, const ghobject_t& oid,
	       map<string,bufferlist>& aset) {
    map<string,bufferptr> bmap;
    int r = getattrs(c, oid, bmap);
    for (map<string,bufferptr>::iterator i = bmap.begin();
	i != bmap.end();
	++i) {
      aset[i->first].append(i->second);
    }
    return r;
  }


  // collections

  /**
   * list_collections -- get all of the collections known to this ObjectStore
   *
   * @param ls list of the collections in sorted order.
   * @returns 0 on success, negative error code on failure.
   */
  virtual int list_collections(vector<coll_t>& ls) = 0;

  /**
   * does a collection exist?
   *
   * @param c collection
   * @returns true if it exists, false otherwise
   */
  virtual bool collection_exists(const coll_t& c) = 0;

  /**
   * is a collection empty?
   *
   * @param c collection
   * @param empty true if the specified collection is empty, false otherwise
   * @returns 0 on success, negative error code on failure.
   */
  virtual int collection_empty(CollectionHandle& c, bool *empty) = 0;

  /**
   * return the number of significant bits of the coll_t::pgid.
   *
   * This should return what the last create_collection or split_collection
   * set.  A legacy backend may return -EAGAIN if the value is unavailable
   * (because we upgraded from an older version, e.g., FileStore).
   */
  virtual int collection_bits(CollectionHandle& c) = 0;


  /**
   * list contents of a collection that fall in the range [start, end) and no more than a specified many result
   *
   * @param c collection
   * @param start list object that sort >= this value
   * @param end list objects that sort < this value
   * @param max return no more than this many results
   * @param seq return no objects with snap < seq
   * @param ls [out] result
   * @param next [out] next item sorts >= this value
   * @return zero on success, or negative error
   */
  virtual int collection_list(CollectionHandle &c,
			      const ghobject_t& start, const ghobject_t& end,
			      int max,
			      vector<ghobject_t> *ls, ghobject_t *next) = 0;

  virtual int collection_list_legacy(CollectionHandle &c,
                                     const ghobject_t& start,
                                     const ghobject_t& end, int max,
                                     std::vector<ghobject_t> *ls,
                                     ghobject_t *next) {
    return collection_list(c, start, end, max, ls, next);
  }

  /// OMAP
  /// Get omap contents
  virtual int omap_get(
    CollectionHandle &c,     ///< [in] Collection containing oid
    const ghobject_t &oid,   ///< [in] Object containing omap
    bufferlist *header,      ///< [out] omap header
    map<string, bufferlist> *out /// < [out] Key to value map
    ) = 0;

  /// Get omap header
  virtual int omap_get_header(
    CollectionHandle &c,     ///< [in] Collection containing oid
    const ghobject_t &oid,   ///< [in] Object containing omap
    bufferlist *header,      ///< [out] omap header
    bool allow_eio = false ///< [in] don't assert on eio
    ) = 0;

  /// Get keys defined on oid
  virtual int omap_get_keys(
    CollectionHandle &c,   ///< [in] Collection containing oid
    const ghobject_t &oid, ///< [in] Object containing omap
    set<string> *keys      ///< [out] Keys defined on oid
    ) = 0;

  /// Get key values
  virtual int omap_get_values(
    CollectionHandle &c,         ///< [in] Collection containing oid
    const ghobject_t &oid,       ///< [in] Object containing omap
    const set<string> &keys,     ///< [in] Keys to get
    map<string, bufferlist> *out ///< [out] Returned keys and values
    ) = 0;

  /// Filters keys into out which are defined on oid
  virtual int omap_check_keys(
    CollectionHandle &c,     ///< [in] Collection containing oid
    const ghobject_t &oid,   ///< [in] Object containing omap
    const set<string> &keys, ///< [in] Keys to check
    set<string> *out         ///< [out] Subset of keys defined on oid
    ) = 0;

  /**
   * Returns an object map iterator
   *
   * Warning!  The returned iterator is an implicit lock on filestore
   * operations in c.  Do not use filestore methods on c while the returned
   * iterator is live.  (Filling in a transaction is no problem).
   *
   * @return iterator, null on error
   */
  virtual ObjectMap::ObjectMapIterator get_omap_iterator(
    CollectionHandle &c,   ///< [in] collection
    const ghobject_t &oid  ///< [in] object
    ) = 0;

  virtual int flush_journal() { return -EOPNOTSUPP; }

  virtual int dump_journal(ostream& out) { return -EOPNOTSUPP; }

  virtual int snapshot(const string& name) { return -EOPNOTSUPP; }

  /**
   * Set and get internal fsid for this instance. No external data is modified
   */
  virtual void set_fsid(uuid_d u) = 0;
  virtual uuid_d get_fsid() = 0;

  /**
  * Estimates additional disk space used by the specified amount of objects and caused by file allocation granularity and metadata store
  * - num objects - total (including witeouts) object count to measure used space for.
  */
  virtual uint64_t estimate_objects_overhead(uint64_t num_objects) = 0;


  // DEBUG
  virtual void inject_data_error(const ghobject_t &oid) {}
  virtual void inject_mdata_error(const ghobject_t &oid) {}

  virtual void compact() {}
  virtual bool has_builtin_csum() const {
    return false;
  }
};
WRITE_CLASS_ENCODER(ObjectStore::Transaction)
WRITE_CLASS_ENCODER(ObjectStore::Transaction::TransactionData)

ostream& operator<<(ostream& out, const ObjectStore::Transaction& tx);

#endif