diff --git a/README.md b/README.md index 467e34e..8c72b5f 100644 --- a/README.md +++ b/README.md @@ -57,4 +57,37 @@ go run cmd/client/main.go -server localhost:8080 -test all 5. 断电恢复测试 6. 长期稳定性测试 -详细测试指标和结果分析请参阅测试报告。 \ No newline at end of file +详细测试指标和结果分析请参阅测试报告。 + +## 新增功能: 实时进度和数据完整性检测 + +### 实时进度追踪 + +客户端现在支持从服务器实时获取测试进度,使用流式传输技术(Server-Sent Events)实现。这使得客户端可以实时查看测试的进度和状态,而不需要频繁轮询。 + +### 断电数据完整性检测 + +系统现在支持在服务器意外断电后进行数据完整性检测,能够详细报告: + +1. 有多少数据块丢失 +2. 有多少数据块已损坏 +3. 总共丢失了多少MB的数据 +4. 提供数据恢复结果 + +### 使用方法 + +#### 运行常规测试(实时进度) + +```bash +./bin/client -test power_loss -server localhost:8080 +``` + +#### 运行断电后的恢复测试 + +在服务器断电并重启后,执行以下命令检查数据完整性: + +```bash +./bin/client -recovery -server localhost:8080 +``` + +这将执行数据完整性检测,并提供详细的数据丢失报告。 \ No newline at end of file diff --git a/cmd/client/main.go b/cmd/client/main.go index f314a22..c473273 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,15 +2,20 @@ package main import ( "bytes" + "context" "encoding/json" "flag" "fmt" + "io" "net/http" + "os" "time" "plp-test/internal/config" "plp-test/internal/model" - "plp-test/internal/utils" + + "bufio" + "strings" "github.com/google/uuid" "github.com/sirupsen/logrus" @@ -25,6 +30,7 @@ var ( dataSizeMB int blockSize int concurrent bool + recovery bool ) func init() { @@ -36,6 +42,7 @@ func init() { flag.IntVar(&dataSizeMB, "data-size", 0, "测试数据大小(MB)") flag.IntVar(&blockSize, "block-size", 0, "数据块大小(KB)") flag.BoolVar(&concurrent, "concurrent", false, "是否并发执行所有测试") + flag.BoolVar(&recovery, "recovery", false, "是否请求恢复测试") } // Client 客户端 @@ -122,6 +129,102 @@ func (c *Client) RunTest(testType string, dataSizeMB, blockSize int) error { func (c *Client) MonitorTestStatus(testID string) error { c.logger.Infof("监控测试状态: %s", testID) + // 使用流式API监控进度 + errCh := make(chan error, 1) + doneCh := make(chan struct{}) + + // 启动流式监控 + go func() { + defer close(doneCh) + + url := fmt.Sprintf("http://%s/stream?test_id=%s&client_id=%s", c.serverAddr, testID, c.clientID) + // 设置超时时间 30分钟 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + errCh <- fmt.Errorf("创建请求失败: %v", err) + return + } + + resp, err := c.httpClient.Do(req) + if err != nil { + errCh <- fmt.Errorf("发送流请求失败: %v", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errCh <- fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode) + return + } + + // 创建事件扫描器 + scanner := bufio.NewScanner(resp.Body) + + // 处理Server-Sent Events + for scanner.Scan() { + line := scanner.Text() + + // 跳过空行 + if line == "" { + continue + } + + // 处理数据行 + if strings.HasPrefix(line, "data: ") { + data := line[6:] // 去掉 "data: " 前缀 + + // 解析更新数据 + var update model.StreamUpdate + if err := json.Unmarshal([]byte(data), &update); err != nil { + c.logger.Warnf("解析更新数据失败: %v", err) + continue + } + + // 处理不同类型的更新 + switch update.Type { + case "status": + c.logger.Infof("测试状态: 进度: %.2f%%, 阶段: %s", + update.Progress, update.CurrentPhase) + case "error": + c.logger.Errorf("测试错误: %s", update.Message) + case "completion": + c.logger.Infof("测试完成: %s", update.Message) + return + case "integrity": + if info, ok := update.Data.(map[string]interface{}); ok { + c.logger.Infof("数据完整性: 可用块: %.0f, 损坏块: %.0f, 丢失块: %.0f, 数据丢失: %.2f MB", + info["available_blocks"], info["corrupted_blocks"], info["missing_blocks"], info["data_loss_mb"]) + } + } + } + } + + if err := scanner.Err(); err != nil { + // 如果是客户端主动断开连接,不报错 + if !strings.Contains(err.Error(), "use of closed network connection") { + errCh <- fmt.Errorf("读取流数据失败: %v", err) + } + } + }() + + // 等待流结束或出错 + select { + case err := <-errCh: + // 如果流模式失败,回退到轮询模式 + c.logger.Warnf("流式监控失败: %v, 回退到轮询模式", err) + return c.pollTestStatus(testID) + case <-doneCh: + c.logger.Info("流式监控完成") + return nil + } +} + +// pollTestStatus 使用轮询方式监控测试状态 +func (c *Client) pollTestStatus(testID string) error { + c.logger.Infof("使用轮询监控测试状态: %s", testID) + for { // 获取测试状态 url := fmt.Sprintf("http://%s/status?test_id=%s", c.serverAddr, testID) @@ -231,6 +334,82 @@ func (c *Client) RunAllTests(concurrent bool) error { return nil } +// CheckDataIntegrity 检查断电后的数据完整性 +func (c *Client) CheckDataIntegrity(testID string) error { + c.logger.Info("检查数据完整性") + + url := fmt.Sprintf("http://%s/integrity?test_id=%s", c.serverAddr, testID) + resp, err := c.httpClient.Get(url) + if err != nil { + return fmt.Errorf("获取数据完整性信息失败: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode) + } + + var integrity model.IntegrityInfo + if err := json.NewDecoder(resp.Body).Decode(&integrity); err != nil { + return fmt.Errorf("解析响应失败: %v", err) + } + + c.logger.Infof("数据完整性检查结果:") + c.logger.Infof(" 测试ID: %s", integrity.TestID) + c.logger.Infof(" 检查时间: %s", integrity.CheckTime.Format("2006-01-02 15:04:05")) + c.logger.Infof(" 总块数: %d", integrity.TotalBlocks) + c.logger.Infof(" 可用块数: %d (%.2f%%)", integrity.AvailableBlocks, float64(integrity.AvailableBlocks)/float64(integrity.TotalBlocks)*100) + c.logger.Infof(" 损坏块数: %d (%.2f%%)", integrity.CorruptedBlocks, float64(integrity.CorruptedBlocks)/float64(integrity.TotalBlocks)*100) + c.logger.Infof(" 丢失块数: %d (%.2f%%)", integrity.MissingBlocks, float64(integrity.MissingBlocks)/float64(integrity.TotalBlocks)*100) + c.logger.Infof(" 数据丢失: %.2f MB", integrity.DataLossMB) + c.logger.Infof(" 恢复成功: %v", integrity.RecoverySuccess) + + return nil +} + +// RequestRecoveryTest 请求执行恢复测试 +func (c *Client) RequestRecoveryTest(testType string) error { + c.logger.Infof("请求恢复测试: %s", testType) + + // 准备请求数据 + req := struct { + TestType string `json:"test_type"` + TestDir string `json:"test_dir,omitempty"` + }{ + TestType: testType, + } + + // 序列化请求数据 + reqData, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("序列化请求数据失败: %v", err) + } + + // 发送请求 + url := fmt.Sprintf("http://%s/recovery", c.serverAddr) + resp, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(reqData)) + if err != nil { + return fmt.Errorf("发送请求失败: %v", err) + } + defer resp.Body.Close() + + // 检查响应状态 + if resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode) + } + + // 解析响应 + var testResp model.TestResponse + if err := json.NewDecoder(resp.Body).Decode(&testResp); err != nil { + return fmt.Errorf("解析响应失败: %v", err) + } + + c.logger.Infof("恢复测试请求已接受,RequestID: %s", testResp.RequestID) + + // 监控测试状态 + return c.MonitorTestStatus(testResp.RequestID) +} + func main() { flag.Parse() @@ -249,7 +428,7 @@ func main() { level = logrus.InfoLevel } - // 初始化日志 + // 配置日志 logger := logrus.New() logger.SetLevel(level) logger.SetFormatter(&logrus.TextFormatter{ @@ -257,20 +436,22 @@ func main() { TimestampFormat: "2006-01-02 15:04:05", }) + // 添加日志文件输出 + logFile, err := os.OpenFile("client.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + multiWriter := io.MultiWriter(os.Stdout, logFile) + logger.SetOutput(multiWriter) + } else { + logger.Warnf("无法打开日志文件: %v", err) + } + // 加载配置 - logger.Infof("加载配置文件: %s", configFile) cfg, err := config.Load(configFile) if err != nil { logger.Fatalf("加载配置失败: %v", err) } - // 初始化日志文件 - if cfg.Client.LogFile != "" { - utils.InitLogger(cfg.Client.LogFile, level) - logger = utils.Logger - } - - // 设置超时时间 + // 如果指定了timeout,覆盖配置 if timeout > 0 { cfg.Client.TimeoutSec = timeout } @@ -283,16 +464,33 @@ func main() { logger.Fatalf("服务器健康检查失败: %v", err) } - // 运行测试 - if testType == "all" { - if err := client.RunAllTests(concurrent); err != nil { - logger.Fatalf("运行所有测试失败: %v", err) + // 处理恢复测试 + if recovery { + if testType == "" { + testType = "power_loss" } - } else { - if err := client.RunTest(testType, dataSizeMB, blockSize); err != nil { - logger.Fatalf("运行测试 %s 失败: %v", testType, err) + logger.Infof("开始执行恢复测试: %s", testType) + if err := client.RequestRecoveryTest(testType); err != nil { + logger.Fatalf("恢复测试失败: %v", err) } + logger.Info("恢复测试完成") + return } - logger.Info("所有测试已完成") + // 执行指定的测试 + if testType != "" { + if testType == "all" { + if err := client.RunAllTests(concurrent); err != nil { + logger.Fatalf("执行所有测试失败: %v", err) + } + } else { + if err := client.RunTest(testType, dataSizeMB, blockSize); err != nil { + logger.Fatalf("执行测试 %s 失败: %v", testType, err) + } + } + } else { + logger.Fatal("未指定测试类型,请使用 -test 参数") + } + + logger.Info("所有测试完成") } diff --git a/cmd/server/main.go b/cmd/server/main.go index 7e414c0..fa87b51 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -32,23 +32,29 @@ func init() { // TestRunner 测试运行器 type TestRunner struct { - config *config.Config - logger *logrus.Logger - factory *testcase.TestCaseFactory - tests map[string]testcase.TestCase - testsMu sync.RWMutex - testResult map[string]*model.TestResult - resultMu sync.RWMutex + config *config.Config + logger *logrus.Logger + factory *testcase.TestCaseFactory + tests map[string]testcase.TestCase + testsMu sync.RWMutex + testResult map[string]*model.TestResult + resultMu sync.RWMutex + streams map[string]map[string]http.ResponseWriter + streamsMu sync.RWMutex + integrityInfo map[string]*model.IntegrityInfo + integrityMu sync.RWMutex } // NewTestRunner 创建测试运行器 func NewTestRunner(cfg *config.Config, logger *logrus.Logger) *TestRunner { return &TestRunner{ - config: cfg, - logger: logger, - factory: testcase.NewTestCaseFactory(cfg, logger), - tests: make(map[string]testcase.TestCase), - testResult: make(map[string]*model.TestResult), + config: cfg, + logger: logger, + factory: testcase.NewTestCaseFactory(cfg, logger), + tests: make(map[string]testcase.TestCase), + testResult: make(map[string]*model.TestResult), + streams: make(map[string]map[string]http.ResponseWriter), + integrityInfo: make(map[string]*model.IntegrityInfo), } } @@ -75,18 +81,42 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // 发送测试开始状态更新 + r.sendStatusUpdate(test) + // 设置测试环境 r.logger.Info("设置测试环境") if err := test.Setup(ctx); err != nil { r.logger.Errorf("设置测试环境失败: %v", err) + r.sendErrorUpdate(testID, fmt.Sprintf("设置测试环境失败: %v", err)) return nil, err } + // 启动状态监控协程 + statusDone := make(chan struct{}) + go func() { + defer close(statusDone) + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // 发送状态更新 + r.sendStatusUpdate(test) + } + } + }() + // 运行测试 r.logger.Info("运行测试") result, err := test.Run(ctx) if err != nil { r.logger.Errorf("测试运行失败: %v", err) + r.sendErrorUpdate(testID, fmt.Sprintf("测试运行失败: %v", err)) + // 尝试清理 cleanupErr := test.Cleanup(ctx) if cleanupErr != nil { @@ -99,9 +129,14 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) { r.logger.Info("清理测试环境") if err := test.Cleanup(ctx); err != nil { r.logger.Errorf("测试清理失败: %v", err) + r.sendErrorUpdate(testID, fmt.Sprintf("测试清理失败: %v", err)) return nil, err } + // 停止状态监控 + cancel() + <-statusDone + // 存储测试结果 r.resultMu.Lock() r.testResult[testID] = result @@ -112,10 +147,52 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) { delete(r.tests, testID) r.testsMu.Unlock() + // 发送完成通知 + r.sendCompletionUpdate(testID, result) + r.logger.Infof("测试 %s 完成", testType) return result, nil } +// sendStatusUpdate 发送状态更新 +func (r *TestRunner) sendStatusUpdate(test testcase.TestCase) { + status := test.Status() + update := model.StreamUpdate{ + Type: "status", + TestID: status.TestID, + Timestamp: time.Now(), + Progress: status.Progress, + CurrentPhase: status.CurrentPhase, + Message: status.Message, + Data: status, + } + r.SendStreamUpdate(status.TestID, update) +} + +// sendErrorUpdate 发送错误更新 +func (r *TestRunner) sendErrorUpdate(testID, message string) { + update := model.StreamUpdate{ + Type: "error", + TestID: testID, + Timestamp: time.Now(), + Message: message, + } + r.SendStreamUpdate(testID, update) +} + +// sendCompletionUpdate 发送完成更新 +func (r *TestRunner) sendCompletionUpdate(testID string, result *model.TestResult) { + update := model.StreamUpdate{ + Type: "completion", + TestID: testID, + Timestamp: time.Now(), + Progress: 100, + Message: "测试完成", + Data: result, + } + r.SendStreamUpdate(testID, update) +} + // GetTestStatus 获取测试状态 func (r *TestRunner) GetTestStatus(testID string) *model.TestStatus { r.testsMu.RLock() @@ -139,6 +216,72 @@ func (r *TestRunner) GetAllTestStatus() []*model.TestStatus { return statuses } +// RegisterStream 注册流式连接 +func (r *TestRunner) RegisterStream(testID, clientID string, w http.ResponseWriter) { + r.streamsMu.Lock() + defer r.streamsMu.Unlock() + + if _, ok := r.streams[testID]; !ok { + r.streams[testID] = make(map[string]http.ResponseWriter) + } + r.streams[testID][clientID] = w + r.logger.Infof("客户端 %s 已连接到测试 %s 的流", clientID, testID) +} + +// UnregisterStream 注销流式连接 +func (r *TestRunner) UnregisterStream(testID, clientID string) { + r.streamsMu.Lock() + defer r.streamsMu.Unlock() + + if clients, ok := r.streams[testID]; ok { + delete(clients, clientID) + r.logger.Infof("客户端 %s 已断开与测试 %s 的流连接", clientID, testID) + } +} + +// SendStreamUpdate 发送流式更新 +func (r *TestRunner) SendStreamUpdate(testID string, update interface{}) { + r.streamsMu.RLock() + defer r.streamsMu.RUnlock() + + clients, ok := r.streams[testID] + if !ok || len(clients) == 0 { + return + } + + data, err := json.Marshal(update) + if err != nil { + r.logger.Errorf("无法序列化流更新: %v", err) + return + } + + for clientID, w := range clients { + // 使用Server-Sent Events格式 + _, err := fmt.Fprintf(w, "data: %s\n\n", data) + if err != nil { + r.logger.Warnf("向客户端 %s 发送更新失败: %v", clientID, err) + } else { + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + } + } +} + +// SaveIntegrityInfo 保存完整性信息 +func (r *TestRunner) SaveIntegrityInfo(testID string, info *model.IntegrityInfo) { + r.integrityMu.Lock() + defer r.integrityMu.Unlock() + r.integrityInfo[testID] = info +} + +// GetIntegrityInfo 获取完整性信息 +func (r *TestRunner) GetIntegrityInfo(testID string) *model.IntegrityInfo { + r.integrityMu.RLock() + defer r.integrityMu.RUnlock() + return r.integrityInfo[testID] +} + // StartServer 启动HTTP服务器 func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger) *http.Server { mux := http.NewServeMux() @@ -217,6 +360,152 @@ func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger) json.NewEncoder(w).Encode(status) }) + // 新增: 实时数据进度流式API + mux.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) { + testID := r.URL.Query().Get("test_id") + if testID == "" { + http.Error(w, "Missing test_id", http.StatusBadRequest) + return + } + + // 设置响应头,支持SSE (Server-Sent Events) + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // 创建完成通道 + doneCh := make(chan struct{}) + defer close(doneCh) + + // 注册客户端连接 + clientID := r.URL.Query().Get("client_id") + runner.RegisterStream(testID, clientID, w) + defer runner.UnregisterStream(testID, clientID) + + // 保持连接直到客户端断开 + select { + case <-r.Context().Done(): + runner.logger.Infof("connection closed by client %s", clientID) + return + case <-doneCh: + runner.logger.Infof("connection closed by server for client %s", clientID) + return + } + }) + + // 新增: 数据完整性检测API + mux.HandleFunc("/integrity", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + testID := r.URL.Query().Get("test_id") + if testID == "" { + http.Error(w, "Missing test_id", http.StatusBadRequest) + return + } + + // 获取测试的数据完整性信息 + integrityInfo := runner.GetIntegrityInfo(testID) + if integrityInfo == nil { + http.Error(w, "Integrity info not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(integrityInfo) + }) + + // 新增: 恢复测试API,用于断电测试后的恢复与校验 + mux.HandleFunc("/recovery", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req struct { + TestType string `json:"test_type"` + TestDir string `json:"test_dir"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + logger.Infof("收到恢复测试请求: %+v", req) + + // 创建恢复测试实例 + test, err := runner.factory.CreateTestCase(req.TestType) + if err != nil || test == nil { + http.Error(w, fmt.Sprintf("无法创建测试实例: %v", err), http.StatusBadRequest) + return + } + + // 获取测试ID + testID := test.Status().TestID + + // 执行恢复和数据完整性检查 + go func() { + ctx := context.Background() + + // 设置测试环境 + logger.Info("设置恢复测试环境") + if err := test.Setup(ctx); err != nil { + logger.Errorf("设置恢复测试环境失败: %v", err) + runner.sendErrorUpdate(testID, fmt.Sprintf("设置恢复测试环境失败: %v", err)) + return + } + + // 数据完整性检查 + logger.Info("执行数据完整性检查") + runner.sendStatusUpdate(test) + + // 检查并获取数据完整性信息 + if powerTest, ok := test.(*testcase.PowerLossTest); ok { + integrityInfo := powerTest.CheckIntegrity() + + // 保存完整性信息 + runner.SaveIntegrityInfo(testID, integrityInfo) + + // 发送完整性信息 + update := model.StreamUpdate{ + Type: "integrity", + TestID: testID, + Timestamp: time.Now(), + Message: "数据完整性检查完成", + Data: integrityInfo, + } + runner.SendStreamUpdate(testID, update) + + logger.Infof("恢复测试完成: 丢失数据: %.2f MB", integrityInfo.DataLossMB) + } else { + logger.Error("不是断电测试实例,无法执行数据完整性检查") + runner.sendErrorUpdate(testID, "不是断电测试实例,无法执行数据完整性检查") + } + + // 清理测试环境 + logger.Info("清理恢复测试环境") + if err := test.Cleanup(ctx); err != nil { + logger.Errorf("清理恢复测试环境失败: %v", err) + } + }() + + // 返回接受响应 + resp := model.TestResponse{ + RequestID: testID, + Status: "accepted", + Message: "恢复测试已接受并开始执行", + ServerTime: time.Now(), + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(resp) + }) + // 启动服务器 addr := fmt.Sprintf("%s:%d", cfg.Server.ListenAddr, cfg.Server.Port) server := &http.Server{ diff --git a/internal/model/data.go b/internal/model/data.go index f06d3e0..039aade 100644 --- a/internal/model/data.go +++ b/internal/model/data.go @@ -114,3 +114,38 @@ type HealthStatus struct { MemoryUsage float64 `json:"memory_usage"` Message string `json:"message,omitempty"` } + +// IntegrityInfo 表示数据完整性信息 +type IntegrityInfo struct { + TestID string `json:"test_id"` + TestType string `json:"test_type"` + CheckTime time.Time `json:"check_time"` + TotalBlocks int `json:"total_blocks"` + ExpectedBlocks int `json:"expected_blocks"` + AvailableBlocks int `json:"available_blocks"` + CorruptedBlocks int `json:"corrupted_blocks"` + MissingBlocks int `json:"missing_blocks"` + DataLossMB float64 `json:"data_loss_mb"` + RecoverySuccess bool `json:"recovery_success"` + RecoveryDuration float64 `json:"recovery_duration_ms"` + BlocksMap map[int]BlockStatus `json:"blocks_map,omitempty"` +} + +// BlockStatus 表示数据块状态 +type BlockStatus struct { + Available bool `json:"available"` + Corrupted bool `json:"corrupted"` + Checksum string `json:"checksum,omitempty"` + FilePath string `json:"file_path,omitempty"` +} + +// StreamUpdate 表示流式更新的数据 +type StreamUpdate struct { + Type string `json:"type"` // "status", "progress", "integrity", "error" + TestID string `json:"test_id"` + Timestamp time.Time `json:"timestamp"` + Progress float64 `json:"progress,omitempty"` + CurrentPhase string `json:"current_phase,omitempty"` + Message string `json:"message,omitempty"` + Data interface{} `json:"data,omitempty"` +} diff --git a/internal/testcase/power_loss-test.go b/internal/testcase/power_loss-test.go index b2ddfe4..9b31076 100644 --- a/internal/testcase/power_loss-test.go +++ b/internal/testcase/power_loss-test.go @@ -2,9 +2,13 @@ package testcase import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "os" "path/filepath" + "sync" + "syscall" "time" "plp-test/internal/config" @@ -26,6 +30,9 @@ type PowerLossTest struct { blocks []*model.TestBlock recoveryTimeMs float64 powerCutInfo *model.PowerCutInfo + integrityInfo *model.IntegrityInfo + blocksMu sync.RWMutex // 保护数据块访问 + blocksMap map[int]model.BlockStatus // 数据块状态映射 } // NewPowerLossTest 创建断电测试 @@ -39,8 +46,9 @@ func NewPowerLossTest(cfg *config.Config, logger *logrus.Logger) *PowerLossTest return &PowerLossTest{ BaseTestCase: baseTest, - blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)), - totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)), + blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)), + totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)), + blocksMap: make(map[int]model.BlockStatus), } } @@ -95,6 +103,21 @@ func (t *PowerLossTest) Setup(ctx context.Context) error { t.blocks = make([]*model.TestBlock, 0, t.totalBlocks) t.powerCutInfo = &model.PowerCutInfo{} + // 初始化完整性信息 + t.integrityInfo = &model.IntegrityInfo{ + TestID: t.testID, + TestType: t.name, + CheckTime: time.Time{}, + TotalBlocks: t.totalBlocks, + ExpectedBlocks: t.totalBlocks, + AvailableBlocks: 0, + CorruptedBlocks: 0, + MissingBlocks: 0, + DataLossMB: 0, + RecoverySuccess: false, + BlocksMap: make(map[int]model.BlockStatus), + } + t.setProgress(10) return nil } @@ -105,18 +128,17 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { startTime := time.Now() var totalBytesWritten int - // 写入阶段 - 只写入一部分数据,断电前 - t.setMessage("写入数据 (断电前)") - blocksBeforePowerCut := t.totalBlocks / 2 // 写入一半的数据块 + // 第一阶段 - 持续写入数据,直到手动断电 + t.setMessage("写入数据 (请在适当时手动断电)") - for i := 0; i < blocksBeforePowerCut; i++ { + for i := 0; i < t.totalBlocks; i++ { select { case <-ctx.Done(): t.setStatus(StatusAborted) return nil, ctx.Err() default: // 生成随机数据 - data, err := utils.GenerateRandomData(t.blockSize) + data, err := utils.GenerateRandomData(utils.KBToBytes(float64(t.blockSize))) if err != nil { t.setStatus(StatusFailed) return nil, fmt.Errorf("生成随机数据失败: %v", err) @@ -124,11 +146,32 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { // 创建测试数据块 block := model.NewTestBlock(data, i) + + // 添加到数据块列表 + t.blocksMu.Lock() t.blocks = append(t.blocks, block) + // 记录数据块状态 + blockStatus := model.BlockStatus{ + Available: true, + Corrupted: false, + Checksum: block.Checksum, + FilePath: fmt.Sprintf("block_%d.dat", i), + } + t.blocksMap[i] = blockStatus + t.blocksMu.Unlock() + // 写入文件 filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) - err = os.WriteFile(filePath, data, 0644) + // direct IO + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|syscall.O_DIRECT, 0644) + if err != nil { + t.setStatus(StatusFailed) + return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err) + } + defer file.Close() + + _, err = file.Write(data) if err != nil { t.setStatus(StatusFailed) return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err) @@ -137,182 +180,40 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { t.writtenBlocks++ totalBytesWritten += len(data) + // 每写入一定数量的块后执行同步 + if i > 0 && i%10 == 0 { + t.setMessage(fmt.Sprintf("同步数据到磁盘 (已写入 %d/%d 块)", i, t.totalBlocks)) + _, err := utils.ExecuteCommand("sync") + if err != nil { + t.logger.Warnf("执行sync命令失败: %v", err) + } + } + // 更新进度 - progress := float64(i+1) / float64(t.totalBlocks) * 30 // 第一阶段占30% + progress := float64(i+1) / float64(t.totalBlocks) * 100 t.setProgress(progress) + + // 每写入一定数量的块后暂停一下,给用户断电的机会 + if i > 0 && i%100 == 0 { + t.setMessage(fmt.Sprintf("已写入 %d/%d 块数据, 共 %.2f MB", i, t.totalBlocks, float64(i*t.blockSize)/(1024*1024))) + time.Sleep(1 * time.Second) + } } } - // 记录写入数据的时间点 + // 记录写入数据的信息 t.powerCutInfo.BlocksWritten = t.writtenBlocks - // 执行同步以确保部分数据已经写入到磁盘,但部分仍在缓存中 - t.setMessage("执行sync同步部分数据到磁盘") + // 完成所有数据写入后,同步到磁盘 + t.setMessage("同步所有数据到磁盘") _, err := utils.ExecuteCommand("sync") if err != nil { t.logger.Warnf("执行sync命令失败: %v", err) } - // 再写入一些数据但不同步,保证有缓存中的数据 - t.setMessage("写入额外数据到缓存中 (这些数据可能会在断电后丢失)") - additionalBlocks := t.totalBlocks / 4 // 额外写入1/4的数据块 - - for i := blocksBeforePowerCut; i < blocksBeforePowerCut+additionalBlocks; i++ { - select { - case <-ctx.Done(): - t.setStatus(StatusAborted) - return nil, ctx.Err() - default: - // 生成随机数据 - data, err := utils.GenerateRandomData(t.blockSize) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("生成随机数据失败: %v", err) - } - - // 创建测试数据块 - block := model.NewTestBlock(data, i) - t.blocks = append(t.blocks, block) - - // 写入文件 - filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) - err = os.WriteFile(filePath, data, 0644) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err) - } - - t.writtenBlocks++ - totalBytesWritten += len(data) - - // 更新进度 - progress := 30 + float64(i-blocksBeforePowerCut+1)/float64(additionalBlocks)*10 // 额外写入占10% - t.setProgress(progress) - } - } - - // 模拟断电 - t.setMessage("模拟断电...") - t.powerCutInfo.Timestamp = time.Now() - err = t.casManager.SimulatePowerCut(t.config.Server.CacheInstanceID) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("模拟断电失败: %v", err) - } - - // 模拟等待一段时间,就像真正断电后的重启 - t.setMessage("模拟系统重启中...") - time.Sleep(2 * time.Second) - - // 恢复阶段 - t.setMessage("恢复阶段") - recoveryStart := time.Now() - - // 重新创建缓存实例 - id := t.config.Server.CacheInstanceID - nvme := t.config.Server.DevicesNVMe - hdd := t.config.Server.DevicesHDD - - // 尝试修复/加载现有缓存 - t.setMessage("尝试加载和修复缓存") - err = t.casManager.RepairCache(id, nvme) - if err != nil { - // 如果修复失败,尝试重新创建 - t.logger.Warnf("修复缓存失败,尝试重新创建: %v", err) - err = t.casManager.CreateCacheInstance(id, nvme, hdd, "wb") - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("断电后重新创建缓存实例失败: %v", err) - } - } - - // 获取缓存设备路径 - cacheDevice := fmt.Sprintf("/dev/cas%s-1", id) - - // 重新挂载缓存设备 - mountPoint := t.config.Server.MountPoint - t.setMessage(fmt.Sprintf("重新挂载缓存设备到 %s", mountPoint)) - err = t.casManager.MountDevice(cacheDevice, mountPoint) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("断电后重新挂载缓存设备失败: %v", err) - } - - // 记录恢复时间 - t.recoveryTimeMs = float64(time.Since(recoveryStart).Milliseconds()) - t.setProgress(50) - - // 验证阶段 - 检查断电前写入的数据是否完整 - t.setMessage("验证阶段 - 检查数据完整性") - t.corruptedBlocks = 0 - - for i, block := range t.blocks { - // 检查文件是否存在 - filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) - if !utils.FileExists(filePath) { - t.logger.Warnf("文件 %s 在断电后丢失", filePath) - t.corruptedBlocks++ - continue - } - - // 读取文件数据 - data, err := os.ReadFile(filePath) - if err != nil { - t.logger.Warnf("读取文件 %s 失败: %v", filePath, err) - t.corruptedBlocks++ - continue - } - - // 验证数据完整性 - if string(data) != string(block.Data) { - t.logger.Warnf("文件 %s 数据损坏", filePath) - t.corruptedBlocks++ - continue - } - - t.verifiedBlocks++ - - // 更新进度 - progress := 50 + float64(i+1)/float64(len(t.blocks))*40 // 验证占40% - t.setProgress(progress) - } - - // 写入断电后的额外数据 - t.setMessage("断电后写入额外数据") - for i := t.writtenBlocks; i < t.totalBlocks; i++ { - select { - case <-ctx.Done(): - t.setStatus(StatusAborted) - return nil, ctx.Err() - default: - // 生成随机数据 - data, err := utils.GenerateRandomData(t.blockSize) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("生成随机数据失败: %v", err) - } - - // 写入文件 - filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) - err = os.WriteFile(filePath, data, 0644) - if err != nil { - t.setStatus(StatusFailed) - return nil, fmt.Errorf("断电后写入文件 %s 失败: %v", filePath, err) - } - - // 更新进度 - progress := 90 + float64(i-t.writtenBlocks+1)/float64(t.totalBlocks-t.writtenBlocks)*10 // 最后10% - t.setProgress(progress) - } - } - - // 记录断电信息 - t.powerCutInfo.RecoverySuccess = t.corruptedBlocks == 0 - t.powerCutInfo.DataLossMB = utils.BytesToMB(t.corruptedBlocks * t.blockSize) - t.setProgress(100) t.setStatus(StatusCompleted) - t.setMessage("断电测试完成") + t.setMessage("数据写入完成") // 构造测试结果 result := t.getTestResult() @@ -328,6 +229,86 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) { return result, nil } +// CheckIntegrity 检查数据完整性 +func (t *PowerLossTest) CheckIntegrity() *model.IntegrityInfo { + t.setMessage("开始检查数据完整性") + t.integrityInfo.CheckTime = time.Now() + + // 重置计数器 + t.integrityInfo.AvailableBlocks = 0 + t.integrityInfo.CorruptedBlocks = 0 + t.integrityInfo.MissingBlocks = 0 + + // 为所有块创建状态记录 + for i := 0; i < t.totalBlocks; i++ { + filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i)) + + if !utils.FileExists(filePath) { + // 文件不存在 + t.integrityInfo.MissingBlocks++ + t.integrityInfo.BlocksMap[i] = model.BlockStatus{ + Available: false, + Corrupted: false, + FilePath: fmt.Sprintf("block_%d.dat", i), + } + continue + } + + // 读取文件数据 + data, err := os.ReadFile(filePath) + if err != nil { + // 无法读取文件 + t.integrityInfo.CorruptedBlocks++ + t.integrityInfo.BlocksMap[i] = model.BlockStatus{ + Available: true, + Corrupted: true, + FilePath: fmt.Sprintf("block_%d.dat", i), + } + continue + } + + // 计算和验证校验和 + hash := sha256.Sum256(data) + checksum := hex.EncodeToString(hash[:]) + + var blockChecksum string + t.blocksMu.RLock() + if i < len(t.blocks) && t.blocks[i] != nil { + blockChecksum = t.blocks[i].Checksum + } + t.blocksMu.RUnlock() + + if blockChecksum != "" && checksum != blockChecksum { + // 数据损坏 + t.integrityInfo.CorruptedBlocks++ + t.integrityInfo.BlocksMap[i] = model.BlockStatus{ + Available: true, + Corrupted: true, + Checksum: checksum, + FilePath: fmt.Sprintf("block_%d.dat", i), + } + } else { + // 数据完好 + t.integrityInfo.AvailableBlocks++ + t.integrityInfo.BlocksMap[i] = model.BlockStatus{ + Available: true, + Corrupted: false, + Checksum: checksum, + FilePath: fmt.Sprintf("block_%d.dat", i), + } + } + } + + // 计算数据丢失量 + t.integrityInfo.DataLossMB = utils.BytesToMB((t.integrityInfo.MissingBlocks + t.integrityInfo.CorruptedBlocks) * t.blockSize) + t.integrityInfo.RecoverySuccess = t.integrityInfo.CorruptedBlocks == 0 && t.integrityInfo.MissingBlocks == 0 + + t.setMessage(fmt.Sprintf("数据完整性检查完成: %d 个块正常, %d 个块丢失, %d 个块损坏", + t.integrityInfo.AvailableBlocks, t.integrityInfo.MissingBlocks, t.integrityInfo.CorruptedBlocks)) + + return t.integrityInfo +} + // Cleanup 清理测试环境 func (t *PowerLossTest) Cleanup(ctx context.Context) error { if err := t.BaseTestCase.Cleanup(ctx); err != nil { diff --git a/internal/testcase/random_write-test.go b/internal/testcase/random_write-test.go index 1db8630..8850f1f 100644 --- a/internal/testcase/random_write-test.go +++ b/internal/testcase/random_write-test.go @@ -40,8 +40,8 @@ func NewRandomWriteTest(cfg *config.Config, logger *logrus.Logger) *RandomWriteT return &RandomWriteTest{ BaseTestCase: baseTest, - blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)), - totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)), + blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)), + totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)), blocks: make(map[int]*model.TestBlock), writeSequence: make([]int, 0), writeLatencies: make([]float64, 0), @@ -128,7 +128,7 @@ func (t *RandomWriteTest) Run(ctx context.Context) (*model.TestResult, error) { return nil, ctx.Err() default: // 生成随机数据 - data, err := utils.GenerateRandomData(t.blockSize) + data, err := utils.GenerateRandomData(t.blockSize * 1024) if err != nil { t.setStatus(StatusFailed) return nil, fmt.Errorf("生成随机数据失败: %v", err) diff --git a/internal/testcase/sequential_write-test.go b/internal/testcase/sequential_write-test.go index bc8785d..38b3b4a 100644 --- a/internal/testcase/sequential_write-test.go +++ b/internal/testcase/sequential_write-test.go @@ -38,8 +38,8 @@ func NewSequentialWriteTest(cfg *config.Config, logger *logrus.Logger) *Sequenti return &SequentialWriteTest{ BaseTestCase: baseTest, - blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)), - totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)), + blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)), + totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)), } } @@ -114,7 +114,7 @@ func (t *SequentialWriteTest) Run(ctx context.Context) (*model.TestResult, error return nil, ctx.Err() default: // 生成随机数据 - data, err := utils.GenerateRandomData(t.blockSize) + data, err := utils.GenerateRandomData(t.blockSize * 1024) if err != nil { t.setStatus(StatusFailed) return nil, fmt.Errorf("生成随机数据失败: %v", err) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index ca11f1d..15dbb62 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -57,6 +57,11 @@ func MBToBytes(mb float64) int { return int(mb * 1024 * 1024) } +// KBToBytes 将KB转换为字节 +func KBToBytes(kb float64) int { + return int(kb * 1024) +} + // FormatDuration 格式化持续时间 func FormatDuration(d time.Duration) string { if d < time.Minute {