Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,15 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
}
if c.be != nil && shouldApplyV3 {
unsafeSaveMemberToBackend(c.lg, c.be, m)

// Ideally, each member only updates its own attributes
// once before serving requests. Since this happens only
// on startup and at a low frequency, it should be safe
// to clean up zombie members in this request.
if id == c.localID && c.v2store != nil {
c.lg.Info("checking and cleaning up zombie members after attribute update")
CleanupZombieMembersIfNeeded(c.lg, c.be, c.v2store)
}
Comment on lines +503 to +510
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the conversation with the reporter, they had never performed the etcdctl snapshot restore operation when they were on v3.4.x. So there are still some unknown reasons. I think we need to syncMemershipDataFromV2ToV3 after each ConfChange as mentioned in #20967 (comment). Usually there aren't too many ConfChange, so the impact on performance should be minimal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already perform the sync (v2 -> v3store) right after each ConfChange, we don't need to do it separately here anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle snapshot and v2 publish when users don't do any confchange.

for example, there are 3 running members with zombie information. user upgrades etcd member binary one by one.
since publish action is using v2 request, we need to cover it in v2 applier.

// publish registers server information into the cluster. The information

and if one v3.5 member receives snapshot with zombie information, we should cover it as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think we have three places to fix that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle snapshot and v2 publish when users don't do any confchange.

OK, I thought the publish also goes through confChange path. It turned out to be the NormalEntry path.

}
return
}
Expand Down Expand Up @@ -563,6 +572,15 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
)
}

func (c *RaftCluster) CleanupZombieMembersIfNeeded(shouldApplyV3 ShouldApplyV3) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already checked ShouldApplyV3 before calling this method.

Suggested change
func (c *RaftCluster) CleanupZombieMembersIfNeeded(shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) CleanupZombieMembersIfNeeded() {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to align it with existing interface and make sure we don't perform this function without checking ShouldApplyV3.

c.Lock()
defer c.Unlock()

if c.v2store != nil && c.be != nil && shouldApplyV3 {
CleanupZombieMembersIfNeeded(c.lg, c.be, c.v2store)
}
}

