425 lines
11 KiB
Go
425 lines
11 KiB
Go
|
|
package services
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"io"
|
||
|
|
"math"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"go.etcd.io/bbolt"
|
||
|
|
)
|
||
|
|
|
||
|
|
var storageSpeedTestsBucket = []byte("storage_speed_tests")
|
||
|
|
|
||
|
|
const (
|
||
|
|
StorageSpeedModeSmall = "small"
|
||
|
|
StorageSpeedModeBig = "big"
|
||
|
|
StorageSpeedModeMixed = "mixed"
|
||
|
|
StorageSpeedModeCustom = "custom"
|
||
|
|
|
||
|
|
StorageSpeedStatusRunning = "running"
|
||
|
|
StorageSpeedStatusDone = "done"
|
||
|
|
StorageSpeedStatusFailed = "failed"
|
||
|
|
)
|
||
|
|
|
||
|
|
type StorageSpeedTest struct {
|
||
|
|
ID string `json:"id"`
|
||
|
|
BackendID string `json:"backendId"`
|
||
|
|
BackendName string `json:"backendName"`
|
||
|
|
Mode string `json:"mode"`
|
||
|
|
Status string `json:"status"`
|
||
|
|
Stage string `json:"stage"`
|
||
|
|
ProgressPercent int `json:"progressPercent"`
|
||
|
|
CustomFileCount int `json:"customFileCount,omitempty"`
|
||
|
|
CustomFileSizeMB float64 `json:"customFileSizeMb,omitempty"`
|
||
|
|
StartedAt time.Time `json:"startedAt"`
|
||
|
|
FinishedAt time.Time `json:"finishedAt,omitempty"`
|
||
|
|
BytesWritten int64 `json:"bytesWritten"`
|
||
|
|
BytesRead int64 `json:"bytesRead"`
|
||
|
|
FilesWritten int `json:"filesWritten"`
|
||
|
|
WriteDurationMS int64 `json:"writeDurationMs"`
|
||
|
|
ReadDurationMS int64 `json:"readDurationMs"`
|
||
|
|
DeleteDurationMS int64 `json:"deleteDurationMs"`
|
||
|
|
Error string `json:"error,omitempty"`
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) ModeLabel() string {
|
||
|
|
switch t.Mode {
|
||
|
|
case StorageSpeedModeSmall:
|
||
|
|
return "Many small files"
|
||
|
|
case StorageSpeedModeBig:
|
||
|
|
return "One big file"
|
||
|
|
case StorageSpeedModeMixed:
|
||
|
|
return "Average mix"
|
||
|
|
case StorageSpeedModeCustom:
|
||
|
|
return "Custom"
|
||
|
|
default:
|
||
|
|
return t.Mode
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) StartedLabel() string {
|
||
|
|
if t.StartedAt.IsZero() {
|
||
|
|
return ""
|
||
|
|
}
|
||
|
|
return t.StartedAt.Format("Jan 2, 15:04:05")
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) FinishedLabel() string {
|
||
|
|
if t.FinishedAt.IsZero() {
|
||
|
|
return "Still running"
|
||
|
|
}
|
||
|
|
return t.FinishedAt.Format("Jan 2, 15:04:05")
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) TotalSizeLabel() string {
|
||
|
|
return FormatMegabytesFromBytes(max(t.BytesWritten, t.BytesRead))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) WriteSpeedLabel() string {
|
||
|
|
return speedLabel(t.BytesWritten, t.WriteDurationMS)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t StorageSpeedTest) ReadSpeedLabel() string {
|
||
|
|
return speedLabel(t.BytesRead, t.ReadDurationMS)
|
||
|
|
}
|
||
|
|
|
||
|
|
func speedLabel(bytes int64, durationMS int64) string {
|
||
|
|
if bytes <= 0 || durationMS <= 0 {
|
||
|
|
return "n/a"
|
||
|
|
}
|
||
|
|
mb := float64(bytes) / 1024 / 1024
|
||
|
|
seconds := float64(durationMS) / 1000
|
||
|
|
value := math.Round((mb/seconds)*100) / 100
|
||
|
|
return fmt.Sprintf("%.2f MB/s", value)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) StartSpeedTest(backendID, mode string) (StorageSpeedTest, error) {
|
||
|
|
return s.StartSpeedTestWithOptions(backendID, StorageSpeedTestOptions{Mode: mode})
|
||
|
|
}
|
||
|
|
|
||
|
|
type StorageSpeedTestOptions struct {
|
||
|
|
Mode string
|
||
|
|
CustomFileCount int
|
||
|
|
CustomFileSizeMB float64
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) StartSpeedTestWithOptions(backendID string, options StorageSpeedTestOptions) (StorageSpeedTest, error) {
|
||
|
|
cfg, err := s.BackendConfig(backendID)
|
||
|
|
if err != nil {
|
||
|
|
return StorageSpeedTest{}, err
|
||
|
|
}
|
||
|
|
if !cfg.Enabled {
|
||
|
|
return StorageSpeedTest{}, fmt.Errorf("storage backend is disabled")
|
||
|
|
}
|
||
|
|
if !cfg.LastTestSuccess {
|
||
|
|
return StorageSpeedTest{}, fmt.Errorf("run a successful connection test before testing speed")
|
||
|
|
}
|
||
|
|
mode := normalizeSpeedTestMode(options.Mode)
|
||
|
|
if mode == StorageSpeedModeCustom {
|
||
|
|
if err := validateCustomSpeedTest(options.CustomFileCount, options.CustomFileSizeMB); err != nil {
|
||
|
|
return StorageSpeedTest{}, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
test := StorageSpeedTest{
|
||
|
|
ID: randomID(10),
|
||
|
|
BackendID: cfg.ID,
|
||
|
|
BackendName: cfg.Name,
|
||
|
|
Mode: mode,
|
||
|
|
Status: StorageSpeedStatusRunning,
|
||
|
|
Stage: "queued",
|
||
|
|
CustomFileCount: options.CustomFileCount,
|
||
|
|
CustomFileSizeMB: options.CustomFileSizeMB,
|
||
|
|
StartedAt: time.Now().UTC(),
|
||
|
|
}
|
||
|
|
if err := s.saveSpeedTest(test); err != nil {
|
||
|
|
return StorageSpeedTest{}, err
|
||
|
|
}
|
||
|
|
return test, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) RunSpeedTest(ctx context.Context, testID string) {
|
||
|
|
test, err := s.speedTest(testID)
|
||
|
|
if err != nil {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if err := s.runSpeedTest(ctx, &test); err != nil {
|
||
|
|
test.Status = StorageSpeedStatusFailed
|
||
|
|
test.Error = err.Error()
|
||
|
|
test.FinishedAt = time.Now().UTC()
|
||
|
|
if test.Stage == "" || test.Stage == "queued" {
|
||
|
|
test.Stage = "failed"
|
||
|
|
}
|
||
|
|
_ = s.saveSpeedTest(test)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
test.Status = StorageSpeedStatusDone
|
||
|
|
test.Stage = "complete"
|
||
|
|
test.ProgressPercent = 100
|
||
|
|
test.FinishedAt = time.Now().UTC()
|
||
|
|
_ = s.saveSpeedTest(test)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) ListSpeedTests(backendID string, limit int) ([]StorageSpeedTest, error) {
|
||
|
|
var tests []StorageSpeedTest
|
||
|
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||
|
|
bucket := tx.Bucket(storageSpeedTestsBucket)
|
||
|
|
if bucket == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return bucket.ForEach(func(_, value []byte) error {
|
||
|
|
var test StorageSpeedTest
|
||
|
|
if err := json.Unmarshal(value, &test); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if backendID == "" || test.BackendID == backendID {
|
||
|
|
tests = append(tests, test)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
sort.Slice(tests, func(i, j int) bool {
|
||
|
|
return tests[i].StartedAt.After(tests[j].StartedAt)
|
||
|
|
})
|
||
|
|
if limit > 0 && len(tests) > limit {
|
||
|
|
tests = tests[:limit]
|
||
|
|
}
|
||
|
|
return tests, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) speedTest(id string) (StorageSpeedTest, error) {
|
||
|
|
var test StorageSpeedTest
|
||
|
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||
|
|
data := tx.Bucket(storageSpeedTestsBucket).Get([]byte(id))
|
||
|
|
if data == nil {
|
||
|
|
return fmt.Errorf("speed test not found")
|
||
|
|
}
|
||
|
|
return json.Unmarshal(data, &test)
|
||
|
|
})
|
||
|
|
return test, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) saveSpeedTest(test StorageSpeedTest) error {
|
||
|
|
data, err := json.Marshal(test)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return s.db.Update(func(tx *bbolt.Tx) error {
|
||
|
|
return tx.Bucket(storageSpeedTestsBucket).Put([]byte(test.ID), data)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *StorageService) runSpeedTest(ctx context.Context, test *StorageSpeedTest) error {
|
||
|
|
backend, err := s.Backend(test.BackendID)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
files, err := createSpeedTestFiles(test)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer os.RemoveAll(files.Root)
|
||
|
|
keys := make([]string, 0, len(files.Files))
|
||
|
|
defer func() {
|
||
|
|
for _, key := range keys {
|
||
|
|
_ = backend.Delete(context.Background(), key)
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
writeStart := time.Now()
|
||
|
|
for i, file := range files.Files {
|
||
|
|
key := fmt.Sprintf(".warpbox-speed-test/%s/%03d.bin", test.ID, i)
|
||
|
|
source, err := os.Open(file.Path)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
err = backend.Put(ctx, key, source, file.Size, "application/octet-stream")
|
||
|
|
source.Close()
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
keys = append(keys, key)
|
||
|
|
test.BytesWritten += file.Size
|
||
|
|
test.FilesWritten++
|
||
|
|
updateSpeedProgress(test, "writing", i+1, len(files.Files), 0, 45)
|
||
|
|
_ = s.saveSpeedTest(*test)
|
||
|
|
}
|
||
|
|
test.WriteDurationMS = time.Since(writeStart).Milliseconds()
|
||
|
|
_ = s.saveSpeedTest(*test)
|
||
|
|
|
||
|
|
readStart := time.Now()
|
||
|
|
for i, key := range keys {
|
||
|
|
object, err := backend.Get(ctx, key)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
read, err := io.Copy(io.Discard, object.Body)
|
||
|
|
object.Body.Close()
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
test.BytesRead += read
|
||
|
|
updateSpeedProgress(test, "reading", i+1, len(keys), 45, 90)
|
||
|
|
_ = s.saveSpeedTest(*test)
|
||
|
|
}
|
||
|
|
test.ReadDurationMS = time.Since(readStart).Milliseconds()
|
||
|
|
_ = s.saveSpeedTest(*test)
|
||
|
|
|
||
|
|
deleteStart := time.Now()
|
||
|
|
for i, key := range keys {
|
||
|
|
if err := backend.Delete(ctx, key); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
updateSpeedProgress(test, "cleaning up", i+1, len(keys), 90, 100)
|
||
|
|
_ = s.saveSpeedTest(*test)
|
||
|
|
}
|
||
|
|
test.DeleteDurationMS = time.Since(deleteStart).Milliseconds()
|
||
|
|
keys = nil
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func updateSpeedProgress(test *StorageSpeedTest, stage string, done, total, start, end int) {
|
||
|
|
test.Stage = stage
|
||
|
|
if total <= 0 {
|
||
|
|
test.ProgressPercent = start
|
||
|
|
return
|
||
|
|
}
|
||
|
|
span := end - start
|
||
|
|
progress := start + int(math.Round(float64(span)*float64(done)/float64(total)))
|
||
|
|
if progress < 0 {
|
||
|
|
progress = 0
|
||
|
|
}
|
||
|
|
if progress > 100 {
|
||
|
|
progress = 100
|
||
|
|
}
|
||
|
|
test.ProgressPercent = progress
|
||
|
|
}
|
||
|
|
|
||
|
|
type speedTestFile struct {
|
||
|
|
Path string
|
||
|
|
Size int64
|
||
|
|
}
|
||
|
|
|
||
|
|
type speedTestFiles struct {
|
||
|
|
Root string
|
||
|
|
Files []speedTestFile
|
||
|
|
}
|
||
|
|
|
||
|
|
func createSpeedTestFiles(test *StorageSpeedTest) (speedTestFiles, error) {
|
||
|
|
plan, err := speedTestPlan(test)
|
||
|
|
if err != nil {
|
||
|
|
return speedTestFiles{}, err
|
||
|
|
}
|
||
|
|
root, err := os.MkdirTemp("", "warpbox-speed-test-*")
|
||
|
|
if err != nil {
|
||
|
|
return speedTestFiles{}, err
|
||
|
|
}
|
||
|
|
files := speedTestFiles{Root: root, Files: make([]speedTestFile, 0, len(plan))}
|
||
|
|
for i, size := range plan {
|
||
|
|
path := filepath.Join(root, fmt.Sprintf("%03d.bin", i))
|
||
|
|
if err := writeMockFile(path, size, byte(65+(i%23))); err != nil {
|
||
|
|
os.RemoveAll(root)
|
||
|
|
return speedTestFiles{}, err
|
||
|
|
}
|
||
|
|
files.Files = append(files.Files, speedTestFile{Path: path, Size: size})
|
||
|
|
}
|
||
|
|
return files, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func speedTestPlan(test *StorageSpeedTest) ([]int64, error) {
|
||
|
|
mode := normalizeSpeedTestMode(test.Mode)
|
||
|
|
if mode == StorageSpeedModeCustom {
|
||
|
|
if err := validateCustomSpeedTest(test.CustomFileCount, test.CustomFileSizeMB); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
size := MegabytesToBytes(test.CustomFileSizeMB)
|
||
|
|
plan := make([]int64, test.CustomFileCount)
|
||
|
|
for i := range plan {
|
||
|
|
plan[i] = size
|
||
|
|
}
|
||
|
|
return plan, nil
|
||
|
|
}
|
||
|
|
return speedTestPlanForMode(mode), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func speedTestPlanForMode(mode string) []int64 {
|
||
|
|
mode = normalizeSpeedTestMode(mode)
|
||
|
|
switch mode {
|
||
|
|
case StorageSpeedModeSmall:
|
||
|
|
return repeatedSizes(24, 32*1024)
|
||
|
|
case StorageSpeedModeBig:
|
||
|
|
return repeatedSizes(1, 8*1024*1024)
|
||
|
|
default:
|
||
|
|
sizes := repeatedSizes(8, 64*1024)
|
||
|
|
return append(sizes, repeatedSizes(1, 4*1024*1024)...)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func repeatedSizes(count int, size int64) []int64 {
|
||
|
|
sizes := make([]int64, 0, count)
|
||
|
|
for i := 0; i < count; i++ {
|
||
|
|
sizes = append(sizes, size)
|
||
|
|
}
|
||
|
|
return sizes
|
||
|
|
}
|
||
|
|
|
||
|
|
func writeMockFile(path string, size int64, seed byte) error {
|
||
|
|
target, err := os.Create(path)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer target.Close()
|
||
|
|
chunk := make([]byte, 64*1024)
|
||
|
|
for i := range chunk {
|
||
|
|
chunk[i] = seed
|
||
|
|
}
|
||
|
|
remaining := size
|
||
|
|
for remaining > 0 {
|
||
|
|
writeSize := int64(len(chunk))
|
||
|
|
if remaining < writeSize {
|
||
|
|
writeSize = remaining
|
||
|
|
}
|
||
|
|
if _, err := target.Write(chunk[:int(writeSize)]); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
remaining -= writeSize
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func validateCustomSpeedTest(count int, sizeMB float64) error {
|
||
|
|
if count <= 0 || count > 500 {
|
||
|
|
return fmt.Errorf("custom speed test file count must be between 1 and 500")
|
||
|
|
}
|
||
|
|
if sizeMB <= 0 {
|
||
|
|
return fmt.Errorf("custom speed test file size must be positive")
|
||
|
|
}
|
||
|
|
totalMB := float64(count) * sizeMB
|
||
|
|
if totalMB > 4096 {
|
||
|
|
return fmt.Errorf("custom speed test total size cannot exceed 4096 MB")
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func normalizeSpeedTestMode(mode string) string {
|
||
|
|
switch strings.TrimSpace(mode) {
|
||
|
|
case StorageSpeedModeSmall:
|
||
|
|
return StorageSpeedModeSmall
|
||
|
|
case StorageSpeedModeBig:
|
||
|
|
return StorageSpeedModeBig
|
||
|
|
case StorageSpeedModeCustom:
|
||
|
|
return StorageSpeedModeCustom
|
||
|
|
default:
|
||
|
|
return StorageSpeedModeMixed
|
||
|
|
}
|
||
|
|
}
|