mirror of https://github.com/k3s-io/k3s
145 lines
6.0 KiB
Python
145 lines
6.0 KiB
Python
![]() |
#!/usr/bin/env python
|
||
|
|
||
|
# Copyright 2015 The Kubernetes Authors All rights reserved.
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
|
||
|
# and the suggestion in the redis documentation for RPOPLPUSH, at
|
||
|
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
|
||
|
|
||
|
|
||
|
import redis
|
||
|
import uuid
|
||
|
import hashlib
|
||
|
|
||
|
class RedisWQ(object):
|
||
|
"""Simple Finite Work Queue with Redis Backend
|
||
|
|
||
|
This work queue is finite: as long as no more work is added
|
||
|
after workers start, the workers can detect when the queue
|
||
|
is completely empty.
|
||
|
|
||
|
The items in the work queue are assumed to have unique values.
|
||
|
|
||
|
This object is not intended to be used by multiple threads
|
||
|
concurrently.
|
||
|
"""
|
||
|
def __init__(self, name, **redis_kwargs):
|
||
|
"""The default connection parameters are: host='localhost', port=6379, db=0
|
||
|
|
||
|
The work queue is identified by "name". The library may create other
|
||
|
keys with "name" as a prefix.
|
||
|
"""
|
||
|
self._db = redis.StrictRedis(**redis_kwargs)
|
||
|
# The session ID will uniquely identify this "worker".
|
||
|
self._session = str(uuid.uuid4())
|
||
|
# Work queue is implemented as two queues: main, and processing.
|
||
|
# Work is initially in main, and moved to processing when a client picks it up.
|
||
|
self._main_q_key = name
|
||
|
self._processing_q_key = name + ":processing"
|
||
|
self._lease_key_prefix = name + ":leased_by_session:"
|
||
|
|
||
|
def sessionID(self):
|
||
|
"""Return the ID for this session."""
|
||
|
return self._session
|
||
|
|
||
|
def _main_qsize(self):
|
||
|
"""Return the size of the main queue."""
|
||
|
return self._db.llen(self._main_q_key)
|
||
|
|
||
|
def _processing_qsize(self):
|
||
|
"""Return the size of the main queue."""
|
||
|
return self._db.llen(self._processing_q_key)
|
||
|
|
||
|
def empty(self):
|
||
|
"""Return True if the queue is empty, including work being done, False otherwise.
|
||
|
|
||
|
False does not necessarily mean that there is work available to work on right now,
|
||
|
"""
|
||
|
return self._main_qsize() == 0 and self._processing_qsize() == 0
|
||
|
|
||
|
# TODO: implement this
|
||
|
# def check_expired_leases(self):
|
||
|
# """Return to the work queueReturn True if the queue is empty, False otherwise."""
|
||
|
# # Processing list should not be _too_ long since it is approximately as long
|
||
|
# # as the number of active and recently active workers.
|
||
|
# processing = self._db.lrange(self._processing_q_key, 0, -1)
|
||
|
# for item in processing:
|
||
|
# # If the lease key is not present for an item (it expired or was
|
||
|
# # never created because the client crashed before creating it)
|
||
|
# # then move the item back to the main queue so others can work on it.
|
||
|
# if not self._lease_exists(item):
|
||
|
# TODO: transactionally move the key from processing queue to
|
||
|
# to main queue, while detecting if a new lease is created
|
||
|
# or if either queue is modified.
|
||
|
|
||
|
def _itemkey(self, item):
|
||
|
"""Returns a string that uniquely identifies an item (bytes)."""
|
||
|
return hashlib.sha224(item).hexdigest()
|
||
|
|
||
|
def _lease_exists(self, item):
|
||
|
"""True if a lease on 'item' exists."""
|
||
|
return self._db.exists(self._lease_key_prefix + self._itemkey(item))
|
||
|
|
||
|
def lease(self, lease_secs=60, block=True, timeout=None):
|
||
|
"""Begin working on an item the work queue.
|
||
|
|
||
|
Lease the item for lease_secs. After that time, other
|
||
|
workers may consider this client to have crashed or stalled
|
||
|
and pick up the item instead.
|
||
|
|
||
|
If optional args block is true and timeout is None (the default), block
|
||
|
if necessary until an item is available."""
|
||
|
if block:
|
||
|
item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
|
||
|
else:
|
||
|
item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
|
||
|
if item:
|
||
|
# Record that we (this session id) are working on a key. Expire that
|
||
|
# note after the lease timeout.
|
||
|
# Note: if we crash at this line of the program, then GC will see no lease
|
||
|
# for this item an later return it to the main queue.
|
||
|
itemkey = self._itemkey(item)
|
||
|
self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
|
||
|
return item
|
||
|
|
||
|
def complete(self, value):
|
||
|
"""Complete working on the item with 'value'.
|
||
|
|
||
|
If the lease expired, the item may not have completed, and some
|
||
|
other worker may have picked it up. There is no indication
|
||
|
of what happened.
|
||
|
"""
|
||
|
self._db.lrem(self._processing_q_key, 0, value)
|
||
|
# If we crash here, then the GC code will try to move the value, but it will
|
||
|
# not be here, which is fine. So this does not need to be a transaction.
|
||
|
itemkey = self._itemkey(value)
|
||
|
self._db.delete(self._lease_key_prefix + itemkey, self._session)
|
||
|
|
||
|
# TODO: add functions to clean up all keys associated with "name" when
|
||
|
# processing is complete.
|
||
|
|
||
|
# TODO: add a function to add an item to the queue. Atomically
|
||
|
# check if the queue is empty and if so fail to add the item
|
||
|
# since other workers might think work is done and be in the process
|
||
|
# of exiting.
|
||
|
|
||
|
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
|
||
|
# make it so it can be pip installed by anyone (see
|
||
|
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
|
||
|
|
||
|
# TODO(etune): finish code to GC expired leases, and call periodically
|
||
|
# e.g. each time lease times out.
|
||
|
|