630 lines
20 KiB
Python
630 lines
20 KiB
Python
"""HTML handling.
|
|
|
|
Copyright 2003-2006 John J. Lee <jjl@pobox.com>
|
|
|
|
This code is free software; you can redistribute it and/or modify it under
|
|
the terms of the BSD or ZPL 2.1 licenses (see the file COPYING.txt
|
|
included with the distribution).
|
|
|
|
"""
|
|
|
|
import codecs
|
|
import copy
|
|
import htmlentitydefs
|
|
import re
|
|
|
|
import _sgmllib_copy as sgmllib
|
|
|
|
import _beautifulsoup
|
|
import _form
|
|
from _headersutil import split_header_words, is_html as _is_html
|
|
import _request
|
|
import _rfc3986
|
|
|
|
DEFAULT_ENCODING = "latin-1"
|
|
|
|
COMPRESS_RE = re.compile(r"\s+")
|
|
|
|
|
|
class CachingGeneratorFunction(object):
|
|
"""Caching wrapper around a no-arguments iterable."""
|
|
|
|
def __init__(self, iterable):
|
|
self._cache = []
|
|
# wrap iterable to make it non-restartable (otherwise, repeated
|
|
# __call__ would give incorrect results)
|
|
self._iterator = iter(iterable)
|
|
|
|
def __call__(self):
|
|
cache = self._cache
|
|
for item in cache:
|
|
yield item
|
|
for item in self._iterator:
|
|
cache.append(item)
|
|
yield item
|
|
|
|
|
|
class EncodingFinder:
|
|
def __init__(self, default_encoding):
|
|
self._default_encoding = default_encoding
|
|
def encoding(self, response):
|
|
# HTTPEquivProcessor may be in use, so both HTTP and HTTP-EQUIV
|
|
# headers may be in the response. HTTP-EQUIV headers come last,
|
|
# so try in order from first to last.
|
|
for ct in response.info().getheaders("content-type"):
|
|
for k, v in split_header_words([ct])[0]:
|
|
if k == "charset":
|
|
encoding = v
|
|
try:
|
|
codecs.lookup(v)
|
|
except LookupError:
|
|
continue
|
|
else:
|
|
return encoding
|
|
return self._default_encoding
|
|
|
|
|
|
class ResponseTypeFinder:
|
|
def __init__(self, allow_xhtml):
|
|
self._allow_xhtml = allow_xhtml
|
|
def is_html(self, response, encoding):
|
|
ct_hdrs = response.info().getheaders("content-type")
|
|
url = response.geturl()
|
|
# XXX encoding
|
|
return _is_html(ct_hdrs, url, self._allow_xhtml)
|
|
|
|
|
|
class Args(object):
|
|
|
|
# idea for this argument-processing trick is from Peter Otten
|
|
|
|
def __init__(self, args_map):
|
|
self.__dict__["dictionary"] = dict(args_map)
|
|
|
|
def __getattr__(self, key):
|
|
try:
|
|
return self.dictionary[key]
|
|
except KeyError:
|
|
return getattr(self.__class__, key)
|
|
|
|
def __setattr__(self, key, value):
|
|
if key == "dictionary":
|
|
raise AttributeError()
|
|
self.dictionary[key] = value
|
|
|
|
|
|
def form_parser_args(
|
|
select_default=False,
|
|
form_parser_class=None,
|
|
request_class=None,
|
|
backwards_compat=False,
|
|
):
|
|
return Args(locals())
|
|
|
|
|
|
class Link:
|
|
def __init__(self, base_url, url, text, tag, attrs):
|
|
assert None not in [url, tag, attrs]
|
|
self.base_url = base_url
|
|
self.absolute_url = _rfc3986.urljoin(base_url, url)
|
|
self.url, self.text, self.tag, self.attrs = url, text, tag, attrs
|
|
def __cmp__(self, other):
|
|
try:
|
|
for name in "url", "text", "tag", "attrs":
|
|
if getattr(self, name) != getattr(other, name):
|
|
return -1
|
|
except AttributeError:
|
|
return -1
|
|
return 0
|
|
def __repr__(self):
|
|
return "Link(base_url=%r, url=%r, text=%r, tag=%r, attrs=%r)" % (
|
|
self.base_url, self.url, self.text, self.tag, self.attrs)
|
|
|
|
|
|
class LinksFactory:
|
|
|
|
def __init__(self,
|
|
link_parser_class=None,
|
|
link_class=Link,
|
|
urltags=None,
|
|
):
|
|
import _pullparser
|
|
if link_parser_class is None:
|
|
link_parser_class = _pullparser.TolerantPullParser
|
|
self.link_parser_class = link_parser_class
|
|
self.link_class = link_class
|
|
if urltags is None:
|
|
urltags = {
|
|
"a": "href",
|
|
"area": "href",
|
|
"frame": "src",
|
|
"iframe": "src",
|
|
}
|
|
self.urltags = urltags
|
|
self._response = None
|
|
self._encoding = None
|
|
|
|
def set_response(self, response, base_url, encoding):
|
|
self._response = response
|
|
self._encoding = encoding
|
|
self._base_url = base_url
|
|
|
|
def links(self):
|
|
"""Return an iterator that provides links of the document."""
|
|
response = self._response
|
|
encoding = self._encoding
|
|
base_url = self._base_url
|
|
p = self.link_parser_class(response, encoding=encoding)
|
|
|
|
try:
|
|
for token in p.tags(*(self.urltags.keys()+["base"])):
|
|
if token.type == "endtag":
|
|
continue
|
|
if token.data == "base":
|
|
base_href = dict(token.attrs).get("href")
|
|
if base_href is not None:
|
|
base_url = base_href
|
|
continue
|
|
attrs = dict(token.attrs)
|
|
tag = token.data
|
|
text = None
|
|
# XXX use attr_encoding for ref'd doc if that doc does not
|
|
# provide one by other means
|
|
#attr_encoding = attrs.get("charset")
|
|
url = attrs.get(self.urltags[tag]) # XXX is "" a valid URL?
|
|
if not url:
|
|
# Probably an <A NAME="blah"> link or <AREA NOHREF...>.
|
|
# For our purposes a link is something with a URL, so
|
|
# ignore this.
|
|
continue
|
|
|
|
url = _rfc3986.clean_url(url, encoding)
|
|
if tag == "a":
|
|
if token.type != "startendtag":
|
|
# hmm, this'd break if end tag is missing
|
|
text = p.get_compressed_text(("endtag", tag))
|
|
# but this doesn't work for e.g.
|
|
# <a href="blah"><b>Andy</b></a>
|
|
#text = p.get_compressed_text()
|
|
|
|
yield Link(base_url, url, text, tag, token.attrs)
|
|
except sgmllib.SGMLParseError, exc:
|
|
raise _form.ParseError(exc)
|
|
|
|
class FormsFactory:
|
|
|
|
"""Makes a sequence of objects satisfying HTMLForm interface.
|
|
|
|
After calling .forms(), the .global_form attribute is a form object
|
|
containing all controls not a descendant of any FORM element.
|
|
|
|
For constructor argument docs, see ParseResponse argument docs.
|
|
"""
|
|
|
|
def __init__(self,
|
|
select_default=False,
|
|
form_parser_class=None,
|
|
request_class=None,
|
|
backwards_compat=False,
|
|
):
|
|
self.select_default = select_default
|
|
if form_parser_class is None:
|
|
form_parser_class = _form.FormParser
|
|
self.form_parser_class = form_parser_class
|
|
if request_class is None:
|
|
request_class = _request.Request
|
|
self.request_class = request_class
|
|
self.backwards_compat = backwards_compat
|
|
self._response = None
|
|
self.encoding = None
|
|
self.global_form = None
|
|
|
|
def set_response(self, response, encoding):
|
|
self._response = response
|
|
self.encoding = encoding
|
|
self.global_form = None
|
|
|
|
def forms(self):
|
|
encoding = self.encoding
|
|
forms = _form.ParseResponseEx(
|
|
self._response,
|
|
select_default=self.select_default,
|
|
form_parser_class=self.form_parser_class,
|
|
request_class=self.request_class,
|
|
encoding=encoding,
|
|
_urljoin=_rfc3986.urljoin,
|
|
_urlparse=_rfc3986.urlsplit,
|
|
_urlunparse=_rfc3986.urlunsplit,
|
|
)
|
|
self.global_form = forms[0]
|
|
return forms[1:]
|
|
|
|
class TitleFactory:
|
|
def __init__(self):
|
|
self._response = self._encoding = None
|
|
|
|
def set_response(self, response, encoding):
|
|
self._response = response
|
|
self._encoding = encoding
|
|
|
|
def _get_title_text(self, parser):
|
|
import _pullparser
|
|
text = []
|
|
tok = None
|
|
while 1:
|
|
try:
|
|
tok = parser.get_token()
|
|
except _pullparser.NoMoreTokensError:
|
|
break
|
|
if tok.type == "data":
|
|
text.append(str(tok))
|
|
elif tok.type == "entityref":
|
|
t = unescape("&%s;" % tok.data,
|
|
parser._entitydefs, parser.encoding)
|
|
text.append(t)
|
|
elif tok.type == "charref":
|
|
t = unescape_charref(tok.data, parser.encoding)
|
|
text.append(t)
|
|
elif tok.type in ["starttag", "endtag", "startendtag"]:
|
|
tag_name = tok.data
|
|
if tok.type == "endtag" and tag_name == "title":
|
|
break
|
|
text.append(str(tok))
|
|
return COMPRESS_RE.sub(" ", "".join(text).strip())
|
|
|
|
def title(self):
|
|
import _pullparser
|
|
p = _pullparser.TolerantPullParser(
|
|
self._response, encoding=self._encoding)
|
|
try:
|
|
try:
|
|
p.get_tag("title")
|
|
except _pullparser.NoMoreTokensError:
|
|
return None
|
|
else:
|
|
return self._get_title_text(p)
|
|
except sgmllib.SGMLParseError, exc:
|
|
raise _form.ParseError(exc)
|
|
|
|
|
|
def unescape(data, entities, encoding):
|
|
if data is None or "&" not in data:
|
|
return data
|
|
|
|
def replace_entities(match):
|
|
ent = match.group()
|
|
if ent[1] == "#":
|
|
return unescape_charref(ent[2:-1], encoding)
|
|
|
|
repl = entities.get(ent[1:-1])
|
|
if repl is not None:
|
|
repl = unichr(repl)
|
|
if type(repl) != type(""):
|
|
try:
|
|
repl = repl.encode(encoding)
|
|
except UnicodeError:
|
|
repl = ent
|
|
else:
|
|
repl = ent
|
|
return repl
|
|
|
|
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
|
|
|
|
def unescape_charref(data, encoding):
|
|
name, base = data, 10
|
|
if name.startswith("x"):
|
|
name, base= name[1:], 16
|
|
uc = unichr(int(name, base))
|
|
if encoding is None:
|
|
return uc
|
|
else:
|
|
try:
|
|
repl = uc.encode(encoding)
|
|
except UnicodeError:
|
|
repl = "&#%s;" % data
|
|
return repl
|
|
|
|
|
|
class MechanizeBs(_beautifulsoup.BeautifulSoup):
|
|
_entitydefs = htmlentitydefs.name2codepoint
|
|
# don't want the magic Microsoft-char workaround
|
|
PARSER_MASSAGE = [(re.compile('(<[^<>]*)/>'),
|
|
lambda(x):x.group(1) + ' />'),
|
|
(re.compile('<!\s+([^<>]*)>'),
|
|
lambda(x):'<!' + x.group(1) + '>')
|
|
]
|
|
|
|
def __init__(self, encoding, text=None, avoidParserProblems=True,
|
|
initialTextIsEverything=True):
|
|
self._encoding = encoding
|
|
_beautifulsoup.BeautifulSoup.__init__(
|
|
self, text, avoidParserProblems, initialTextIsEverything)
|
|
|
|
def handle_charref(self, ref):
|
|
t = unescape("&#%s;"%ref, self._entitydefs, self._encoding)
|
|
self.handle_data(t)
|
|
def handle_entityref(self, ref):
|
|
t = unescape("&%s;"%ref, self._entitydefs, self._encoding)
|
|
self.handle_data(t)
|
|
def unescape_attrs(self, attrs):
|
|
escaped_attrs = []
|
|
for key, val in attrs:
|
|
val = unescape(val, self._entitydefs, self._encoding)
|
|
escaped_attrs.append((key, val))
|
|
return escaped_attrs
|
|
|
|
class RobustLinksFactory:
|
|
|
|
compress_re = COMPRESS_RE
|
|
|
|
def __init__(self,
|
|
link_parser_class=None,
|
|
link_class=Link,
|
|
urltags=None,
|
|
):
|
|
if link_parser_class is None:
|
|
link_parser_class = MechanizeBs
|
|
self.link_parser_class = link_parser_class
|
|
self.link_class = link_class
|
|
if urltags is None:
|
|
urltags = {
|
|
"a": "href",
|
|
"area": "href",
|
|
"frame": "src",
|
|
"iframe": "src",
|
|
}
|
|
self.urltags = urltags
|
|
self._bs = None
|
|
self._encoding = None
|
|
self._base_url = None
|
|
|
|
def set_soup(self, soup, base_url, encoding):
|
|
self._bs = soup
|
|
self._base_url = base_url
|
|
self._encoding = encoding
|
|
|
|
def links(self):
|
|
bs = self._bs
|
|
base_url = self._base_url
|
|
encoding = self._encoding
|
|
for ch in bs.recursiveChildGenerator():
|
|
if (isinstance(ch, _beautifulsoup.Tag) and
|
|
ch.name in self.urltags.keys()+["base"]):
|
|
link = ch
|
|
attrs = bs.unescape_attrs(link.attrs)
|
|
attrs_dict = dict(attrs)
|
|
if link.name == "base":
|
|
base_href = attrs_dict.get("href")
|
|
if base_href is not None:
|
|
base_url = base_href
|
|
continue
|
|
url_attr = self.urltags[link.name]
|
|
url = attrs_dict.get(url_attr)
|
|
if not url:
|
|
continue
|
|
url = _rfc3986.clean_url(url, encoding)
|
|
text = link.fetchText(lambda t: True)
|
|
if not text:
|
|
# follow _pullparser's weird behaviour rigidly
|
|
if link.name == "a":
|
|
text = ""
|
|
else:
|
|
text = None
|
|
else:
|
|
text = self.compress_re.sub(" ", " ".join(text).strip())
|
|
yield Link(base_url, url, text, link.name, attrs)
|
|
|
|
|
|
class RobustFormsFactory(FormsFactory):
|
|
def __init__(self, *args, **kwds):
|
|
args = form_parser_args(*args, **kwds)
|
|
if args.form_parser_class is None:
|
|
args.form_parser_class = _form.RobustFormParser
|
|
FormsFactory.__init__(self, **args.dictionary)
|
|
|
|
def set_response(self, response, encoding):
|
|
self._response = response
|
|
self.encoding = encoding
|
|
|
|
|
|
class RobustTitleFactory:
|
|
def __init__(self):
|
|
self._bs = self._encoding = None
|
|
|
|
def set_soup(self, soup, encoding):
|
|
self._bs = soup
|
|
self._encoding = encoding
|
|
|
|
def title(self):
|
|
title = self._bs.first("title")
|
|
if title == _beautifulsoup.Null:
|
|
return None
|
|
else:
|
|
inner_html = "".join([str(node) for node in title.contents])
|
|
return COMPRESS_RE.sub(" ", inner_html.strip())
|
|
|
|
|
|
class Factory:
|
|
"""Factory for forms, links, etc.
|
|
|
|
This interface may expand in future.
|
|
|
|
Public methods:
|
|
|
|
set_request_class(request_class)
|
|
set_response(response)
|
|
forms()
|
|
links()
|
|
|
|
Public attributes:
|
|
|
|
Note that accessing these attributes may raise ParseError.
|
|
|
|
encoding: string specifying the encoding of response if it contains a text
|
|
document (this value is left unspecified for documents that do not have
|
|
an encoding, e.g. an image file)
|
|
is_html: true if response contains an HTML document (XHTML may be
|
|
regarded as HTML too)
|
|
title: page title, or None if no title or not HTML
|
|
global_form: form object containing all controls that are not descendants
|
|
of any FORM element, or None if the forms_factory does not support
|
|
supplying a global form
|
|
|
|
"""
|
|
|
|
LAZY_ATTRS = ["encoding", "is_html", "title", "global_form"]
|
|
|
|
def __init__(self, forms_factory, links_factory, title_factory,
|
|
encoding_finder=EncodingFinder(DEFAULT_ENCODING),
|
|
response_type_finder=ResponseTypeFinder(allow_xhtml=False),
|
|
):
|
|
"""
|
|
|
|
Pass keyword arguments only.
|
|
|
|
default_encoding: character encoding to use if encoding cannot be
|
|
determined (or guessed) from the response. You should turn on
|
|
HTTP-EQUIV handling if you want the best chance of getting this right
|
|
without resorting to this default. The default value of this
|
|
parameter (currently latin-1) may change in future.
|
|
|
|
"""
|
|
self._forms_factory = forms_factory
|
|
self._links_factory = links_factory
|
|
self._title_factory = title_factory
|
|
self._encoding_finder = encoding_finder
|
|
self._response_type_finder = response_type_finder
|
|
|
|
self.set_response(None)
|
|
|
|
def set_request_class(self, request_class):
|
|
"""Set request class (mechanize.Request by default).
|
|
|
|
HTMLForm instances returned by .forms() will return instances of this
|
|
class when .click()ed.
|
|
|
|
"""
|
|
self._forms_factory.request_class = request_class
|
|
|
|
def set_response(self, response):
|
|
"""Set response.
|
|
|
|
The response must either be None or implement the same interface as
|
|
objects returned by mechanize.urlopen().
|
|
|
|
"""
|
|
self._response = response
|
|
self._forms_genf = self._links_genf = None
|
|
self._get_title = None
|
|
for name in self.LAZY_ATTRS:
|
|
try:
|
|
delattr(self, name)
|
|
except AttributeError:
|
|
pass
|
|
|
|
def __getattr__(self, name):
|
|
if name not in self.LAZY_ATTRS:
|
|
return getattr(self.__class__, name)
|
|
|
|
if name == "encoding":
|
|
self.encoding = self._encoding_finder.encoding(
|
|
copy.copy(self._response))
|
|
return self.encoding
|
|
elif name == "is_html":
|
|
self.is_html = self._response_type_finder.is_html(
|
|
copy.copy(self._response), self.encoding)
|
|
return self.is_html
|
|
elif name == "title":
|
|
if self.is_html:
|
|
self.title = self._title_factory.title()
|
|
else:
|
|
self.title = None
|
|
return self.title
|
|
elif name == "global_form":
|
|
self.forms()
|
|
return self.global_form
|
|
|
|
def forms(self):
|
|
"""Return iterable over HTMLForm-like objects.
|
|
|
|
Raises mechanize.ParseError on failure.
|
|
"""
|
|
# this implementation sets .global_form as a side-effect, for benefit
|
|
# of __getattr__ impl
|
|
if self._forms_genf is None:
|
|
try:
|
|
self._forms_genf = CachingGeneratorFunction(
|
|
self._forms_factory.forms())
|
|
except: # XXXX define exception!
|
|
self.set_response(self._response)
|
|
raise
|
|
self.global_form = getattr(
|
|
self._forms_factory, "global_form", None)
|
|
return self._forms_genf()
|
|
|
|
def links(self):
|
|
"""Return iterable over mechanize.Link-like objects.
|
|
|
|
Raises mechanize.ParseError on failure.
|
|
"""
|
|
if self._links_genf is None:
|
|
try:
|
|
self._links_genf = CachingGeneratorFunction(
|
|
self._links_factory.links())
|
|
except: # XXXX define exception!
|
|
self.set_response(self._response)
|
|
raise
|
|
return self._links_genf()
|
|
|
|
class DefaultFactory(Factory):
|
|
"""Based on sgmllib."""
|
|
def __init__(self, i_want_broken_xhtml_support=False):
|
|
Factory.__init__(
|
|
self,
|
|
forms_factory=FormsFactory(),
|
|
links_factory=LinksFactory(),
|
|
title_factory=TitleFactory(),
|
|
response_type_finder=ResponseTypeFinder(
|
|
allow_xhtml=i_want_broken_xhtml_support),
|
|
)
|
|
|
|
def set_response(self, response):
|
|
Factory.set_response(self, response)
|
|
if response is not None:
|
|
self._forms_factory.set_response(
|
|
copy.copy(response), self.encoding)
|
|
self._links_factory.set_response(
|
|
copy.copy(response), response.geturl(), self.encoding)
|
|
self._title_factory.set_response(
|
|
copy.copy(response), self.encoding)
|
|
|
|
class RobustFactory(Factory):
|
|
"""Based on BeautifulSoup, hopefully a bit more robust to bad HTML than is
|
|
DefaultFactory.
|
|
|
|
"""
|
|
def __init__(self, i_want_broken_xhtml_support=False,
|
|
soup_class=None):
|
|
Factory.__init__(
|
|
self,
|
|
forms_factory=RobustFormsFactory(),
|
|
links_factory=RobustLinksFactory(),
|
|
title_factory=RobustTitleFactory(),
|
|
response_type_finder=ResponseTypeFinder(
|
|
allow_xhtml=i_want_broken_xhtml_support),
|
|
)
|
|
if soup_class is None:
|
|
soup_class = MechanizeBs
|
|
self._soup_class = soup_class
|
|
|
|
def set_response(self, response):
|
|
Factory.set_response(self, response)
|
|
if response is not None:
|
|
data = response.read()
|
|
soup = self._soup_class(self.encoding, data)
|
|
self._forms_factory.set_response(
|
|
copy.copy(response), self.encoding)
|
|
self._links_factory.set_soup(
|
|
soup, response.geturl(), self.encoding)
|
|
self._title_factory.set_soup(soup, self.encoding)
|