Untitled diff

Created Diff never expires
103 removals
272 lines
117 additions
282 lines
"""
"""
Developed by niphlod@gmail.com
Developed by niphlod@gmail.com
Released under web2py license because includes gluon/cache.py source code
Released under web2py license because includes gluon/cache.py source code
"""
"""
import redis
from redis.exceptions import ConnectionError
from gluon import current
from gluon.cache import CacheAbstract
try:
try:
import cPickle as pickle
import cPickle as pickle
except:
except:
import pickle
import pickle
import time
import time
import re
import re
import logging
import logging
import thread
from threading import Lock
import random
import random
from gluon import current
from gluon.cache import CacheAbstract
from gluon.contrib.redis_utils import acquire_lock, release_lock
from gluon.contrib.redis_utils import register_release_lock, RConnectionError


logger = logging.getLogger("web2py.cache.redis")
logger = logging.getLogger("web2py.cache.redis")


locker = thread.allocate_lock()
locker = Lock()




def RedisCache(*args, **vars):
def RedisCache(redis_conn=None, debug=False, with_lock=False, fail_gracefully=False, db=None):
"""
"""
Usage example: put in models
Usage example: put in models::

First of all install Redis
Ubuntu :
sudo apt-get install redis-server
sudo pip install redis


from gluon.contrib.redis_cache import RedisCache
Then
cache.redis = RedisCache('localhost:6379',db=None, debug=True, with_lock=True, password=None)


:param db: redis db to use (0..16)
from gluon.contrib.redis_utils import RConn
:param debug: if True adds to stats() the total_hits and misses
rconn = RConn()
:param with_lock: sets the default locking mode for creating new keys.
from gluon.contrib.redis_cache import RedisCache
cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True)

Args:
redis_conn: a redis-like connection object
debug: if True adds to stats() the total_hits and misses
with_lock: sets the default locking mode for creating new keys.
By default is False (usualy when you choose Redis you do it
By default is False (usualy when you choose Redis you do it
for performances reason)
for performances reason)
When True, only one thread/process can set a value concurrently
When True, only one thread/process can set a value concurrently
fail_gracefully: if redis is unavailable, returns the value computing it
instead of raising an exception


When you use cache.redis directly you can use
It can be used pretty much the same as cache.ram()
value = cache.redis('mykey', lambda: time.time(), with_lock=True)
When you use cache.redis directly you can use :

redis_key_and_var_name = cache.redis('redis_key_and_var_name', lambda or function,
time_expire=time.time(), with_lock=True)

to enforce locking. The with_lock parameter overrides the one set in the
to enforce locking. The with_lock parameter overrides the one set in the
cache.redis instance creation
cache.redis instance creation


cache.redis.stats()
cache.redis.stats()
returns a dictionary with statistics of Redis server
returns a dictionary with statistics of Redis server
with one additional key ('w2p_keys') showing all keys currently set
with one additional key ('w2p_keys') showing all keys currently set
from web2py with their TTL
from web2py with their TTL


A little wording on how keys are stored (and why the cache_it() function
A little wording on how keys are stored (and why the cache_it() function
and the clear() one look a little bit convoluted): there are a lot of
and the clear() one look a little bit convoluted): there are a lot of
libraries that just store values and then use the KEYS command to delete it.
libraries that just store values and then use the KEYS command to delete it.
Until recent releases of this module, that technique was used here too.
Until recent releases of this module, that technique was used here too.
In the need of deleting specific keys in a database with zillions keys in it
In the need of deleting specific keys in a database with zillions keys in it
(other web2py apps, other applications in the need of a Redis stack) the
(other web2py apps, other applications in the need of a Redis stack) the
KEYS command is slow (it needs to scan every key in the database).
KEYS command is slow (it needs to scan every key in the database).
So, we use Redis 'sets' to store keys in "buckets"...
So, we use Redis 'sets' to store keys in "buckets"...
- every key created gets "indexed" in a bucket
- every key created gets "indexed" in a bucket
- all buckets are indexed in a fixed key that never expires
- all buckets are indexed in a fixed key that never expires
- all keys generated within the same minute go in the same bucket
- all keys generated within the same minute go in the same bucket
- every bucket is then set to expire when every key within it is expired
- every bucket is then set to expire when every key within it is expired
When we need to clear() cached keys:
When we need to clear() cached keys:
- we tell Redis to SUNION all buckets
- we tell Redis to SUNION all buckets
- gives us just the keys that are not expired yet
- gives us just the keys that are not expired yet
- buckets that are expired are removed from the fixed set
- buckets that are expired are removed from the fixed set
- we scan the keys and then delete them
- we scan the keys and then delete them
"""
"""


