diff --git a/node/cron/cron.go b/node/cron/cron.go index 184ceeb..2174bb0 100644 --- a/node/cron/cron.go +++ b/node/cron/cron.go @@ -16,6 +16,7 @@ import ( // be inspected while running. type Cron struct { entries []*Entry + indexes map[string]int stop chan struct{} add chan *Entry snapshot chan []*Entry @@ -85,6 +86,7 @@ func New() *Cron { func NewWithLocation(location *time.Location) *Cron { return &Cron{ entries: nil, + indexes: make(map[string]int), add: make(chan *Entry), stop: make(chan struct{}), snapshot: make(chan []*Entry), @@ -125,7 +127,11 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) { Job: cmd, } if !c.running { - c.entries = append(c.entries, entry) + if index, ok := c.indexes[entry.ID]; ok { + c.entries[index] = entry + return + } + c.entries, c.indexes[entry.ID] = append(c.entries, entry), len(c.entries) return } @@ -168,6 +174,13 @@ func (c *Cron) runWithRecovery(j Job) { j.Run() } +// Rebuild the indexes +func (c *Cron) reIndex() { + for i, count := 0, len(c.entries); i < count; i++ { + c.indexes[c.entries[i].ID] = i + } +} + // Run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { @@ -181,6 +194,7 @@ func (c *Cron) run() { for { // Determine the next entry to run. sort.Sort(byTime(c.entries)) + c.reIndex() var effective time.Time if len(c.entries) == 0 || c.entries[0].Next.IsZero() { @@ -207,7 +221,11 @@ func (c *Cron) run() { continue case newEntry := <-c.add: - c.entries = append(c.entries, newEntry) + if index, ok := c.indexes[newEntry.ID]; ok { + c.entries[index] = newEntry + } else { + c.entries, c.indexes[newEntry.ID] = append(c.entries, newEntry), len(c.entries) + } newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location)) case <-c.snapshot: diff --git a/node/node.go b/node/node.go index 9e45f25..a0af0e0 100644 --- a/node/node.go +++ b/node/node.go @@ -140,7 +140,7 @@ func (n *Node) keepAlive() { } } -// 启动服务 +// 停止服务 func (n *Node) Stop(i interface{}) { close(n.done) n.Node.Del()