Source code for dynaconf.loaders.redis_loader

# coding: utf-8
from dynaconf.utils.parse_conf import (
    parse_conf_data, unparse_conf_data
)
try:
    from redis import StrictRedis
except ImportError as e:
    raise ImportError(
        "redis package is not installed in your environment. "
        "`pip install dynaconf[redis]` or disable the redis loader with "
        "export REDIS_ENABLED_FOR_DYNACONF=false"
    )

IDENTIFIER = 'redis'


[docs]def load(obj, env=None, silent=True, key=None): """Reads and loads in to "settings" a single key or all keys from redis :param obj: the settings instance :param env: settings env default='DYNACONF' :param silent: if errors should raise :param key: if defined load a single key, else load all in env :return: None """ redis = StrictRedis(**obj.get('REDIS_FOR_DYNACONF')) holder = obj.get('GLOBAL_ENV_FOR_DYNACONF') try: if key: value = redis.hget(holder.upper(), key) if value: obj.logger.debug( "redis_loader: loading by key: %s:%s (%s:%s)", key, value, IDENTIFIER, holder ) if value: parsed_value = parse_conf_data(value, tomlfy=True) if parsed_value: obj.set(key, parsed_value) else: data = { key: parse_conf_data(value, tomlfy=True) for key, value in redis.hgetall(holder.upper()).items() } if data: obj.logger.debug( "redis_loader: loading: %s (%s:%s)", data, IDENTIFIER, holder ) obj.update(data, loader_identifier=IDENTIFIER) except Exception as e: if silent: if hasattr(obj, 'logger'): obj.logger.error(str(e)) return False raise
[docs]def write(obj, data=None, **kwargs): """Write a value in to loader source :param obj: settings object :param data: vars to be stored :param kwargs: vars to be stored :return: """ if obj.REDIS_ENABLED_FOR_DYNACONF is False: raise RuntimeError( 'Redis is not configured \n' 'export REDIS_ENABLED_FOR_DYNACONF=true\n' 'and configure the REDIS_FOR_DYNACONF_* variables' ) client = StrictRedis(**obj.REDIS_FOR_DYNACONF) holder = obj.get('GLOBAL_ENV_FOR_DYNACONF') data = data or {} data.update(kwargs) if not data: raise AttributeError('Data must be provided') redis_data = { key.upper(): unparse_conf_data(value) for key, value in data.items() } client.hmset(holder.upper(), redis_data) load(obj)
[docs]def delete(obj, key=None): """ Delete a single key if specified, or all env if key is none :param obj: settings object :param key: key to delete from store location :return: None """ client = StrictRedis(**obj.REDIS_FOR_DYNACONF) holder = obj.get('GLOBAL_ENV_FOR_DYNACONF') if key: client.hdel(holder.upper(), key.upper()) obj.unset(key) else: keys = client.hkeys(holder.upper()) client.delete(holder.upper()) obj.unset_all(keys)