csctl: impl restore command, redesign backup format

pull/69/head
Doflatango 2018-02-28 18:17:34 +08:00
parent 2ff176e908
commit c2852c3bfb
3 changed files with 183 additions and 20 deletions

View File

@ -26,7 +26,7 @@ var rootCmd = &cobra.Command{
func init() {
rootCmd.PersistentFlags().StringVarP(&confFile, "conf", "c", "", "base.json file path.")
rootCmd.AddCommand(subcmd.BackupCmd)
rootCmd.AddCommand(subcmd.BackupCmd, subcmd.RestoreCmd)
}
func main() {

View File

@ -2,22 +2,26 @@ package cmd
import (
"archive/zip"
"encoding/binary"
"fmt"
"io"
"os"
"path"
"runtime"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
"github.com/shunfei/cronsun"
"github.com/shunfei/cronsun/conf"
)
var backupOutput string
var (
backupDir string
backupFile string
)
var BackupCmd = &cobra.Command{
Use: "backup",
@ -26,15 +30,21 @@ var BackupCmd = &cobra.Command{
var err error
var ea = NewExitAction()
backupOutput = strings.TrimSpace(backupOutput)
if len(backupOutput) > 0 {
err = os.MkdirAll(backupOutput, os.ModeDir)
backupDir = strings.TrimSpace(backupDir)
if len(backupDir) > 0 {
err = os.MkdirAll(backupDir, os.ModeDir)
if err != nil {
ea.Exit("failed to make directory %s, err: %s", backupOutput, err)
ea.Exit("failed to make directory %s, err: %s", backupDir, err)
}
}
name := path.Join(backupOutput, time.Now().Format("20060102_150405")+".zip")
backupFile = strings.TrimSpace(backupFile)
if len(backupFile) == 0 {
backupFile = time.Now().Format("20060102_150405")
}
backupFile += ".zip"
name := path.Join(backupDir, backupFile)
f, err := os.OpenFile(name, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0600)
ea.ExitOnErr(err)
ea.Defer = func() {
@ -54,7 +64,7 @@ var BackupCmd = &cobra.Command{
for i := range waitForStore {
zf, err := zw.Create(waitForStore[i][0])
ea.ExitOnErr(err)
ea.ExitOnErr(storeKvs(zf, waitForStore[i][1]))
storeKvs(zf, waitForStore[i][1])
}
ea.ExitOnErr(zw.Close())
@ -62,7 +72,8 @@ var BackupCmd = &cobra.Command{
}
func init() {
BackupCmd.Flags().StringVarP(&backupOutput, "output-dir", "o", "", "the directory for store backup file")
BackupCmd.Flags().StringVarP(&backupDir, "dir", "d", "", "the directory to store backup file")
BackupCmd.Flags().StringVarP(&backupFile, "file", "f", "", "the backup file name")
}
type ExitAction struct {
@ -75,7 +86,8 @@ func NewExitAction() *ExitAction {
func (ea *ExitAction) ExitOnErr(err error) {
if err != nil {
ea.Exit(err.Error())
_, f, l, _ := runtime.Caller(1)
ea.Exit("%s line %d: %s", f, l, err.Error())
}
}
@ -89,8 +101,7 @@ func (ea *ExitAction) Exit(format string, v ...interface{}) {
}
var (
byteLD = []byte{'['}
byteRD = []byte{']'}
sizeBuf = make([]byte, 2+4) // key length(uint16) + value length(uint32)
)
func storeKvs(w io.Writer, keyPrefix string) error {
@ -99,19 +110,29 @@ func storeKvs(w io.Writer, keyPrefix string) error {
return fmt.Errorf("failed to fetch %s from etcd: %s", keyPrefix, err)
}
w.Write(byteLD)
var prefixLen = len(keyPrefix)
for i := range gresp.Kvs {
if len(gresp.Kvs)-1 != i {
_, err = w.Write(append(gresp.Kvs[i].Value, ','))
} else {
_, err = w.Write(gresp.Kvs[i].Value)
key := gresp.Kvs[i].Key[prefixLen:]
binary.LittleEndian.PutUint16(sizeBuf[:2], uint16(len(key)))
binary.LittleEndian.PutUint32(sizeBuf[2:], uint32(len(gresp.Kvs[i].Value)))
// length of key
if _, err = w.Write(sizeBuf[:2]); err != nil {
return err
}
if _, err = w.Write(key); err != nil {
return err
}
if err != nil {
// lenght of value
if _, err = w.Write(sizeBuf[2:]); err != nil {
return err
}
if _, err = w.Write(gresp.Kvs[i].Value); err != nil {
return err
}
}
w.Write(byteRD)
return nil
}

142
bin/csctl/cmd/restore.go Normal file
View File

@ -0,0 +1,142 @@
package cmd
import (
"archive/zip"
"encoding/binary"
"fmt"
"io"
"strings"
"sync"
"github.com/spf13/cobra"
"github.com/shunfei/cronsun"
"github.com/shunfei/cronsun/conf"
)
var restoreFile string
func init() {
RestoreCmd.Flags().StringVarP(&restoreFile, "file", "f", "", "the backup zip file")
}
var RestoreCmd = &cobra.Command{
Use: "restore",
Short: "restore job & group data",
Run: func(cmd *cobra.Command, args []string) {
var err error
var ea = NewExitAction()
restoreFile = strings.TrimSpace(restoreFile)
if len(restoreFile) == 0 {
ea.Exit("backup file is required")
}
r, err := zip.OpenReader(restoreFile)
ea.ExitOnErr(err)
ea.Defer = func() {
r.Close()
}
restoreChan, wg := startRestoreProcess()
for _, f := range r.File {
var keyPrefix string
switch f.Name {
case "job":
keyPrefix = conf.Config.Cmd
case "node_group":
keyPrefix = conf.Config.Group
}
rc, err := f.Open()
ea.ExitOnErr(err)
ea.ExitOnErr(restoreKvs(rc, keyPrefix, restoreChan, wg))
rc.Close()
}
wg.Wait()
close(restoreChan)
},
}
type kv struct {
k, v string
}
var (
keyLenBuf = make([]byte, 2)
valLenBuf = make([]byte, 4)
keyBuf = make([]byte, 256)
valBuf = make([]byte, 1024)
)
func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.WaitGroup) error {
for {
// read length of key
n, err := r.Read(keyLenBuf)
if err == io.EOF && n != 0 {
return fmt.Errorf("Unexcepted data, the file may borken")
} else if err == io.EOF && n == 0 {
break
} else if err != nil {
return err
}
keylen := binary.LittleEndian.Uint16(keyLenBuf)
// read key
if n, err = r.Read(keyBuf[:keylen]); err != nil {
return err
}
key := keyBuf[:keylen]
// read length of value
if n, err = r.Read(valLenBuf); err != nil {
return err
}
vallen := binary.LittleEndian.Uint32(valLenBuf)
// read value
if len(valBuf) < int(vallen) {
valBuf = make([]byte, vallen*2)
}
if n, err = r.Read(valBuf[:vallen]); err != nil && err != io.EOF {
return err
}
value := valBuf[:vallen]
wg.Add(1)
storeChan <- &kv{
k: keyPrefix + string(key),
v: string(value),
}
}
return nil
}
func startRestoreProcess() (chan *kv, *sync.WaitGroup) {
c := make(chan *kv, 0)
wg := &sync.WaitGroup{}
const maxResries = 3
go func() {
for _kv := range c {
for retries := 1; retries <= maxResries; retries++ {
_, err := cronsun.DefalutClient.Put(_kv.k, _kv.v)
if err != nil {
if retries == maxResries {
fmt.Println("[Error] restore err:", err)
fmt.Println("\tKey: ", string(_kv.k))
fmt.Println("\tValue: ", string(_kv.v))
}
continue
}
}
wg.Done()
}
}()
return c, wg
}