From dc927febf0db0f42ae6940217c1bb93b9ba1f3d0 Mon Sep 17 00:00:00 2001 From: td-zhangshun Date: Wed, 23 Apr 2025 11:32:19 +0800 Subject: [PATCH] feat: power loss check optimization --- cmd/client/main.go | 14 ++++--- cmd/server/main.go | 31 +++++++++++----- internal/testcase/concurrent_write-test.go | 2 +- internal/testcase/mixed_read_write-test.go | 2 +- internal/testcase/power_loss-test.go | 43 +++++++++++++--------- internal/testcase/random_write-test.go | 2 +- internal/testcase/sequential_write-test.go | 2 +- internal/testcase/stability-test.go | 2 +- internal/testcase/testcase.go | 2 +- 9 files changed, 61 insertions(+), 39 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index c473273..d06f26e 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "context" "encoding/json" "flag" "fmt" @@ -138,16 +137,19 @@ func (c *Client) MonitorTestStatus(testID string) error { defer close(doneCh) url := fmt.Sprintf("http://%s/stream?test_id=%s&client_id=%s", c.serverAddr, testID, c.clientID) - // 设置超时时间 30分钟 - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + + // 创建一个没有超时的HTTP客户端专门用于流式连接 + noTimeoutClient := &http.Client{ + Timeout: 0, // 0表示无超时 + } + + req, err := http.NewRequest("GET", url, nil) if err != nil { errCh <- fmt.Errorf("创建请求失败: %v", err) return } - resp, err := c.httpClient.Do(req) + resp, err := noTimeoutClient.Do(req) if err != nil { errCh <- fmt.Errorf("发送流请求失败: %v", err) return diff --git a/cmd/server/main.go b/cmd/server/main.go index fa87b51..1a3a6bc 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -86,7 +86,7 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) { // 设置测试环境 r.logger.Info("设置测试环境") - if err := test.Setup(ctx); err != nil { + if err := test.Setup(ctx, false); err != nil { r.logger.Errorf("设置测试环境失败: %v", err) r.sendErrorUpdate(testID, fmt.Sprintf("设置测试环境失败: %v", err)) return nil, err @@ -193,6 +193,18 @@ func (r *TestRunner) sendCompletionUpdate(testID string, result *model.TestResul r.SendStreamUpdate(testID, update) } +// sendIntegrityUpdate 发送完整性更新 +func (r *TestRunner) sendIntegrityUpdate(testID string, message string, info *model.IntegrityInfo) { + update := model.StreamUpdate{ + Type: "integrity", + TestID: testID, + Timestamp: time.Now(), + Message: message, + Data: info, + } + r.SendStreamUpdate(testID, update) +} + // GetTestStatus 获取测试状态 func (r *TestRunner) GetTestStatus(testID string) *model.TestStatus { r.testsMu.RLock() @@ -453,7 +465,7 @@ func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger) // 设置测试环境 logger.Info("设置恢复测试环境") - if err := test.Setup(ctx); err != nil { + if err := test.Setup(ctx, true); err != nil { logger.Errorf("设置恢复测试环境失败: %v", err) runner.sendErrorUpdate(testID, fmt.Sprintf("设置恢复测试环境失败: %v", err)) return @@ -467,18 +479,17 @@ func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger) if powerTest, ok := test.(*testcase.PowerLossTest); ok { integrityInfo := powerTest.CheckIntegrity() + go func() { + time.Sleep(1 * time.Second) + runner.sendIntegrityUpdate(testID, "开始数据完整性检查", nil) + }() + // 保存完整性信息 runner.SaveIntegrityInfo(testID, integrityInfo) // 发送完整性信息 - update := model.StreamUpdate{ - Type: "integrity", - TestID: testID, - Timestamp: time.Now(), - Message: "数据完整性检查完成", - Data: integrityInfo, - } - runner.SendStreamUpdate(testID, update) + + runner.sendIntegrityUpdate(testID, "数据完整性检查完成", integrityInfo) logger.Infof("恢复测试完成: 丢失数据: %.2f MB", integrityInfo.DataLossMB) } else { diff --git a/internal/testcase/concurrent_write-test.go b/internal/testcase/concurrent_write-test.go index a2ce10b..2f63b1f 100644 --- a/internal/testcase/concurrent_write-test.go +++ b/internal/testcase/concurrent_write-test.go @@ -28,7 +28,7 @@ func NewConcurrentWriteTest(cfg *config.Config, logger *logrus.Logger) *Concurre } // Setup 设置测试环境 -func (t *ConcurrentWriteTest) Setup(ctx context.Context) error { +func (t *ConcurrentWriteTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } diff --git a/internal/testcase/mixed_read_write-test.go b/internal/testcase/mixed_read_write-test.go index 81c2fab..052ed34 100644 --- a/internal/testcase/mixed_read_write-test.go +++ b/internal/testcase/mixed_read_write-test.go @@ -28,7 +28,7 @@ func NewMixedReadWriteTest(cfg *config.Config, logger *logrus.Logger) *MixedRead } // Setup 设置测试环境 -func (t *MixedReadWriteTest) Setup(ctx context.Context) error { +func (t *MixedReadWriteTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } diff --git a/internal/testcase/power_loss-test.go b/internal/testcase/power_loss-test.go index 9b31076..ff22337 100644 --- a/internal/testcase/power_loss-test.go +++ b/internal/testcase/power_loss-test.go @@ -53,7 +53,7 @@ func NewPowerLossTest(cfg *config.Config, logger *logrus.Logger) *PowerLossTest } // Setup 设置测试环境 -func (t *PowerLossTest) Setup(ctx context.Context) error { +func (t *PowerLossTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } @@ -74,11 +74,23 @@ func (t *PowerLossTest) Setup(ctx context.Context) error { cacheDevice := fmt.Sprintf("/dev/cas%s-1", id) t.setMessage(fmt.Sprintf("格式化缓存设备 %s", cacheDevice)) - // 格式化缓存设备 - err = t.casManager.FormatDevice(cacheDevice, "ext4") - if err != nil { - t.setStatus(StatusFailed) - return fmt.Errorf("格式化缓存设备失败: %v", err) + // 确认挂载点没人挂载 + if utils.IsMounted(t.config.Server.MountPoint) { + // 卸载挂载点 + err = t.casManager.UnmountDevice(t.config.Server.MountPoint) + if err != nil { + t.setStatus(StatusFailed) + return fmt.Errorf("卸载挂载点失败: %v", err) + } + } + + if !recovery { + // 格式化缓存设备 + err = t.casManager.FormatDevice(cacheDevice, "ext4") + if err != nil { + t.setStatus(StatusFailed) + return fmt.Errorf("格式化缓存设备失败: %v", err) + } } // 挂载缓存设备 @@ -163,8 +175,8 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { // 写入文件 filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) - // direct IO - file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|syscall.O_DIRECT, 0644) + // direct IO 直接写入磁盘,必须使用syscall.O_DIRECT,Linux 2.6.29 及以上版本支持 + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC|syscall.O_DIRECT, 0644) if err != nil { t.setStatus(StatusFailed) return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err) @@ -180,23 +192,20 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { t.writtenBlocks++ totalBytesWritten += len(data) - // 每写入一定数量的块后执行同步 - if i > 0 && i%10 == 0 { - t.setMessage(fmt.Sprintf("同步数据到磁盘 (已写入 %d/%d 块)", i, t.totalBlocks)) - _, err := utils.ExecuteCommand("sync") - if err != nil { - t.logger.Warnf("执行sync命令失败: %v", err) - } + t.setMessage(fmt.Sprintf("同步数据到磁盘 (已写入 %d/%d 块)", i, t.totalBlocks)) + _, err = utils.ExecuteCommand("sync") + if err != nil { + t.logger.Warnf("执行sync命令失败: %v", err) } // 更新进度 progress := float64(i+1) / float64(t.totalBlocks) * 100 t.setProgress(progress) - // 每写入一定数量的块后暂停一下,给用户断电的机会 + // 每写入一定数量的打印输出下当前进度到日志 if i > 0 && i%100 == 0 { t.setMessage(fmt.Sprintf("已写入 %d/%d 块数据, 共 %.2f MB", i, t.totalBlocks, float64(i*t.blockSize)/(1024*1024))) - time.Sleep(1 * time.Second) + // time.Sleep(1 * time.Second) } } } diff --git a/internal/testcase/random_write-test.go b/internal/testcase/random_write-test.go index 8850f1f..80b9b65 100644 --- a/internal/testcase/random_write-test.go +++ b/internal/testcase/random_write-test.go @@ -50,7 +50,7 @@ func NewRandomWriteTest(cfg *config.Config, logger *logrus.Logger) *RandomWriteT } // Setup 设置测试环境 -func (t *RandomWriteTest) Setup(ctx context.Context) error { +func (t *RandomWriteTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } diff --git a/internal/testcase/sequential_write-test.go b/internal/testcase/sequential_write-test.go index 38b3b4a..3ef9197 100644 --- a/internal/testcase/sequential_write-test.go +++ b/internal/testcase/sequential_write-test.go @@ -44,7 +44,7 @@ func NewSequentialWriteTest(cfg *config.Config, logger *logrus.Logger) *Sequenti } // Setup 设置测试环境 -func (t *SequentialWriteTest) Setup(ctx context.Context) error { +func (t *SequentialWriteTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } diff --git a/internal/testcase/stability-test.go b/internal/testcase/stability-test.go index bd9f2bb..d4ceead 100644 --- a/internal/testcase/stability-test.go +++ b/internal/testcase/stability-test.go @@ -28,7 +28,7 @@ func NewStabilityTest(cfg *config.Config, logger *logrus.Logger) *StabilityTest } // Setup 设置测试环境 -func (t *StabilityTest) Setup(ctx context.Context) error { +func (t *StabilityTest) Setup(ctx context.Context, recovery bool) error { if err := t.BaseTestCase.Setup(ctx); err != nil { return err } diff --git a/internal/testcase/testcase.go b/internal/testcase/testcase.go index ef7e17e..e40c49a 100644 --- a/internal/testcase/testcase.go +++ b/internal/testcase/testcase.go @@ -32,7 +32,7 @@ type TestCase interface { Description() string // Setup 设置测试环境 - Setup(ctx context.Context) error + Setup(ctx context.Context, recovery bool) error // Run 运行测试 Run(ctx context.Context) (*model.TestResult, error)