cronsun/bin/csctl/cmd/restore.go

143 lines
2.7 KiB
Go

package cmd
import (
"archive/zip"
"encoding/binary"
"fmt"
"io"
"strings"
"sync"
"github.com/spf13/cobra"
"github.com/shunfei/cronsun"
"github.com/shunfei/cronsun/conf"
)
var restoreFile string
func init() {
RestoreCmd.Flags().StringVarP(&restoreFile, "file", "f", "", "the backup zip file")
}
var RestoreCmd = &cobra.Command{
Use: "restore",
Short: "restore job & group data",
Run: func(cmd *cobra.Command, args []string) {
var err error
var ea = NewExitAction()
restoreFile = strings.TrimSpace(restoreFile)
if len(restoreFile) == 0 {
ea.Exit("backup file is required")
}
r, err := zip.OpenReader(restoreFile)
ea.ExitOnErr(err)
ea.Defer = func() {
r.Close()
}
restoreChan, wg := startRestoreProcess()
for _, f := range r.File {
var keyPrefix string
switch f.Name {
case "job":
keyPrefix = conf.Config.Cmd
case "node_group":
keyPrefix = conf.Config.Group
}
rc, err := f.Open()
ea.ExitOnErr(err)
ea.ExitOnErr(restoreKvs(rc, keyPrefix, restoreChan, wg))
rc.Close()
}
wg.Wait()
close(restoreChan)
},
}
type kv struct {
k, v string
}
var (
keyLenBuf = make([]byte, 2)
valLenBuf = make([]byte, 4)
keyBuf = make([]byte, 256)
valBuf = make([]byte, 1024)
)
func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.WaitGroup) error {
for {
// read length of key
n, err := r.Read(keyLenBuf)
if err == io.EOF && n != 0 {
return fmt.Errorf("unexcepted data, the file may broken")
} else if err == io.EOF && n == 0 {
break
} else if err != nil {
return err
}
keylen := binary.LittleEndian.Uint16(keyLenBuf)
// read key
if n, err = r.Read(keyBuf[:keylen]); err != nil {
return err
}
key := keyBuf[:keylen]
// read length of value
if n, err = r.Read(valLenBuf); err != nil {
return err
}
vallen := binary.LittleEndian.Uint32(valLenBuf)
// read value
if len(valBuf) < int(vallen) {
valBuf = make([]byte, vallen*2)
}
if n, err = r.Read(valBuf[:vallen]); err != nil && err != io.EOF {
return err
}
value := valBuf[:vallen]
wg.Add(1)
storeChan <- &kv{
k: keyPrefix + string(key),
v: string(value),
}
}
return nil
}
func startRestoreProcess() (chan *kv, *sync.WaitGroup) {
c := make(chan *kv, 0)
wg := &sync.WaitGroup{}
const maxResries = 3
go func() {
for _kv := range c {
for retries := 1; retries <= maxResries; retries++ {
_, err := cronsun.DefalutClient.Put(_kv.k, _kv.v)
if err != nil {
if retries == maxResries {
fmt.Println("[Error] restore err:", err)
fmt.Println("\tKey: ", string(_kv.k))
fmt.Println("\tValue: ", string(_kv.v))
}
continue
}
}
wg.Done()
}
}()
return c, wg
}