locker.acquire()
locker.acquire()
try:
try:
instance_name = 'redis_instance_' + current.request.application
instance_name = 'redis_instance_' + current.request.application
if not hasattr(RedisCache, instance_name):
if not hasattr(RedisCache, instance_name):
setattr(RedisCache, instance_name, RedisClient(*args, **vars))
setattr(RedisCache, instance_name,
RedisClient(redis_conn=redis_conn, debug=debug,
with_lock=with_lock, fail_gracefully=fail_gracefully))
return getattr(RedisCache, instance_name)
return getattr(RedisCache, instance_name)
finally:
finally:
locker.release()
locker.release()




class RedisClient(object):
class RedisClient(object):


meta_storage = {}
meta_storage = {}
MAX_RETRIES = 5
MAX_RETRIES = 5
RETRIES = 0
RETRIES = 0


def __init__(self, server='localhost:6379', db=None, debug=False, with_lock=False, password=None):
def __init__(self, redis_conn=None, debug=False,
self.server = server
with_lock=False, fail_gracefully=False):
self.password = password
self.db = db or 0
host, port = (self.server.split(':') + ['6379'])[:2]
port = int(port)
self.request = current.request
self.request = current.request
self.debug = debug
self.debug = debug
self.with_lock = with_lock
self.with_lock = with_lock
self.prefix = "w2p:%s:" % (self.request.application)
self.fail_gracefully = fail_gracefully
self.prefix = "w2p:cache:%s:" % self.request.application
if self.request:
if self.request:
app = self.request.application
app = self.request.application
else:
else:
app = ''
app = ''


if not app in self.meta_storage:
if app not in self.meta_storage:
self.storage = self.meta_storage[app] = {
self.storage = self.meta_storage[app] = {
CacheAbstract.cache_stats_name: {
CacheAbstract.cache_stats_name: {
'hit_total': 0,
'hit_total': 0,
'misses': 0,
'misses': 0,
}}
}}
else:
else:
self.storage = self.meta_storage[app]
self.storage = self.meta_storage[app]


self.cache_set_key = 'w2p:%s:___cache_set' % (self.request.application)
self.cache_set_key = 'w2p:%s:___cache_set' % self.request.application


self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password)
self.r_server = redis_conn
self._release_script = register_release_lock(self.r_server)


def initialize(self):
def initialize(self):
pass
pass


