// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2013 Inktank Storage, Inc. * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include "ECTransaction.h" #include "ECUtil.h" #include "os/ObjectStore.h" #include "common/inline_variant.h" void encode_and_write( pg_t pgid, const hobject_t &oid, const ECUtil::stripe_info_t &sinfo, ErasureCodeInterfaceRef &ecimpl, const set &want, uint64_t offset, bufferlist bl, uint32_t flags, ECUtil::HashInfoRef hinfo, extent_map &written, map *transactions, DoutPrefixProvider *dpp) { const uint64_t before_size = hinfo->get_total_logical_size(sinfo); ceph_assert(sinfo.logical_offset_is_stripe_aligned(offset)); ceph_assert(sinfo.logical_offset_is_stripe_aligned(bl.length())); ceph_assert(bl.length()); map buffers; int r = ECUtil::encode( sinfo, ecimpl, bl, want, &buffers); ceph_assert(r == 0); written.insert(offset, bl.length(), bl); ldpp_dout(dpp, 20) << __func__ << ": " << oid << " new_size " << offset + bl.length() << dendl; if (offset >= before_size) { ceph_assert(offset == before_size); hinfo->append( sinfo.aligned_logical_offset_to_chunk_offset(offset), buffers); } for (auto &&i : *transactions) { ceph_assert(buffers.count(i.first)); bufferlist &enc_bl = buffers[i.first]; if (offset >= before_size) { i.second.set_alloc_hint( coll_t(spg_t(pgid, i.first)), ghobject_t(oid, ghobject_t::NO_GEN, i.first), 0, 0, CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE | CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY); } i.second.write( coll_t(spg_t(pgid, i.first)), ghobject_t(oid, ghobject_t::NO_GEN, i.first), sinfo.logical_to_prev_chunk_offset( offset), enc_bl.length(), enc_bl, flags); } } bool ECTransaction::requires_overwrite( uint64_t prev_size, const PGTransaction::ObjectOperation &op) { // special handling for truncates to 0 if (op.truncate && op.truncate->first == 0) return false; return op.is_none() && ((!op.buffer_updates.empty() && (op.buffer_updates.begin().get_off() < prev_size)) || (op.truncate && (op.truncate->first < prev_size))); } void ECTransaction::generate_transactions( WritePlan &plan, ErasureCodeInterfaceRef &ecimpl, pg_t pgid, const ECUtil::stripe_info_t &sinfo, const map &partial_extents, vector &entries, map *written_map, map *transactions, set *temp_added, set *temp_removed, DoutPrefixProvider *dpp) { ceph_assert(written_map); ceph_assert(transactions); ceph_assert(temp_added); ceph_assert(temp_removed); ceph_assert(plan.t); auto &t = *(plan.t); auto &hash_infos = plan.hash_infos; map obj_to_log; for (auto &&i: entries) { obj_to_log.insert(make_pair(i.soid, &i)); } t.safe_create_traverse( [&](pair &opair) { const hobject_t &oid = opair.first; auto &op = opair.second; auto &obc_map = t.obc_map; auto &written = (*written_map)[oid]; auto iter = obj_to_log.find(oid); pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr; ObjectContextRef obc; auto obiter = t.obc_map.find(oid); if (obiter != t.obc_map.end()) { obc = obiter->second; } if (entry) { ceph_assert(obc); } else { ceph_assert(oid.is_temp()); } ECUtil::HashInfoRef hinfo; { auto iter = hash_infos.find(oid); ceph_assert(iter != hash_infos.end()); hinfo = iter->second; } if (oid.is_temp()) { if (op.is_fresh_object()) { temp_added->insert(oid); } else if (op.is_delete()) { temp_removed->insert(oid); } } if (entry && entry->is_modify() && op.updated_snaps) { bufferlist bl(op.updated_snaps->second.size() * 8 + 8); encode(op.updated_snaps->second, bl); entry->snaps.swap(bl); entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog); } ldpp_dout(dpp, 20) << "generate_transactions: " << opair.first << ", current size is " << hinfo->get_total_logical_size(sinfo) << " buffers are " << op.buffer_updates << dendl; if (op.truncate) { ldpp_dout(dpp, 20) << "generate_transactions: " << " truncate is " << *(op.truncate) << dendl; } if (entry && op.updated_snaps) { entry->mod_desc.update_snaps(op.updated_snaps->first); } map > xattr_rollback; ceph_assert(hinfo); bufferlist old_hinfo; encode(*hinfo, old_hinfo); xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo; if (op.is_none() && op.truncate && op.truncate->first == 0) { ceph_assert(op.truncate->first == 0); ceph_assert(op.truncate->first == op.truncate->second); ceph_assert(entry); ceph_assert(obc); if (op.truncate->first != op.truncate->second) { op.truncate->first = op.truncate->second; } else { op.truncate = boost::none; } op.delete_first = true; op.init_type = PGTransaction::ObjectOperation::Init::Create(); if (obc) { /* We need to reapply all of the cached xattrs. * std::map insert fortunately only writes keys * which don't already exist, so this should do * the right thing. */ op.attr_updates.insert( obc->attr_cache.begin(), obc->attr_cache.end()); } } if (op.delete_first) { /* We also want to remove the boost::none entries since * the keys already won't exist */ for (auto j = op.attr_updates.begin(); j != op.attr_updates.end(); ) { if (j->second) { ++j; } else { op.attr_updates.erase(j++); } } /* Fill in all current entries for xattr rollback */ if (obc) { xattr_rollback.insert( obc->attr_cache.begin(), obc->attr_cache.end()); obc->attr_cache.clear(); } if (entry) { entry->mod_desc.rmobject(entry->version.version); for (auto &&st: *transactions) { st.second.collection_move_rename( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), coll_t(spg_t(pgid, st.first)), ghobject_t(oid, entry->version.version, st.first)); } } else { for (auto &&st: *transactions) { st.second.remove( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first)); } } hinfo->clear(); } if (op.is_fresh_object() && entry) { entry->mod_desc.create(); } match( op.init_type, [&](const PGTransaction::ObjectOperation::Init::None &) {}, [&](const PGTransaction::ObjectOperation::Init::Create &op) { for (auto &&st: *transactions) { st.second.touch( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first)); } }, [&](const PGTransaction::ObjectOperation::Init::Clone &op) { for (auto &&st: *transactions) { st.second.clone( coll_t(spg_t(pgid, st.first)), ghobject_t(op.source, ghobject_t::NO_GEN, st.first), ghobject_t(oid, ghobject_t::NO_GEN, st.first)); } auto siter = hash_infos.find(op.source); ceph_assert(siter != hash_infos.end()); hinfo->update_to(*(siter->second)); if (obc) { auto cobciter = obc_map.find(op.source); ceph_assert(cobciter != obc_map.end()); obc->attr_cache = cobciter->second->attr_cache; } }, [&](const PGTransaction::ObjectOperation::Init::Rename &op) { ceph_assert(op.source.is_temp()); for (auto &&st: *transactions) { st.second.collection_move_rename( coll_t(spg_t(pgid, st.first)), ghobject_t(op.source, ghobject_t::NO_GEN, st.first), coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first)); } auto siter = hash_infos.find(op.source); ceph_assert(siter != hash_infos.end()); hinfo->update_to(*(siter->second)); if (obc) { auto cobciter = obc_map.find(op.source); ceph_assert(cobciter == obc_map.end()); obc->attr_cache.clear(); } }); // omap not supported (except 0, handled above) ceph_assert(!(op.clear_omap)); ceph_assert(!(op.omap_header)); ceph_assert(op.omap_updates.empty()); if (!op.attr_updates.empty()) { map to_set; for (auto &&j: op.attr_updates) { if (j.second) { to_set[j.first] = *(j.second); } else { for (auto &&st : *transactions) { st.second.rmattr( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), j.first); } } if (obc) { auto citer = obc->attr_cache.find(j.first); if (entry) { if (citer != obc->attr_cache.end()) { // won't overwrite anything we put in earlier xattr_rollback.insert( make_pair( j.first, boost::optional(citer->second))); } else { // won't overwrite anything we put in earlier xattr_rollback.insert( make_pair( j.first, boost::none)); } } if (j.second) { obc->attr_cache[j.first] = *(j.second); } else if (citer != obc->attr_cache.end()) { obc->attr_cache.erase(citer); } } else { ceph_assert(!entry); } } for (auto &&st : *transactions) { st.second.setattrs( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), to_set); } ceph_assert(!xattr_rollback.empty()); } if (entry && !xattr_rollback.empty()) { entry->mod_desc.setattrs(xattr_rollback); } if (op.alloc_hint) { /* logical_to_next_chunk_offset() scales down both aligned and * unaligned offsets * we don't bother to roll this back at this time for two reasons: * 1) it's advisory * 2) we don't track the old value */ uint64_t object_size = sinfo.logical_to_next_chunk_offset( op.alloc_hint->expected_object_size); uint64_t write_size = sinfo.logical_to_next_chunk_offset( op.alloc_hint->expected_write_size); for (auto &&st : *transactions) { st.second.set_alloc_hint( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), object_size, write_size, op.alloc_hint->flags); } } extent_map to_write; auto pextiter = partial_extents.find(oid); if (pextiter != partial_extents.end()) { to_write = pextiter->second; } vector > rollback_extents; const uint64_t orig_size = hinfo->get_total_logical_size(sinfo); uint64_t new_size = orig_size; uint64_t append_after = new_size; ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl; if (op.truncate && op.truncate->first < new_size) { ceph_assert(!op.is_fresh_object()); new_size = sinfo.logical_to_next_stripe_offset( op.truncate->first); ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down " << new_size << dendl; if (new_size != op.truncate->first) { // 0 the unaligned part bufferlist bl; bl.append_zero(new_size - op.truncate->first); to_write.insert( op.truncate->first, bl.length(), bl); append_after = sinfo.logical_to_prev_stripe_offset( op.truncate->first); } else { append_after = new_size; } to_write.erase( new_size, std::numeric_limits::max() - new_size); if (entry && !op.is_fresh_object()) { uint64_t restore_from = sinfo.logical_to_prev_chunk_offset( op.truncate->first); uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset( orig_size - sinfo.logical_to_prev_stripe_offset(op.truncate->first)); ceph_assert(rollback_extents.empty()); ldpp_dout(dpp, 20) << __func__ << ": saving extent " << make_pair(restore_from, restore_len) << dendl; ldpp_dout(dpp, 20) << __func__ << ": truncating to " << new_size << dendl; rollback_extents.emplace_back( make_pair(restore_from, restore_len)); for (auto &&st : *transactions) { st.second.touch( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, entry->version.version, st.first)); st.second.clone_range( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), ghobject_t(oid, entry->version.version, st.first), restore_from, restore_len, restore_from); } } else { ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object" << dendl; } for (auto &&st : *transactions) { st.second.truncate( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), sinfo.aligned_logical_offset_to_chunk_offset(new_size)); } } uint32_t fadvise_flags = 0; for (auto &&extent: op.buffer_updates) { using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; bufferlist bl; match( extent.get_val(), [&](const BufferUpdate::Write &op) { bl = op.buffer; fadvise_flags |= op.fadvise_flags; }, [&](const BufferUpdate::Zero &) { bl.append_zero(extent.get_len()); }, [&](const BufferUpdate::CloneRange &) { ceph_assert( 0 == "CloneRange is not allowed, do_op should have returned ENOTSUPP"); }); uint64_t off = extent.get_off(); uint64_t len = extent.get_len(); uint64_t end = off + len; ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update " << make_pair(off, len) << dendl; ceph_assert(len > 0); if (off > new_size) { ceph_assert(off > append_after); bl.prepend_zero(off - new_size); len += off - new_size; ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align " << off << "->" << new_size << dendl; off = new_size; } if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) { uint64_t aligned_end = sinfo.logical_to_next_stripe_offset( end); uint64_t tail = aligned_end - end; bl.append_zero(tail); ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end " << end << "->" << end+tail << ", len: " << len << "->" << len+tail << dendl; end += tail; len += tail; } to_write.insert(off, len, bl); if (end > new_size) new_size = end; } if (op.truncate && op.truncate->second > new_size) { ceph_assert(op.truncate->second > append_after); uint64_t truncate_to = sinfo.logical_to_next_stripe_offset( op.truncate->second); uint64_t zeroes = truncate_to - new_size; bufferlist bl; bl.append_zero(zeroes); to_write.insert( new_size, zeroes, bl); new_size = truncate_to; ldpp_dout(dpp, 20) << __func__ << ": truncating out to " << truncate_to << dendl; } set want; for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) { want.insert(i); } auto to_overwrite = to_write.intersect(0, append_after); ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: " << to_overwrite << dendl; for (auto &&extent: to_overwrite) { ceph_assert(extent.get_off() + extent.get_len() <= append_after); ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off())); ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len())); if (entry) { uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset( extent.get_off()); uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset( extent.get_len()); ldpp_dout(dpp, 20) << __func__ << ": overwriting " << restore_from << "~" << restore_len << dendl; if (rollback_extents.empty()) { for (auto &&st : *transactions) { st.second.touch( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, entry->version.version, st.first)); } } rollback_extents.emplace_back(make_pair(restore_from, restore_len)); for (auto &&st : *transactions) { st.second.clone_range( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), ghobject_t(oid, entry->version.version, st.first), restore_from, restore_len, restore_from); } } encode_and_write( pgid, oid, sinfo, ecimpl, want, extent.get_off(), extent.get_val(), fadvise_flags, hinfo, written, transactions, dpp); } auto to_append = to_write.intersect( append_after, std::numeric_limits::max() - append_after); ldpp_dout(dpp, 20) << __func__ << ": to_append: " << to_append << dendl; for (auto &&extent: to_append) { ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off())); ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len())); ldpp_dout(dpp, 20) << __func__ << ": appending " << extent.get_off() << "~" << extent.get_len() << dendl; encode_and_write( pgid, oid, sinfo, ecimpl, want, extent.get_off(), extent.get_val(), fadvise_flags, hinfo, written, transactions, dpp); } ldpp_dout(dpp, 20) << __func__ << ": " << oid << " resetting hinfo to logical size " << new_size << dendl; if (!rollback_extents.empty() && entry) { if (entry) { ldpp_dout(dpp, 20) << __func__ << ": " << oid << " marking rollback extents " << rollback_extents << dendl; entry->mod_desc.rollback_extents( entry->version.version, rollback_extents); } hinfo->set_total_chunk_size_clear_hash( sinfo.aligned_logical_offset_to_chunk_offset(new_size)); } else { ceph_assert(hinfo->get_total_logical_size(sinfo) == new_size); } if (entry && !to_append.empty()) { ldpp_dout(dpp, 20) << __func__ << ": marking append " << append_after << dendl; entry->mod_desc.append(append_after); } if (!op.is_delete()) { bufferlist hbuf; encode(*hinfo, hbuf); for (auto &&i : *transactions) { i.second.setattr( coll_t(spg_t(pgid, i.first)), ghobject_t(oid, ghobject_t::NO_GEN, i.first), ECUtil::get_hinfo_key(), hbuf); } } }); }