# SAX-like interfaces
class XMLSyntaxAssertionError(XMLSyntaxError, AssertionError):
"""
An XMLSyntaxError that additionally inherits from AssertionError for
ElementTree / backwards compatibility reasons.
This class may get replaced by a plain XMLSyntaxError in a future version.
"""
ctypedef enum _SaxParserEvents:
SAX_EVENT_START = 1 << 0
SAX_EVENT_END = 1 << 1
SAX_EVENT_DATA = 1 << 2
SAX_EVENT_DOCTYPE = 1 << 3
SAX_EVENT_PI = 1 << 4
SAX_EVENT_COMMENT = 1 << 5
SAX_EVENT_START_NS = 1 << 6
SAX_EVENT_END_NS = 1 << 7
ctypedef enum _ParseEventFilter:
PARSE_EVENT_FILTER_START = 1 << 0
PARSE_EVENT_FILTER_END = 1 << 1
PARSE_EVENT_FILTER_START_NS = 1 << 2
PARSE_EVENT_FILTER_END_NS = 1 << 3
PARSE_EVENT_FILTER_COMMENT = 1 << 4
PARSE_EVENT_FILTER_PI = 1 << 5
cdef int _buildParseEventFilter(events) except -1:
cdef int event_filter
event_filter = 0
for event in events:
if event == 'start':
event_filter |= PARSE_EVENT_FILTER_START
elif event == 'end':
event_filter |= PARSE_EVENT_FILTER_END
elif event == 'start-ns':
event_filter |= PARSE_EVENT_FILTER_START_NS
elif event == 'end-ns':
event_filter |= PARSE_EVENT_FILTER_END_NS
elif event == 'comment':
event_filter |= PARSE_EVENT_FILTER_COMMENT
elif event == 'pi':
event_filter |= PARSE_EVENT_FILTER_PI
else:
raise ValueError, f"invalid event name '{event}'"
return event_filter
cdef class _SaxParserTarget:
cdef int _sax_event_filter
def __cinit__(self):
self._sax_event_filter = 0
cdef _handleSaxStart(self, tag, attrib, nsmap):
return None
cdef _handleSaxEnd(self, tag):
return None
cdef int _handleSaxData(self, data) except -1:
return 0
cdef int _handleSaxDoctype(self, root_tag, public_id, system_id) except -1:
return 0
cdef _handleSaxPi(self, target, data):
return None
cdef _handleSaxComment(self, comment):
return None
cdef _handleSaxStartNs(self, prefix, uri):
return None
cdef _handleSaxEndNs(self, prefix):
return None
#@cython.final
@cython.internal
@cython.no_gc_clear # Required because parent class uses it - Cython bug.
cdef class _SaxParserContext(_ParserContext):
u"""This class maps SAX2 events to parser target events.
"""
cdef _SaxParserTarget _target
cdef _BaseParser _parser
cdef xmlparser.startElementNsSAX2Func _origSaxStart
cdef xmlparser.endElementNsSAX2Func _origSaxEnd
cdef xmlparser.startElementSAXFunc _origSaxStartNoNs
cdef xmlparser.endElementSAXFunc _origSaxEndNoNs
cdef xmlparser.charactersSAXFunc _origSaxData
cdef xmlparser.cdataBlockSAXFunc _origSaxCData
cdef xmlparser.internalSubsetSAXFunc _origSaxDoctype
cdef xmlparser.commentSAXFunc _origSaxComment
cdef xmlparser.processingInstructionSAXFunc _origSaxPI
cdef xmlparser.startDocumentSAXFunc _origSaxStartDocument
# for event collecting
cdef int _event_filter
cdef list _ns_stack
cdef list _node_stack
cdef _ParseEventsIterator events_iterator
# for iterparse
cdef _Element _root
cdef _MultiTagMatcher _matcher
def __cinit__(self, _BaseParser parser):
self._ns_stack = []
self._node_stack = []
self._parser = parser
self.events_iterator = _ParseEventsIterator()
cdef void _setSaxParserTarget(self, _SaxParserTarget target):
self._target = target
cdef void _initParserContext(self, xmlparser.xmlParserCtxt* c_ctxt):
_ParserContext._initParserContext(self, c_ctxt)
if self._target is not None:
self._connectTarget(c_ctxt)
elif self._event_filter:
self._connectEvents(c_ctxt)
cdef void _connectTarget(self, xmlparser.xmlParserCtxt* c_ctxt):
"""Wrap original SAX2 callbacks to call into parser target.
"""
sax = c_ctxt.sax
self._origSaxStart = sax.startElementNs = NULL
self._origSaxStartNoNs = sax.startElement = NULL
if self._target._sax_event_filter & (SAX_EVENT_START |
SAX_EVENT_START_NS |
SAX_EVENT_END_NS):
# intercept => overwrite orig callback
# FIXME: also intercept on when collecting END events
if sax.initialized == xmlparser.XML_SAX2_MAGIC:
sax.startElementNs = _handleSaxTargetStart
if self._target._sax_event_filter & SAX_EVENT_START:
sax.startElement = _handleSaxTargetStartNoNs
self._origSaxEnd = sax.endElementNs = NULL
self._origSaxEndNoNs = sax.endElement = NULL
if self._target._sax_event_filter & (SAX_EVENT_END |
SAX_EVENT_END_NS):
if sax.initialized == xmlparser.XML_SAX2_MAGIC:
sax.endElementNs = _handleSaxEnd
if self._target._sax_event_filter & SAX_EVENT_END:
sax.endElement = _handleSaxEndNoNs
self._origSaxData = sax.characters = sax.cdataBlock = NULL
if self._target._sax_event_filter & SAX_EVENT_DATA:
sax.characters = sax.cdataBlock = _handleSaxData
# doctype propagation is always required for entity replacement
self._origSaxDoctype = sax.internalSubset
if self._target._sax_event_filter & SAX_EVENT_DOCTYPE:
sax.internalSubset = _handleSaxTargetDoctype
self._origSaxPI = sax.processingInstruction = NULL
if self._target._sax_event_filter & SAX_EVENT_PI:
sax.processingInstruction = _handleSaxTargetPI
self._origSaxComment = sax.comment = NULL
if self._target._sax_event_filter & SAX_EVENT_COMMENT:
sax.comment = _handleSaxTargetComment
# enforce entity replacement
sax.reference = NULL
c_ctxt.replaceEntities = 1
cdef void _connectEvents(self, xmlparser.xmlParserCtxt* c_ctxt):
"""Wrap original SAX2 callbacks to collect parse events without parser target.
"""
sax = c_ctxt.sax
self._origSaxStartDocument = sax.startDocument
sax.startDocument = _handleSaxStartDocument
# only override "start" event handler if needed
self._origSaxStart = sax.startElementNs
if self._event_filter == 0 or c_ctxt.html or \
self._event_filter & (PARSE_EVENT_FILTER_START |
PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_START_NS |
PARSE_EVENT_FILTER_END_NS):
sax.startElementNs = <xmlparser.startElementNsSAX2Func>_handleSaxStart
self._origSaxStartNoNs = sax.startElement
if self._event_filter == 0 or c_ctxt.html or \
self._event_filter & (PARSE_EVENT_FILTER_START |
PARSE_EVENT_FILTER_END):
sax.startElement = <xmlparser.startElementSAXFunc>_handleSaxStartNoNs
# only override "end" event handler if needed
self._origSaxEnd = sax.endElementNs
if self._event_filter == 0 or \
self._event_filter & (PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_END_NS):
sax.endElementNs = <xmlparser.endElementNsSAX2Func>_handleSaxEnd
self._origSaxEndNoNs = sax.endElement
if self._event_filter == 0 or \
self._event_filter & PARSE_EVENT_FILTER_END:
sax.endElement = <xmlparser.endElementSAXFunc>_handleSaxEndNoNs
self._origSaxComment = sax.comment
if self._event_filter & PARSE_EVENT_FILTER_COMMENT:
sax.comment = <xmlparser.commentSAXFunc>_handleSaxComment
self._origSaxPI = sax.processingInstruction
if self._event_filter & PARSE_EVENT_FILTER_PI:
sax.processingInstruction = <xmlparser.processingInstructionSAXFunc>_handleSaxPIEvent
cdef _setEventFilter(self, events, tag):
self._event_filter = _buildParseEventFilter(events)
if not self._event_filter or tag is None or tag == '*':
self._matcher = None
else:
self._matcher = _MultiTagMatcher.__new__(_MultiTagMatcher, tag)
cdef int startDocument(self, xmlDoc* c_doc) except -1:
try:
self._doc = _documentFactory(c_doc, self._parser)
finally:
self._parser = None # clear circular reference ASAP
if self._matcher is not None:
self._matcher.cacheTags(self._doc, True) # force entry in libxml2 dict
return 0
cdef int pushEvent(self, event, xmlNode* c_node) except -1:
cdef _Element root
if self._root is None:
root = self._doc.getroot()
if root is not None and root._c_node.type == tree.XML_ELEMENT_NODE:
self._root = root
node = _elementFactory(self._doc, c_node)
self.events_iterator._events.append( (event, node) )
return 0
cdef int flushEvents(self) except -1:
events = self.events_iterator._events
while self._node_stack:
events.append( ('end', self._node_stack.pop()) )
_pushSaxNsEndEvents(self)
while self._ns_stack:
_pushSaxNsEndEvents(self)
cdef void _handleSaxException(self, xmlparser.xmlParserCtxt* c_ctxt):
if c_ctxt.errNo == xmlerror.XML_ERR_OK:
c_ctxt.errNo = xmlerror.XML_ERR_INTERNAL_ERROR
# stop parsing immediately
c_ctxt.wellFormed = 0
c_ctxt.disableSAX = 1
c_ctxt.instate = xmlparser.XML_PARSER_EOF
self._store_raised()
@cython.final
@cython.internal
cdef class _ParseEventsIterator:
"""A reusable parse events iterator"""
cdef list _events
cdef int _event_index
def __cinit__(self):
self._events = []
self._event_index = 0
def __iter__(self):
return self
def __next__(self):
cdef int event_index = self._event_index
events = self._events
if event_index >= 2**10 or event_index * 2 >= len(events):
if event_index:
# clean up from time to time
del events[:event_index]
self._event_index = event_index = 0
if event_index >= len(events):
raise StopIteration
item = events[event_index]
self._event_index = event_index + 1
return item
cdef list _build_prefix_uri_list(_SaxParserContext context, int c_nb_namespaces,
const_xmlChar** c_namespaces):
"Build [(prefix, uri)] list of declared namespaces."
cdef int i
namespaces = []
for i in xrange(c_nb_namespaces):
namespaces.append((funicodeOrEmpty(c_namespaces[0]), funicode(c_namespaces[1])))
c_namespaces += 2
return namespaces
cdef void _handleSaxStart(
void* ctxt, const_xmlChar* c_localname, const_xmlChar* c_prefix,
const_xmlChar* c_namespace, int c_nb_namespaces,
const_xmlChar** c_namespaces,
int c_nb_attributes, int c_nb_defaulted,
const_xmlChar** c_attributes) with gil:
cdef int i
cdef size_t c_len
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
cdef int event_filter = context._event_filter
try:
if (c_nb_namespaces and
event_filter & (PARSE_EVENT_FILTER_START_NS |
PARSE_EVENT_FILTER_END_NS)):
declared_namespaces = _build_prefix_uri_list(
context, c_nb_namespaces, c_namespaces)
if event_filter & PARSE_EVENT_FILTER_START_NS:
for prefix_uri_tuple in declared_namespaces:
context.events_iterator._events.append(("start-ns", prefix_uri_tuple))
else:
declared_namespaces = None
context._origSaxStart(c_ctxt, c_localname, c_prefix, c_namespace,
c_nb_namespaces, c_namespaces, c_nb_attributes,
c_nb_defaulted, c_attributes)
if c_ctxt.html:
_fixHtmlDictNodeNames(c_ctxt.dict, c_ctxt.node)
if event_filter & PARSE_EVENT_FILTER_END_NS:
context._ns_stack.append(declared_namespaces)
if event_filter & (PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_START):
_pushSaxStartEvent(context, c_ctxt, c_namespace, c_localname, None)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxTargetStart(
void* ctxt, const_xmlChar* c_localname, const_xmlChar* c_prefix,
const_xmlChar* c_namespace, int c_nb_namespaces,
const_xmlChar** c_namespaces,
int c_nb_attributes, int c_nb_defaulted,
const_xmlChar** c_attributes) with gil:
cdef int i
cdef size_t c_len
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
cdef int event_filter = context._event_filter
cdef int sax_event_filter = context._target._sax_event_filter
try:
if c_nb_namespaces:
declared_namespaces = _build_prefix_uri_list(
context, c_nb_namespaces, c_namespaces)
if event_filter & PARSE_EVENT_FILTER_START_NS:
for prefix_uri_tuple in declared_namespaces:
context.events_iterator._events.append(("start-ns", prefix_uri_tuple))
if sax_event_filter & SAX_EVENT_START_NS:
for prefix, uri in declared_namespaces:
context._target._handleSaxStartNs(prefix, uri)
#if not context._target._sax_event_filter & SAX_EVENT_START:
# # *Only* collecting start-ns events.
# return
else:
declared_namespaces = None
if sax_event_filter & SAX_EVENT_START:
if c_nb_defaulted > 0:
# only add default attributes if we asked for them
if c_ctxt.loadsubset & xmlparser.XML_COMPLETE_ATTRS == 0:
c_nb_attributes -= c_nb_defaulted
if c_nb_attributes == 0:
attrib = IMMUTABLE_EMPTY_MAPPING
else:
attrib = {}
for i in xrange(c_nb_attributes):
name = _namespacedNameFromNsName(
c_attributes[2], c_attributes[0])
if c_attributes[3] is NULL:
value = ''
else:
c_len = c_attributes[4] - c_attributes[3]
value = c_attributes[3][:c_len].decode('utf8')
attrib[name] = value
c_attributes += 5
nsmap = dict(declared_namespaces) if c_nb_namespaces else IMMUTABLE_EMPTY_MAPPING
element = _callTargetSaxStart(
context, c_ctxt,
_namespacedNameFromNsName(c_namespace, c_localname),
attrib, nsmap)
else:
element = None
if (event_filter & PARSE_EVENT_FILTER_END_NS or
sax_event_filter & SAX_EVENT_END_NS):
context._ns_stack.append(declared_namespaces)
if event_filter & (PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_START):
_pushSaxStartEvent(context, c_ctxt, c_namespace,
c_localname, element)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxStartNoNs(void* ctxt, const_xmlChar* c_name,
const_xmlChar** c_attributes) with gil:
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
context._origSaxStartNoNs(c_ctxt, c_name, c_attributes)
if c_ctxt.html:
_fixHtmlDictNodeNames(c_ctxt.dict, c_ctxt.node)
if context._event_filter & (PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_START):
_pushSaxStartEvent(context, c_ctxt, NULL, c_name, None)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxTargetStartNoNs(void* ctxt, const_xmlChar* c_name,
const_xmlChar** c_attributes) with gil:
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
if c_attributes is NULL:
attrib = IMMUTABLE_EMPTY_MAPPING
else:
attrib = {}
while c_attributes[0] is not NULL:
name = funicode(c_attributes[0])
attrib[name] = funicodeOrEmpty(c_attributes[1])
c_attributes += 2
element = _callTargetSaxStart(
context, c_ctxt, funicode(c_name),
attrib, IMMUTABLE_EMPTY_MAPPING)
if context._event_filter & (PARSE_EVENT_FILTER_END |
PARSE_EVENT_FILTER_START):
_pushSaxStartEvent(context, c_ctxt, NULL, c_name, element)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef _callTargetSaxStart(_SaxParserContext context,
xmlparser.xmlParserCtxt* c_ctxt,
tag, attrib, nsmap):
element = context._target._handleSaxStart(tag, attrib, nsmap)
if element is not None and c_ctxt.input is not NULL:
if isinstance(element, _Element):
(<_Element>element)._c_node.line = (
<unsigned short>c_ctxt.input.line
if c_ctxt.input.line < 65535 else 65535)
return element
cdef int _pushSaxStartEvent(_SaxParserContext context,
xmlparser.xmlParserCtxt* c_ctxt,
const_xmlChar* c_href,
const_xmlChar* c_name, node) except -1:
if (context._matcher is None or
context._matcher.matchesNsTag(c_href, c_name)):
if node is None and context._target is None:
assert context._doc is not None
node = _elementFactory(context._doc, c_ctxt.node)
if context._event_filter & PARSE_EVENT_FILTER_START:
context.events_iterator._events.append(('start', node))
if (context._target is None and
context._event_filter & PARSE_EVENT_FILTER_END):
context._node_stack.append(node)
return 0
cdef void _handleSaxEnd(void* ctxt, const_xmlChar* c_localname,
const_xmlChar* c_prefix,
const_xmlChar* c_namespace) with gil:
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
if context._target is not None:
if context._target._sax_event_filter & SAX_EVENT_END:
node = context._target._handleSaxEnd(
_namespacedNameFromNsName(c_namespace, c_localname))
else:
node = None
else:
context._origSaxEnd(c_ctxt, c_localname, c_prefix, c_namespace)
node = None
_pushSaxEndEvent(context, c_namespace, c_localname, node)
_pushSaxNsEndEvents(context)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxEndNoNs(void* ctxt, const_xmlChar* c_name) with gil:
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
if context._target is not None:
node = context._target._handleSaxEnd(funicode(c_name))
else:
context._origSaxEndNoNs(c_ctxt, c_name)
node = None
_pushSaxEndEvent(context, NULL, c_name, node)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef int _pushSaxNsEndEvents(_SaxParserContext context) except -1:
cdef bint build_events = context._event_filter & PARSE_EVENT_FILTER_END_NS
cdef bint call_target = (
context._target is not None
and context._target._sax_event_filter & SAX_EVENT_END_NS)
if not build_events and not call_target:
return 0
cdef list declared_namespaces = context._ns_stack.pop()
if declared_namespaces is None:
return 0
cdef tuple prefix_uri
for prefix_uri in reversed(declared_namespaces):
if call_target:
context._target._handleSaxEndNs(prefix_uri[0])
if build_events:
context.events_iterator._events.append(('end-ns', None))
return 0
cdef int _pushSaxEndEvent(_SaxParserContext context,
const_xmlChar* c_href,
const_xmlChar* c_name, node) except -1:
if context._event_filter & PARSE_EVENT_FILTER_END:
if (context._matcher is None or
context._matcher.matchesNsTag(c_href, c_name)):
if context._target is None:
node = context._node_stack.pop()
context.events_iterator._events.append(('end', node))
return 0
cdef void _handleSaxData(void* ctxt, const_xmlChar* c_data, int data_len) with gil:
# can only be called if parsing with a target
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
context._target._handleSaxData(
c_data[:data_len].decode('utf8'))
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxTargetDoctype(void* ctxt, const_xmlChar* c_name,
const_xmlChar* c_public,
const_xmlChar* c_system) with gil:
# can only be called if parsing with a target
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
context._target._handleSaxDoctype(
funicodeOrNone(c_name),
funicodeOrNone(c_public),
funicodeOrNone(c_system))
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxStartDocument(void* ctxt) with gil:
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
context._origSaxStartDocument(ctxt)
c_doc = c_ctxt.myDoc
try:
context.startDocument(c_doc)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxTargetPI(void* ctxt, const_xmlChar* c_target,
const_xmlChar* c_data) with gil:
# can only be called if parsing with a target
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
pi = context._target._handleSaxPi(
funicodeOrNone(c_target),
funicodeOrEmpty(c_data))
if context._event_filter & PARSE_EVENT_FILTER_PI:
context.events_iterator._events.append(('pi', pi))
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxPIEvent(void* ctxt, const_xmlChar* target,
const_xmlChar* data) with gil:
# can only be called when collecting pi events
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
context._origSaxPI(ctxt, target, data)
c_node = _findLastEventNode(c_ctxt)
if c_node is NULL:
return
try:
context.pushEvent('pi', c_node)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxTargetComment(void* ctxt, const_xmlChar* c_data) with gil:
# can only be called if parsing with a target
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
try:
comment = context._target._handleSaxComment(funicodeOrEmpty(c_data))
if context._event_filter & PARSE_EVENT_FILTER_COMMENT:
context.events_iterator._events.append(('comment', comment))
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef void _handleSaxComment(void* ctxt, const_xmlChar* text) with gil:
# can only be called when collecting comment events
c_ctxt = <xmlparser.xmlParserCtxt*>ctxt
if c_ctxt._private is NULL or c_ctxt.disableSAX:
return
context = <_SaxParserContext>c_ctxt._private
context._origSaxComment(ctxt, text)
c_node = _findLastEventNode(c_ctxt)
if c_node is NULL:
return
try:
context.pushEvent('comment', c_node)
except:
context._handleSaxException(c_ctxt)
finally:
return # swallow any further exceptions
cdef inline xmlNode* _findLastEventNode(xmlparser.xmlParserCtxt* c_ctxt):
# this mimics what libxml2 creates for comments/PIs
if c_ctxt.inSubset == 1:
return c_ctxt.myDoc.intSubset.last
elif c_ctxt.inSubset == 2:
return c_ctxt.myDoc.extSubset.last
elif c_ctxt.node is NULL:
return c_ctxt.myDoc.last
elif c_ctxt.node.type == tree.XML_ELEMENT_NODE:
return c_ctxt.node.last
else:
return c_ctxt.node.next
############################################################
## ET compatible XML tree builder
############################################################
cdef class TreeBuilder(_SaxParserTarget):
u"""TreeBuilder(self, element_factory=None, parser=None,
comment_factory=None, pi_factory=None,
insert_comments=True, insert_pis=True)
Parser target that builds a tree from parse event callbacks.
The factory arguments can be used to influence the creation of
elements, comments and processing instructions.
By default, comments and processing instructions are inserted into
the tree, but they can be ignored by passing the respective flags.
The final tree is returned by the ``close()`` method.
"""
cdef _BaseParser _parser
cdef object _factory
cdef object _comment_factory
cdef object _pi_factory
cdef list _data
cdef list _element_stack
cdef object _element_stack_pop
cdef _Element _last # may be None
cdef bint _in_tail
cdef bint _insert_comments
cdef bint _insert_pis
def __init__(self, *, element_factory=None, parser=None,
comment_factory=None, pi_factory=None,
bint insert_comments=True, bint insert_pis=True):
self._sax_event_filter = \
SAX_EVENT_START | SAX_EVENT_END | SAX_EVENT_DATA | \
SAX_EVENT_PI | SAX_EVENT_COMMENT
self._data = [] # data collector
self._element_stack = [] # element stack
self._element_stack_pop = self._element_stack.pop
self._last = None # last element
self._in_tail = 0 # true if we're after an end tag
self._factory = element_factory
self._comment_factory = comment_factory if comment_factory is not None else Comment
self._pi_factory = pi_factory if pi_factory is not None else ProcessingInstruction
self._insert_comments = insert_comments
self._insert_pis = insert_pis
self._parser = parser
@cython.final
cdef int _flush(self) except -1:
if self._data:
if self._last is not None:
text = u"".join(self._data)
if self._in_tail:
assert self._last.tail is None, u"internal error (tail)"
self._last.tail = text
else:
assert self._last.text is None, u"internal error (text)"
self._last.text = text
del self._data[:]
return 0
# internal SAX event handlers
@cython.final
cdef _handleSaxStart(self, tag, attrib, nsmap):
self._flush()
if self._factory is not None:
self._last = self._factory(tag, attrib)
if self._element_stack:
_appendChild(self._element_stack[-1], self._last)
elif self._element_stack:
self._last = _makeSubElement(
self._element_stack[-1], tag, None, None, attrib, nsmap, None)
else:
self._last = _makeElement(
tag, NULL, None, self._parser, None, None, attrib, nsmap, None)
self._element_stack.append(self._last)
self._in_tail = 0
return self._last
@cython.final
cdef _handleSaxEnd(self, tag):
self._flush()
self._last = self._element_stack_pop()
self._in_tail = 1
return self._last
@cython.final
cdef int _handleSaxData(self, data) except -1:
self._data.append(data)
@cython.final
cdef _handleSaxPi(self, target, data):
elem = self._pi_factory(target, data)
if self._insert_pis:
self._flush()
self._last = elem
if self._element_stack:
_appendChild(self._element_stack[-1], self._last)
self._in_tail = 1
return self._last
@cython.final
cdef _handleSaxComment(self, comment):
elem = self._comment_factory(comment)
if self._insert_comments:
self._flush()
self._last = elem
if self._element_stack:
_appendChild(self._element_stack[-1], self._last)
self._in_tail = 1
return elem
# Python level event handlers
def close(self):
u"""close(self)
Flushes the builder buffers, and returns the toplevel document
element. Raises XMLSyntaxError on inconsistencies.
"""
if self._element_stack:
raise XMLSyntaxAssertionError("missing end tags")
# TODO: this does not necessarily seem like an error case. Why not just return None?
if self._last is None:
raise XMLSyntaxAssertionError("missing toplevel element")
return self._last
def data(self, data):
u"""data(self, data)
Adds text to the current element. The value should be either an
8-bit string containing ASCII text, or a Unicode string.
"""
self._handleSaxData(data)
def start(self, tag, attrs, nsmap=None):
u"""start(self, tag, attrs, nsmap=None)
Opens a new element.
"""
if nsmap is None:
nsmap = IMMUTABLE_EMPTY_MAPPING
return self._handleSaxStart(tag, attrs, nsmap)
def end(self, tag):
u"""end(self, tag)
Closes the current element.
"""
element = self._handleSaxEnd(tag)
assert self._last.tag == tag,\
f"end tag mismatch (expected {self._last.tag}, got {tag})"
return element
def pi(self, target, data=None):
u"""pi(self, target, data=None)
Creates a processing instruction using the factory, appends it
(unless disabled) and returns it.
"""
return self._handleSaxPi(target, data)
def comment(self, comment):
u"""comment(self, comment)
Creates a comment using the factory, appends it (unless disabled)
and returns it.
"""
return self._handleSaxComment(comment)