Source code for stetl.filters.xmlassembler

#!/usr/bin/env python
#
# Splits stream of XML elements into etree docs.
#
# Author: Just van den Broecke
#
from stetl.util import Util, etree
from stetl.filter import Filter
from stetl.packet import FORMAT

log = Util.get_log('xmlassembler')


[docs] class XmlAssembler(Filter): """ Split a stream of etree DOM XML elements (usually Features) into etree DOM docs. Consumes and buffers elements until max_elements reached, will then produce an etree doc. consumes=FORMAT.etree_element, produces=FORMAT.etree_doc """ xpath_base = "//*[local-name() = '%s']" # Constructor def __init__(self, configdict, section): Filter.__init__(self, configdict, section, consumes=FORMAT.etree_element, produces=FORMAT.etree_doc) log.info("cfg = %s" % self.cfg.to_string()) self.max_elements = self.cfg.get_int('max_elements', 10000) self.container_doc = self.cfg.get('container_doc') self.element_container_xpath = XmlAssembler.xpath_base % self.cfg.get('element_container_tag') self.total_element_count = 0 self.element_arr = [] # Reusable XML parser self.xml_parser = etree.XMLParser(remove_blank_text=True)
[docs] def invoke(self, packet): if packet.data is not None: # Valid element: consume and handle self.consume_element(packet) # Document is obviously not finished, reset EoD/EoS in packet packet.set_end_of_stream(False) packet.set_end_of_doc(False) if packet.is_end_of_stream() or packet.is_end_of_doc() or len(self.element_arr) >= self.max_elements: # EOF but still data in buffer: make doc # log.info("Flush doc") self.flush_elements(packet) return packet
def consume_element(self, packet): # Always move the data (element) from packet element = packet.consume() if element is not None: self.total_element_count += 1 self.element_arr.append(element) return packet def flush_elements(self, packet): packet.set_end_of_doc() if len(self.element_arr) == 0: return packet # Start new doc (TODO clone) try: etree_doc = etree.fromstring(self.container_doc.encode('utf-8'), self.xml_parser) except Exception as e: log.error('new container doc not OK: %s' % str(e)) return packet parent_element = etree_doc.xpath(self.element_container_xpath) if len(parent_element) > 0: parent_element = parent_element[0] for element in self.element_arr: parent_element.append(element) log.info('xmldoc ready: elms=%d total_elms=%d' % (len(self.element_arr), self.total_element_count)) packet.data = etree_doc self.element_arr = [] return packet