# -*- coding: utf-8 -*-
# python 3 compatibility
from __future__ import absolute_import, division, print_function, unicode_literals
# import yaml
from .chains import *
from .frkl import Frkl
from .processors import *
try:
set
except NameError:
# noinspection PyDeprecation,PyCompatibility
from sets import Set as set
__metaclass__ = type
log = logging.getLogger("frkl")
[docs]def load_collector(name, **init_params):
"""Loading a collector extension.
Args:
name (str): the registered name of the collector
init_params (dict): the parameters to initialize the extension object
Returns:
FrklCallback: the extension collector
"""
log2 = logging.getLogger("stevedore")
out_hdlr = logging.StreamHandler(sys.stdout)
out_hdlr.setFormatter(logging.Formatter("PLUGIN ERROR -> %(message)s"))
out_hdlr.setLevel(logging.DEBUG)
log2.addHandler(out_hdlr)
log2.setLevel(logging.INFO)
log.debug("Loading extension...")
mgr = stevedore.driver.DriverManager(
namespace="frkl.collector",
name=name,
invoke_on_load=True,
invoke_kwds=init_params,
)
log.debug(
"Registered plugins: {}".format(", ".join(ext.name for ext in mgr.extensions))
)
return mgr
# ----------------------------------------------------------------
# callbacks / collectors
[docs]@six.add_metaclass(abc.ABCMeta)
class FrklCallback(object):
"""A class to slurp up configurations from the last element of a processor chain.
Since those processers might return Generators, it's handy to deal with the results of a config processing
manually, callbacks seemed like a good way to do it.
"""
[docs] def init(init_file, configs):
"""Creates a collector object with associated Frkl and processor chain.
The init file needs to be yaml, with the 'collector' key being a registered collector plugin,
and the 'processor_chain' key being a list of registered ConfigProcessor objects.
Args:
init_file: the path to the init file
configs: the configuration item(s) for this processor
Returns:
FrklCallback: the collector item
"""
yaml = YAML(typ="safe")
with open(init_file) as f:
init_config = yaml.load(f)
if isinstance(init_config, (list, tuple)):
processor_chain = init_config
collector_name = "default"
elif not isinstance(init_config, dict):
raise Exception(
"init configuration needs to be either list of processor configs, or dict with 'processor_chain' and optionally 'collector' keys"
)
else:
if "processor_chain" not in init_config.keys():
raise Exception(
"No processor chain specified in '{}'".format(init_file)
)
processor_chain = init_config["processor_chain"]
if not processor_chain:
raise Exception("Processor chain in '{}' empty".format(init_file))
if "collector" not in init_config.keys():
collector_name = "default"
else:
collector_name = init_config["collector"]
if isinstance(collector_name, string_types):
collector_init = {}
elif not isinstance(collector_name, dict) or len(collector_name) != 1:
raise Exception(
"'collector' value needs to be either a string or a dict with length 1"
)
else:
temp = collector_name
collector_name = list(temp.keys())[0]
collector_init = temp[collector_name]
if collector_name == "default":
collector_name = "merge"
collector = load_collector(collector_name, **collector_init).driver
bootstrap = Frkl(processor_chain, COLLECTOR_INIT_BOOTSTRAP_PROCESSOR_CHAIN)
config_frkl = bootstrap.process(FrklFactoryCallback())
config_frkl.set_configs(configs)
temp = config_frkl.process(collector)
return collector
init = staticmethod(init)
def __init__(self, **init_params):
self.init_params = init_params
[docs] @abc.abstractmethod
def callback(self, item):
"""Adds a new item to the callback class.
Args:
item (object): the newly processed config
"""
pass
[docs] @abc.abstractmethod
def result(self):
"""Returns a meaningful representation of all added configs so far.
Ideally this is a string representation of the (current) state of the callback, since it might
be used by 3rd party tools for debugging purposes, or as a method to show state in the absence
of knowledge of the type of FrklCallback that is used.
Returns:
object: the current state of the callback
"""
pass
[docs] def started(self):
"""Optional method that can be overwritten if the callback needs to know when the processing has started."""
pass
[docs] def finished(self):
"""Optional method that can be overwritten if the callback needs to know when the processing has finished."""
pass
[docs]class MergeDictResultCallback(FrklCallback):
"""Simple callback, merges all configs to *one* internal dict."""
def __init__(self, **init_params):
super(MergeDictResultCallback, self).__init__(**init_params)
self.result_dict = {}
self.append_keys = init_params.get("append_keys", [])
if not isinstance(self.append_keys, (list, tuple)):
self.append_keys = [self.append_keys]
self.append_keys_map = {}
[docs] def get_dict_detail(self, target_dict, detail_path):
temp = target_dict
for level in detail_path.split("/"):
temp = temp.get(level, {})
return temp
[docs] def set_dict_detail(self, target_dict, detail_path, new_value):
temp = target_dict
tokens = detail_path.split("/")
for level in tokens[0:-1]:
if level not in temp.keys():
temp[level] = {}
temp = temp[level]
temp[tokens[-1]] = new_value
[docs] def callback(self, process_result):
for ak in self.append_keys:
v = self.get_dict_detail(process_result, ak)
if v:
if not isinstance(v, (list, tuple)):
v = [v]
self.append_keys_map.setdefault(ak, []).extend(v)
dict_merge(self.result_dict, process_result, copy_dct=False)
[docs] def result(self):
for k, v in self.append_keys_map.items():
self.set_dict_detail(self.result_dict, k, v)
return self.result_dict
[docs]class SetResultCallback(FrklCallback):
"""Simple callback to create a set out of a list."""
def __init__(self, **init_params):
super(SetResultCallback, self).__init__(**init_params)
self.result_set = set()
self.return_list = init_params.get("return_list", False)
[docs] def callback(self, process_result):
self.result_set.add(process_result)
[docs] def result(self):
if self.return_list:
return list(self.result_set)
else:
return self.result_set
[docs]class MergeResultCallback(FrklCallback):
"""Simple callback, just appends all configs to an internal list."""
def __init__(self, **init_params):
super(MergeResultCallback, self).__init__(**init_params)
self.result_list = []
[docs] def callback(self, process_result):
self.result_list.append(process_result)
[docs] def result(self):
return self.result_list
[docs]class ExtendResultCallback(FrklCallback):
"""Simple callback, extends an internal list with the processing results.
"""
def __init__(self, **init_params):
super(ExtendResultCallback, self).__init__(**init_params)
self.result_list = []
[docs] def callback(self, process_result):
self.result_list.extend(process_result)
[docs] def result(self):
return self.result_list
[docs]class FrklFactoryCallback(FrklCallback):
"""Helper callback method, creates a new Frkl object by processing a list of processor init dicts.
"""
def __init__(self, **init_params):
super(FrklFactoryCallback, self).__init__(**init_params)
self.processors = []
self.bootstrap_chain = []
[docs] def callback(self, item):
self.processors.append(item)
ext_name = item.get("processor", {}).get("type", None)
if not ext_name:
raise FrklConfigException(
"Can't parse processor name using config: {}".format(item)
)
ext_init_params = item.get("init", {})
log.debug(
"Loading extension '{}' using init parameters: '{}".format(
ext_name, ext_init_params
)
)
ext = load_extension(ext_name, **ext_init_params)
self.bootstrap_chain.append(ext.driver)
[docs] def result(self):
return Frkl([], self.bootstrap_chain)