diff options
Diffstat (limited to 'src/gpt_chat_cli/streaming_lexer.py')
-rw-r--r-- | src/gpt_chat_cli/streaming_lexer.py | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/src/gpt_chat_cli/streaming_lexer.py b/src/gpt_chat_cli/streaming_lexer.py new file mode 100644 index 0000000..7f3da73 --- /dev/null +++ b/src/gpt_chat_cli/streaming_lexer.py @@ -0,0 +1,288 @@ +# +# SINGLE-PASS STREAMING LEXER +# --------------------------- +# +# This is a simple "streaming lexer" which is designed to provide real-time syntax highlighting. +# It consumes input from the LLM as it is produced and parses it efficiently, emitting tokens +# as soon as they are available. +# +# The language is a subset of markdown. GPT seems to sometimes emit markdown, although it is somewhat +# fickle and does not reliably emit most tokens. When responding to programming questions, it does +# prefer code blocks and code spans. Maybe this should not be surprising since programmers do tend +# to use markdown. +# +# For the design of this "streaming lexer," a subset of this markdown will be parsed. While I would like +# to expand its purview, I have limited capacity. +# +# Thus, for version 0.0.3 we will only consider fenced code blocks and code spans. Fenced code blocks will +# be passed off to pygments for syntax highlighting. Code spans will be highlighted a different color to +# differentiate them from the rest of the text. +# +# A custom lexer seems to be needed since I have not found a lexer which is able to emit tokens when they +# are guaranteed and no sooner. If this evolves, it may need to not completely follow the commonmark spec. +# The recommended algorithm to parse markdown follows a two-pass algorithm. Regardless, the goals of the +# syntax highlighting may differ from full parsing since it is highlighting the textual content rather +# than converting the markdown to HTML. +# +# Currently, the design is one which emits two kinds of tokens "text" and "delimiters." Text unsurprisingly +# contains text which can be displayed or passed on to another parser. The "delimiters" are meant to control +# style. Currently, all delimiters are parsed left-to-right. When the lexer emits a "beginning" delimiter, +# it must terminate it with an "end" delimiter. +# +# Here is an example: +# [PARAGRAPH BEGIN][TEXT "Here is some" ] [TEXT " text"] [CODE_SPAN BEGIN][TEXT "I'm in "][TEXT "the span"][CODE_SPAN END][PARAGRAPH END] +# +# Code fences "begin" block may contain an info string. This is stripped (as the markdown specification demands.) + +#################################################### +## AS OF 2023-05-06 ONLY CODE FENCES ARE IMPLEMENTED + + +from enum import Enum, auto +from dataclasses import dataclass +from typing import Optional, Tuple, Iterator, Generator +import re + +class TokenType(Enum): + PARAGRAPH = auto() + TEXT = auto() + CODE_FENCE = auto() + CODE_SPAN = auto() + EOF = auto() + +class TokenOrientation(Enum): + NONE = auto() + BEGIN = auto() + END = auto() + +@dataclass +class Token: + type: TokenType + orientation: TokenOrientation + content: Optional[str] = None + +def make_text_token( s : str ) -> Token: + return Token( + type = TokenType.TEXT, + orientation = TokenOrientation.NONE, + content = s + ) + +class MatchState(Enum): + MATCH = auto() + INDETERMINATE = auto() + MISMATCH = auto() + +@dataclass +class CodeFenceContext: + spacing : int + info_string : str + end : int + +def _try_to_parse_code_fence( buffer, eof=False ) -> Tuple[MatchState, Optional[CodeFenceContext]]: + ''' + Parse a code fence from a line boundary + + Example: + + ```python + ```python + ```python + ~~~python + + Invalid: + ```python [four spaces] + ~``python + ''' + + # match between zero and three spaces, followed by a grouping of ` and ~ + fence_match = re.search(r"^( {0,3})([`~]{1,3})", buffer ) + + if fence_match: + fence_indicator = fence_match.group(2) + + # Ensure there isn't a mix of ` and ~ + if '`' in fence_indicator and '~' in fence_indicator: + return ( MatchState.MISMATCH, None ) + + remaining = buffer[fence_match.end():] + + if len(fence_indicator) != 3 and len(remaining) > 0: + return ( MatchState.MISMATCH, None ) + + if '\n' not in remaining and not eof: + # wait for info string to accumulate + return ( MatchState.INDETERMINATE, None ) + else: + + if eof: + info_match = re.search(r"^([^`~\n]*)(?:\n|$)", remaining ) + else: + info_match = re.search(r"^([^`~\n]*)\n", remaining ) + + # info string cannot contain ^ or ~ + if not info_match: + # info string cannot contain ` or ~ + return ( MatchState.MISMATCH, None ) + + spaces = len(fence_match.group(1)) + info_string = info_match.group( 1 ) + + # remove extra spaces + info_string = info_string.strip() + + # end of match + end = info_match.end() + fence_match.end() + + ctx = CodeFenceContext( spaces, info_string, end ) + + return ( MatchState.MATCH, ctx ) + else: + return ( MatchState.MISMATCH, None ) + +class SinglePassStreamingLexer( object ): + _buffer : str + _line_start : bool + _eof : bool + + # "leaf" blocks + _in_code_fence : bool + _code_fence_spaces : int + _in_paragraph : bool + + def __init__( self : "SinglePassStreamingLexer" ): + self._buffer = '' + self._line_start = True + + self._in_code_fence = False + self._code_fence_spaces = 0 + self._in_paragraph = False + self._eof = False + + def add_chunk( self : "SinglePassStreamingLexer", new_chunk : str ): + self._buffer += new_chunk + + def finish( self : "SinglePassStreamingLexer" ): + self._eof = True + + def _take_chunk( self : "SinglePassStreamingLexer", amount : int ): + + chunk = self._buffer[ : amount ] + self._buffer = self._buffer[ amount : ] + + return chunk + + def _tokenize_text_until_newline( self : "SinglePassStreamingLexer" ): + # we can take tokens until we hit a newline + + end = self._buffer.find('\n') + + if end == -1: + l = len(self._buffer) + + if l != 0: + self._line_start = False + return make_text_token( self._take_chunk( l ) ) + else: + self._line_start = True + + return make_text_token( self._take_chunk( end + 1 ) ) + + def parse( self : "SinglePassStreamingLexer" ) -> Generator[Token, None, None]: + + while True: + + if len(self._buffer) == 0: + if self._eof: + + # terminate + + if self._in_code_fence: + yield Token( + TokenType.CODE_FENCE, + TokenOrientation.END + ) + + self._in_code_fence = False + + yield Token( + TokenType.EOF, + TokenOrientation.NONE + ) + + return + else: + # Wait for more content + return + + if self._line_start: + + state, ctx = _try_to_parse_code_fence( self._buffer, eof=self._eof ) + + if state == MatchState.INDETERMINATE and not self._eof: + # wait for more tokens to accumulate + return + elif state == MatchState.MATCH: + + chunk = self._take_chunk( ctx.end ) + + if self._in_code_fence: + # closing fences cannot contain info strings + # consider it in the code block + if len(ctx.info_string) != 0: + yield make_text_token( chunk ) + else: + yield Token( TokenType.CODE_FENCE, TokenOrientation.END ) + self._in_code_fence = False + self._code_fence_spaces = 0 + else: + + if self._in_paragraph: + yield Token( TokenType.PARAGRAPH, TokenOrientation.END ) + self._in_paragraph = False + + yield Token( + TokenType.CODE_FENCE, + TokenOrientation.BEGIN, + content = ctx.info_string + ) + + self._code_fence_spaces = ctx.spacing + self._in_code_fence = True + + # if we get to this point, we are at the + # beginning of a line, restart parsing + + continue + + # a mismatch occurred, but we're still at the beginning of + # a line, emit regular text + + # TODO: add paragraph check + if self._in_code_fence: + if len(self._buffer) < self._code_fence_spaces and \ + not self._eof: + # wait for mare tokens + return + + token = self._tokenize_text_until_newline() + + # strip off beginning spaces + if token.content.startswith(' ' * self._code_fence_spaces): + token.content = token.content[self._code_fence_spaces:] + + yield token + + continue + + # FALLTHROUGH: tokenize text until newline then continue + + if self._in_code_fence: + # tokenize text until next line + pass + + # otherwise, emit a paragraph in the future + token = self._tokenize_text_until_newline() + yield token + continue + |