func (c *RaftCluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -766,6 +784,21 @@ func SyncLearnerPromotionIfNeeded(lg *zap.Logger, be backend.Backend, st v2store
return nil
}

func CleanupZombieMembersIfNeeded(lg *zap.Logger, be backend.Backend, st v2store.Store) {
v2Members, _ := membersFromStore(lg, st)
v3Members, _ := membersFromBackend(lg, be)

for id, v3Member := range v3Members {
_, ok := v2Members[id]
if !ok {
lg.Warn("Removing zombie member from v3store", zap.String("member", fmt.Sprintf("%+v", *v3Member)))
if err := unsafeDeleteMemberFromBackend(be, id); err != nil {
lg.Warn("failed to delete zombie member from backend", zap.String("member-id", id.String()), zap.Error(err))
}
}
}
}
Comment on lines +787 to +800
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking probably we should sync from v2store to v3store to address the following cases:

  • zombie members in v3store (exist in v3, but not in v2). It's already reproduced and confirmed.
  • missing members in v3store (exist in v2, but not in v3). We haven't see any reproduction for such case so far. Since v2store is the only source of truth for membership data for v3.5.x and older versions, so we should completely trust v2store, so probably we should take a proactive measure to sync from v2store to v3store for such case as well. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should completely trust v2store, so probably we should take a proactive measure to sync from v2store to v3store for such case as well. WDYT?

Agree so we can sync after confChange

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, I am thinking probably we should handle the case missing members in v3store (exist in v2, but not in v3) as well, although we haven't seen any reproduction for such case so far.


func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {

lg.Info("restored v2 store")

lg.Info("checking and cleaning up zombie members during snapshot restore")
membership.CleanupZombieMembersIfNeeded(lg, newbe, s.v2store)
if err = membership.SyncLearnerPromotionIfNeeded(lg, newbe, s.v2store); err != nil {
lg.Error("Failed to sync learner promotion for v3store", zap.Error(err))
}
Expand Down Expand Up @@ -2266,6 +2268,12 @@ func (s *EtcdServer) apply(
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf

if shouldApplyV3 {
s.lg.Info("checking and cleaning up zombie members if needed after confState changed")
s.cluster.CleanupZombieMembersIfNeeded(shouldApplyV3)
}

s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})

default:
Expand Down
23 changes: 23 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ function dep_pass {

function release_pass {
rm -f ./bin/etcd-last-release
rm -f ./bin/etcdctl-last-release
rm -f ./bin/etcdctl-v3439-release

# Work out the previous minor release based on the version reported by etcd binary
binary_version=$(./bin/etcd --version | grep --only-matching --perl-regexp '(?<=etcd Version: )\d+\.\d+')
Expand Down Expand Up @@ -695,6 +697,27 @@ function release_pass {
tar xzvf "/tmp/$file" -C /tmp/ --strip-components=1 --no-same-owner
mkdir -p ./bin
mv /tmp/etcd ./bin/etcd-last-release
mv /tmp/etcdctl ./bin/etcdctl-last-release

# We also need etcdctl v3.4.39 binary to reproduce zombie members.
#
# See https://github.com/etcd-io/etcd/issues/20967
log_callout "Downloading v3.4.39 etcdctl"
v3439_file="etcd-v3.4.39-linux-$GOARCH.tar.gz"

set +e
curl --fail -L "https://github.com/etcd-io/etcd/releases/download/v3.4.39/$v3439_file" -o "/tmp/$v3439_file"
local result=$?
set -e
case $result in
0) ;;
*) log_error "--- FAIL:" ${result}
return $result
;;
esac

tar xzvf "/tmp/$v3439_file" -C /tmp/ --strip-components=1 --no-same-owner
mv /tmp/etcdctl ./bin/etcdctl-v3439-release
}

function release_tests_pass {
Expand Down
150 changes: 150 additions & 0 deletions tests/e2e/reproduce_20967_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package e2e

import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"

"go.etcd.io/bbolt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/datadir"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestIssue20967_Upgrade(t *testing.T) {
e2e.BeforeTest(t)

snapshotCount := 20

t.Log("Creating cluster with zombie members...")
epc := createClustewithZombieMembers(t, snapshotCount)

t.Log("Upgradeing cluster to the new version...")
for _, proc := range epc.Procs {
serverCfg := proc.Config()

t.Logf("Stopping node: %v", serverCfg.Name)
require.NoError(t, proc.Stop(), "error closing etcd process (%v)", serverCfg.Name)

serverCfg.ExecPath = e2e.BinPath
serverCfg.KeepDataDir = true

t.Logf("Restarting node in the new version: %v", serverCfg.Name)
require.NoError(t, proc.Restart(), "error restarting etcd process (%v)", serverCfg.Name)
}

t.Log("Cluster upgraded to current version successfully")
epc.WaitLeader(t)

t.Log("Verifying zombie members are removed from v3store...")
require.NoError(t, epc.Stop())
for _, proc := range epc.Procs {
members := readMembersFromV3Store(t, proc.Config().DataDirPath)
require.Lenf(t, members, 3, "expected 3 members in v3store after upgrade, got %v", members)
}

t.Log("Restarting cluster")
require.NoError(t, epc.Start())
epc.WaitLeader(t)

t.Log("Writing key/values")
for i := 0; i < snapshotCount; i++ {
k, v := fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)

require.NoError(t, epc.Procs[i%len(epc.Procs)].Etcdctl(e2e.ClientNonTLS, false, false).Put(k, v))
}
}

func TestIssue20967_Snapshot(t *testing.T) {
e2e.BeforeTest(t)

snapshotCount := 20
keyCount := snapshotCount

t.Log("Creating cluster with zombie members...")
epc := createClustewithZombieMembers(t, snapshotCount)

lastProc := epc.Procs[2]
t.Logf("Stopping last member %s", lastProc.Config().Name)
require.NoError(t, lastProc.Stop())

epc.WaitLeader(t)

cli, err := clientv3.New(clientv3.Config{Endpoints: epc.Procs[0].EndpointsGRPC()})
require.NoError(t, err)
defer cli.Close()

t.Log("Writing key/values to trigger snapshot...")
for i := 0; i < keyCount; i++ {
k, v := fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)

_, err = cli.Put(t.Context(), k, v)
require.NoErrorf(t, err, "failed to put key %s", k)
}
cli.Close()

t.Log("Restart the first two members to drop pre-snapshot in-memory log entries," +
"ensuring the last member is forced to recover from the snapshot." +
"(Note: SnapshotCatchUpEntries in v3.4.x is a fixed value of 5000.)")
require.NoError(t, epc.Procs[0].Restart())
require.NoError(t, epc.Procs[1].Restart())
epc.WaitLeader(t)

t.Logf("Restarting last member %s with new version", lastProc.Config().Name)
lastProc.Config().ExecPath = e2e.BinPath
require.NoError(t, lastProc.Start(), "failed to start member process")

t.Logf("Verifying last member %s was recovered from snapshot sent by leader", lastProc.Config().Name)
found := false
for _, line := range lastProc.Logs().Lines() {
if strings.Contains(line, "applied snapshot") {
t.Logf("Found %s", line)
found = true
break
}
}
require.True(t, found, "last member did not receive snapshot from leader")

for i := 0; i < keyCount; i++ {
k, v := fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)

value, err := lastProc.Etcdctl(e2e.ClientNonTLS, false, false).Get(k)
require.NoErrorf(t, err, "failed to get key %s from rejoined member", k)

require.Len(t, value.Kvs, 1)
require.Equal(t, v, string(value.Kvs[0].Value))
}

t.Log("Verifying zombie members for last member")
require.NoError(t, lastProc.Stop())
members := readMembersFromV3Store(t, lastProc.Config().DataDirPath)
require.Lenf(t, members, 3, "expected 3 members in v3store after upgrade, got %v", members)
}

// readMembersFromV3Store read all members from the v3store in the given dataDir.
func readMembersFromV3Store(t *testing.T, dataDir string) []membership.Member {
dbPath := datadir.ToBackendFileName(dataDir)
db, err := bbolt.Open(dbPath, 0400, &bbolt.Options{ReadOnly: true})
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

var members []membership.Member
_ = db.View(func(tx *bbolt.Tx) error {
return tx.Bucket(buckets.Members.Name()).ForEach(func(_, v []byte) error {
m := membership.Member{}
err := json.Unmarshal(v, &m)
require.NoError(t, err)

members = append(members, m)
return nil
})
})
return members
}
Loading