def __call__(self, key, f, time_expire=300, with_lock=None):
def __call__(self, key, f, time_expire=300, with_lock=None):
if with_lock is None:
if with_lock is None:
with_lock = self.with_lock
with_lock = self.with_lock
if time_expire is None:
if time_expire is None:
time_expire = 24 * 60 * 60
time_expire = 24 * 60 * 60
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
value = None
value = None
ttl = 0
ttl = 0
try:
try:
#is there a value
if f is None:
# delete and never look back
self.r_server.delete(newKey)
return None
# is there a value
obj = self.r_server.get(newKey)
obj = self.r_server.get(newKey)
#what's its ttl
# what's its ttl
if obj:
if obj:
ttl = self.r_server.ttl(newKey)
ttl = self.r_server.ttl(newKey)
if ttl > time_expire:
if ttl > time_expire:
obj = None
obj = None
if obj:
if obj:
#was cached
# was cached
if self.debug:
if self.debug:
self.r_server.incr('web2py_cache_statistics:hit_total')
self.r_server.incr('web2py_cache_statistics:hit_total')
value = pickle.loads(obj)
value = pickle.loads(obj)
elif f is None:
#delete and never look back
self.r_server.delete(newKey)
else:
else:
#naive distributed locking
# naive distributed locking
if with_lock:
if with_lock:
lock_key = '%s:__lock' % newKey
lock_key = '%s:__lock' % newKey
try:
randomvalue = time.time()
while True:
al = acquire_lock(self.r_server, lock_key, randomvalue)
lock = self.r_server.setnx(lock_key, 1)
# someone may have computed it
if lock:
obj = self.r_server.get(newKey)
value = self.cache_it(newKey, f, time_expire)
if obj is None:
break
value = self.cache_it(newKey, f, time_expire)
else:
else:
time.sleep(0.2)
value = pickle.loads(obj)
#did someone else create it in the meanwhile ?
release_lock(self, lock_key, al)
obj = self.r_server.get(newKey)
if obj:
value = pickle.loads(obj)
break
finally:
self.r_server.delete(lock_key)
else:
else:
#without distributed locking
# without distributed locking
value = self.cache_it(newKey, f, time_expire)
value = self.cache_it(newKey, f, time_expire)
return value
return value
except ConnectionError:
except RConnectionError:
return self.retry_call(key, f, time_expire, with_lock)
return self.retry_call(key, f, time_expire, with_lock)


def cache_it(self, key, f, time_expire):
def cache_it(self, key, f, time_expire):
if self.debug:
if self.debug:
self.r_server.incr('web2py_cache_statistics:misses')
self.r_server.incr('web2py_cache_statistics:misses')
cache_set_key = self.cache_set_key
cache_set_key = self.cache_set_key
expireat = int(time.time() + time_expire) + 120
expire_at = int(time.time() + time_expire) + 120
bucket_key = "%s:%s" % (cache_set_key, expireat / 60)
bucket_key = "%s:%s" % (cache_set_key, expire_at / 60)
value = f()
value = f()
value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
if time_expire == 0:
if time_expire == 0:
time_expire = 1
time_expire = 1
self.r_server.setex(key, value_, time_expire)
self.r_server.setex(key, time_expire, value_)
#print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expireat))
# print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire_at))
#print 'that will expire on %s' % (bucket_key, time.ctime(((expireat/60) + 1)*60))
# print 'that will expire on %s' % (bucket_key, time.ctime(((expire_at / 60) + 1) * 60))
p = self.r_server.pipeline()
p = self.r_server.pipeline()
#add bucket to the fixed set
# add bucket to the fixed set
p.sadd(cache_set_key, bucket_key)
p.sadd(cache_set_key, bucket_key)
#sets the key
# sets the key
p.setex(key, value_, time_expire)
p.setex(key, time_expire, value_)
#add the key to the bucket
# add the key to the bucket
p.sadd(bucket_key, key)
p.sadd(bucket_key, key)
#expire the bucket properly
# expire the bucket properly
p.expireat(bucket_key, ((expireat/60) + 1)*60)
p.expireat(bucket_key, ((expire_at / 60) + 1) * 60)
p.execute()
p.execute()
return value
return value


def retry_call(self, key, f, time_expire, with_locking):
def retry_call(self, key, f, time_expire, with_lock):
self.RETRIES += 1
self.RETRIES += 1
if self.RETRIES <= self.MAX_RETRIES:
if self.RETRIES <= self.MAX_RETRIES:
logger.error("sleeping %s seconds before reconnecting" %
logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES))
(2 * self.RETRIES))
time.sleep(2 * self.RETRIES)
time.sleep(2 * self.RETRIES)
self.__init__(self.server, self.db, self.debug, self.with_lock)
if self.fail_gracefully:
return self.__call__(key, f, time_expire, with_locking)
self.RETRIES = 0
return f()
return self.__call__(key, f, time_expire, with_lock)
else:
else:
self.RETRIES = 0
self.RETRIES = 0
raise ConnectionError('Redis instance is unavailable at %s' % (
if self.fail_gracefully:
self.server))
return f
raise RConnectionError('Redis instance is unavailable')


