diff --git a/bin/csctl/cmd.go b/bin/csctl/cmd.go index cbb4043..326699f 100644 --- a/bin/csctl/cmd.go +++ b/bin/csctl/cmd.go @@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{ func init() { rootCmd.PersistentFlags().StringVarP(&confFile, "conf", "c", "", "base.json file path.") - rootCmd.AddCommand(subcmd.BackupCmd) + rootCmd.AddCommand(subcmd.BackupCmd, subcmd.RestoreCmd) } func main() { diff --git a/bin/csctl/cmd/backup.go b/bin/csctl/cmd/backup.go index a08b230..f849dc1 100644 --- a/bin/csctl/cmd/backup.go +++ b/bin/csctl/cmd/backup.go @@ -2,22 +2,26 @@ package cmd import ( "archive/zip" + "encoding/binary" "fmt" "io" "os" "path" + "runtime" "strings" "time" "github.com/coreos/etcd/clientv3" - "github.com/spf13/cobra" "github.com/shunfei/cronsun" "github.com/shunfei/cronsun/conf" ) -var backupOutput string +var ( + backupDir string + backupFile string +) var BackupCmd = &cobra.Command{ Use: "backup", @@ -26,15 +30,21 @@ var BackupCmd = &cobra.Command{ var err error var ea = NewExitAction() - backupOutput = strings.TrimSpace(backupOutput) - if len(backupOutput) > 0 { - err = os.MkdirAll(backupOutput, os.ModeDir) + backupDir = strings.TrimSpace(backupDir) + if len(backupDir) > 0 { + err = os.MkdirAll(backupDir, os.ModeDir) if err != nil { - ea.Exit("failed to make directory %s, err: %s", backupOutput, err) + ea.Exit("failed to make directory %s, err: %s", backupDir, err) } } - name := path.Join(backupOutput, time.Now().Format("20060102_150405")+".zip") + backupFile = strings.TrimSpace(backupFile) + if len(backupFile) == 0 { + backupFile = time.Now().Format("20060102_150405") + } + backupFile += ".zip" + + name := path.Join(backupDir, backupFile) f, err := os.OpenFile(name, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0600) ea.ExitOnErr(err) ea.Defer = func() { @@ -54,7 +64,7 @@ var BackupCmd = &cobra.Command{ for i := range waitForStore { zf, err := zw.Create(waitForStore[i][0]) ea.ExitOnErr(err) - ea.ExitOnErr(storeKvs(zf, waitForStore[i][1])) + storeKvs(zf, waitForStore[i][1]) } ea.ExitOnErr(zw.Close()) @@ -62,7 +72,8 @@ var BackupCmd = &cobra.Command{ } func init() { - BackupCmd.Flags().StringVarP(&backupOutput, "output-dir", "o", "", "the directory for store backup file") + BackupCmd.Flags().StringVarP(&backupDir, "dir", "d", "", "the directory to store backup file") + BackupCmd.Flags().StringVarP(&backupFile, "file", "f", "", "the backup file name") } type ExitAction struct { @@ -75,7 +86,8 @@ func NewExitAction() *ExitAction { func (ea *ExitAction) ExitOnErr(err error) { if err != nil { - ea.Exit(err.Error()) + _, f, l, _ := runtime.Caller(1) + ea.Exit("%s line %d: %s", f, l, err.Error()) } } @@ -89,8 +101,7 @@ func (ea *ExitAction) Exit(format string, v ...interface{}) { } var ( - byteLD = []byte{'['} - byteRD = []byte{']'} + sizeBuf = make([]byte, 2+4) // key length(uint16) + value length(uint32) ) func storeKvs(w io.Writer, keyPrefix string) error { @@ -99,19 +110,29 @@ func storeKvs(w io.Writer, keyPrefix string) error { return fmt.Errorf("failed to fetch %s from etcd: %s", keyPrefix, err) } - w.Write(byteLD) + var prefixLen = len(keyPrefix) + for i := range gresp.Kvs { - if len(gresp.Kvs)-1 != i { - _, err = w.Write(append(gresp.Kvs[i].Value, ',')) - } else { - _, err = w.Write(gresp.Kvs[i].Value) + key := gresp.Kvs[i].Key[prefixLen:] + binary.LittleEndian.PutUint16(sizeBuf[:2], uint16(len(key))) + binary.LittleEndian.PutUint32(sizeBuf[2:], uint32(len(gresp.Kvs[i].Value))) + + // length of key + if _, err = w.Write(sizeBuf[:2]); err != nil { + return err + } + if _, err = w.Write(key); err != nil { + return err } - if err != nil { + // lenght of value + if _, err = w.Write(sizeBuf[2:]); err != nil { + return err + } + if _, err = w.Write(gresp.Kvs[i].Value); err != nil { return err } } - w.Write(byteRD) return nil } diff --git a/bin/csctl/cmd/restore.go b/bin/csctl/cmd/restore.go new file mode 100644 index 0000000..4983f2d --- /dev/null +++ b/bin/csctl/cmd/restore.go @@ -0,0 +1,142 @@ +package cmd + +import ( + "archive/zip" + "encoding/binary" + "fmt" + "io" + "strings" + "sync" + + "github.com/spf13/cobra" + + "github.com/shunfei/cronsun" + "github.com/shunfei/cronsun/conf" +) + +var restoreFile string + +func init() { + RestoreCmd.Flags().StringVarP(&restoreFile, "file", "f", "", "the backup zip file") +} + +var RestoreCmd = &cobra.Command{ + Use: "restore", + Short: "restore job & group data", + Run: func(cmd *cobra.Command, args []string) { + var err error + var ea = NewExitAction() + + restoreFile = strings.TrimSpace(restoreFile) + if len(restoreFile) == 0 { + ea.Exit("backup file is required") + } + + r, err := zip.OpenReader(restoreFile) + ea.ExitOnErr(err) + ea.Defer = func() { + r.Close() + } + + restoreChan, wg := startRestoreProcess() + for _, f := range r.File { + var keyPrefix string + switch f.Name { + case "job": + keyPrefix = conf.Config.Cmd + case "node_group": + keyPrefix = conf.Config.Group + } + + rc, err := f.Open() + ea.ExitOnErr(err) + + ea.ExitOnErr(restoreKvs(rc, keyPrefix, restoreChan, wg)) + rc.Close() + } + + wg.Wait() + close(restoreChan) + }, +} + +type kv struct { + k, v string +} + +var ( + keyLenBuf = make([]byte, 2) + valLenBuf = make([]byte, 4) + keyBuf = make([]byte, 256) + valBuf = make([]byte, 1024) +) + +func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.WaitGroup) error { + for { + // read length of key + n, err := r.Read(keyLenBuf) + if err == io.EOF && n != 0 { + return fmt.Errorf("Unexcepted data, the file may borken") + } else if err == io.EOF && n == 0 { + break + } else if err != nil { + return err + } + keylen := binary.LittleEndian.Uint16(keyLenBuf) + + // read key + if n, err = r.Read(keyBuf[:keylen]); err != nil { + return err + } + key := keyBuf[:keylen] + + // read length of value + if n, err = r.Read(valLenBuf); err != nil { + return err + } + vallen := binary.LittleEndian.Uint32(valLenBuf) + + // read value + if len(valBuf) < int(vallen) { + valBuf = make([]byte, vallen*2) + } + if n, err = r.Read(valBuf[:vallen]); err != nil && err != io.EOF { + return err + } + value := valBuf[:vallen] + + wg.Add(1) + storeChan <- &kv{ + k: keyPrefix + string(key), + v: string(value), + } + } + + return nil +} + +func startRestoreProcess() (chan *kv, *sync.WaitGroup) { + c := make(chan *kv, 0) + wg := &sync.WaitGroup{} + + const maxResries = 3 + go func() { + for _kv := range c { + for retries := 1; retries <= maxResries; retries++ { + _, err := cronsun.DefalutClient.Put(_kv.k, _kv.v) + if err != nil { + if retries == maxResries { + fmt.Println("[Error] restore err:", err) + fmt.Println("\tKey: ", string(_kv.k)) + fmt.Println("\tValue: ", string(_kv.v)) + } + continue + } + } + + wg.Done() + } + }() + + return c, wg +}