Source code for stetl.inputs.httpinput

# -*- coding: utf-8 -*-
#
# Input classes for fetching data via HTTP.
#
# Author: Just van den Broecke
#
import re
from urllib2 import Request, urlopen, URLError, HTTPError
import urllib
import base64

from stetl.component import Config
from stetl.input import Input
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('httpinput')


[docs]class HttpInput(Input): """ Fetch data from remote services like WFS via HTTP protocol. Base class: subclasses will do datatype-specific formatting of the returned data. produces=FORMAT.any """ # Start attribute config meta # Applying Decorator pattern with the Config class to provide # read-only config values from the configured properties.
[docs] @Config(ptype=str, default=None, required=True) def url(self): """ The HTTP URL string. """ pass
[docs] @Config(ptype=dict, default=None, required=False) def auth(self): """ Authentication data: Flat JSON-like struct dependent on auth type/schema. Only the `type` field is required, other fields depend on auth schema. Supported values : :: type: basic|token If the type is ``basic`` (HTTP Basic Authentication) two additional fields ``user`` and ``password`` are required. If the type is ``token`` (HTTP Token) additional two additional fields ``keyword`` and ``token`` are required. Any required Base64 encoding is provided by ``HttpInput``. Examples: :: # Basic Auth url = https://some.rest.api.com auth = { type: basic, user: myname password: mypassword } # Token Auth url = https://some.rest.api.com auth = { type: token, keyword: Bearer token: mytoken } """ pass
[docs] @Config(ptype=dict, default=None, required=False) def parameters(self): """ Flat JSON-like struct of the parameters to be appended to the url. Example: (parameters require quotes):: url = http://geodata.nationaalgeoregister.nl/natura2000/wfs parameters = { service : WFS, version : 1.1.0, request : GetFeature, srsName : EPSG:28992, outputFormat : text/xml; subtype=gml/2.1.2, typename : natura2000 } """ pass
# End attribute config meta def __init__(self, configdict, section, produces=FORMAT.any): Input.__init__(self, configdict, section, produces) log.info("url=%s parameters=%s" % (self.url, self.parameters))
[docs] def add_authorization(self, request): """ Add authorization from config data. Authorization scheme-specific. May be extended or overloaded for additional schemes. :param request: the HTTP Request :return: """ auth_creds = self.auth auth_type = auth_creds['type'] auth_val = None if auth_type == 'basic': # Basic auth: http://mozgovipc.blogspot.nl/2012/06/python-http-basic-authentication-with.html # base64 encode username and password # write the Authorization header like: 'Basic base64encode(username + ':' + password) auth_val = base64.encodestring('%s:%s' % (auth_creds['user'], auth_creds['password'])) auth_val = "Basic %s" % auth_val elif auth_type == 'token': # Bearer Type, see eg. https://tools.ietf.org/html/rfc6750 auth_val = "%s %s" % (auth_creds['keyword'], auth_creds['token']) request.add_header("Authorization", auth_val.replace('\n', ''))
[docs] def read_from_url(self, url, parameters=None): """ Read the data from the URL. :param url: the url to fetch :param parameters: optional dict of query parameters :return: """ # log.info('Fetch data from URL: %s ...' % url) req = Request(url) try: # Urlencode optional parameters query_string = None if parameters: query_string = urllib.urlencode(parameters) # Add optional Authorization if self.auth: self.add_authorization(req) response = urlopen(req, query_string) except HTTPError as e: log.error('HTTPError fetching from URL %s: code=%d e=%s' % (url, e.code, e)) raise e except URLError as e: log.error('URLError fetching from URL %s: reason=%s e=%s' % (url, e.reason, e)) raise e # Everything is fine return response.read()
[docs] def read(self, packet): """ Read the data from the URL. :param packet: :return: """ # Done with URL ? if self.url is None: packet.set_end_of_stream() log.info("EOF URL reading done") return packet packet.data = self.format_data(self.read_from_url(self.url, self.parameters)) self.url = None return packet
[docs] def format_data(self, data): """ Format response data, override in subclasses, defaults to returning original data. :param packet: :return: """ return data
[docs]class ApacheDirInput(HttpInput): """ Read file data from an Apache directory "index" HTML page. Uses http://stackoverflow.com/questions/686147/url-tree-walker-in-python produces=FORMAT.record. Each record contains file_name and file_data (other meta data like date time is too fragile over different Apache servers). """
[docs] @Config(ptype=str, default='xml', required=False) def file_ext(self): """ The file extension for target files in Apache dir. """ pass
def __init__(self, configdict, section, produces=FORMAT.record): HttpInput.__init__(self, configdict, section, produces) # look for a link + a timestamp + a size ('-' for dir) # self.parse_re = re.compile('href="([^"]*)".*(..-...-.... ..:..).*?(\d+[^\s<]*|-)') # This appeared to be too fragile, e.g. different date formats per apache server # default file extension to filter # default regular expression for file self.file_reg_exp = self.cfg.get('file_reg_exp', 'href="([^"]*%s)"' % self.file_ext) self.parse_re = re.compile(self.file_reg_exp) self.file_list = None self.file_index = None if not self.url.endswith('/'): self.url += '/'
[docs] def init(self): """ Read the list of files from the Apache index URL. """ # One time: get all files from remote Apache dir log.info('Init: fetching file list from URL: %s ...' % self.url) html = self.read_from_url(self.url) self.file_list = self.parse_re.findall(html) log.info('Found %4d file' % len(self.file_list) + 's' * (len(self.file_list) != 1))
[docs] def next_file(self): """ Return a tuple (name, date, size) with next file info. :return tuple: """ if self.file_index is None: self.file_index = -1 # At last file tuple ? if self.no_more_files(): return None self.file_index += 1 return self.file_list[self.file_index]
[docs] def no_more_files(self): """ More files left?. :return Boolean: """ return self.file_index == len(self.file_list) - 1
[docs] def read(self, packet): """ Read the data from the URL. :param packet: :return: """ file_name = self.next_file() file_name = self.filter_file(file_name) # All files done? if file_name is None and self.no_more_files() is True: packet.set_end_of_stream() log.info("EOF Apache dir files done, file_index=%d" % self.file_index) return packet if file_name is None: return packet # Process next file url = self.url + file_name log.info("Reading file_index=%d, file_name=%s " % (self.file_index, file_name)) # Create record from file_name and file content packet.data = dict(file_name=file_name, file_data=self.read_from_url(url)) return packet
[docs] def filter_file(self, file_name): """ Filter the file_name, e.g. to suppress reading, default: return file_name. :param file_name: :return string or None: """ return file_name