From 6f85296c503e0c46a476040e4a6045ac891dfdca Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Mon, 1 Sep 2025 18:21:03 +0200 Subject: [PATCH 01/11] feat(cmd): setup init stake cmd --- cmd/beekeeper/cmd/cmd.go | 4 ++++ cmd/beekeeper/cmd/stake.go | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 cmd/beekeeper/cmd/stake.go diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 36cdf5aa..979340bc 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -146,6 +146,10 @@ func newCommand(opts ...option) (c *command, err error) { return nil, err } + if err := c.initStakeCmd(); err != nil { + return nil, err + } + c.initVersionCmd() return c, nil diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go new file mode 100644 index 00000000..fab41517 --- /dev/null +++ b/cmd/beekeeper/cmd/stake.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +func (c *command) initStakeCmd() (err error) { + cmd := &cobra.Command{ + Use: "stake", + Short: "Stakes Bee nodes", + Long: `Stakes Bee nodes with BZZ tokens and ETH for Bee node operations.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return nil + }, + PreRunE: c.preRunE, + } + + c.root.AddCommand(cmd) + + return nil +} + From 34e3bcd3917762fe4cc238461ce609b98af598c4 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Mon, 1 Sep 2025 20:33:24 +0200 Subject: [PATCH 02/11] feat(cmd): add deposit command for staking Bee nodes --- cmd/beekeeper/cmd/stake.go | 127 +++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 14 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index fab41517..5b3c87a8 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -1,22 +1,121 @@ package cmd import ( - "github.com/spf13/cobra" + "context" + "errors" + "fmt" + "math/big" + + "github.com/spf13/cobra" +) + +const ( + optionNameAmount = "amount" +) + +var ( + errMissingStakeAmount = errors.New("stake amount not provided") + errInvalidAmount = errors.New("invalid stake amount") ) func (c *command) initStakeCmd() (err error) { - cmd := &cobra.Command{ - Use: "stake", - Short: "Stakes Bee nodes", - Long: `Stakes Bee nodes with BZZ tokens and ETH for Bee node operations.`, - RunE: func(cmd *cobra.Command, args []string) (err error) { - return nil - }, - PreRunE: c.preRunE, - } - - c.root.AddCommand(cmd) - - return nil + cmd := &cobra.Command{ + Use: "stake", + Short: "Stakes Bee nodes", + Long: `Stakes Bee nodes with BZZ tokens and ETH for Bee node operations.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return cmd.Help() + }, + PreRunE: c.preRunE, + } + + cmd.AddCommand(c.initStakeDeposit()) + + c.root.AddCommand(cmd) + + return nil } +func (c *command) initStakeDeposit() *cobra.Command { + cmd := &cobra.Command{ + Use: "deposit", + Short: "Deposit stake on Bee nodes", + Long: `Deposits a specified amount of BZZ as stake on targeted Bee nodes.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + amount, err := cmd.Flags().GetString(optionNameAmount) + if err != nil { + return fmt.Errorf("error reading amount flag: %w", err) + } + + if amount == "" { + return errMissingStakeAmount + } + + stakeAmount, ok := new(big.Int).SetString(amount, 10) + if !ok { + return errInvalidAmount + } + + if stakeAmount.Cmp(big.NewInt(0)) <= 0 { + return errInvalidAmount + } + + fmt.Printf("Amount validated: %s WEI\n", amount) + + clusterName, err := cmd.Flags().GetString(optionNameClusterName) + if err != nil { + return fmt.Errorf("error reading cluster-name flag: %w", err) + } + + if clusterName == "" { + return fmt.Errorf("cluster-name is required") + } + + fmt.Printf("Targeting cluster: %s\n", clusterName) + + // Setup cluster and get node clients + ctx := context.Background() + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + } + + clients, err := cluster.NodesClients(ctx) + if err != nil { + return fmt.Errorf("failed to get node clients: %w", err) + } + + fmt.Printf("Found %d nodes in cluster\n", len(clients)) + + fmt.Printf("Starting stake deposit of %s WEI on %d nodes...\n", amount, len(clients)) + + var errorCount int + for nodeName, client := range clients { + fmt.Printf("Depositing stake on node %s...\n", nodeName) + + txHash, err := client.DepositStake(ctx, stakeAmount) + if err != nil { + errorCount++ + fmt.Printf("%s\n", fmt.Sprintf("node %s: %v", nodeName, err)) + continue + } + + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", nodeName, txHash) + } + + if errorCount > 0 { + return fmt.Errorf("stake deposit completed with %d errors", errorCount) + } + + fmt.Printf("Stake deposit completed successfully on all %d nodes!\n", len(clients)) + return nil + }, + } + + cmd.Flags().String(optionNameAmount, "", "Stake amount in WEI (required)") + cmd.MarkFlagRequired(optionNameAmount) + cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name (required)") + cmd.MarkFlagRequired(optionNameClusterName) + + return cmd +} From 93c8a24609a159167d70bb87fe225c60b1d99c04 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Mon, 1 Sep 2025 20:43:01 +0200 Subject: [PATCH 03/11] fix(cmd): handle errors when marking flags as required in stake command --- cmd/beekeeper/cmd/stake.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 5b3c87a8..8d4ebd21 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -113,9 +113,13 @@ func (c *command) initStakeDeposit() *cobra.Command { } cmd.Flags().String(optionNameAmount, "", "Stake amount in WEI (required)") - cmd.MarkFlagRequired(optionNameAmount) + if err := cmd.MarkFlagRequired(optionNameAmount); err != nil { + return nil + } cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name (required)") - cmd.MarkFlagRequired(optionNameClusterName) + if err := cmd.MarkFlagRequired(optionNameClusterName); err != nil { + return nil + } return cmd } From 2bd7b5e78acdce3a75757e98c05ae7de03e84a89 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Tue, 2 Sep 2025 14:39:13 +0200 Subject: [PATCH 04/11] feat(cmd): add parallel execution for stake deposit command and improve error handling --- cmd/beekeeper/cmd/stake.go | 79 ++++++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 16 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 8d4ebd21..3f7369d4 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -5,12 +5,16 @@ import ( "errors" "fmt" "math/big" + "strings" + "sync" + "github.com/ethersphere/beekeeper/pkg/bee" "github.com/spf13/cobra" ) const ( - optionNameAmount = "amount" + optionNameAmount = "amount" + optionNameParallel = "parallel" ) var ( @@ -67,10 +71,6 @@ func (c *command) initStakeDeposit() *cobra.Command { return fmt.Errorf("error reading cluster-name flag: %w", err) } - if clusterName == "" { - return fmt.Errorf("cluster-name is required") - } - fmt.Printf("Targeting cluster: %s\n", clusterName) // Setup cluster and get node clients @@ -87,22 +87,53 @@ func (c *command) initStakeDeposit() *cobra.Command { fmt.Printf("Found %d nodes in cluster\n", len(clients)) - fmt.Printf("Starting stake deposit of %s WEI on %d nodes...\n", amount, len(clients)) + parallel, err := cmd.Flags().GetInt(optionNameParallel) + if err != nil { + fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") + parallel = 5 + } + + if parallel <= 0 { + fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) + parallel = 5 + } - var errorCount int - for nodeName, client := range clients { - fmt.Printf("Depositing stake on node %s...\n", nodeName) + if parallel > len(clients) { + fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, len(clients), len(clients)) + parallel = len(clients) + } + + fmt.Printf("Starting stake deposit of %s WEI on %d nodes with %d parallel operations...\n", amount, len(clients), parallel) - txHash, err := client.DepositStake(ctx, stakeAmount) - if err != nil { - errorCount++ - fmt.Printf("%s\n", fmt.Sprintf("node %s: %v", nodeName, err)) - continue - } + var errorCount int + var mu sync.Mutex + semaphore := make(chan struct{}, parallel) + var wg sync.WaitGroup - fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", nodeName, txHash) + for nodeName, client := range clients { + wg.Add(1) + go func(name string, cl *bee.Client) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Depositing stake on node %s...\n", name) + + txHash, err := cl.DepositStake(ctx, stakeAmount) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("%s\n", c.formatStakeError(name, err)) + return + } + + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", name, txHash) + }(nodeName, client) } + wg.Wait() + if errorCount > 0 { return fmt.Errorf("stake deposit completed with %d errors", errorCount) } @@ -120,6 +151,22 @@ func (c *command) initStakeDeposit() *cobra.Command { if err := cmd.MarkFlagRequired(optionNameClusterName); err != nil { return nil } + cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") return cmd } + +// formatStakeError formats stake-related errors with user-friendly messages +func (c *command) formatStakeError(nodeName string, err error) string { + errorStr := err.Error() + + if strings.Contains(errorStr, "out of funds") { + return fmt.Sprintf("node %s: insufficient BZZ balance (fund the node wallet first)", nodeName) + } else if strings.Contains(errorStr, "insufficient stake amount") { + return fmt.Sprintf("node %s: stake amount too low (increase the amount)", nodeName) + } else if strings.Contains(errorStr, "503") { + return fmt.Sprintf("node %s: service temporarily unavailable (node might be starting up)", nodeName) + } else { + return fmt.Sprintf("node %s: %v", nodeName, err) + } +} From 759105189f5337bcca6d5262e1bd95e887b8f638 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Tue, 2 Sep 2025 15:30:39 +0200 Subject: [PATCH 05/11] feat(cmd): cap parallel operations in stake deposit command to prevent network overload --- cmd/beekeeper/cmd/stake.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 3f7369d4..3cddc8cb 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -15,6 +15,7 @@ import ( const ( optionNameAmount = "amount" optionNameParallel = "parallel" + maxParallel = 10 ) var ( @@ -103,6 +104,12 @@ func (c *command) initStakeDeposit() *cobra.Command { parallel = len(clients) } + // Cap parallel operations to prevent network overload + if parallel > maxParallel { + fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) + parallel = maxParallel + } + fmt.Printf("Starting stake deposit of %s WEI on %d nodes with %d parallel operations...\n", amount, len(clients), parallel) var errorCount int From bd3cd3fcfb690e39079db6b1841569ebbd938086 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Tue, 2 Sep 2025 19:42:45 +0200 Subject: [PATCH 06/11] feat(cmd): enhance stake deposit command to support namespace targeting --- cmd/beekeeper/cmd/stake.go | 148 ++++++++++++++++++++++++++----------- 1 file changed, 105 insertions(+), 43 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 3cddc8cb..bb0e4a90 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/node" "github.com/spf13/cobra" ) @@ -65,29 +66,63 @@ func (c *command) initStakeDeposit() *cobra.Command { return errInvalidAmount } - fmt.Printf("Amount validated: %s WEI\n", amount) + namespace, err := cmd.Flags().GetString(optionNameNamespace) + if err != nil { + return fmt.Errorf("error reading namespace flag: %w", err) + } clusterName, err := cmd.Flags().GetString(optionNameClusterName) if err != nil { return fmt.Errorf("error reading cluster-name flag: %w", err) } - fmt.Printf("Targeting cluster: %s\n", clusterName) - - // Setup cluster and get node clients - ctx := context.Background() - cluster, err := c.setupCluster(ctx, clusterName, false) - if err != nil { - return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + if clusterName == "" && namespace == "" { + return fmt.Errorf("either cluster-name or namespace must be provided") } - clients, err := cluster.NodesClients(ctx) - if err != nil { - return fmt.Errorf("failed to get node clients: %w", err) + ctx := context.Background() + var clients map[string]*bee.Client + var nodes node.NodeList + + if namespace != "" { + fmt.Printf("Targeting namespace: %s\n", namespace) + + labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) + if err != nil { + return fmt.Errorf("error reading label-selector flag: %w", err) + } + + nodeClient := node.New(&node.ClientConfig{ + Log: c.log, + HTTPClient: c.httpClient, + K8sClient: c.k8sClient, + BeeClients: nil, + Namespace: namespace, + LabelSelector: labelSelector, + DeploymentType: node.DeploymentTypeBeekeeper, + InCluster: c.globalConfig.GetBool(optionNameInCluster), + UseNamespace: true, + }) + + nodes, err = nodeClient.GetNodes(ctx) + if err != nil { + return fmt.Errorf("getting nodes: %w", err) + } + + clients = make(map[string]*bee.Client) + } else { + fmt.Printf("Targeting cluster: %s\n", clusterName) + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + } + + clients, err = cluster.NodesClients(ctx) + if err != nil { + return fmt.Errorf("failed to get node clients: %w", err) + } } - fmt.Printf("Found %d nodes in cluster\n", len(clients)) - parallel, err := cmd.Flags().GetInt(optionNameParallel) if err != nil { fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") @@ -99,44 +134,72 @@ func (c *command) initStakeDeposit() *cobra.Command { parallel = 5 } - if parallel > len(clients) { - fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, len(clients), len(clients)) - parallel = len(clients) + nodeCount := len(clients) + if namespace != "" { + nodeCount = len(nodes) + } + + if parallel > nodeCount { + fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) + parallel = nodeCount } - // Cap parallel operations to prevent network overload if parallel > maxParallel { fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) parallel = maxParallel } - fmt.Printf("Starting stake deposit of %s WEI on %d nodes with %d parallel operations...\n", amount, len(clients), parallel) + fmt.Printf("Starting stake deposit of %s WEI on %d nodes with %d parallel operations...\n", amount, nodeCount, parallel) var errorCount int var mu sync.Mutex semaphore := make(chan struct{}, parallel) var wg sync.WaitGroup - for nodeName, client := range clients { - wg.Add(1) - go func(name string, cl *bee.Client) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Depositing stake on node %s...\n", name) - - txHash, err := cl.DepositStake(ctx, stakeAmount) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("%s\n", c.formatStakeError(name, err)) - return - } - - fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", name, txHash) - }(nodeName, client) + if namespace != "" { + for _, n := range nodes { + wg.Add(1) + go func(node node.Node) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Depositing stake on node %s...\n", node.Name()) + + txHash, err := node.Client().Stake.DepositStake(ctx, stakeAmount) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("%s\n", c.formatStakeError(node.Name(), err)) + return + } + + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", node.Name(), txHash) + }(n) + } + } else { + for nodeName, client := range clients { + wg.Add(1) + go func(name string, cl *bee.Client) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Depositing stake on node %s...\n", name) + + txHash, err := cl.DepositStake(ctx, stakeAmount) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("%s\n", c.formatStakeError(name, err)) + return + } + + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", name, txHash) + }(nodeName, client) + } } wg.Wait() @@ -145,7 +208,7 @@ func (c *command) initStakeDeposit() *cobra.Command { return fmt.Errorf("stake deposit completed with %d errors", errorCount) } - fmt.Printf("Stake deposit completed successfully on all %d nodes!\n", len(clients)) + fmt.Printf("Stake deposit completed successfully on all %d nodes!\n", nodeCount) return nil }, } @@ -154,10 +217,9 @@ func (c *command) initStakeDeposit() *cobra.Command { if err := cmd.MarkFlagRequired(optionNameAmount); err != nil { return nil } - cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name (required)") - if err := cmd.MarkFlagRequired(optionNameClusterName); err != nil { - return nil - } + cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") + cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") + cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") return cmd From 954c376a4e9d122ba37b29c2d3f888e08c838c0a Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Wed, 3 Sep 2025 09:52:14 +0200 Subject: [PATCH 07/11] feat(cmd): add 'get' command to retrieve current stake amounts from Bee nodes --- cmd/beekeeper/cmd/stake.go | 163 +++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index bb0e4a90..c3e25f8c 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -36,6 +36,7 @@ func (c *command) initStakeCmd() (err error) { } cmd.AddCommand(c.initStakeDeposit()) + cmd.AddCommand(c.initStakeGet()) c.root.AddCommand(cmd) @@ -239,3 +240,165 @@ func (c *command) formatStakeError(nodeName string, err error) string { return fmt.Sprintf("node %s: %v", nodeName, err) } } + +func (c *command) initStakeGet() *cobra.Command { + cmd := &cobra.Command{ + Use: "get", + Short: "get current stake amounts from Bee nodes", + Long: "Retrieves the current stake amounts from targeted Bee nodes.", + RunE: func(cmd *cobra.Command, args []string) error { + namespace, err := cmd.Flags().GetString(optionNameNamespace) + if err != nil { + return fmt.Errorf("error reading namespace flag: %w", err) + } + + clusterName, err := cmd.Flags().GetString(optionNameClusterName) + if err != nil { + return fmt.Errorf("error reading cluster-name flag: %w", err) + } + + if clusterName == "" && namespace == "" { + return fmt.Errorf("either cluster-name or namespace must be provided") + } + + ctx := context.Background() + var clients map[string]*bee.Client + var nodes node.NodeList + + if namespace != "" { + fmt.Printf("Targeting namespace: %s\n", namespace) + + labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) + if err != nil { + return fmt.Errorf("error reading label-selector flag: %w", err) + } + + nodeClient := node.New(&node.ClientConfig{ + Log: c.log, + HTTPClient: c.httpClient, + K8sClient: c.k8sClient, + BeeClients: nil, + Namespace: namespace, + LabelSelector: labelSelector, + DeploymentType: node.DeploymentTypeBeekeeper, + InCluster: c.globalConfig.GetBool(optionNameInCluster), + UseNamespace: true, + }) + + nodes, err = nodeClient.GetNodes(ctx) + if err != nil { + return fmt.Errorf("getting nodes: %w", err) + } + + clients = make(map[string]*bee.Client) + } else { + fmt.Printf("Targeting cluster: %s\n", clusterName) + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + } + + clients, err = cluster.NodesClients(ctx) + if err != nil { + return fmt.Errorf("failed to get node clients: %w", err) + } + } + + nodeCount := len(clients) + if namespace != "" { + nodeCount = len(nodes) + } + fmt.Printf("Found %d nodes\n", nodeCount) + + parallel, err := cmd.Flags().GetInt(optionNameParallel) + if err != nil { + fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") + parallel = 5 + } + + if parallel <= 0 { + fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) + parallel = 5 + } + + if parallel > nodeCount { + fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) + parallel = nodeCount + } + + if parallel > maxParallel { + fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) + parallel = maxParallel + } + + fmt.Printf("Getting stake amounts from %d nodes with %d parallel operations...\n", nodeCount, parallel) + + var errorCount int + var mu sync.Mutex + semaphore := make(chan struct{}, parallel) + var wg sync.WaitGroup + + if namespace != "" { + for _, n := range nodes { + wg.Add(1) + go func(node node.Node) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Getting stake from node %s...\n", node.Name()) + + stakeAmount, err := node.Client().Stake.GetStakedAmount(ctx) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("Error getting stake from node %s: %v\n", node.Name(), err) + return + } + + fmt.Printf("Node %s: %s WEI staked\n", node.Name(), stakeAmount.String()) + }(n) + } + } else { + for nodeName, client := range clients { + wg.Add(1) + go func(name string, cl *bee.Client) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Getting stake from node %s...\n", name) + + stakeAmount, err := cl.GetStake(ctx) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("Error getting stake from node %s: %v\n", name, err) + return + } + + fmt.Printf("Node %s: %s WEI staked\n", name, stakeAmount.String()) + }(nodeName, client) + } + } + + wg.Wait() + + if errorCount > 0 { + return fmt.Errorf("stake retrieval completed with %d errors", errorCount) + } + + fmt.Printf("Stake retrieval completed successfully from all %d nodes!\n", nodeCount) + return nil + }, + } + + cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") + cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") + cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") + cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") + + return cmd +} From 7ce6094f5735c00507d2b4373db960f05886502a Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Wed, 3 Sep 2025 10:02:01 +0200 Subject: [PATCH 08/11] feat(cmd): add 'withdraw' command --- cmd/beekeeper/cmd/stake.go | 163 +++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index c3e25f8c..3f8bf657 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -37,6 +37,7 @@ func (c *command) initStakeCmd() (err error) { cmd.AddCommand(c.initStakeDeposit()) cmd.AddCommand(c.initStakeGet()) + cmd.AddCommand(c.initStakeWithdraw()) c.root.AddCommand(cmd) @@ -402,3 +403,165 @@ func (c *command) initStakeGet() *cobra.Command { return cmd } + +func (c *command) initStakeWithdraw() *cobra.Command { + cmd := &cobra.Command{ + Use: "withdraw", + Short: "withdraw stake from Bee nodes", + Long: "Withdraws (migrates) stake from targeted Bee nodes. This operation migrates the stake to the node's wallet.", + RunE: func(cmd *cobra.Command, args []string) error { + namespace, err := cmd.Flags().GetString(optionNameNamespace) + if err != nil { + return fmt.Errorf("error reading namespace flag: %w", err) + } + + clusterName, err := cmd.Flags().GetString(optionNameClusterName) + if err != nil { + return fmt.Errorf("error reading cluster-name flag: %w", err) + } + + if clusterName == "" && namespace == "" { + return fmt.Errorf("either cluster-name or namespace must be provided") + } + + ctx := context.Background() + var clients map[string]*bee.Client + var nodes node.NodeList + + if namespace != "" { + fmt.Printf("Targeting namespace: %s\n", namespace) + + labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) + if err != nil { + return fmt.Errorf("error reading label-selector flag: %w", err) + } + + nodeClient := node.New(&node.ClientConfig{ + Log: c.log, + HTTPClient: c.httpClient, + K8sClient: c.k8sClient, + BeeClients: nil, + Namespace: namespace, + LabelSelector: labelSelector, + DeploymentType: node.DeploymentTypeBeekeeper, + InCluster: c.globalConfig.GetBool(optionNameInCluster), + UseNamespace: true, + }) + + nodes, err = nodeClient.GetNodes(ctx) + if err != nil { + return fmt.Errorf("getting nodes: %w", err) + } + + clients = make(map[string]*bee.Client) + } else { + fmt.Printf("Targeting cluster: %s\n", clusterName) + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + } + + clients, err = cluster.NodesClients(ctx) + if err != nil { + return fmt.Errorf("failed to get node clients: %w", err) + } + } + + nodeCount := len(clients) + if namespace != "" { + nodeCount = len(nodes) + } + fmt.Printf("Found %d nodes\n", nodeCount) + + parallel, err := cmd.Flags().GetInt(optionNameParallel) + if err != nil { + fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") + parallel = 5 + } + + if parallel <= 0 { + fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) + parallel = 5 + } + + if parallel > nodeCount { + fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) + parallel = nodeCount + } + + if parallel > maxParallel { + fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) + parallel = maxParallel + } + + fmt.Printf("Starting stake withdrawal from %d nodes with %d parallel operations...\n", nodeCount, parallel) + + var errorCount int + var mu sync.Mutex + semaphore := make(chan struct{}, parallel) + var wg sync.WaitGroup + + if namespace != "" { + for _, n := range nodes { + wg.Add(1) + go func(node node.Node) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Withdrawing stake from node %s...\n", node.Name()) + + txHash, err := node.Client().Stake.MigrateStake(ctx) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("Error withdrawing stake from node %s: %v\n", node.Name(), err) + return + } + + fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", node.Name(), txHash) + }(n) + } + } else { + for nodeName, client := range clients { + wg.Add(1) + go func(name string, cl *bee.Client) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + fmt.Printf("Withdrawing stake from node %s...\n", name) + + txHash, err := cl.MigrateStake(ctx) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("Error withdrawing stake from node %s: %v\n", name, err) + return + } + + fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", name, txHash) + }(nodeName, client) + } + } + + wg.Wait() + + if errorCount > 0 { + return fmt.Errorf("stake withdrawal completed with %d errors", errorCount) + } + + fmt.Printf("Stake withdrawal completed successfully from all %d nodes!\n", nodeCount) + return nil + }, + } + + cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") + cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") + cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") + cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") + + return cmd +} From ee239dc93109d1c205f79017901a4e41185fa1c8 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Thu, 4 Sep 2025 15:12:57 +0200 Subject: [PATCH 09/11] feat(cmd): add support for filtering node clients by specified node groups in stake commands --- cmd/beekeeper/cmd/stake.go | 80 +++++++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 3f8bf657..4761e7f2 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -10,13 +10,15 @@ import ( "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/node" + "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/spf13/cobra" ) const ( - optionNameAmount = "amount" - optionNameParallel = "parallel" - maxParallel = 10 + optionNameAmount = "amount" + optionNameParallel = "parallel" + optionNameNodeGroups = "node-groups" + maxParallel = 10 ) var ( @@ -119,10 +121,23 @@ func (c *command) initStakeDeposit() *cobra.Command { return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) } - clients, err = cluster.NodesClients(ctx) + allClients, err := cluster.NodesClients(ctx) if err != nil { return fmt.Errorf("failed to get node clients: %w", err) } + + nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) + if err != nil { + return fmt.Errorf("error reading node-groups flag: %w", err) + } + + if len(nodeGroups) > 0 { + fmt.Printf("Filtering by node groups: %v\n", nodeGroups) + clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) + } else { + fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") + clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) + } } parallel, err := cmd.Flags().GetInt(optionNameParallel) @@ -222,6 +237,7 @@ func (c *command) initStakeDeposit() *cobra.Command { cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") + cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") return cmd @@ -299,10 +315,23 @@ func (c *command) initStakeGet() *cobra.Command { return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) } - clients, err = cluster.NodesClients(ctx) + allClients, err := cluster.NodesClients(ctx) if err != nil { return fmt.Errorf("failed to get node clients: %w", err) } + + nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) + if err != nil { + return fmt.Errorf("error reading node-groups flag: %w", err) + } + + if len(nodeGroups) > 0 { + fmt.Printf("Filtering by node groups: %v\n", nodeGroups) + clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) + } else { + fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") + clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) + } } nodeCount := len(clients) @@ -399,6 +428,7 @@ func (c *command) initStakeGet() *cobra.Command { cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") + cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") return cmd @@ -461,10 +491,23 @@ func (c *command) initStakeWithdraw() *cobra.Command { return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) } - clients, err = cluster.NodesClients(ctx) + allClients, err := cluster.NodesClients(ctx) if err != nil { return fmt.Errorf("failed to get node clients: %w", err) } + + nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) + if err != nil { + return fmt.Errorf("error reading node-groups flag: %w", err) + } + + if len(nodeGroups) > 0 { + fmt.Printf("Filtering by node groups: %v\n", nodeGroups) + clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) + } else { + fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") + clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) + } } nodeCount := len(clients) @@ -561,7 +604,32 @@ func (c *command) initStakeWithdraw() *cobra.Command { cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") + cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") return cmd } + +func (c *command) filterClientsByNodeGroups(cluster orchestration.Cluster, allClients map[string]*bee.Client, nodeGroups []string) map[string]*bee.Client { + nodeGroupsMap := cluster.NodeGroups() + var targetNodes []string + + for _, nodeGroup := range nodeGroups { + group, ok := nodeGroupsMap[nodeGroup] + if !ok { + c.log.Debugf("node group %s not found in cluster", nodeGroup) + continue + } + targetNodes = append(targetNodes, group.NodesSorted()...) + } + + // Filter clients to only include nodes from specified groups + filteredClients := make(map[string]*bee.Client) + for _, nodeName := range targetNodes { + if client, exists := allClients[nodeName]; exists { + filteredClients[nodeName] = client + } + } + + return filteredClients +} From e693a3d46f619f81c8803df582a4a1a439b5e3e4 Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Thu, 4 Sep 2025 17:20:52 +0200 Subject: [PATCH 10/11] feat(cmd): refactor stake commands to consolidate common logic and improve error handling --- cmd/beekeeper/cmd/stake.go | 685 ++++++++++++++----------------------- 1 file changed, 251 insertions(+), 434 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 4761e7f2..65c85017 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -48,9 +48,10 @@ func (c *command) initStakeCmd() (err error) { func (c *command) initStakeDeposit() *cobra.Command { cmd := &cobra.Command{ - Use: "deposit", - Short: "Deposit stake on Bee nodes", - Long: `Deposits a specified amount of BZZ as stake on targeted Bee nodes.`, + Use: "deposit", + Short: "Deposit stake on Bee nodes", + Long: `Deposits a specified amount of BZZ as stake on targeted Bee nodes.`, + PreRunE: c.preRunE, RunE: func(cmd *cobra.Command, args []string) (err error) { amount, err := cmd.Flags().GetString(optionNameAmount) if err != nil { @@ -70,156 +71,43 @@ func (c *command) initStakeDeposit() *cobra.Command { return errInvalidAmount } - namespace, err := cmd.Flags().GetString(optionNameNamespace) + clients, nodes, err := c.getStakeTargets(cmd) if err != nil { - return fmt.Errorf("error reading namespace flag: %w", err) - } - - clusterName, err := cmd.Flags().GetString(optionNameClusterName) - if err != nil { - return fmt.Errorf("error reading cluster-name flag: %w", err) - } - - if clusterName == "" && namespace == "" { - return fmt.Errorf("either cluster-name or namespace must be provided") - } - - ctx := context.Background() - var clients map[string]*bee.Client - var nodes node.NodeList - - if namespace != "" { - fmt.Printf("Targeting namespace: %s\n", namespace) - - labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) - if err != nil { - return fmt.Errorf("error reading label-selector flag: %w", err) - } - - nodeClient := node.New(&node.ClientConfig{ - Log: c.log, - HTTPClient: c.httpClient, - K8sClient: c.k8sClient, - BeeClients: nil, - Namespace: namespace, - LabelSelector: labelSelector, - DeploymentType: node.DeploymentTypeBeekeeper, - InCluster: c.globalConfig.GetBool(optionNameInCluster), - UseNamespace: true, - }) - - nodes, err = nodeClient.GetNodes(ctx) - if err != nil { - return fmt.Errorf("getting nodes: %w", err) - } - - clients = make(map[string]*bee.Client) - } else { - fmt.Printf("Targeting cluster: %s\n", clusterName) - cluster, err := c.setupCluster(ctx, clusterName, false) - if err != nil { - return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) - } - - allClients, err := cluster.NodesClients(ctx) - if err != nil { - return fmt.Errorf("failed to get node clients: %w", err) - } - - nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) - if err != nil { - return fmt.Errorf("error reading node-groups flag: %w", err) - } - - if len(nodeGroups) > 0 { - fmt.Printf("Filtering by node groups: %v\n", nodeGroups) - clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) - } else { - fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") - clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) - } + return err } - parallel, err := cmd.Flags().GetInt(optionNameParallel) + parallel, nodeCount, err := c.getStakeParallelConfig(cmd, clients, nodes) if err != nil { - fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") - parallel = 5 - } - - if parallel <= 0 { - fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) - parallel = 5 - } - - nodeCount := len(clients) - if namespace != "" { - nodeCount = len(nodes) - } - - if parallel > nodeCount { - fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) - parallel = nodeCount - } - - if parallel > maxParallel { - fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) - parallel = maxParallel + return err } fmt.Printf("Starting stake deposit of %s WEI on %d nodes with %d parallel operations...\n", amount, nodeCount, parallel) - var errorCount int - var mu sync.Mutex - semaphore := make(chan struct{}, parallel) - var wg sync.WaitGroup - - if namespace != "" { - for _, n := range nodes { - wg.Add(1) - go func(node node.Node) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Depositing stake on node %s...\n", node.Name()) - - txHash, err := node.Client().Stake.DepositStake(ctx, stakeAmount) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("%s\n", c.formatStakeError(node.Name(), err)) - return - } - - fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", node.Name(), txHash) - }(n) - } - } else { - for nodeName, client := range clients { - wg.Add(1) - go func(name string, cl *bee.Client) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Depositing stake on node %s...\n", name) - - txHash, err := cl.DepositStake(ctx, stakeAmount) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("%s\n", c.formatStakeError(name, err)) - return - } - - fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", name, txHash) - }(nodeName, client) + // Define the deposit operation + depositOp := func(nodeName string, client *bee.Client, node node.Node) error { + ctx := context.Background() + + if len(nodes) > 0 { + // Namespace mode + fmt.Printf("Depositing stake on node %s...\n", node.Name()) + txHash, err := node.Client().Stake.DepositStake(ctx, stakeAmount) + if err != nil { + return err + } + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", node.Name(), txHash) + } else { + // Cluster mode + fmt.Printf("Depositing stake on node %s...\n", nodeName) + txHash, err := client.DepositStake(ctx, stakeAmount) + if err != nil { + return err + } + fmt.Printf("Successfully deposited stake on node %s, transaction: %s\n", nodeName, txHash) } + return nil } - wg.Wait() + errorCount := c.executeStakeOperation(clients, nodes, parallel, depositOp) if errorCount > 0 { return fmt.Errorf("stake deposit completed with %d errors", errorCount) @@ -234,15 +122,12 @@ func (c *command) initStakeDeposit() *cobra.Command { if err := cmd.MarkFlagRequired(optionNameAmount); err != nil { return nil } - cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") - cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") - cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") - cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") - cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") + c.addStakeCommonFlags(cmd) return cmd } +// TODO // formatStakeError formats stake-related errors with user-friendly messages func (c *command) formatStakeError(nodeName string, err error) string { errorStr := err.Error() @@ -260,161 +145,48 @@ func (c *command) formatStakeError(nodeName string, err error) string { func (c *command) initStakeGet() *cobra.Command { cmd := &cobra.Command{ - Use: "get", - Short: "get current stake amounts from Bee nodes", - Long: "Retrieves the current stake amounts from targeted Bee nodes.", + Use: "get", + Short: "get current stake amounts from Bee nodes", + Long: "Retrieves the current stake amounts from targeted Bee nodes.", + PreRunE: c.preRunE, RunE: func(cmd *cobra.Command, args []string) error { - namespace, err := cmd.Flags().GetString(optionNameNamespace) - if err != nil { - return fmt.Errorf("error reading namespace flag: %w", err) - } - - clusterName, err := cmd.Flags().GetString(optionNameClusterName) + clients, nodes, err := c.getStakeTargets(cmd) if err != nil { - return fmt.Errorf("error reading cluster-name flag: %w", err) - } - - if clusterName == "" && namespace == "" { - return fmt.Errorf("either cluster-name or namespace must be provided") - } - - ctx := context.Background() - var clients map[string]*bee.Client - var nodes node.NodeList - - if namespace != "" { - fmt.Printf("Targeting namespace: %s\n", namespace) - - labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) - if err != nil { - return fmt.Errorf("error reading label-selector flag: %w", err) - } - - nodeClient := node.New(&node.ClientConfig{ - Log: c.log, - HTTPClient: c.httpClient, - K8sClient: c.k8sClient, - BeeClients: nil, - Namespace: namespace, - LabelSelector: labelSelector, - DeploymentType: node.DeploymentTypeBeekeeper, - InCluster: c.globalConfig.GetBool(optionNameInCluster), - UseNamespace: true, - }) - - nodes, err = nodeClient.GetNodes(ctx) - if err != nil { - return fmt.Errorf("getting nodes: %w", err) - } - - clients = make(map[string]*bee.Client) - } else { - fmt.Printf("Targeting cluster: %s\n", clusterName) - cluster, err := c.setupCluster(ctx, clusterName, false) - if err != nil { - return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) - } - - allClients, err := cluster.NodesClients(ctx) - if err != nil { - return fmt.Errorf("failed to get node clients: %w", err) - } - - nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) - if err != nil { - return fmt.Errorf("error reading node-groups flag: %w", err) - } - - if len(nodeGroups) > 0 { - fmt.Printf("Filtering by node groups: %v\n", nodeGroups) - clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) - } else { - fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") - clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) - } - } - - nodeCount := len(clients) - if namespace != "" { - nodeCount = len(nodes) + return err } - fmt.Printf("Found %d nodes\n", nodeCount) - parallel, err := cmd.Flags().GetInt(optionNameParallel) + parallel, nodeCount, err := c.getStakeParallelConfig(cmd, clients, nodes) if err != nil { - fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") - parallel = 5 - } - - if parallel <= 0 { - fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) - parallel = 5 - } - - if parallel > nodeCount { - fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) - parallel = nodeCount - } - - if parallel > maxParallel { - fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) - parallel = maxParallel + return err } fmt.Printf("Getting stake amounts from %d nodes with %d parallel operations...\n", nodeCount, parallel) - var errorCount int - var mu sync.Mutex - semaphore := make(chan struct{}, parallel) - var wg sync.WaitGroup - - if namespace != "" { - for _, n := range nodes { - wg.Add(1) - go func(node node.Node) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Getting stake from node %s...\n", node.Name()) - - stakeAmount, err := node.Client().Stake.GetStakedAmount(ctx) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("Error getting stake from node %s: %v\n", node.Name(), err) - return - } - - fmt.Printf("Node %s: %s WEI staked\n", node.Name(), stakeAmount.String()) - }(n) - } - } else { - for nodeName, client := range clients { - wg.Add(1) - go func(name string, cl *bee.Client) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Getting stake from node %s...\n", name) - - stakeAmount, err := cl.GetStake(ctx) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("Error getting stake from node %s: %v\n", name, err) - return - } - - fmt.Printf("Node %s: %s WEI staked\n", name, stakeAmount.String()) - }(nodeName, client) + // Define the get operation + getOp := func(nodeName string, client *bee.Client, node node.Node) error { + ctx := context.Background() + + if len(nodes) > 0 { + // Namespace mode + fmt.Printf("Getting stake from node %s...\n", node.Name()) + stakeAmount, err := node.Client().Stake.GetStakedAmount(ctx) + if err != nil { + return err + } + fmt.Printf("Node %s: %s WEI staked\n", node.Name(), stakeAmount.String()) + } else { + // Cluster mode + fmt.Printf("Getting stake from node %s...\n", nodeName) + stakeAmount, err := client.GetStake(ctx) + if err != nil { + return err + } + fmt.Printf("Node %s: %s WEI staked\n", nodeName, stakeAmount.String()) } + return nil } - wg.Wait() + errorCount := c.executeStakeOperation(clients, nodes, parallel, getOp) if errorCount > 0 { return fmt.Errorf("stake retrieval completed with %d errors", errorCount) @@ -425,189 +197,234 @@ func (c *command) initStakeGet() *cobra.Command { }, } - cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") - cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") - cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") - cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") - cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") - + c.addStakeCommonFlags(cmd) return cmd } func (c *command) initStakeWithdraw() *cobra.Command { cmd := &cobra.Command{ - Use: "withdraw", - Short: "withdraw stake from Bee nodes", - Long: "Withdraws (migrates) stake from targeted Bee nodes. This operation migrates the stake to the node's wallet.", + Use: "withdraw", + Short: "withdraw stake from Bee nodes", + Long: "Withdraws (migrates) stake from targeted Bee nodes. This operation migrates the stake to the node's wallet.", + PreRunE: c.preRunE, RunE: func(cmd *cobra.Command, args []string) error { - namespace, err := cmd.Flags().GetString(optionNameNamespace) + clients, nodes, err := c.getStakeTargets(cmd) if err != nil { - return fmt.Errorf("error reading namespace flag: %w", err) + return err } - clusterName, err := cmd.Flags().GetString(optionNameClusterName) + parallel, nodeCount, err := c.getStakeParallelConfig(cmd, clients, nodes) if err != nil { - return fmt.Errorf("error reading cluster-name flag: %w", err) + return err } - if clusterName == "" && namespace == "" { - return fmt.Errorf("either cluster-name or namespace must be provided") + fmt.Printf("Starting stake withdrawal from %d nodes with %d parallel operations...\n", nodeCount, parallel) + + // Define the withdraw operation + withdrawOp := func(nodeName string, client *bee.Client, node node.Node) error { + ctx := context.Background() + + if len(nodes) > 0 { + // Namespace mode + fmt.Printf("Withdrawing stake from node %s...\n", node.Name()) + txHash, err := node.Client().Stake.MigrateStake(ctx) + if err != nil { + return err + } + fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", node.Name(), txHash) + } else { + // Cluster mode + fmt.Printf("Withdrawing stake from node %s...\n", nodeName) + txHash, err := client.MigrateStake(ctx) + if err != nil { + return err + } + fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", nodeName, txHash) + } + return nil } - ctx := context.Background() - var clients map[string]*bee.Client - var nodes node.NodeList + errorCount := c.executeStakeOperation(clients, nodes, parallel, withdrawOp) - if namespace != "" { - fmt.Printf("Targeting namespace: %s\n", namespace) + if errorCount > 0 { + return fmt.Errorf("stake withdrawal completed with %d errors", errorCount) + } - labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) - if err != nil { - return fmt.Errorf("error reading label-selector flag: %w", err) - } + fmt.Printf("Stake withdrawal completed successfully from all %d nodes!\n", nodeCount) + return nil + }, + } - nodeClient := node.New(&node.ClientConfig{ - Log: c.log, - HTTPClient: c.httpClient, - K8sClient: c.k8sClient, - BeeClients: nil, - Namespace: namespace, - LabelSelector: labelSelector, - DeploymentType: node.DeploymentTypeBeekeeper, - InCluster: c.globalConfig.GetBool(optionNameInCluster), - UseNamespace: true, - }) - - nodes, err = nodeClient.GetNodes(ctx) - if err != nil { - return fmt.Errorf("getting nodes: %w", err) - } + c.addStakeCommonFlags(cmd) + return cmd +} - clients = make(map[string]*bee.Client) - } else { - fmt.Printf("Targeting cluster: %s\n", clusterName) - cluster, err := c.setupCluster(ctx, clusterName, false) - if err != nil { - return fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) - } +// getStakeTargets handles node discovery for both namespace and cluster modes +func (c *command) getStakeTargets(cmd *cobra.Command) (map[string]*bee.Client, node.NodeList, error) { + namespace, err := cmd.Flags().GetString(optionNameNamespace) + if err != nil { + return nil, nil, fmt.Errorf("error reading namespace flag: %w", err) + } - allClients, err := cluster.NodesClients(ctx) - if err != nil { - return fmt.Errorf("failed to get node clients: %w", err) - } + clusterName, err := cmd.Flags().GetString(optionNameClusterName) + if err != nil { + return nil, nil, fmt.Errorf("error reading cluster-name flag: %w", err) + } - nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) - if err != nil { - return fmt.Errorf("error reading node-groups flag: %w", err) - } + if clusterName == "" && namespace == "" { + return nil, nil, fmt.Errorf("either cluster-name or namespace must be provided") + } - if len(nodeGroups) > 0 { - fmt.Printf("Filtering by node groups: %v\n", nodeGroups) - clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) - } else { - fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") - clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) - } - } + ctx := context.Background() + var clients map[string]*bee.Client + var nodes node.NodeList - nodeCount := len(clients) - if namespace != "" { - nodeCount = len(nodes) - } - fmt.Printf("Found %d nodes\n", nodeCount) + if namespace != "" { + fmt.Printf("Targeting namespace: %s\n", namespace) - parallel, err := cmd.Flags().GetInt(optionNameParallel) - if err != nil { - fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") - parallel = 5 - } + labelSelector, err := cmd.Flags().GetString(optionNameLabelSelector) + if err != nil { + return nil, nil, fmt.Errorf("error reading label-selector flag: %w", err) + } - if parallel <= 0 { - fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) - parallel = 5 - } + nodeClient := node.New(&node.ClientConfig{ + Log: c.log, + HTTPClient: c.httpClient, + K8sClient: c.k8sClient, + BeeClients: nil, + Namespace: namespace, + LabelSelector: labelSelector, + DeploymentType: node.DeploymentTypeBeekeeper, + InCluster: c.globalConfig.GetBool(optionNameInCluster), + UseNamespace: true, + }) + + nodes, err = nodeClient.GetNodes(ctx) + if err != nil { + return nil, nil, fmt.Errorf("getting nodes: %w", err) + } - if parallel > nodeCount { - fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) - parallel = nodeCount - } + clients = make(map[string]*bee.Client) + } else { + fmt.Printf("Targeting cluster: %s\n", clusterName) + cluster, err := c.setupCluster(ctx, clusterName, false) + if err != nil { + return nil, nil, fmt.Errorf("failed to setup cluster %s: %w", clusterName, err) + } - if parallel > maxParallel { - fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) - parallel = maxParallel - } + allClients, err := cluster.NodesClients(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get node clients: %w", err) + } - fmt.Printf("Starting stake withdrawal from %d nodes with %d parallel operations...\n", nodeCount, parallel) + nodeGroups, err := cmd.Flags().GetStringSlice(optionNameNodeGroups) + if err != nil { + return nil, nil, fmt.Errorf("error reading node-groups flag: %w", err) + } - var errorCount int - var mu sync.Mutex - semaphore := make(chan struct{}, parallel) - var wg sync.WaitGroup - - if namespace != "" { - for _, n := range nodes { - wg.Add(1) - go func(node node.Node) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Withdrawing stake from node %s...\n", node.Name()) - - txHash, err := node.Client().Stake.MigrateStake(ctx) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("Error withdrawing stake from node %s: %v\n", node.Name(), err) - return - } - - fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", node.Name(), txHash) - }(n) - } - } else { - for nodeName, client := range clients { - wg.Add(1) - go func(name string, cl *bee.Client) { - defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - fmt.Printf("Withdrawing stake from node %s...\n", name) - - txHash, err := cl.MigrateStake(ctx) - if err != nil { - mu.Lock() - errorCount++ - mu.Unlock() - fmt.Printf("Error withdrawing stake from node %s: %v\n", name, err) - return - } - - fmt.Printf("Successfully withdrew stake from node %s, transaction: %s\n", name, txHash) - }(nodeName, client) - } - } + if len(nodeGroups) > 0 { + fmt.Printf("Filtering by node groups: %v\n", nodeGroups) + clients = c.filterClientsByNodeGroups(cluster, allClients, nodeGroups) + } else { + fmt.Printf("No node groups specified, defaulting to 'bee' nodes for staking\n") + clients = c.filterClientsByNodeGroups(cluster, allClients, []string{"bee"}) + } + } - wg.Wait() + return clients, nodes, nil +} - if errorCount > 0 { - return fmt.Errorf("stake withdrawal completed with %d errors", errorCount) - } +// getStakeParallelConfig handles parallel configuration and validation +func (c *command) getStakeParallelConfig(cmd *cobra.Command, clients map[string]*bee.Client, nodes node.NodeList) (int, int, error) { + parallel, err := cmd.Flags().GetInt(optionNameParallel) + if err != nil { + fmt.Printf("Warning: Could not read parallel flag, using default value of 5\n") + parallel = 5 + } - fmt.Printf("Stake withdrawal completed successfully from all %d nodes!\n", nodeCount) - return nil - }, + if parallel <= 0 { + fmt.Printf("Warning: Invalid parallel value (%d), using default value of 5\n", parallel) + parallel = 5 + } + + nodeCount := len(clients) + if len(nodes) > 0 { + nodeCount = len(nodes) + } + + if parallel > nodeCount { + fmt.Printf("Info: Parallel value (%d) is greater than number of nodes (%d), using %d\n", parallel, nodeCount, nodeCount) + parallel = nodeCount } + if parallel > maxParallel { + fmt.Printf("Info: Parallel value (%d) is too high, capping at %d to prevent network overload\n", parallel, maxParallel) + parallel = maxParallel + } + + fmt.Printf("Found %d nodes\n", nodeCount) + return parallel, nodeCount, nil +} + +// addStakeCommonFlags adds common flags to stake commands +func (c *command) addStakeCommonFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameClusterName, "", "Target Beekeeper cluster name") cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name)") cmd.Flags().String(optionNameLabelSelector, "app.kubernetes.io/name=bee", "Kubernetes label selector for filtering resources") cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target (applies to all groups if not set)") cmd.Flags().Int(optionNameParallel, 5, "Number of parallel operations (default: 5, max: number of nodes)") +} - return cmd +// stakeOperation defines the signature for stake operations +type stakeOperation func(nodeName string, client *bee.Client, node node.Node) error + +// executeStakeOperation runs the given operation in parallel across all target nodes +func (c *command) executeStakeOperation(clients map[string]*bee.Client, nodes node.NodeList, parallel int, operation stakeOperation) int { + var errorCount int + var mu sync.Mutex + semaphore := make(chan struct{}, parallel) + var wg sync.WaitGroup + + if len(nodes) > 0 { + // Namespace mode: iterate over nodes + for _, n := range nodes { + wg.Add(1) + go func(node node.Node) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + err := operation("", nil, node) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("%s\n", c.formatStakeError(node.Name(), err)) + } + }(n) + } + } else { + // Cluster mode: iterate over clients + for nodeName, client := range clients { + wg.Add(1) + go func(name string, cl *bee.Client) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + err := operation(name, cl, node.Node{}) + if err != nil { + mu.Lock() + errorCount++ + mu.Unlock() + fmt.Printf("%s\n", c.formatStakeError(name, err)) + } + }(nodeName, client) + } + } + + wg.Wait() + return errorCount } func (c *command) filterClientsByNodeGroups(cluster orchestration.Cluster, allClients map[string]*bee.Client, nodeGroups []string) map[string]*bee.Client { From 6ddae9fafa33077889e9dea24914277bcadc93bf Mon Sep 17 00:00:00 2001 From: akrem-chabchoub Date: Thu, 4 Sep 2025 20:05:57 +0200 Subject: [PATCH 11/11] refactor(cmd): move formatStakeError function and remove typo --- cmd/beekeeper/cmd/stake.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/cmd/beekeeper/cmd/stake.go b/cmd/beekeeper/cmd/stake.go index 65c85017..1f937555 100644 --- a/cmd/beekeeper/cmd/stake.go +++ b/cmd/beekeeper/cmd/stake.go @@ -127,22 +127,6 @@ func (c *command) initStakeDeposit() *cobra.Command { return cmd } -// TODO -// formatStakeError formats stake-related errors with user-friendly messages -func (c *command) formatStakeError(nodeName string, err error) string { - errorStr := err.Error() - - if strings.Contains(errorStr, "out of funds") { - return fmt.Sprintf("node %s: insufficient BZZ balance (fund the node wallet first)", nodeName) - } else if strings.Contains(errorStr, "insufficient stake amount") { - return fmt.Sprintf("node %s: stake amount too low (increase the amount)", nodeName) - } else if strings.Contains(errorStr, "503") { - return fmt.Sprintf("node %s: service temporarily unavailable (node might be starting up)", nodeName) - } else { - return fmt.Sprintf("node %s: %v", nodeName, err) - } -} - func (c *command) initStakeGet() *cobra.Command { cmd := &cobra.Command{ Use: "get", @@ -450,3 +434,18 @@ func (c *command) filterClientsByNodeGroups(cluster orchestration.Cluster, allCl return filteredClients } + +// formatStakeError formats stake-related errors with user-friendly messages +func (c *command) formatStakeError(nodeName string, err error) string { + errorStr := err.Error() + fmt.Printf("errorStr: %s\n", errorStr) + if strings.Contains(errorStr, "out of funds") { + return fmt.Sprintf("node %s: insufficient BZZ balance (fund the node wallet first)", nodeName) + } else if strings.Contains(errorStr, "insufficient stake amount") { + return fmt.Sprintf("node %s: stake amount too low (increase the amount)", nodeName) + } else if strings.Contains(errorStr, "503") { + return fmt.Sprintf("node %s: service temporarily unavailable (node might be starting up)", nodeName) + } else { + return fmt.Sprintf("node %s: %v", nodeName, err) + } +}