def increment(self, key, value=1):
def increment(self, key, value=1):
try:
try:
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
return self.r_server.incr(newKey, value)
return self.r_server.incr(newKey, value)
except ConnectionError:
except RConnectionError:
return self.retry_increment(key, value)
return self.retry_increment(key, value)


def retry_increment(self, key, value):
def retry_increment(self, key, value):
self.RETRIES += 1
self.RETRIES += 1
if self.RETRIES <= self.MAX_RETRIES:
if self.RETRIES <= self.MAX_RETRIES:
logger.error("sleeping some seconds before reconnecting")
logger.error("sleeping some seconds before reconnecting")
time.sleep(2 * self.RETRIES)
time.sleep(2 * self.RETRIES)
self.__init__(self.server, self.db, self.debug, self.with_lock)
return self.increment(key, value)
return self.increment(key, value)
else:
else:
self.RETRIES = 0
self.RETRIES = 0
raise ConnectionError('Redis instance is unavailable at %s' % (
raise RConnectionError('Redis instance is unavailable')
self.server))


def clear(self, regex):
def clear(self, regex):
"""
"""
Auxiliary function called by `clear` to search and
Auxiliary function called by `clear` to search and
clear cache entries
clear cache entries
"""
"""
r = re.compile(regex)
r = re.compile(regex)
#get all buckets
# get all buckets
buckets = self.r_server.smembers(self.cache_set_key)
buckets = self.r_server.smembers(self.cache_set_key)
#get all keys in buckets
# get all keys in buckets
if buckets:
if buckets:
keys = self.r_server.sunion(buckets)
keys = self.r_server.sunion(buckets)
else:
else:
return
return
prefix = self.prefix
prefix = self.prefix
pipe = self.r_server.pipeline()
pipe = self.r_server.pipeline()
for a in keys:
for a in keys:
if r.match(str(a).replace(prefix, '', 1)):
if r.match(str(a).replace(prefix, '', 1)):
pipe.delete(a)
pipe.delete(a)
if random.randrange(0,100) < 10:
if random.randrange(0, 100) < 10:
#do this just once in a while (10% chance)
# do this just once in a while (10% chance)
self.clear_buckets(buckets)
self.clear_buckets(buckets)
pipe.execute()
pipe.execute()


def clear_buckets(self, buckets):
def clear_buckets(self, buckets):
p = self.r_server.pipeline()
p = self.r_server.pipeline()
for b in buckets:
for b in buckets:
if not self.r_server.exists(b):
if not self.r_server.exists(b):
p.srem(self.cache_set_key, b)
p.srem(self.cache_set_key, b)
p.execute()
p.execute()


def delete(self, key):
def delete(self, key):
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
return self.r_server.delete(newKey)
return self.r_server.delete(newKey)


def stats(self):
def stats(self):
statscollector = self.r_server.info()
stats_collector = self.r_server.info()
if self.debug:
if self.debug:
statscollector['w2p_stats'] = dict(
stats_collector['w2p_stats'] = dict(
hit_total=self.r_server.get(
hit_total=self.r_server.get(
'web2py_cache_statistics:hit_total'),
'web2py_cache_statistics:hit_total'),
misses=self.r_server.get('web2py_cache_statistics:misses')
misses=self.r_server.get('web2py_cache_statistics:misses')
)
)
statscollector['w2p_keys'] = dict()
stats_collector['w2p_keys'] = dict()


for a in self.r_server.keys("w2p:%s:*" % (
for a in self.r_server.keys("w2p:%s:*" % (
self.request.application)):
self.request.application)):
statscollector['w2p_keys']["%s_expire_in_sec" % (a)] = self.r_server.ttl(a)
stats_collector['w2p_keys']["%s_expire_in_sec" % a] = self.r_server.ttl(a)
return statscollector
return stats_collector


def __keyFormat__(self, key):
def __keyFormat__(self, key):
return '%s%s' % (self.prefix, key.replace(' ', '_'))
return '%s%s' % (self.prefix, key.replace(' ', '_'))