Skip to content

Instantly share code, notes, and snippets.

@andrwng
Created November 15, 2017 16:49
Show Gist options
  • Save andrwng/8ea8957dfec6e944e0dc4436474b5001 to your computer and use it in GitHub Desktop.
Save andrwng/8ea8957dfec6e944e0dc4436474b5001 to your computer and use it in GitHub Desktop.
commit c1edad735f3520ca60190a0016f0ac118156a8f3
Author: Andrew Wong <awong@cloudera.com>
Date: Wed Nov 15 08:04:09 2017 -0800
Trigger error handling upon leaving carnage
Change-Id: If338be326dd9922c8c6cc2a03bfeebd85894c208
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index faf9aa0..3610df4 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -240,6 +240,10 @@ class FsManager {
return block_manager_.get();
}
+ fs::FsErrorManager* error_manager() {
+ return error_manager_.get();
+ }
+
private:
FRIEND_TEST(FsManagerTestBase, TestDuplicatePaths);
friend class itest::ExternalMiniClusterFsInspector; // for access to directory names
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 613d0e2..786e92d 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -34,6 +34,7 @@
#include "kudu/common/timestamp.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/casts.h"
@@ -62,6 +63,7 @@ namespace tablet {
using cfile::ReaderOptions;
using fs::CreateBlockOptions;
+using fs::FsErrorManager;
using fs::ReadableBlock;
using fs::WritableBlock;
using log::LogAnchorRegistry;
@@ -715,14 +717,19 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
LOG_WITH_PREFIX(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
// Now, actually flush the contents of the old DMS.
- // NOTE: failures here leave the stores inconsistent with each other. This is
- // only "safe" because errors are passed up and are checked that the tablet
- // is stopped from doing anything else.
- //
// TODO(todd): need another lock to prevent concurrent flushers
// at some point.
+ //
+ // NOTE: failures here leave the stores inconsistent with each other. This is
+ // only "safe" because the tablet is stopped from doing anything else.
shared_ptr<DeltaFileReader> dfr;
- RETURN_NOT_OK_PREPEND(FlushDMS(old_dms.get(), &dfr, flush_type), "Failed to flush DMS");
+ Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << "Failed to flush DMS";
+ FsErrorManager* em = rowset_metadata_->fs_manager()->error_manager();
+ const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
+ em->RunErrorNotificationCb(fs::ErrorHandlerType::TABLET, tablet_id);
+ }
// Now, re-take the lock and swap in the DeltaFileReader in place of
// of the DeltaMemStore
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 9ba8a60..1db20bf 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -37,6 +37,7 @@
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/tablet/cfile_set.h"
@@ -54,6 +55,7 @@
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
@@ -85,6 +87,7 @@ using cfile::BloomFileWriter;
using fs::BlockManager;
using fs::BlockCreationTransaction;
using fs::CreateBlockOptions;
+using fs::FsErrorManager;
using fs::WritableBlock;
using log::LogAnchorRegistry;
using std::shared_ptr;
@@ -563,11 +566,6 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
- // NOTE: Errors in this method leave the diskrowset in an inconsistent state,
- // with the rowset metadata, base data, or delta tracker out of sync with the
- // others. This is only "safe" because errors are passed up and eventually
- // checked to ensure the tablet is stopped from doing anything else.
-
// TODO(todd): do we need to lock schema or anything here?
gscoped_ptr<MajorDeltaCompaction> compaction;
RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts), &compaction));
@@ -580,6 +578,16 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
vector<BlockId> removed_blocks;
rowset_metadata_->CommitUpdate(update);
+ // NOTE: Errors in this method leave the diskrowset in an inconsistent state,
+ // with the rowset metadata, base data, or delta tracker out of sync with the
+ // others. This is only "safe" because errors are passed up and eventually
+ // checked to ensure the tablet is stopped from doing anything else.
+ auto fail_on_error = MakeScopedCleanup([&] {
+ FsErrorManager* em = rowset_metadata_->fs_manager()->error_manager();
+ const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
+ em->RunErrorNotificationCb(fs::ErrorHandlerType::TABLET, tablet_id);
+ });
+
// Now open a new cfile set with the updated metadata.
shared_ptr<CFileSet> new_base;
RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
@@ -592,6 +600,7 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
base_data_.swap(new_base);
}
+ fail_on_error.cancel();
// Even if we don't successfully flush we don't have consistency problems in
// the case of major delta compaction -- we are not adding additional
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index a2e403c..08f1225 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -87,6 +87,7 @@
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/throttler.h"
@@ -173,6 +174,7 @@ METRIC_DEFINE_gauge_size(tablet, on_disk_data_size, "Tablet Data Size On Disk",
using kudu::MaintenanceManager;
using kudu::clock::HybridClock;
+using kudu::fs::FsErrorManager;
using kudu::log::LogAnchorRegistry;
using std::ostream;
using std::pair;
@@ -2124,6 +2126,11 @@ Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_d
}
}
+ auto fail_on_error = MakeScopedCleanup([&] {
+ FsErrorManager* em = metadata_->fs_manager()->error_manager();
+ em->RunErrorNotificationCb(fs::ErrorHandlerType::TABLET, tablet_id());
+ });
+
int64_t tablet_blocks_deleted = 0;
int64_t tablet_bytes_deleted = 0;
for (const auto& rowset : rowsets_to_gc_undos) {
@@ -2139,6 +2146,7 @@ Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_d
if (tablet_blocks_deleted > 0) {
RETURN_NOT_OK(metadata_->Flush());
}
+ fail_on_error.cancel();
MonoDelta tablet_delete_duration = MonoTime::Now() - tablet_delete_start;
metrics_->undo_delta_block_gc_bytes_deleted->IncrementBy(tablet_bytes_deleted);
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 3bcf833..25b9d9b 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -105,13 +105,8 @@ bool CompactRowSetsOp::Prepare() {
}
void CompactRowSetsOp::Perform() {
- Status s = tablet_->Compact(Tablet::COMPACT_NO_FLAGS);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
+ Substitute("$0Compaction failed on $1", LogPrefix(), tablet_->tablet_id()));
}
scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
@@ -180,13 +175,9 @@ bool MinorDeltaCompactionOp::Prepare() {
}
void MinorDeltaCompactionOp::Perform() {
- Status s = tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Minor delta compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
+ Substitute("$0Minor delta compaction failed on $1",
+ LogPrefix(), tablet_->tablet_id()));
}
scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
@@ -260,13 +251,9 @@ bool MajorDeltaCompactionOp::Prepare() {
}
void MajorDeltaCompactionOp::Perform() {
- Status s = tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Major delta compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
+ Substitute("$0Major delta compaction failed on $1",
+ LogPrefix(), tablet_->tablet_id()));
}
scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
@@ -311,13 +298,8 @@ void UndoDeltaBlockGCOp::Perform() {
// Return early if it turns out that we have nothing to GC.
if (bytes_in_ancient_undos == 0) return;
- s = tablet_->DeleteAncientUndoDeltas();
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0GC of undo delta blocks failed: $1",
- LogPrefix(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->DeleteAncientUndoDeltas(),
+ Substitute("$0GC of undo delta blocks failed", LogPrefix()));
}
scoped_refptr<Histogram> UndoDeltaBlockGCOp::DurationHistogram() const {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment