Source code for stetl.chain

# Chain class: holds pipeline of components.
#
# Author: Just van den Broecke
#

from .factory import factory
from .packet import Packet
from .util import Util
from .splitter import Splitter
from .merger import Merger

log = Util.get_log('chain')


[docs]class Chain: """ Holder for single invokable pipeline of components A Chain is basically a singly linked list of Components Each Component executes a part of the total ETL. Data along the Chain is passed within a Packet object. The compatibility of input and output for linked Components is checked when adding a Component to the Chain. """ def __init__(self, chain_str, config_dict): self.first_comp = None self.cur_comp = None self.config_dict = config_dict self.chain_str = chain_str.strip()
[docs] def assemble(self): """ Builder method: build a Chain of linked Components :return: """ log.info('Assembling Chain: %s...' % self.chain_str) # Create linked list of input/filter/output (ETL Component) objects chain_str = self.chain_str sub_comps = [] while chain_str: chain_str = chain_str.strip() # Check and handle Splitter construct # e.g. input_xml_file |(transformer_xslt|output_file) (output_std) (transformer_xslt|output_std) if chain_str.startswith('('): etl_section_name, chain_str = chain_str.split(')', 1) etl_section_name = etl_section_name.strip('(') # Check for subchain (split at Filter level) if '|' in etl_section_name: # Have subchain: use Chain to assemble sub_chain = Chain(etl_section_name, self.config_dict) sub_chain.assemble() child_comp = sub_chain.first_comp else: # Single component (Output) to split child_comp = factory.create_obj(self.config_dict, etl_section_name.strip()) # Assemble Components (can be subchains) for Splitter later sub_comps.append(child_comp) if '(' in chain_str: # Still components (subchains) to assemble for Splitter continue if len(sub_comps) > 0: if chain_str.startswith('|'): # Next component is Merger with children etl_comp = Merger(self.config_dict, sub_comps) dummy, chain_str = chain_str.split('|', 1) else: # Next component is Splitter with children etl_comp = Splitter(self.config_dict, sub_comps) sub_comps = [] else: # "Normal" case: regular Components piped in Chain if '|' in chain_str: # More than one component in remaining Chain etl_section_name, chain_str = chain_str.split('|', 1) else: # Last element, we're done! etl_section_name = chain_str chain_str = None # Create the ETL component by name and properties etl_comp = factory.create_obj(self.config_dict, etl_section_name.strip()) # Add component to end of Chain self.add(etl_comp)
[docs] def add(self, etl_comp): """ Add component to end of Chain :param etl_comp: :return: """ if not self.first_comp: self.first_comp = etl_comp else: # Already component(s) in chain add to current self.cur_comp.add_next(etl_comp) # Remember current self.cur_comp = etl_comp
[docs] def get_by_class(self, clazz): """ Get Component instance from Chain by class, mainly for testing. :param clazz: :return Component: """ cur_comp = self.first_comp while cur_comp: if cur_comp.__class__ == clazz: return cur_comp # Try next in Chain cur_comp = cur_comp.next return None
[docs] def get_by_id(self, id): """ Get Component instance from Chain, mainly for testing. :param name: :return Component: """ cur_comp = self.first_comp while cur_comp: if cur_comp.get_id() == id: return cur_comp # Try next in Chain cur_comp = cur_comp.next return None
[docs] def get_by_index(self, index): """ Get Component instance from Chain by position/index in Chain, mainly for testing. :param clazz: :return Component: """ cur_comp = self.first_comp i = 0 while cur_comp and i < index: # Try next in Chain cur_comp = cur_comp.next i += 1 return cur_comp
[docs] def run(self): """ Run the ETL Chain. :return: """ log.info('Running Chain: %s' % self.chain_str) # One time init for entire Chain self.first_comp.do_init() # Do ETL as long as input available in Packet packet = Packet() rounds = 0 try: while not packet.is_end_of_stream(): # try: # Invoke the first component to start the chain packet.init() packet = self.first_comp.process(packet) rounds += 1 # except (Exception), e: # log.error("Fatal Error in ETL: %s"% str(e)) # break finally: # Always one time exit for entire Chain self.first_comp.do_exit() log.info('DONE - %d rounds - chain=%s ' % (rounds, self.chain_str))