Diff
checker
文本
文本
圖像
文檔
Excel
文件夾
Legal
Enterprise
桌面版
定價
登入
下載 Diffchecker 桌面版
比較文本
尋找兩個文字檔案之間的差異
工具
歷史
即時編輯器
摺疊未變更行
關閉換行
檢視
拆分
統一
比對精度
智能
單詞
字符
語法突出顯示
選擇語法
忽略
文字轉換
前往第一個差異
編輯輸入
Diffchecker Desktop
執行Diffchecker最安全的方式。取得Diffchecker桌面應用程式:您的差異永遠不會離開您的電腦!
取得桌面版
Untitled diff
建立於
8 年前
差異永不過期
清除
匯出
分享
解釋
74 刪除
行
總計
刪除
字符
總計
刪除
要繼續使用此功能,請升級到
Diff
checker
Pro
查看價格
273 行
全部複製
117 新增
行
總計
新增
字符
總計
新增
要繼續使用此功能,請升級到
Diff
checker
Pro
查看價格
283 行
全部複製
"""
"""
Developed by niphlod@gmail.com
Developed by niphlod@gmail.com
Released under web2py license because includes gluon/cache.py source code
Released under web2py license because includes gluon/cache.py source code
"""
"""
複製
已複製
複製
已複製
import redis
from redis.exceptions import ConnectionError
from gluon import current
from gluon.cache import CacheAbstract
try:
try:
複製
已複製
複製
已複製
import cPickle as pickle
import cPickle as pickle
except:
except:
複製
已複製
複製
已複製
import pickle
import pickle
import time
import time
import re
import re
import logging
import logging
複製
已複製
複製
已複製
import
thread
from
thread
ing import Lock
import random
import random
複製
已複製
複製
已複製
from gluon import current
from gluon.cache import CacheAbstract
from gluon.contrib.redis_utils import acquire_lock, release_lock
from gluon.contrib.redis_utils import register_release_lock, RConnectionError
logger = logging.getLogger("web2py.cache.redis")
logger = logging.getLogger("web2py.cache.redis")
複製
已複製
複製
已複製
locker =
thread.allocate_l
ock()
locker =
L
ock()
複製
已複製
複製
已複製
def RedisCache(
*args, **vars
):
def RedisCache(
redis_conn=None, debug=False, with_lock=False, fail_gracefully=False, db=None
):
"""
"""
複製
已複製
複製
已複製
Usage example: put in models
Usage example: put in models
::
複製
已複製
複製
已複製
from gluon.contrib.redis_cache import
Redis
Cache
First of all install
Redis
cache.
redis
= RedisCache('localhost:6379',db=None, debug=True, with_lock=True, password=None)
Ubuntu :
sudo apt-get install
redis
-server
sudo pip install redis
複製
已複製
複製
已複製
:param db: redis db to use (0..16)
Then
:param
debug: if True adds to stats() the total_hits and misses
:param
with_lock: sets the default locking mode for creating new keys.
from gluon.contrib.redis_utils import RConn
rconn = RConn()
from gluon.contrib.redis_cache import RedisCache
cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True)
Args:
redis_conn: a redis-like connection object
debug: if True adds to stats() the total_hits and misses
with_lock: sets the default locking mode for creating new keys.
By default is False (usualy when you choose Redis you do it
By default is False (usualy when you choose Redis you do it
for performances reason)
for performances reason)
When True, only one thread/process can set a value concurrently
When True, only one thread/process can set a value concurrently
複製
已複製
複製
已複製
fail_gracefully: if redis is unavailable, returns the value computing it
instead of raising an exception
複製
已複製
複製
已複製
When you use cache.redis directly you can use
It can be used pretty much the same as cache.ram()
value
= cache.redis('
my
key
', lambda
:
time
.time(), with_lock=True)
When you use cache.redis directly you can use
:
redis_key_and_var_name
= cache.redis('
redis_
key
_and_var_name
', lambda
or function,
time
_expire=time
.time(), with_lock=True)
to enforce locking. The with_lock parameter overrides the one set in the
to enforce locking. The with_lock parameter overrides the one set in the
cache.redis instance creation
cache.redis instance creation
cache.redis.stats()
cache.redis.stats()
returns a dictionary with statistics of Redis server
returns a dictionary with statistics of Redis server
with one additional key ('w2p_keys') showing all keys currently set
with one additional key ('w2p_keys') showing all keys currently set
from web2py with their TTL
from web2py with their TTL
A little wording on how keys are stored (and why the cache_it() function
A little wording on how keys are stored (and why the cache_it() function
and the clear() one look a little bit convoluted): there are a lot of
and the clear() one look a little bit convoluted): there are a lot of
libraries that just store values and then use the KEYS command to delete it.
libraries that just store values and then use the KEYS command to delete it.
Until recent releases of this module, that technique was used here too.
Until recent releases of this module, that technique was used here too.
In the need of deleting specific keys in a database with zillions keys in it
In the need of deleting specific keys in a database with zillions keys in it
(other web2py apps, other applications in the need of a Redis stack) the
(other web2py apps, other applications in the need of a Redis stack) the
KEYS command is slow (it needs to scan every key in the database).
KEYS command is slow (it needs to scan every key in the database).
So, we use Redis 'sets' to store keys in "buckets"...
So, we use Redis 'sets' to store keys in "buckets"...
- every key created gets "indexed" in a bucket
- every key created gets "indexed" in a bucket
- all buckets are indexed in a fixed key that never expires
- all buckets are indexed in a fixed key that never expires
- all keys generated within the same minute go in the same bucket
- all keys generated within the same minute go in the same bucket
- every bucket is then set to expire when every key within it is expired
- every bucket is then set to expire when every key within it is expired
When we need to clear() cached keys:
When we need to clear() cached keys:
- we tell Redis to SUNION all buckets
- we tell Redis to SUNION all buckets
- gives us just the keys that are not expired yet
- gives us just the keys that are not expired yet
- buckets that are expired are removed from the fixed set
- buckets that are expired are removed from the fixed set
- we scan the keys and then delete them
- we scan the keys and then delete them
"""
"""
locker.acquire()
locker.acquire()
try:
try:
instance_name = 'redis_instance_' + current.request.application
instance_name = 'redis_instance_' + current.request.application
if not hasattr(RedisCache, instance_name):
if not hasattr(RedisCache, instance_name):
複製
已複製
複製
已複製
setattr(RedisCache, instance_name,
RedisClient(
*args, **vars))
setattr(RedisCache, instance_name,
RedisClient(
redis_conn=redis_conn, debug=debug,
with_lock=with_lock, fail_gracefully=fail_gracefully))
return getattr(RedisCache, instance_name)
return getattr(RedisCache, instance_name)
finally:
finally:
locker.release()
locker.release()
class RedisClient(object):
class RedisClient(object):
meta_storage = {}
meta_storage = {}
MAX_RETRIES = 5
MAX_RETRIES = 5
RETRIES = 0
RETRIES = 0
複製
已複製
複製
已複製
def __init__(self,
server='localhost:6379', db
=None, debug=False,
with_lock=False,
password=None):
def __init__(self,
redis_conn
=None, debug=False,
self.server = server
with_lock=False,
fail_gracefully=False):
self.password = password
self.db = db or 0
host, port = (self.server.split(':') + ['6379'])[:2]
port = int(port)
self.request = current.request
self.request = current.request
self.debug = debug
self.debug = debug
self.with_lock = with_lock
self.with_lock = with_lock
複製
已複製
複製
已複製
self.prefix = "w2p:
%s:" %
(
self.request.application
)
self.fail_gracefully = fail_gracefully
self.prefix = "w2p:
cache:
%s:" %
self.request.application
if self.request:
if self.request:
app = self.request.application
app = self.request.application
else:
else:
app = ''
app = ''
複製
已複製
複製
已複製
if
not
app
in self.meta_storage:
if
app
not
in self.meta_storage:
self.storage = self.meta_storage[app] = {
self.storage = self.meta_storage[app] = {
CacheAbstract.cache_stats_name: {
CacheAbstract.cache_stats_name: {
'hit_total': 0,
'hit_total': 0,
'misses': 0,
'misses': 0,
}}
}}
else:
else:
self.storage = self.meta_storage[app]
self.storage = self.meta_storage[app]
複製
已複製
複製
已複製
self.cache_set_key = 'w2p:%s:___cache_set' %
(
self.request.application
)
self.cache_set_key = 'w2p:%s:___cache_set' %
self.request.application
複製
已複製
複製
已複製
self.r_server = redis
.Redis(host=host, port=port, db=
self.
db, password=
self.
password
)
self.r_server = redis
_conn
self.
_release_script = register_release_lock(
self.
r_server
)
def initialize(self):
def initialize(self):
pass
pass
def __call__(self, key, f, time_expire=300, with_lock=None):
def __call__(self, key, f, time_expire=300, with_lock=None):
if with_lock is None:
if with_lock is None:
with_lock = self.with_lock
with_lock = self.with_lock
if time_expire is None:
if time_expire is None:
time_expire = 24 * 60 * 60
time_expire = 24 * 60 * 60
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
value = None
value = None
ttl = 0
ttl = 0
try:
try:
複製
已複製
複製
已複製
#
is there a value
if f is None:
# delete and never look back
self.r_server.delete(newKey)
return None
#
is there a value
obj = self.r_server.get(newKey)
obj = self.r_server.get(newKey)
複製
已複製
複製
已複製
#
what's its ttl
#
what's its ttl
if obj:
if obj:
ttl = self.r_server.ttl(newKey)
ttl = self.r_server.ttl(newKey)
if ttl > time_expire:
if ttl > time_expire:
obj = None
obj = None
if obj:
if obj:
複製
已複製
複製
已複製
#
was cached
#
was cached
if self.debug:
if self.debug:
self.r_server.incr('web2py_cache_statistics:hit_total')
self.r_server.incr('web2py_cache_statistics:hit_total')
value = pickle.loads(obj)
value = pickle.loads(obj)
複製
已複製
複製
已複製
elif f is None:
#delete and never look back
self.r_server.delete(newKey)
else:
else:
複製
已複製
複製
已複製
#
naive distributed locking
#
naive distributed locking
if with_lock:
if with_lock:
lock_key = '%s:__lock' % newKey
lock_key = '%s:__lock' % newKey
複製
已複製
複製
已複製
try:
randomvalue = time.time()
while True:
al = acquire_lock(self.r_server, lock_key, randomvalue)
lock
= self.r_server.
setnx(lock_key, 1)
# someone may have computed it
if
lock:
obj
= self.r_server.
get(newKey)
value = self.cache_it(newKey, f, time_expire)
if
obj is None:
break
value = self.cache_it(newKey, f, time_expire)
else:
else:
time.sleep(0.2)
value = pickle.loads(obj)
#did someone else create it in the meanwhile ?
release_lock(
self
,
lock_key
, al
)
obj = self.r_server.get(newKey)
if obj:
value = pickle.loads(obj)
break
finally:
self
.r_server.delete(
lock_key
)
else:
else:
複製
已複製
複製
已複製
#
without distributed locking
#
without distributed locking
value = self.cache_it(newKey, f, time_expire)
value = self.cache_it(newKey, f, time_expire)
return value
return value
複製
已複製
複製
已複製
except
ConnectionError:
except
R
ConnectionError:
return self.retry_call(key, f, time_expire, with_lock)
return self.retry_call(key, f, time_expire, with_lock)
def cache_it(self, key, f, time_expire):
def cache_it(self, key, f, time_expire):
if self.debug:
if self.debug:
self.r_server.incr('web2py_cache_statistics:misses')
self.r_server.incr('web2py_cache_statistics:misses')
cache_set_key = self.cache_set_key
cache_set_key = self.cache_set_key
複製
已複製
複製
已複製
expire
at = int(time.time() + time_expire) + 120
expire
_
at = int(time.time() + time_expire) + 120
bucket_key = "%s:%s" % (cache_set_key, expire
at / 60)
bucket_key = "%s:%s" % (cache_set_key, expire
_
at / 60)
value = f()
value = f()
value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
if time_expire == 0:
if time_expire == 0:
time_expire = 1
time_expire = 1
複製
已複製
複製
已複製
self.r_server.setex(key,
value_,
time_expire
)
self.r_server.setex(key,
time_expire
, value_
)
#
print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire
at))
#
print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire
_
at))
#
print 'that will expire on %s' % (bucket_key, time.ctime(((expire
at/
60) + 1)
*
60))
#
print 'that will expire on %s' % (bucket_key, time.ctime(((expire
_at /
60) + 1)
*
60))
p = self.r_server.pipeline()
p = self.r_server.pipeline()
複製
已複製
複製
已複製
#
add bucket to the fixed set
#
add bucket to the fixed set
p.sadd(cache_set_key, bucket_key)
p.sadd(cache_set_key, bucket_key)
複製
已複製
複製
已複製
#
sets the key
#
sets the key
p.setex(key,
value_,
time_expire
)
p.setex(key,
time_expire
, value_
)
#
add the key to the bucket
#
add the key to the bucket
p.sadd(bucket_key, key)
p.sadd(bucket_key, key)
複製
已複製
複製
已複製
#
expire the bucket properly
#
expire the bucket properly
p.expireat(bucket_key, ((expire
at/
60) + 1)
*
60)
p.expireat(bucket_key, ((expire
_at /
60) + 1)
*
60)
p.execute()
p.execute()
return value
return value
複製
已複製
複製
已複製
def retry_call(self, key, f, time_expire, with_lock
ing
):
def retry_call(self, key, f, time_expire, with_lock
):
self.RETRIES += 1
self.RETRIES += 1
if self.RETRIES <= self.MAX_RETRIES:
if self.RETRIES <= self.MAX_RETRIES:
複製
已複製
複製
已複製
logger.error("sleeping %s seconds before reconnecting" %
logger.error("sleeping %s seconds before reconnecting" %
(2 * self.RETRIES))
(2 * self.RETRIES))
time.sleep(2 * self.RETRIES)
time.sleep(2 * self.RETRIES)
複製
已複製
複製
已複製
self.
__init__(self.server, self.db, self.debug, self.with_lock
)
if self.fail_gracefully:
return self.__call__(key, f, time_expire, with_lock
ing
)
self.
RETRIES = 0
return f(
)
return self.__call__(key, f, time_expire, with_lock
)
else:
else:
self.RETRIES = 0
self.RETRIES = 0
複製
已複製
複製
已複製
raise
ConnectionError('Redis instance is unavailable
at %s' % (
if self.fail_gracefully:
self.server)
)
return f
raise
R
ConnectionError('Redis instance is unavailable
'
)
def increment(self, key, value=1):
def increment(self, key, value=1):
try:
try:
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
return self.r_server.incr(newKey, value)
return self.r_server.incr(newKey, value)
複製
已複製
複製
已複製
except
ConnectionError:
except
R
ConnectionError:
return self.retry_increment(key, value)
return self.retry_increment(key, value)
def retry_increment(self, key, value):
def retry_increment(self, key, value):
self.RETRIES += 1
self.RETRIES += 1
if self.RETRIES <= self.MAX_RETRIES:
if self.RETRIES <= self.MAX_RETRIES:
logger.error("sleeping some seconds before reconnecting")
logger.error("sleeping some seconds before reconnecting")
time.sleep(2 * self.RETRIES)
time.sleep(2 * self.RETRIES)
複製
已複製
複製
已複製
self.__init__(self.server, self.db, self.debug, self.with_lock)
return self.increment(key, value)
return self.increment(key, value)
else:
else:
self.RETRIES = 0
self.RETRIES = 0
複製
已複製
複製
已複製
raise
ConnectionError('Redis instance is unavailable
at %s' % (
raise
R
ConnectionError('Redis instance is unavailable
'
)
self.server)
)
def clear(self, regex):
def clear(self, regex):
"""
"""
Auxiliary function called by `clear` to search and
Auxiliary function called by `clear` to search and
clear cache entries
clear cache entries
"""
"""
r = re.compile(regex)
r = re.compile(regex)
複製
已複製
複製
已複製
#
get all buckets
#
get all buckets
buckets = self.r_server.smembers(self.cache_set_key)
buckets = self.r_server.smembers(self.cache_set_key)
複製
已複製
複製
已複製
#
get all keys in buckets
#
get all keys in buckets
if buckets:
if buckets:
keys = self.r_server.sunion(buckets)
keys = self.r_server.sunion(buckets)
else:
else:
return
return
prefix = self.prefix
prefix = self.prefix
pipe = self.r_server.pipeline()
pipe = self.r_server.pipeline()
for a in keys:
for a in keys:
if r.match(str(a).replace(prefix, '', 1)):
if r.match(str(a).replace(prefix, '', 1)):
pipe.delete(a)
pipe.delete(a)
複製
已複製
複製
已複製
if random.randrange(0,
100) < 10:
if random.randrange(0,
100) < 10:
#
do this just once in a while (10% chance)
#
do this just once in a while (10% chance)
self.clear_buckets(buckets)
self.clear_buckets(buckets)
pipe.execute()
pipe.execute()
def clear_buckets(self, buckets):
def clear_buckets(self, buckets):
p = self.r_server.pipeline()
p = self.r_server.pipeline()
for b in buckets:
for b in buckets:
if not self.r_server.exists(b):
if not self.r_server.exists(b):
p.srem(self.cache_set_key, b)
p.srem(self.cache_set_key, b)
p.execute()
p.execute()
def delete(self, key):
def delete(self, key):
newKey = self.__keyFormat__(key)
newKey = self.__keyFormat__(key)
return self.r_server.delete(newKey)
return self.r_server.delete(newKey)
def stats(self):
def stats(self):
複製
已複製
複製
已複製
stats
collector = self.r_server.info()
stats
_
collector = self.r_server.info()
if self.debug:
if self.debug:
複製
已複製
複製
已複製
stats
collector['w2p_stats'] = dict(
stats
_
collector['w2p_stats'] = dict(
hit_total=self.r_server.get(
hit_total=self.r_server.get(
'web2py_cache_statistics:hit_total'),
'web2py_cache_statistics:hit_total'),
misses=self.r_server.get('web2py_cache_statistics:misses')
misses=self.r_server.get('web2py_cache_statistics:misses')
)
)
複製
已複製
複製
已複製
stats
collector['w2p_keys'] = dict()
stats
_
collector['w2p_keys'] = dict()
for a in self.r_server.keys("w2p:%s:*" % (
for a in self.r_server.keys("w2p:%s:*" % (
self.request.application)):
self.request.application)):
複製
已複製
複製
已複製
stats
collector['w2p_keys']["%s_expire_in_sec" %
(a)
] = self.r_server.ttl(a)
stats
_
collector['w2p_keys']["%s_expire_in_sec" %
a
] = self.r_server.ttl(a)
return stats
collector
return stats
_
collector
def __keyFormat__(self, key):
def __keyFormat__(self, key):
return '%s%s' % (self.prefix, key.replace(' ', '_'))
return '%s%s' % (self.prefix, key.replace(' ', '_'))
複製
已複製
複製
已複製
已保存差異
原始文本
開啟檔案
""" Developed by niphlod@gmail.com Released under web2py license because includes gluon/cache.py source code """ import redis from redis.exceptions import ConnectionError from gluon import current from gluon.cache import CacheAbstract try: import cPickle as pickle except: import pickle import time import re import logging import thread import random logger = logging.getLogger("web2py.cache.redis") locker = thread.allocate_lock() def RedisCache(*args, **vars): """ Usage example: put in models from gluon.contrib.redis_cache import RedisCache cache.redis = RedisCache('localhost:6379',db=None, debug=True, with_lock=True, password=None) :param db: redis db to use (0..16) :param debug: if True adds to stats() the total_hits and misses :param with_lock: sets the default locking mode for creating new keys. By default is False (usualy when you choose Redis you do it for performances reason) When True, only one thread/process can set a value concurrently When you use cache.redis directly you can use value = cache.redis('mykey', lambda: time.time(), with_lock=True) to enforce locking. The with_lock parameter overrides the one set in the cache.redis instance creation cache.redis.stats() returns a dictionary with statistics of Redis server with one additional key ('w2p_keys') showing all keys currently set from web2py with their TTL A little wording on how keys are stored (and why the cache_it() function and the clear() one look a little bit convoluted): there are a lot of libraries that just store values and then use the KEYS command to delete it. Until recent releases of this module, that technique was used here too. In the need of deleting specific keys in a database with zillions keys in it (other web2py apps, other applications in the need of a Redis stack) the KEYS command is slow (it needs to scan every key in the database). So, we use Redis 'sets' to store keys in "buckets"... - every key created gets "indexed" in a bucket - all buckets are indexed in a fixed key that never expires - all keys generated within the same minute go in the same bucket - every bucket is then set to expire when every key within it is expired When we need to clear() cached keys: - we tell Redis to SUNION all buckets - gives us just the keys that are not expired yet - buckets that are expired are removed from the fixed set - we scan the keys and then delete them """ locker.acquire() try: instance_name = 'redis_instance_' + current.request.application if not hasattr(RedisCache, instance_name): setattr(RedisCache, instance_name, RedisClient(*args, **vars)) return getattr(RedisCache, instance_name) finally: locker.release() class RedisClient(object): meta_storage = {} MAX_RETRIES = 5 RETRIES = 0 def __init__(self, server='localhost:6379', db=None, debug=False, with_lock=False, password=None): self.server = server self.password = password self.db = db or 0 host, port = (self.server.split(':') + ['6379'])[:2] port = int(port) self.request = current.request self.debug = debug self.with_lock = with_lock self.prefix = "w2p:%s:" % (self.request.application) if self.request: app = self.request.application else: app = '' if not app in self.meta_storage: self.storage = self.meta_storage[app] = { CacheAbstract.cache_stats_name: { 'hit_total': 0, 'misses': 0, }} else: self.storage = self.meta_storage[app] self.cache_set_key = 'w2p:%s:___cache_set' % (self.request.application) self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password) def initialize(self): pass def __call__(self, key, f, time_expire=300, with_lock=None): if with_lock is None: with_lock = self.with_lock if time_expire is None: time_expire = 24 * 60 * 60 newKey = self.__keyFormat__(key) value = None ttl = 0 try: #is there a value obj = self.r_server.get(newKey) #what's its ttl if obj: ttl = self.r_server.ttl(newKey) if ttl > time_expire: obj = None if obj: #was cached if self.debug: self.r_server.incr('web2py_cache_statistics:hit_total') value = pickle.loads(obj) elif f is None: #delete and never look back self.r_server.delete(newKey) else: #naive distributed locking if with_lock: lock_key = '%s:__lock' % newKey try: while True: lock = self.r_server.setnx(lock_key, 1) if lock: value = self.cache_it(newKey, f, time_expire) break else: time.sleep(0.2) #did someone else create it in the meanwhile ? obj = self.r_server.get(newKey) if obj: value = pickle.loads(obj) break finally: self.r_server.delete(lock_key) else: #without distributed locking value = self.cache_it(newKey, f, time_expire) return value except ConnectionError: return self.retry_call(key, f, time_expire, with_lock) def cache_it(self, key, f, time_expire): if self.debug: self.r_server.incr('web2py_cache_statistics:misses') cache_set_key = self.cache_set_key expireat = int(time.time() + time_expire) + 120 bucket_key = "%s:%s" % (cache_set_key, expireat / 60) value = f() value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) if time_expire == 0: time_expire = 1 self.r_server.setex(key, value_, time_expire) #print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expireat)) #print 'that will expire on %s' % (bucket_key, time.ctime(((expireat/60) + 1)*60)) p = self.r_server.pipeline() #add bucket to the fixed set p.sadd(cache_set_key, bucket_key) #sets the key p.setex(key, value_, time_expire) #add the key to the bucket p.sadd(bucket_key, key) #expire the bucket properly p.expireat(bucket_key, ((expireat/60) + 1)*60) p.execute() return value def retry_call(self, key, f, time_expire, with_locking): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES)) time.sleep(2 * self.RETRIES) self.__init__(self.server, self.db, self.debug, self.with_lock) return self.__call__(key, f, time_expire, with_locking) else: self.RETRIES = 0 raise ConnectionError('Redis instance is unavailable at %s' % ( self.server)) def increment(self, key, value=1): try: newKey = self.__keyFormat__(key) return self.r_server.incr(newKey, value) except ConnectionError: return self.retry_increment(key, value) def retry_increment(self, key, value): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping some seconds before reconnecting") time.sleep(2 * self.RETRIES) self.__init__(self.server, self.db, self.debug, self.with_lock) return self.increment(key, value) else: self.RETRIES = 0 raise ConnectionError('Redis instance is unavailable at %s' % ( self.server)) def clear(self, regex): """ Auxiliary function called by `clear` to search and clear cache entries """ r = re.compile(regex) #get all buckets buckets = self.r_server.smembers(self.cache_set_key) #get all keys in buckets if buckets: keys = self.r_server.sunion(buckets) else: return prefix = self.prefix pipe = self.r_server.pipeline() for a in keys: if r.match(str(a).replace(prefix, '', 1)): pipe.delete(a) if random.randrange(0,100) < 10: #do this just once in a while (10% chance) self.clear_buckets(buckets) pipe.execute() def clear_buckets(self, buckets): p = self.r_server.pipeline() for b in buckets: if not self.r_server.exists(b): p.srem(self.cache_set_key, b) p.execute() def delete(self, key): newKey = self.__keyFormat__(key) return self.r_server.delete(newKey) def stats(self): statscollector = self.r_server.info() if self.debug: statscollector['w2p_stats'] = dict( hit_total=self.r_server.get( 'web2py_cache_statistics:hit_total'), misses=self.r_server.get('web2py_cache_statistics:misses') ) statscollector['w2p_keys'] = dict() for a in self.r_server.keys("w2p:%s:*" % ( self.request.application)): statscollector['w2p_keys']["%s_expire_in_sec" % (a)] = self.r_server.ttl(a) return statscollector def __keyFormat__(self, key): return '%s%s' % (self.prefix, key.replace(' ', '_'))
更改後文本
開啟檔案
""" Developed by niphlod@gmail.com Released under web2py license because includes gluon/cache.py source code """ try: import cPickle as pickle except: import pickle import time import re import logging from threading import Lock import random from gluon import current from gluon.cache import CacheAbstract from gluon.contrib.redis_utils import acquire_lock, release_lock from gluon.contrib.redis_utils import register_release_lock, RConnectionError logger = logging.getLogger("web2py.cache.redis") locker = Lock() def RedisCache(redis_conn=None, debug=False, with_lock=False, fail_gracefully=False, db=None): """ Usage example: put in models:: First of all install Redis Ubuntu : sudo apt-get install redis-server sudo pip install redis Then from gluon.contrib.redis_utils import RConn rconn = RConn() from gluon.contrib.redis_cache import RedisCache cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True) Args: redis_conn: a redis-like connection object debug: if True adds to stats() the total_hits and misses with_lock: sets the default locking mode for creating new keys. By default is False (usualy when you choose Redis you do it for performances reason) When True, only one thread/process can set a value concurrently fail_gracefully: if redis is unavailable, returns the value computing it instead of raising an exception It can be used pretty much the same as cache.ram() When you use cache.redis directly you can use : redis_key_and_var_name = cache.redis('redis_key_and_var_name', lambda or function, time_expire=time.time(), with_lock=True) to enforce locking. The with_lock parameter overrides the one set in the cache.redis instance creation cache.redis.stats() returns a dictionary with statistics of Redis server with one additional key ('w2p_keys') showing all keys currently set from web2py with their TTL A little wording on how keys are stored (and why the cache_it() function and the clear() one look a little bit convoluted): there are a lot of libraries that just store values and then use the KEYS command to delete it. Until recent releases of this module, that technique was used here too. In the need of deleting specific keys in a database with zillions keys in it (other web2py apps, other applications in the need of a Redis stack) the KEYS command is slow (it needs to scan every key in the database). So, we use Redis 'sets' to store keys in "buckets"... - every key created gets "indexed" in a bucket - all buckets are indexed in a fixed key that never expires - all keys generated within the same minute go in the same bucket - every bucket is then set to expire when every key within it is expired When we need to clear() cached keys: - we tell Redis to SUNION all buckets - gives us just the keys that are not expired yet - buckets that are expired are removed from the fixed set - we scan the keys and then delete them """ locker.acquire() try: instance_name = 'redis_instance_' + current.request.application if not hasattr(RedisCache, instance_name): setattr(RedisCache, instance_name, RedisClient(redis_conn=redis_conn, debug=debug, with_lock=with_lock, fail_gracefully=fail_gracefully)) return getattr(RedisCache, instance_name) finally: locker.release() class RedisClient(object): meta_storage = {} MAX_RETRIES = 5 RETRIES = 0 def __init__(self, redis_conn=None, debug=False, with_lock=False, fail_gracefully=False): self.request = current.request self.debug = debug self.with_lock = with_lock self.fail_gracefully = fail_gracefully self.prefix = "w2p:cache:%s:" % self.request.application if self.request: app = self.request.application else: app = '' if app not in self.meta_storage: self.storage = self.meta_storage[app] = { CacheAbstract.cache_stats_name: { 'hit_total': 0, 'misses': 0, }} else: self.storage = self.meta_storage[app] self.cache_set_key = 'w2p:%s:___cache_set' % self.request.application self.r_server = redis_conn self._release_script = register_release_lock(self.r_server) def initialize(self): pass def __call__(self, key, f, time_expire=300, with_lock=None): if with_lock is None: with_lock = self.with_lock if time_expire is None: time_expire = 24 * 60 * 60 newKey = self.__keyFormat__(key) value = None ttl = 0 try: if f is None: # delete and never look back self.r_server.delete(newKey) return None # is there a value obj = self.r_server.get(newKey) # what's its ttl if obj: ttl = self.r_server.ttl(newKey) if ttl > time_expire: obj = None if obj: # was cached if self.debug: self.r_server.incr('web2py_cache_statistics:hit_total') value = pickle.loads(obj) else: # naive distributed locking if with_lock: lock_key = '%s:__lock' % newKey randomvalue = time.time() al = acquire_lock(self.r_server, lock_key, randomvalue) # someone may have computed it obj = self.r_server.get(newKey) if obj is None: value = self.cache_it(newKey, f, time_expire) else: value = pickle.loads(obj) release_lock(self, lock_key, al) else: # without distributed locking value = self.cache_it(newKey, f, time_expire) return value except RConnectionError: return self.retry_call(key, f, time_expire, with_lock) def cache_it(self, key, f, time_expire): if self.debug: self.r_server.incr('web2py_cache_statistics:misses') cache_set_key = self.cache_set_key expire_at = int(time.time() + time_expire) + 120 bucket_key = "%s:%s" % (cache_set_key, expire_at / 60) value = f() value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) if time_expire == 0: time_expire = 1 self.r_server.setex(key, time_expire, value_) # print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire_at)) # print 'that will expire on %s' % (bucket_key, time.ctime(((expire_at / 60) + 1) * 60)) p = self.r_server.pipeline() # add bucket to the fixed set p.sadd(cache_set_key, bucket_key) # sets the key p.setex(key, time_expire, value_) # add the key to the bucket p.sadd(bucket_key, key) # expire the bucket properly p.expireat(bucket_key, ((expire_at / 60) + 1) * 60) p.execute() return value def retry_call(self, key, f, time_expire, with_lock): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES)) time.sleep(2 * self.RETRIES) if self.fail_gracefully: self.RETRIES = 0 return f() return self.__call__(key, f, time_expire, with_lock) else: self.RETRIES = 0 if self.fail_gracefully: return f raise RConnectionError('Redis instance is unavailable') def increment(self, key, value=1): try: newKey = self.__keyFormat__(key) return self.r_server.incr(newKey, value) except RConnectionError: return self.retry_increment(key, value) def retry_increment(self, key, value): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping some seconds before reconnecting") time.sleep(2 * self.RETRIES) return self.increment(key, value) else: self.RETRIES = 0 raise RConnectionError('Redis instance is unavailable') def clear(self, regex): """ Auxiliary function called by `clear` to search and clear cache entries """ r = re.compile(regex) # get all buckets buckets = self.r_server.smembers(self.cache_set_key) # get all keys in buckets if buckets: keys = self.r_server.sunion(buckets) else: return prefix = self.prefix pipe = self.r_server.pipeline() for a in keys: if r.match(str(a).replace(prefix, '', 1)): pipe.delete(a) if random.randrange(0, 100) < 10: # do this just once in a while (10% chance) self.clear_buckets(buckets) pipe.execute() def clear_buckets(self, buckets): p = self.r_server.pipeline() for b in buckets: if not self.r_server.exists(b): p.srem(self.cache_set_key, b) p.execute() def delete(self, key): newKey = self.__keyFormat__(key) return self.r_server.delete(newKey) def stats(self): stats_collector = self.r_server.info() if self.debug: stats_collector['w2p_stats'] = dict( hit_total=self.r_server.get( 'web2py_cache_statistics:hit_total'), misses=self.r_server.get('web2py_cache_statistics:misses') ) stats_collector['w2p_keys'] = dict() for a in self.r_server.keys("w2p:%s:*" % ( self.request.application)): stats_collector['w2p_keys']["%s_expire_in_sec" % a] = self.r_server.ttl(a) return stats_collector def __keyFormat__(self, key): return '%s%s' % (self.prefix, key.replace(' ', '_'))
尋找差異