Skip to content

Instantly share code, notes, and snippets.

@andrwng
Created November 15, 2017 16:25
Show Gist options
  • Save andrwng/420397e5ca5d6838a395fea11444ba0f to your computer and use it in GitHub Desktop.
Save andrwng/420397e5ca5d6838a395fea11444ba0f to your computer and use it in GitHub Desktop.
commit cbe7087ac3539cf342135436f230c36f7812203e
Author: Andrew Wong <awong@cloudera.com>
Date: Wed Nov 15 08:04:09 2017 -0800
Error handle on the way up
Change-Id: If338be326dd9922c8c6cc2a03bfeebd85894c208
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 613d0e2..20d8577 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -34,6 +34,7 @@
#include "kudu/common/timestamp.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/casts.h"
@@ -715,14 +716,19 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
LOG_WITH_PREFIX(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
// Now, actually flush the contents of the old DMS.
- // NOTE: failures here leave the stores inconsistent with each other. This is
- // only "safe" because errors are passed up and are checked that the tablet
- // is stopped from doing anything else.
- //
// TODO(todd): need another lock to prevent concurrent flushers
// at some point.
+ //
+ // NOTE: failures here leave the stores inconsistent with each other. This is
+ // only "safe" because the tablet is stopped from doing anything else.
shared_ptr<DeltaFileReader> dfr;
- RETURN_NOT_OK_PREPEND(FlushDMS(old_dms.get(), &dfr, flush_type), "Failed to flush DMS");
+ Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << "Failed to flush DMS";
+ ErrorManager* em = rowset_metadata_->fs_manager()->error_manager();
+ const string& rowset_metadata_->tablet_metadata()->tablet_id();
+ em->RunErrorNotification(ErrorHandlerType::TABLET, tablet_id);
+ }
// Now, re-take the lock and swap in the DeltaFileReader in place of
// of the DeltaMemStore
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 9ba8a60..9a268f1 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -37,6 +37,7 @@
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/tablet/cfile_set.h"
@@ -563,11 +564,6 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
- // NOTE: Errors in this method leave the diskrowset in an inconsistent state,
- // with the rowset metadata, base data, or delta tracker out of sync with the
- // others. This is only "safe" because errors are passed up and eventually
- // checked to ensure the tablet is stopped from doing anything else.
-
// TODO(todd): do we need to lock schema or anything here?
gscoped_ptr<MajorDeltaCompaction> compaction;
RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts), &compaction));
@@ -580,6 +576,16 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
vector<BlockId> removed_blocks;
rowset_metadata_->CommitUpdate(update);
+ // NOTE: Errors in this method leave the diskrowset in an inconsistent state,
+ // with the rowset metadata, base data, or delta tracker out of sync with the
+ // others. This is only "safe" because errors are passed up and eventually
+ // checked to ensure the tablet is stopped from doing anything else.
+ auto fail_on_error = MakeScopedCleanup([&] {
+ ErrorManager* em = rowset_metadata_->fs_manager()->error_manager();
+ const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
+ em->RunErrorNotificationCb(ErrorHandlerType::TABLET, tablet_id);
+ });
+
// Now open a new cfile set with the updated metadata.
shared_ptr<CFileSet> new_base;
RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
@@ -592,6 +598,7 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
base_data_.swap(new_base);
}
+ fail_on_error.cancel();
// Even if we don't successfully flush we don't have consistency problems in
// the case of major delta compaction -- we are not adding additional
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index a2e403c..790c07d 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2124,6 +2124,11 @@ Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_d
}
}
+ auto fail_on_error = MakeScopedCleanup([&] {
+ ErrorManager* em = metadata_->fs_manager()->error_manager();
+ em->RunErrorNotificationCb(ErrorHandlerType::TYPE, tablet_id());
+ });
+
int64_t tablet_blocks_deleted = 0;
int64_t tablet_bytes_deleted = 0;
for (const auto& rowset : rowsets_to_gc_undos) {
@@ -2139,6 +2144,7 @@ Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_d
if (tablet_blocks_deleted > 0) {
RETURN_NOT_OK(metadata_->Flush());
}
+ fail_on_error.cancel();
MonoDelta tablet_delete_duration = MonoTime::Now() - tablet_delete_start;
metrics_->undo_delta_block_gc_bytes_deleted->IncrementBy(tablet_bytes_deleted);
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 3bcf833..d9b20bc 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -105,13 +105,14 @@ bool CompactRowSetsOp::Prepare() {
}
void CompactRowSetsOp::Perform() {
- Status s = tablet_->Compact(Tablet::COMPACT_NO_FLAGS);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
+ Substitute("$0Compaction failed on $1", LogPrefix(), tablet_->tablet_id()));
+ // if (PREDICT_FALSE(!s.ok())) {
+ // LOG(WARNING) << Substitute("$0Compaction failed on $1: $2",
+ // LogPrefix(), tablet_->tablet_id(), s.ToString());
+ // CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
+ // "has been stopped first";
+ // }
}
scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
@@ -180,13 +181,9 @@ bool MinorDeltaCompactionOp::Prepare() {
}
void MinorDeltaCompactionOp::Perform() {
- Status s = tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Minor delta compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
+ Substitute("$0Minor delta compaction failed on $1",
+ LogPrefix(), tablet_->tablet_id());
}
scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
@@ -260,13 +257,9 @@ bool MajorDeltaCompactionOp::Prepare() {
}
void MajorDeltaCompactionOp::Perform() {
- Status s = tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0Major delta compaction failed on $1: $2",
- LogPrefix(), tablet_->tablet_id(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
+ Substitute("$0Major delta compaction failed on $1",
+ LogPrefix(), tablet_->tablet_id());
}
scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
@@ -311,13 +304,8 @@ void UndoDeltaBlockGCOp::Perform() {
// Return early if it turns out that we have nothing to GC.
if (bytes_in_ancient_undos == 0) return;
- s = tablet_->DeleteAncientUndoDeltas();
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << Substitute("$0GC of undo delta blocks failed: $1",
- LogPrefix(), s.ToString());
- CHECK(tablet_->HasBeenStopped()) << "Failure is only allowed if the tablet "
- "has been stopped first";
- }
+ WARN_NOT_OK(tablet_->DeleteAncientUndoDeltas(),
+ Substitute("$0GC of undo delta blocks failed", LogPrefix()));
}
scoped_refptr<Histogram> UndoDeltaBlockGCOp::DurationHistogram() const {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment