import yaml def get_version_string(): return yaml_get_version_string() def get_version(): cdef int major, minor, patch yaml_get_version(&major, &minor, &patch) return (major, minor, patch) #Mark = yaml.error.Mark YAMLError = yaml.error.YAMLError ReaderError = yaml.reader.ReaderError ScannerError = yaml.scanner.ScannerError ParserError = yaml.parser.ParserError ComposerError = yaml.composer.ComposerError ConstructorError = yaml.constructor.ConstructorError EmitterError = yaml.emitter.EmitterError SerializerError = yaml.serializer.SerializerError RepresenterError = yaml.representer.RepresenterError StreamStartToken = yaml.tokens.StreamStartToken StreamEndToken = yaml.tokens.StreamEndToken DirectiveToken = yaml.tokens.DirectiveToken DocumentStartToken = yaml.tokens.DocumentStartToken DocumentEndToken = yaml.tokens.DocumentEndToken BlockSequenceStartToken = yaml.tokens.BlockSequenceStartToken BlockMappingStartToken = yaml.tokens.BlockMappingStartToken BlockEndToken = yaml.tokens.BlockEndToken FlowSequenceStartToken = yaml.tokens.FlowSequenceStartToken FlowMappingStartToken = yaml.tokens.FlowMappingStartToken FlowSequenceEndToken = yaml.tokens.FlowSequenceEndToken FlowMappingEndToken = yaml.tokens.FlowMappingEndToken KeyToken = yaml.tokens.KeyToken ValueToken = yaml.tokens.ValueToken BlockEntryToken = yaml.tokens.BlockEntryToken FlowEntryToken = yaml.tokens.FlowEntryToken AliasToken = yaml.tokens.AliasToken AnchorToken = yaml.tokens.AnchorToken TagToken = yaml.tokens.TagToken ScalarToken = yaml.tokens.ScalarToken StreamStartEvent = yaml.events.StreamStartEvent StreamEndEvent = yaml.events.StreamEndEvent DocumentStartEvent = yaml.events.DocumentStartEvent DocumentEndEvent = yaml.events.DocumentEndEvent AliasEvent = yaml.events.AliasEvent ScalarEvent = yaml.events.ScalarEvent SequenceStartEvent = yaml.events.SequenceStartEvent SequenceEndEvent = yaml.events.SequenceEndEvent MappingStartEvent = yaml.events.MappingStartEvent MappingEndEvent = yaml.events.MappingEndEvent ScalarNode = yaml.nodes.ScalarNode SequenceNode = yaml.nodes.SequenceNode MappingNode = yaml.nodes.MappingNode cdef class Mark: cdef readonly object name cdef readonly int index cdef readonly int line cdef readonly int column cdef readonly buffer cdef readonly pointer def __init__(self, object name, int index, int line, int column, object buffer, object pointer): self.name = name self.index = index self.line = line self.column = column self.buffer = buffer self.pointer = pointer def get_snippet(self): return None def __str__(self): where = " in \"%s\", line %d, column %d" \ % (self.name, self.line+1, self.column+1) return where #class YAMLError(Exception): # pass # #class MarkedYAMLError(YAMLError): # # def __init__(self, context=None, context_mark=None, # problem=None, problem_mark=None, note=None): # self.context = context # self.context_mark = context_mark # self.problem = problem # self.problem_mark = problem_mark # self.note = note # # def __str__(self): # lines = [] # if self.context is not None: # lines.append(self.context) # if self.context_mark is not None \ # and (self.problem is None or self.problem_mark is None # or self.context_mark.name != self.problem_mark.name # or self.context_mark.line != self.problem_mark.line # or self.context_mark.column != self.problem_mark.column): # lines.append(str(self.context_mark)) # if self.problem is not None: # lines.append(self.problem) # if self.problem_mark is not None: # lines.append(str(self.problem_mark)) # if self.note is not None: # lines.append(self.note) # return '\n'.join(lines) # #class ReaderError(YAMLError): # # def __init__(self, name, position, character, encoding, reason): # self.name = name # self.character = character # self.position = position # self.encoding = encoding # self.reason = reason # # def __str__(self): # if isinstance(self.character, str): # return "'%s' codec can't decode byte #x%02x: %s\n" \ # " in \"%s\", position %d" \ # % (self.encoding, ord(self.character), self.reason, # self.name, self.position) # else: # return "unacceptable character #x%04x: %s\n" \ # " in \"%s\", position %d" \ # % (ord(self.character), self.reason, # self.name, self.position) # #class ScannerError(MarkedYAMLError): # pass # #class ParserError(MarkedYAMLError): # pass # #class EmitterError(YAMLError): # pass # #cdef class Token: # cdef readonly Mark start_mark # cdef readonly Mark end_mark # def __init__(self, Mark start_mark, Mark end_mark): # self.start_mark = start_mark # self.end_mark = end_mark # #cdef class StreamStartToken(Token): # cdef readonly object encoding # def __init__(self, Mark start_mark, Mark end_mark, encoding): # self.start_mark = start_mark # self.end_mark = end_mark # self.encoding = encoding # #cdef class StreamEndToken(Token): # pass # #cdef class DirectiveToken(Token): # cdef readonly object name # cdef readonly object value # def __init__(self, name, value, Mark start_mark, Mark end_mark): # self.name = name # self.value = value # self.start_mark = start_mark # self.end_mark = end_mark # #cdef class DocumentStartToken(Token): # pass # #cdef class DocumentEndToken(Token): # pass # #cdef class BlockSequenceStartToken(Token): # pass # #cdef class BlockMappingStartToken(Token): # pass # #cdef class BlockEndToken(Token): # pass # #cdef class FlowSequenceStartToken(Token): # pass # #cdef class FlowMappingStartToken(Token): # pass # #cdef class FlowSequenceEndToken(Token): # pass # #cdef class FlowMappingEndToken(Token): # pass # #cdef class KeyToken(Token): # pass # #cdef class ValueToken(Token): # pass # #cdef class BlockEntryToken(Token): # pass # #cdef class FlowEntryToken(Token): # pass # #cdef class AliasToken(Token): # cdef readonly object value # def __init__(self, value, Mark start_mark, Mark end_mark): # self.value = value # self.start_mark = start_mark # self.end_mark = end_mark # #cdef class AnchorToken(Token): # cdef readonly object value # def __init__(self, value, Mark start_mark, Mark end_mark): # self.value = value # self.start_mark = start_mark # self.end_mark = end_mark # #cdef class TagToken(Token): # cdef readonly object value # def __init__(self, value, Mark start_mark, Mark end_mark): # self.value = value # self.start_mark = start_mark # self.end_mark = end_mark # #cdef class ScalarToken(Token): # cdef readonly object value # cdef readonly object plain # cdef readonly object style # def __init__(self, value, plain, Mark start_mark, Mark end_mark, style=None): # self.value = value # self.plain = plain # self.start_mark = start_mark # self.end_mark = end_mark # self.style = style cdef class CParser: cdef yaml_parser_t parser cdef yaml_event_t parsed_event cdef object stream cdef object stream_name cdef object current_token cdef object current_event cdef object anchors def __init__(self, stream): if yaml_parser_initialize(&self.parser) == 0: raise MemoryError self.parsed_event.type = YAML_NO_EVENT if hasattr(stream, 'read'): self.stream = stream try: self.stream_name = stream.name except AttributeError: self.stream_name = '' yaml_parser_set_input(&self.parser, input_handler, self) else: if PyUnicode_CheckExact(stream) != 0: stream = PyUnicode_AsUTF8String(stream) self.stream_name = '' else: self.stream_name = '' if PyString_CheckExact(stream) == 0: raise TypeError("a string or stream input is required") self.stream = stream yaml_parser_set_input_string(&self.parser, PyString_AS_STRING(stream), PyString_GET_SIZE(stream)) self.current_token = None self.current_event = None self.anchors = {} def __dealloc__(self): yaml_parser_delete(&self.parser) yaml_event_delete(&self.parsed_event) cdef object _parser_error(self): if self.parser.error == YAML_MEMORY_ERROR: raise MemoryError elif self.parser.error == YAML_READER_ERROR: raise ReaderError(self.stream_name, self.parser.problem_offset, self.parser.problem_value, '?', self.parser.problem) elif self.parser.error == YAML_SCANNER_ERROR \ or self.parser.error == YAML_PARSER_ERROR: context_mark = None problem_mark = None if self.parser.context != NULL: context_mark = Mark(self.stream_name, self.parser.context_mark.index, self.parser.context_mark.line, self.parser.context_mark.column, None, None) if self.parser.problem != NULL: problem_mark = Mark(self.stream_name, self.parser.problem_mark.index, self.parser.problem_mark.line, self.parser.problem_mark.column, None, None) if self.parser.error == YAML_SCANNER_ERROR: if self.parser.context != NULL: return ScannerError(self.parser.context, context_mark, self.parser.problem, problem_mark) else: return ScannerError(None, None, self.parser.problem, problem_mark) else: if self.parser.context != NULL: return ParserError(self.parser.context, context_mark, self.parser.problem, problem_mark) else: return ParserError(None, None, self.parser.problem, problem_mark) raise ValueError("no parser error") def raw_scan(self): cdef yaml_token_t token cdef int done cdef int count count = 0 done = 0 while done == 0: if yaml_parser_scan(&self.parser, &token) == 0: error = self._parser_error() raise error if token.type == YAML_NO_TOKEN: done = 1 else: count = count+1 yaml_token_delete(&token) return count cdef object _scan(self): cdef yaml_token_t token if yaml_parser_scan(&self.parser, &token) == 0: error = self._parser_error() raise error token_object = self._token_to_object(&token) yaml_token_delete(&token) return token_object cdef object _token_to_object(self, yaml_token_t *token): start_mark = Mark(self.stream_name, token.start_mark.index, token.start_mark.line, token.start_mark.column, None, None) end_mark = Mark(self.stream_name, token.end_mark.index, token.end_mark.line, token.end_mark.column, None, None) if token.type == YAML_NO_TOKEN: return None elif token.type == YAML_STREAM_START_TOKEN: encoding = None if token.data.stream_start.encoding == YAML_UTF8_ENCODING: encoding = "utf-8" elif token.data.stream_start.encoding == YAML_UTF16LE_ENCODING: encoding = "utf-16-le" elif token.data.stream_start.encoding == YAML_UTF16BE_ENCODING: encoding = "utf-16-be" return StreamStartToken(start_mark, end_mark, encoding) elif token.type == YAML_STREAM_END_TOKEN: return StreamEndToken(start_mark, end_mark) elif token.type == YAML_VERSION_DIRECTIVE_TOKEN: return DirectiveToken("YAML", (token.data.version_directive.major, token.data.version_directive.minor), start_mark, end_mark) elif token.type == YAML_TAG_DIRECTIVE_TOKEN: return DirectiveToken("TAG", (token.data.tag_directive.handle, token.data.tag_directive.prefix), start_mark, end_mark) elif token.type == YAML_DOCUMENT_START_TOKEN: return DocumentStartToken(start_mark, end_mark) elif token.type == YAML_DOCUMENT_END_TOKEN: return DocumentEndToken(start_mark, end_mark) elif token.type == YAML_BLOCK_SEQUENCE_START_TOKEN: return BlockSequenceStartToken(start_mark, end_mark) elif token.type == YAML_BLOCK_MAPPING_START_TOKEN: return BlockMappingStartToken(start_mark, end_mark) elif token.type == YAML_BLOCK_END_TOKEN: return BlockEndToken(start_mark, end_mark) elif token.type == YAML_FLOW_SEQUENCE_START_TOKEN: return FlowSequenceStartToken(start_mark, end_mark) elif token.type == YAML_FLOW_SEQUENCE_END_TOKEN: return FlowSequenceEndToken(start_mark, end_mark) elif token.type == YAML_FLOW_MAPPING_START_TOKEN: return FlowMappingStartToken(start_mark, end_mark) elif token.type == YAML_FLOW_MAPPING_END_TOKEN: return FlowMappingEndToken(start_mark, end_mark) elif token.type == YAML_BLOCK_ENTRY_TOKEN: return BlockEntryToken(start_mark, end_mark) elif token.type == YAML_FLOW_ENTRY_TOKEN: return FlowEntryToken(start_mark, end_mark) elif token.type == YAML_KEY_TOKEN: return KeyToken(start_mark, end_mark) elif token.type == YAML_VALUE_TOKEN: return ValueToken(start_mark, end_mark) elif token.type == YAML_ALIAS_TOKEN: value = PyUnicode_DecodeUTF8(token.data.alias.value, strlen(token.data.alias.value), 'strict') return AliasToken(value, start_mark, end_mark) elif token.type == YAML_ANCHOR_TOKEN: value = PyUnicode_DecodeUTF8(token.data.anchor.value, strlen(token.data.anchor.value), 'strict') return AnchorToken(value, start_mark, end_mark) elif token.type == YAML_TAG_TOKEN: handle = PyUnicode_DecodeUTF8(token.data.tag.handle, strlen(token.data.tag.handle), 'strict') suffix = PyUnicode_DecodeUTF8(token.data.tag.suffix, strlen(token.data.tag.suffix), 'strict') if not handle: handle = None return TagToken((handle, suffix), start_mark, end_mark) elif token.type == YAML_SCALAR_TOKEN: value = PyUnicode_DecodeUTF8(token.data.scalar.value, token.data.scalar.length, 'strict') plain = False style = None if token.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: plain = True style = '' elif token.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif token.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif token.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif token.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' return ScalarToken(value, plain, start_mark, end_mark, style) else: raise ValueError("unknown token type") def get_token(self): if self.current_token is not None: value = self.current_token self.current_token = None else: value = self._scan() return value def peek_token(self): if self.current_token is None: self.current_token = self._scan() return self.current_token def check_token(self, *choices): if self.current_token is None: self.current_token = self._scan() if self.current_token is None: return False if not choices: return True token_class = self.current_token.__class__ for choice in choices: if token_class is choice: return True return False def raw_parse(self): cdef yaml_event_t event cdef int done cdef int count count = 0 done = 0 while done == 0: if yaml_parser_parse(&self.parser, &event) == 0: error = self._parser_error() raise error if event.type == YAML_NO_EVENT: done = 1 else: count = count+1 yaml_event_delete(&event) return count cdef object _parse(self): cdef yaml_event_t event if yaml_parser_parse(&self.parser, &event) == 0: error = self._parser_error() raise error event_object = self._event_to_object(&event) yaml_event_delete(&event) return event_object cdef object _event_to_object(self, yaml_event_t *event): cdef yaml_tag_directive_t *tag_directive start_mark = Mark(self.stream_name, event.start_mark.index, event.start_mark.line, event.start_mark.column, None, None) end_mark = Mark(self.stream_name, event.end_mark.index, event.end_mark.line, event.end_mark.column, None, None) if event.type == YAML_NO_EVENT: return None elif event.type == YAML_STREAM_START_EVENT: encoding = None if event.data.stream_start.encoding == YAML_UTF8_ENCODING: encoding = "utf-8" elif event.data.stream_start.encoding == YAML_UTF16LE_ENCODING: encoding = "utf-16-le" elif event.data.stream_start.encoding == YAML_UTF16BE_ENCODING: encoding = "utf-16-be" return StreamStartEvent(start_mark, end_mark, encoding) elif event.type == YAML_STREAM_END_EVENT: return StreamEndEvent(start_mark, end_mark) elif event.type == YAML_DOCUMENT_START_EVENT: explicit = False if event.data.document_start.implicit == 0: explicit = True version = None if event.data.document_start.version_directive != NULL: version = (event.data.document_start.version_directive.major, event.data.document_start.version_directive.minor) tags = None if event.data.document_start.tag_directives.start != NULL: tags = {} tag_directive = event.data.document_start.tag_directives.start while tag_directive != event.data.document_start.tag_directives.end: handle = PyUnicode_DecodeUTF8(tag_directive.handle, strlen(tag_directive.handle), 'strict') prefix = PyUnicode_DecodeUTF8(tag_directive.prefix, strlen(tag_directive.prefix), 'strict') tags[handle] = prefix tag_directive = tag_directive+1 return DocumentStartEvent(start_mark, end_mark, explicit, version, tags) elif event.type == YAML_DOCUMENT_END_EVENT: explicit = False if event.data.document_end.implicit == 0: explicit = True return DocumentEndEvent(start_mark, end_mark, explicit) elif event.type == YAML_ALIAS_EVENT: anchor = PyUnicode_DecodeUTF8(event.data.alias.anchor, strlen(event.data.alias.anchor), 'strict') return AliasEvent(anchor, start_mark, end_mark) elif event.type == YAML_SCALAR_EVENT: anchor = None if event.data.scalar.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.scalar.anchor, strlen(event.data.scalar.anchor), 'strict') tag = None if event.data.scalar.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.scalar.tag, strlen(event.data.scalar.tag), 'strict') value = PyUnicode_DecodeUTF8(event.data.scalar.value, event.data.scalar.length, 'strict') plain_implicit = False if event.data.scalar.plain_implicit == 1: plain_implicit = True quoted_implicit = False if event.data.scalar.quoted_implicit == 1: quoted_implicit = True style = None if event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: style = '' elif event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' return ScalarEvent(anchor, tag, (plain_implicit, quoted_implicit), value, start_mark, end_mark, style) elif event.type == YAML_SEQUENCE_START_EVENT: anchor = None if event.data.sequence_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.sequence_start.anchor, strlen(event.data.sequence_start.anchor), 'strict') tag = None if event.data.sequence_start.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.sequence_start.tag, strlen(event.data.sequence_start.tag), 'strict') implicit = False if event.data.sequence_start.implicit == 1: implicit = True flow_style = None if event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif event.data.sequence_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False return SequenceStartEvent(anchor, tag, implicit, start_mark, end_mark, flow_style) elif event.type == YAML_MAPPING_START_EVENT: anchor = None if event.data.mapping_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.mapping_start.anchor, strlen(event.data.mapping_start.anchor), 'strict') tag = None if event.data.mapping_start.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.mapping_start.tag, strlen(event.data.mapping_start.tag), 'strict') implicit = False if event.data.mapping_start.implicit == 1: implicit = True flow_style = None if event.data.mapping_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif event.data.mapping_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False return MappingStartEvent(anchor, tag, implicit, start_mark, end_mark, flow_style) elif event.type == YAML_SEQUENCE_END_EVENT: return SequenceEndEvent(start_mark, end_mark) elif event.type == YAML_MAPPING_END_EVENT: return MappingEndEvent(start_mark, end_mark) else: raise ValueError("unknown token type") def get_event(self): if self.current_event is not None: value = self.current_event self.current_event = None else: value = self._parse() return value def peek_event(self): if self.current_event is None: self.current_event = self._parse() return self.current_event def check_event(self, *choices): if self.current_event is None: self.current_event = self._parse() if self.current_event is None: return False if not choices: return True event_class = self.current_event.__class__ for choice in choices: if event_class is choice: return True return False def check_node(self): self._parse_next_event() if self.parsed_event.type == YAML_STREAM_START_EVENT: yaml_event_delete(&self.parsed_event) self._parse_next_event() if self.parsed_event.type != YAML_STREAM_END_EVENT: return True return False def get_node(self): self._parse_next_event() if self.parsed_event.type != YAML_STREAM_END_EVENT: return self._compose_document() cdef object _compose_document(self): yaml_event_delete(&self.parsed_event) node = self._compose_node(None, None) self._parse_next_event() yaml_event_delete(&self.parsed_event) self.anchors = {} return node cdef object _compose_node(self, object parent, object index): self._parse_next_event() if self.parsed_event.type == YAML_ALIAS_EVENT: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.alias.anchor, strlen(self.parsed_event.data.alias.anchor), 'strict') if anchor not in self.anchors: mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) raise ComposerError(None, None, "found undefined alias", mark) yaml_event_delete(&self.parsed_event) return self.anchors[anchor] anchor = None if self.parsed_event.type == YAML_SCALAR_EVENT \ and self.parsed_event.data.scalar.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.anchor, strlen(self.parsed_event.data.scalar.anchor), 'strict') elif self.parsed_event.type == YAML_SEQUENCE_START_EVENT \ and self.parsed_event.data.sequence_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.anchor, strlen(self.parsed_event.data.sequence_start.anchor), 'strict') elif self.parsed_event.type == YAML_MAPPING_START_EVENT \ and self.parsed_event.data.mapping_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.anchor, strlen(self.parsed_event.data.mapping_start.anchor), 'strict') if anchor is not None: if anchor in self.anchors: mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) raise ComposerError("found duplicate anchor; first occurence", self.anchors[anchor].start_mark, "second occurence", mark) self.descend_resolver(parent, index) if self.parsed_event.type == YAML_SCALAR_EVENT: node = self._compose_scalar_node(anchor) elif self.parsed_event.type == YAML_SEQUENCE_START_EVENT: node = self._compose_sequence_node(anchor) elif self.parsed_event.type == YAML_MAPPING_START_EVENT: node = self._compose_mapping_node(anchor) self.ascend_resolver() return node cdef _compose_scalar_node(self, object anchor): start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) value = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.value, self.parsed_event.data.scalar.length, 'strict') plain_implicit = False if self.parsed_event.data.scalar.plain_implicit == 1: plain_implicit = True quoted_implicit = False if self.parsed_event.data.scalar.quoted_implicit == 1: quoted_implicit = True if self.parsed_event.data.scalar.tag == NULL \ or (self.parsed_event.data.scalar.tag[0] == c'!' and self.parsed_event.data.scalar.tag[1] == c'\0'): tag = self.resolve(ScalarNode, value, (plain_implicit, quoted_implicit)) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.tag, strlen(self.parsed_event.data.scalar.tag), 'strict') style = None if self.parsed_event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: style = '' elif self.parsed_event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif self.parsed_event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif self.parsed_event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif self.parsed_event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' node = ScalarNode(tag, value, start_mark, end_mark, style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) return node cdef _compose_sequence_node(self, object anchor): cdef int index start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) implicit = False if self.parsed_event.data.sequence_start.implicit == 1: implicit = True if self.parsed_event.data.sequence_start.tag == NULL \ or (self.parsed_event.data.sequence_start.tag[0] == c'!' and self.parsed_event.data.sequence_start.tag[1] == c'\0'): tag = self.resolve(SequenceNode, None, implicit) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.tag, strlen(self.parsed_event.data.sequence_start.tag), 'strict') flow_style = None if self.parsed_event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif self.parsed_event.data.sequence_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False value = [] node = SequenceNode(tag, value, start_mark, None, flow_style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) index = 0 self._parse_next_event() while self.parsed_event.type != YAML_SEQUENCE_END_EVENT: value.append(self._compose_node(node, index)) index = index+1 self._parse_next_event() node.end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) yaml_event_delete(&self.parsed_event) return node cdef _compose_mapping_node(self, object anchor): start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) implicit = False if self.parsed_event.data.mapping_start.implicit == 1: implicit = True if self.parsed_event.data.mapping_start.tag == NULL \ or (self.parsed_event.data.mapping_start.tag[0] == c'!' and self.parsed_event.data.mapping_start.tag[1] == c'\0'): tag = self.resolve(MappingNode, None, implicit) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.tag, strlen(self.parsed_event.data.mapping_start.tag), 'strict') flow_style = None if self.parsed_event.data.mapping_start.style == YAML_FLOW_MAPPING_STYLE: flow_style = True elif self.parsed_event.data.mapping_start.style == YAML_BLOCK_MAPPING_STYLE: flow_style = False value = [] node = MappingNode(tag, value, start_mark, None, flow_style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) self._parse_next_event() while self.parsed_event.type != YAML_MAPPING_END_EVENT: item_key = self._compose_node(node, None) item_value = self._compose_node(node, item_key) value.append((item_key, item_value)) self._parse_next_event() node.end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) yaml_event_delete(&self.parsed_event) return node cdef int _parse_next_event(self) except 0: if self.parsed_event.type == YAML_NO_EVENT: if yaml_parser_parse(&self.parser, &self.parsed_event) == 0: error = self._parser_error() raise error return 1 cdef int input_handler(void *data, char *buffer, int size, int *read) except 0: cdef CParser parser parser = data value = parser.stream.read(size) if PyString_CheckExact(value) == 0: raise TypeError("a string value is expected") if PyString_GET_SIZE(value) > size: raise ValueError("a string value it too long") memcpy(buffer, PyString_AS_STRING(value), PyString_GET_SIZE(value)) read[0] = PyString_GET_SIZE(value) return 1 cdef class CEmitter: cdef yaml_emitter_t emitter cdef object stream cdef yaml_encoding_t use_encoding cdef int document_start_implicit cdef int document_end_implicit cdef object use_version cdef object use_tags cdef object serialized_nodes cdef object anchors cdef int last_alias_id cdef int closed def __init__(self, stream, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None, encoding=None, explicit_start=None, explicit_end=None, version=None, tags=None): if yaml_emitter_initialize(&self.emitter) == 0: raise MemoryError self.stream = stream yaml_emitter_set_output(&self.emitter, output_handler, self) if canonical is not None: yaml_emitter_set_canonical(&self.emitter, 1) if indent is not None: yaml_emitter_set_indent(&self.emitter, indent) if width is not None: yaml_emitter_set_width(&self.emitter, width) if allow_unicode is not None: yaml_emitter_set_unicode(&self.emitter, 1) if line_break is not None: if line_break == '\r': yaml_emitter_set_break(&self.emitter, YAML_CR_BREAK) elif line_break == '\n': yaml_emitter_set_break(&self.emitter, YAML_LN_BREAK) elif line_break == '\r\n': yaml_emitter_set_break(&self.emitter, YAML_CRLN_BREAK) if encoding == 'utf-16-le': self.use_encoding = YAML_UTF16LE_ENCODING elif encoding == 'utf-16-be': self.use_encoding = YAML_UTF16BE_ENCODING else: self.use_encoding = YAML_UTF8_ENCODING self.document_start_implicit = 1 if explicit_start: self.document_start_implicit = 0 self.document_end_implicit = 1 if explicit_end: self.document_end_implicit = 0 self.use_version = version self.use_tags = tags self.serialized_nodes = {} self.anchors = {} self.last_alias_id = 0 self.closed = -1 def __dealloc__(self): yaml_emitter_delete(&self.emitter) cdef object _emitter_error(self): if self.emitter.error == YAML_MEMORY_ERROR: return MemoryError elif self.emitter.error == YAML_EMITTER_ERROR: return EmitterError(self.emitter.problem) raise ValueError("no emitter error") cdef int _object_to_event(self, object event_object, yaml_event_t *event) except 0: cdef yaml_encoding_t encoding cdef yaml_version_directive_t version_directive_value cdef yaml_version_directive_t *version_directive cdef yaml_tag_directive_t tag_directives_value[128] cdef yaml_tag_directive_t *tag_directives_start cdef yaml_tag_directive_t *tag_directives_end cdef int implicit cdef int plain_implicit cdef int quoted_implicit cdef char *anchor cdef char *tag cdef char *value cdef int length cdef yaml_scalar_style_t scalar_style cdef yaml_sequence_style_t sequence_style cdef yaml_mapping_style_t mapping_style event_class = event_object.__class__ if event_class is StreamStartEvent: encoding = YAML_UTF8_ENCODING if event_object.encoding == 'utf-16-le': encoding = YAML_UTF16LE_ENCODING elif event_object.encoding == 'utf-16-be': encoding = YAML_UTF16BE_ENCODING yaml_stream_start_event_initialize(event, encoding) elif event_class is StreamEndEvent: yaml_stream_end_event_initialize(event) elif event_class is DocumentStartEvent: version_directive = NULL if event_object.version: version_directive_value.major = event_object.version[0] version_directive_value.minor = event_object.version[1] version_directive = &version_directive_value tag_directives_start = NULL tag_directives_end = NULL if event_object.tags: if len(event_object.tags) > 128: raise ValueError("too many tags") tag_directives_start = tag_directives_value tag_directives_end = tag_directives_value cache = [] for handle in event_object.tags: prefix = event_object.tags[handle] if PyUnicode_CheckExact(handle): handle = PyUnicode_AsUTF8String(handle) cache.append(handle) if not PyString_CheckExact(handle): raise TypeError("tag handle must be a string") tag_directives_end.handle = PyString_AS_STRING(handle) if PyUnicode_CheckExact(prefix): prefix = PyUnicode_AsUTF8String(prefix) cache.append(prefix) if not PyString_CheckExact(prefix): raise TypeError("tag prefix must be a string") tag_directives_end.prefix = PyString_AS_STRING(prefix) tag_directives_end = tag_directives_end+1 implicit = 1 if event_object.explicit: implicit = 0 if yaml_document_start_event_initialize(event, version_directive, tag_directives_start, tag_directives_end, implicit) == 0: raise MemoryError elif event_class is DocumentEndEvent: implicit = 1 if event_object.explicit: implicit = 0 yaml_document_end_event_initialize(event, implicit) elif event_class is AliasEvent: anchor = NULL anchor_object = event_object.anchor if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) if yaml_alias_event_initialize(event, anchor) == 0: raise MemoryError elif event_class is ScalarEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) value_object = event_object.value if PyUnicode_CheckExact(value_object): value_object = PyUnicode_AsUTF8String(value_object) if not PyString_CheckExact(value_object): raise TypeError("value must be a string") value = PyString_AS_STRING(value_object) length = PyString_GET_SIZE(value_object) plain_implicit = 0 quoted_implicit = 0 if event_object.implicit is not None: plain_implicit = event_object.implicit[0] quoted_implicit = event_object.implicit[1] style_object = event_object.style scalar_style = YAML_PLAIN_SCALAR_STYLE if style_object == "'": scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE elif style_object == "\"": scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE elif style_object == "|": scalar_style = YAML_LITERAL_SCALAR_STYLE elif style_object == ">": scalar_style = YAML_FOLDED_SCALAR_STYLE if yaml_scalar_event_initialize(event, anchor, tag, value, length, plain_implicit, quoted_implicit, scalar_style) == 0: raise MemoryError elif event_class is SequenceStartEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) implicit = 0 if event_object.implicit: implicit = 1 sequence_style = YAML_BLOCK_SEQUENCE_STYLE if event_object.flow_style: sequence_style = YAML_FLOW_SEQUENCE_STYLE if yaml_sequence_start_event_initialize(event, anchor, tag, implicit, sequence_style) == 0: raise MemoryError elif event_class is MappingStartEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) implicit = 0 if event_object.implicit: implicit = 1 mapping_style = YAML_BLOCK_MAPPING_STYLE if event_object.flow_style: mapping_style = YAML_FLOW_MAPPING_STYLE if yaml_mapping_start_event_initialize(event, anchor, tag, implicit, mapping_style) == 0: raise MemoryError elif event_class is SequenceEndEvent: yaml_sequence_end_event_initialize(event) elif event_class is MappingEndEvent: yaml_mapping_end_event_initialize(event) else: raise TypeError("invalid event %s" % event_object) return 1 def emit(self, event_object): cdef yaml_event_t event self._object_to_event(event_object, &event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error def open(self): cdef yaml_event_t event if self.closed == -1: yaml_stream_start_event_initialize(&event, self.use_encoding) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.closed = 0 elif self.closed == 1: raise SerializerError("serializer is closed") else: raise SerializerError("serializer is already opened") def close(self): cdef yaml_event_t event if self.closed == -1: raise SerializerError("serializer is not opened") elif self.closed == 0: yaml_stream_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.closed = 1 def serialize(self, node): cdef yaml_event_t event cdef yaml_version_directive_t version_directive_value cdef yaml_version_directive_t *version_directive cdef yaml_tag_directive_t tag_directives_value[128] cdef yaml_tag_directive_t *tag_directives_start cdef yaml_tag_directive_t *tag_directives_end if self.closed == -1: raise SerializerError("serializer is not opened") elif self.closed == 1: raise SerializerError("serializer is closed") cache = [] version_directive = NULL if self.use_version: version_directive_value.major = self.use_version[0] version_directive_value.minor = self.use_version[1] version_directive = &version_directive_value tag_directives_start = NULL tag_directives_end = NULL if self.use_tags: if len(self.use_tags) > 128: raise ValueError("too many tags") tag_directives_start = tag_directives_value tag_directives_end = tag_directives_value for handle in self.use_tags: prefix = self.use_tags[handle] if PyUnicode_CheckExact(handle): handle = PyUnicode_AsUTF8String(handle) cache.append(handle) if not PyString_CheckExact(handle): raise TypeError("tag handle must be a string") tag_directives_end.handle = PyString_AS_STRING(handle) if PyUnicode_CheckExact(prefix): prefix = PyUnicode_AsUTF8String(prefix) cache.append(prefix) if not PyString_CheckExact(prefix): raise TypeError("tag prefix must be a string") tag_directives_end.prefix = PyString_AS_STRING(prefix) tag_directives_end = tag_directives_end+1 if yaml_document_start_event_initialize(&event, version_directive, tag_directives_start, tag_directives_end, self.document_start_implicit) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self._anchor_node(node) self._serialize_node(node, None, None) yaml_document_end_event_initialize(&event, self.document_end_implicit) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.serialized_nodes = {} self.anchors = {} self.last_alias_id = 0 cdef int _anchor_node(self, object node) except 0: if node in self.anchors: if self.anchors[node] is None: self.last_alias_id = self.last_alias_id+1 self.anchors[node] = "id%03d" % self.last_alias_id else: self.anchors[node] = None node_class = node.__class__ if node_class is SequenceNode: for item in node.value: self._anchor_node(item) elif node_class is MappingNode: for key, value in node.value: self._anchor_node(key) self._anchor_node(value) return 1 cdef int _serialize_node(self, object node, object parent, object index) except 0: cdef yaml_event_t event cdef int implicit cdef int plain_implicit cdef int quoted_implicit cdef char *anchor cdef char *tag cdef char *value cdef int length cdef int item_index cdef yaml_scalar_style_t scalar_style cdef yaml_sequence_style_t sequence_style cdef yaml_mapping_style_t mapping_style anchor_object = self.anchors[node] anchor = NULL if anchor_object is not None: anchor = PyString_AS_STRING(anchor_object) if node in self.serialized_nodes: if yaml_alias_event_initialize(&event, anchor) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error else: node_class = node.__class__ self.serialized_nodes[node] = True self.descend_resolver(parent, index) if node_class is ScalarNode: plain_implicit = 0 quoted_implicit = 0 tag_object = node.tag if self.resolve(ScalarNode, node.value, (True, False)) == tag_object: plain_implicit = 1 if self.resolve(ScalarNode, node.value, (False, True)) == tag_object: quoted_implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) value_object = node.value if PyUnicode_CheckExact(value_object): value_object = PyUnicode_AsUTF8String(value_object) if not PyString_CheckExact(value_object): raise TypeError("value must be a string") value = PyString_AS_STRING(value_object) length = PyString_GET_SIZE(value_object) style_object = node.style scalar_style = YAML_PLAIN_SCALAR_STYLE if style_object == "'": scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE elif style_object == "\"": scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE elif style_object == "|": scalar_style = YAML_LITERAL_SCALAR_STYLE elif style_object == ">": scalar_style = YAML_FOLDED_SCALAR_STYLE if yaml_scalar_event_initialize(&event, anchor, tag, value, length, plain_implicit, quoted_implicit, scalar_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error elif node_class is SequenceNode: implicit = 0 tag_object = node.tag if self.resolve(SequenceNode, node.value, True) == tag_object: implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) sequence_style = YAML_BLOCK_SEQUENCE_STYLE if node.flow_style: sequence_style = YAML_FLOW_SEQUENCE_STYLE if yaml_sequence_start_event_initialize(&event, anchor, tag, implicit, sequence_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error item_index = 0 for item in node.value: self._serialize_node(item, node, item_index) item_index = item_index+1 yaml_sequence_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error elif node_class is MappingNode: implicit = 0 tag_object = node.tag if self.resolve(MappingNode, node.value, True) == tag_object: implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) mapping_style = YAML_BLOCK_MAPPING_STYLE if node.flow_style: mapping_style = YAML_FLOW_MAPPING_STYLE if yaml_mapping_start_event_initialize(&event, anchor, tag, implicit, mapping_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error for item_key, item_value in node.value: self._serialize_node(item_key, node, None) self._serialize_node(item_value, node, item_key) yaml_mapping_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error return 1 cdef int output_handler(void *data, char *buffer, int size) except 0: cdef CEmitter emitter emitter = data value = PyString_FromStringAndSize(buffer, size) emitter.stream.write(value) return 1