Source code for stetl.etl

# Main ETL program.
# Author: Just van den Broecke
import os
import re
import sys
from configparser import ConfigParser, ExtendedInterpolation
from io import StringIO
from . import version
from .util import Util
from .chain import Chain

log = Util.get_log('ETL')

[docs]class ETL: """The main class: builds ETL Chains with connected Components from a config and let them run. Usually this class is called via :mod:`main` but it may be called directly for direct integration. """ CONFIG_DIR = None def __init__(self, options_dict, args_dict=None): """ :param options_dict: dictionary with options, now only config_file files with path to config file :param args_dict: optional dictionary with arguments to be substituted for symbolic values in config :return: Assume path to config .ini file is in options dict """ # args_dict is optional and is used to do string substitutions in options_dict.config file"INIT - Stetl version is %s" % str(version.__version__)) self.options_dict = options_dict config_file = self.options_dict.get('config_file') if config_file is None or not os.path.isfile(config_file): log.error('No config file found at: %s' % config_file) sys.exit(1) ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))"Config/working dir = %s" % ETL.CONFIG_DIR) self.configdict = ConfigParser(interpolation=ExtendedInterpolation()) sys.path.append(ETL.CONFIG_DIR) config_str = '' try: # Get config file as string"Reading config_file = %s" % config_file) f = open(config_file, 'r') config_str = f.close() except Exception as e: log.error("Cannot read config file: err=%s" % str(e)) raise e args_names = list() try: # Optional: expand symbolic arguments from args_dict and or OS Env # ignore errors here as { .. } may appear at random. # Parse unique list of argument names from config file string. # args_names = list(set(re.findall(r'{[A-Z|a-z]\w+}', config_str))) args_names = [name.split('{')[1].split('}')[0] for name in args_names] # Optional: expand from equivalent env vars args_dict = self.env_expand_args_dict(args_dict, args_names) # In general all arg names should be present in args dict for args_name in args_names: if args_name not in args_dict: log.warn("Arg not found in args nor environment: name=%s" % args_name) # raise Exception("name=%s" % args_name) except Exception as e: log.warn("Expanding config arguments (non fatal yet): %s" % str(e)) try: if args_dict:"Substituting %d args in config file from args_dict: %s" % (len(args_names), str(args_names))) # Do replacements see # and render substituted config string config_str = config_str.format(**args_dict)"Substituting args OK") except Exception as e: log.error("Error substituting config arguments: err=%s" % str(e)) raise e try: # Put Config string into buffer (readfp() needs a readline() method) config_buf = StringIO(config_str) # Parse config from file buffer self.configdict.read_file(config_buf, config_file) except Exception as e: log.error("Error populating config dict from config string: err=%s" % str(e)) raise e
[docs] def env_expand_args_dict(self, args_dict, args_names): """ Expand values in dict with equivalent values from the OS Env. NB vars in OS Env should be prefixed with `STETL_` or `stetl_` as to get overrides by accident. :return: expanded args_dict or None """ env_dict = os.environ for name in env_dict: args_key = '_'.join(name.split('_')[1:]) if name.lower().startswith('stetl_') and args_key in args_names: # Get real key, e.g. "STETL_HOST" becomes "HOST" # "stetl_host" becomes "host". args_value = env_dict[name] if not args_dict: args_dict = dict() # Set: optionally override any existing value args_dict[args_key] = args_value"Set/override from env var: %s" % name) return args_dict
def run(self): # The main ETL processing"START") t1 = Util.start_timer("total ETL") # Get the ETL Chain pipeline config strings # Default is to use the section [etl], but may be overidden on cmd line config_section = self.options_dict.get('config_section') if config_section is None: config_section = 'etl' chains_str = self.configdict.get(config_section, 'chains') if not chains_str: raise ValueError('ETL chain entry not defined in section [etl]') # Multiple Chains may be specified in the config chains_str_arr = chains_str.split(',') for chain_str in chains_str_arr: # Build single Chain of components and let it run chain = Chain(chain_str, self.configdict) chain.assemble() # Run the ETL for this Chain Util.end_timer(t1, "total ETL")"ALL DONE")