# -*- coding: utf-8 -*- """ html versions of the classes necessary to create text reductions for presentation to external segments, and to create accounted versions of cleaned html. """ from accounting import AccountedString, Deletion, Insertion from common import is_whitespace, GML_TAG_CLOSE, GML_TAG_EMPTY, GML_TAG_OPEN, open from reduction import Reduction from shep import Comment, Entity, parse, Tag, Text from shep import NON_CLOSING as NON_CLOSING_TAGS import unittest HTML_TAG_CLOSE = u'{0}>' HTML_TAG_EMPTY = u'<{0}/>' HTML_TAG_OPEN = u'<{0}>' class _Action(object): """ tracks whether a purge or mask action is in progress """ def __init__(self, acting_on, start=None): super(_Action, self).__init__() self.acting_on = acting_on self.start = start self.depth = 0 if acting_on in NON_CLOSING_TAGS else 1 def is_complete(self): return self.depth == 0 @staticmethod def update(action, tag, config, originator): if action is not None and action.acting_on == tag.name: action.depth += -1 if tag.is_closer else +1 elif tag.name in config.purge: action = _Purging(tag.name, tag.offset) elif type(originator) is HtmlAccountedString and tag.name in config.mask: action = _Masking(tag.name, tag.offset) elif type(originator) is HtmlReduction and tag.name in config.unsegmentable: action = _Masking(tag.name, tag.offset) return action class _Purging(_Action): pass class _Masking(_Action): pass class HtmlAccountedString(AccountedString): def _build(self): action = None for event in parse(self._pre): if type(event) is Comment: self._process_comment(event) elif type(event) is Entity: self._process_entity(event) elif type(event) is Tag: action = self._process_tag(event, action, self.config) elif type(event) is Text: self._process_text(event, action) @staticmethod def _get_gml_substitute(tag, config): gml_name = config.map[tag.name] if tag.is_empty or tag.name in NON_CLOSING_TAGS: return GML_TAG_EMPTY.format(gml_name) elif tag.is_closer: return GML_TAG_CLOSE.format(gml_name) else: return GML_TAG_OPEN.format(gml_name) @staticmethod def _get_html_substitute(tag): if (tag.is_empty or tag.name in NON_CLOSING_TAGS) and tag.has_attributes: return HTML_TAG_EMPTY.format(tag.name) elif not tag.is_closer and tag.has_attributes: return HTML_TAG_OPEN.format(tag.name) else: return None @staticmethod def _get_substitute(tag, config): if config.gml_mode: return HtmlAccountedString._get_gml_substitute(tag, config) else: return HtmlAccountedString._get_html_substitute(tag) def _map(self, tag, config): substitute = self._get_substitute(tag, config) if substitute: self.register(Deletion(tag.offset, length=len(tag))) self.register(Insertion(tag.offset, substitute)) def _mask(self, mask, tag, config): length = tag.offset - mask.start + len(tag) self.register(Deletion(mask.start, length=length)) template = GML_TAG_EMPTY if config.gml_mode else HTML_TAG_EMPTY self.register(Insertion(mask.start, template.format(tag.name))) def _process_comment(self, comment): # remove comments from the accounted string self._remove(comment) def _process_entity(self, entity): # add the actual character to the accounted string # and not the entity escape sequence self.register(Deletion(entity.offset, length=len(entity))) self.register(Insertion(entity.offset, entity.char)) def _process_tag(self, tag, action, config): action = _Action.update(action, tag, config, self) if action and action.is_complete(): if type(action) is _Masking: self._mask(action, tag, config) elif type(action) is _Purging: self._purge(action, tag) action = None elif not type(action) is _Purging: if tag.name in config.map: self._map(tag, config) else: self._remove(tag) return action def _process_text(self, text, action): pass def _purge(self, purge, tag): length = tag.offset + len(tag) - purge.start self.register(Deletion(purge.start, length=length)) def _remove(self, event): self.register(Deletion(event.offset, length=len(event))) class HtmlReduction(Reduction): def __init__(self, html, config): super(HtmlReduction, self).__init__(html) action = None for event in parse(html): if type(event) is Comment: self._process_comment(event) elif type(event) is Entity: self._process_entity(event, action) elif type(event) is Tag: action = self._process_tag(event, action, config) elif type(event) is Text: self._process_text(event, action) else: raise ValueError('invalid event type ' + str(type(event))) def _expand(self, x, y): """ expand boundaries to recover tags that should belong inside """ x = self._expansion(x, backward=True) if y > 0 and y < len(self.source): if self.source[y] == '<': y -= 1 y = self._expansion(y, backward=False) return x, y def _expansion(self, i, backward): if backward: start = i - 1 if start < 0: return i else: start = i if start >= len(self.source): return i start = i + (-1 if backward else +1) for event in parse(self.source, start, backward): if type(event) is Entity: if not is_whitespace(event.char): break elif type(event) is Tag: if backward and (event.is_closer or event.is_empty): break elif not backward and not event.is_closer: break else: if backward: i = event.offset else: i = event.offset + len(event.content) elif type(event) is Text: if not is_whitespace(event.content): break return i def _mask(self, event): for i in range(len(event.content)): offset = event.offset + i self._mask_char(offset) def _process_comment(self, comment): # do not add comments to the accounted string pass def _process_entity(self, entity, action): # add the actual character to the reduction, and # not the entity escape sequence if not type(action) is _Purging: self._append(entity.char, entity.offset) def _process_tag(self, tag, action, config): action = _Action.update(action, tag, config, self) if type(action) is _Masking: self._mask(tag) if action and action.is_complete(): action = None if tag.name in config.newline: self._append('\n', tag.offset + len(tag)) elif tag.name in config.paragraph_like: self._append('\n', tag.offset + len(tag)) self._append('\n', tag.offset + len(tag)) return action def _process_text(self, text, action): if not type(action) is _Purging: if type(action) is _Masking: self._mask(text) else: for i, char in enumerate(text.content): self._append(char, text.offset + i) class Utf8Fixer(object): """ a class to fix common corruptions when text is erroneously reported as utf-8 """ def __init__(self, filename='utf8-debug.dat'): super(Utf8Fixer, self).__init__() self._info = dict() self._tuples = list() # load the debug reference istream = open(filename) for line in istream: line = line.strip() if line and not line.startswith('#'): entry = Utf8FixerInfo(line) self._info[entry.actual] = entry # collate sets of entries depending on length of actual length = len(entry.actual) if length >= len(self._tuples): self._tuples.extend([set() for i in range(length - len(self._tuples))]) self._tuples[length - 1].add(entry) istream.close() def _get_tuples(self, n): return self._tuples[n - 1] def fix(self, account): statuses = account._make_statuses() changes = False i = 0 while i < len(statuses): for tuple_length in range(len(self._tuples) - 1, 0, -1): for entry in self._tuples[tuple_length]: end = i + tuple_length + 1 if 'd' not in statuses[i:end]: if account._pre[i:end] == entry.actual: account.register(Deletion(i, length=tuple_length+1)) account.register(Insertion(i, entry.expected)) changes = True i += tuple_length i += 1 if changes: account.actions = sorted(account.actions) return account class Utf8FixerInfo(object): def __init__(self, line): super(Utf8FixerInfo, self).__init__() fields = line.split('\t') self.unicode = fields[0] self.windows_1252 = fields[1] self.expected = fields[2] self.actual = fields[3] self.utf8_bytes = fields[4] class _TestHtmlReduction(unittest.TestCase): def setUp(self): from config import Config self.config = Config('html-wdc') def test_basic(self): string = '
code
should mask'
reduction = HtmlReduction(string, self.config)
expected = 'This _________________ should mask'
self.assertEquals(str(reduction), expected)
def test_pointers(self):
html_reduction = HtmlReduction('This tests the pointers', self.config)
for i, char in enumerate(str(html_reduction)):
corresponding = html_reduction.source[html_reduction.pointers[i]]
self.assertEquals(char, corresponding)
def test_purge(self):
test = 'This stuffshould purge.' nested = 'This
alsoshould purge.' expected = 'This should purge.' html_reduction = HtmlReduction(test, self.config) self.assertEqual(str(html_reduction), expected) html_reduction = HtmlReduction(nested, self.config) self.assertEqual(str(html_reduction), expected) class _TestHtmlAccountedString(unittest.TestCase): def setUp(self): from config import Config self.config = Config('html-wdc') def test_basic(self): self.config.gml_mode = False string = '
code
should mask'
account = HtmlAccountedString(string, self.config)
expected = 'This
should mask'
self.assertEquals(unicode(account), expected)
nested = 'This code
should mask'
account = HtmlAccountedString(nested, self.config)
self.assertEquals(unicode(account), expected)
def test_purging(self):
self.config.gml_mode = False
string = 'A B C D EF.' account = HtmlAccountedString(string, self.config) expected = 'A B F.' self.assertEquals(unicode(account), expected) def test_normalise_whitespace(self): expected = u'The ⌊>quick>⌋ brown fox.' lead = u'