cron: 支持增加/修改 job

pull/1/head
miraclesu 2017-01-18 16:27:35 +08:00
parent a7b59cac6e
commit 04b17ec3db
2 changed files with 21 additions and 3 deletions

View File

@ -16,6 +16,7 @@ import (
// be inspected while running. // be inspected while running.
type Cron struct { type Cron struct {
entries []*Entry entries []*Entry
indexes map[string]int
stop chan struct{} stop chan struct{}
add chan *Entry add chan *Entry
snapshot chan []*Entry snapshot chan []*Entry
@ -85,6 +86,7 @@ func New() *Cron {
func NewWithLocation(location *time.Location) *Cron { func NewWithLocation(location *time.Location) *Cron {
return &Cron{ return &Cron{
entries: nil, entries: nil,
indexes: make(map[string]int),
add: make(chan *Entry), add: make(chan *Entry),
stop: make(chan struct{}), stop: make(chan struct{}),
snapshot: make(chan []*Entry), snapshot: make(chan []*Entry),
@ -125,7 +127,11 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) {
Job: cmd, Job: cmd,
} }
if !c.running { if !c.running {
c.entries = append(c.entries, entry) if index, ok := c.indexes[entry.ID]; ok {
c.entries[index] = entry
return
}
c.entries, c.indexes[entry.ID] = append(c.entries, entry), len(c.entries)
return return
} }
@ -168,6 +174,13 @@ func (c *Cron) runWithRecovery(j Job) {
j.Run() j.Run()
} }
// Rebuild the indexes
func (c *Cron) reIndex() {
for i, count := 0, len(c.entries); i < count; i++ {
c.indexes[c.entries[i].ID] = i
}
}
// Run the scheduler.. this is private just due to the need to synchronize // Run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable. // access to the 'running' state variable.
func (c *Cron) run() { func (c *Cron) run() {
@ -181,6 +194,7 @@ func (c *Cron) run() {
for { for {
// Determine the next entry to run. // Determine the next entry to run.
sort.Sort(byTime(c.entries)) sort.Sort(byTime(c.entries))
c.reIndex()
var effective time.Time var effective time.Time
if len(c.entries) == 0 || c.entries[0].Next.IsZero() { if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
@ -207,7 +221,11 @@ func (c *Cron) run() {
continue continue
case newEntry := <-c.add: case newEntry := <-c.add:
c.entries = append(c.entries, newEntry) if index, ok := c.indexes[newEntry.ID]; ok {
c.entries[index] = newEntry
} else {
c.entries, c.indexes[newEntry.ID] = append(c.entries, newEntry), len(c.entries)
}
newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location)) newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location))
case <-c.snapshot: case <-c.snapshot:

View File

@ -140,7 +140,7 @@ func (n *Node) keepAlive() {
} }
} }
// 启动服务 // 停止服务
func (n *Node) Stop(i interface{}) { func (n *Node) Stop(i interface{}) {
close(n.done) close(n.done)
n.Node.Del() n.Node.Del()