aboutsummaryrefslogtreecommitdiff
path: root/src/gpt_chat_cli/streaming_lexer.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/gpt_chat_cli/streaming_lexer.py')
-rw-r--r--src/gpt_chat_cli/streaming_lexer.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/src/gpt_chat_cli/streaming_lexer.py b/src/gpt_chat_cli/streaming_lexer.py
new file mode 100644
index 0000000..7f3da73
--- /dev/null
+++ b/src/gpt_chat_cli/streaming_lexer.py
@@ -0,0 +1,288 @@
+#
+# SINGLE-PASS STREAMING LEXER
+# ---------------------------
+#
+# This is a simple "streaming lexer" which is designed to provide real-time syntax highlighting.
+# It consumes input from the LLM as it is produced and parses it efficiently, emitting tokens
+# as soon as they are available.
+#
+# The language is a subset of markdown. GPT seems to sometimes emit markdown, although it is somewhat
+# fickle and does not reliably emit most tokens. When responding to programming questions, it does
+# prefer code blocks and code spans. Maybe this should not be surprising since programmers do tend
+# to use markdown.
+#
+# For the design of this "streaming lexer," a subset of this markdown will be parsed. While I would like
+# to expand its purview, I have limited capacity.
+#
+# Thus, for version 0.0.3 we will only consider fenced code blocks and code spans. Fenced code blocks will
+# be passed off to pygments for syntax highlighting. Code spans will be highlighted a different color to
+# differentiate them from the rest of the text.
+#
+# A custom lexer seems to be needed since I have not found a lexer which is able to emit tokens when they
+# are guaranteed and no sooner. If this evolves, it may need to not completely follow the commonmark spec.
+# The recommended algorithm to parse markdown follows a two-pass algorithm. Regardless, the goals of the
+# syntax highlighting may differ from full parsing since it is highlighting the textual content rather
+# than converting the markdown to HTML.
+#
+# Currently, the design is one which emits two kinds of tokens "text" and "delimiters." Text unsurprisingly
+# contains text which can be displayed or passed on to another parser. The "delimiters" are meant to control
+# style. Currently, all delimiters are parsed left-to-right. When the lexer emits a "beginning" delimiter,
+# it must terminate it with an "end" delimiter.
+#
+# Here is an example:
+# [PARAGRAPH BEGIN][TEXT "Here is some" ] [TEXT " text"] [CODE_SPAN BEGIN][TEXT "I'm in "][TEXT "the span"][CODE_SPAN END][PARAGRAPH END]
+#
+# Code fences "begin" block may contain an info string. This is stripped (as the markdown specification demands.)
+
+####################################################
+## AS OF 2023-05-06 ONLY CODE FENCES ARE IMPLEMENTED
+
+
+from enum import Enum, auto
+from dataclasses import dataclass
+from typing import Optional, Tuple, Iterator, Generator
+import re
+
+class TokenType(Enum):
+ PARAGRAPH = auto()
+ TEXT = auto()
+ CODE_FENCE = auto()
+ CODE_SPAN = auto()
+ EOF = auto()
+
+class TokenOrientation(Enum):
+ NONE = auto()
+ BEGIN = auto()
+ END = auto()
+
+@dataclass
+class Token:
+ type: TokenType
+ orientation: TokenOrientation
+ content: Optional[str] = None
+
+def make_text_token( s : str ) -> Token:
+ return Token(
+ type = TokenType.TEXT,
+ orientation = TokenOrientation.NONE,
+ content = s
+ )
+
+class MatchState(Enum):
+ MATCH = auto()
+ INDETERMINATE = auto()
+ MISMATCH = auto()
+
+@dataclass
+class CodeFenceContext:
+ spacing : int
+ info_string : str
+ end : int
+
+def _try_to_parse_code_fence( buffer, eof=False ) -> Tuple[MatchState, Optional[CodeFenceContext]]:
+ '''
+ Parse a code fence from a line boundary
+
+ Example:
+
+ ```python
+ ```python
+ ```python
+ ~~~python
+
+ Invalid:
+ ```python [four spaces]
+ ~``python
+ '''
+
+ # match between zero and three spaces, followed by a grouping of ` and ~
+ fence_match = re.search(r"^( {0,3})([`~]{1,3})", buffer )
+
+ if fence_match:
+ fence_indicator = fence_match.group(2)
+
+ # Ensure there isn't a mix of ` and ~
+ if '`' in fence_indicator and '~' in fence_indicator:
+ return ( MatchState.MISMATCH, None )
+
+ remaining = buffer[fence_match.end():]
+
+ if len(fence_indicator) != 3 and len(remaining) > 0:
+ return ( MatchState.MISMATCH, None )
+
+ if '\n' not in remaining and not eof:
+ # wait for info string to accumulate
+ return ( MatchState.INDETERMINATE, None )
+ else:
+
+ if eof:
+ info_match = re.search(r"^([^`~\n]*)(?:\n|$)", remaining )
+ else:
+ info_match = re.search(r"^([^`~\n]*)\n", remaining )
+
+ # info string cannot contain ^ or ~
+ if not info_match:
+ # info string cannot contain ` or ~
+ return ( MatchState.MISMATCH, None )
+
+ spaces = len(fence_match.group(1))
+ info_string = info_match.group( 1 )
+
+ # remove extra spaces
+ info_string = info_string.strip()
+
+ # end of match
+ end = info_match.end() + fence_match.end()
+
+ ctx = CodeFenceContext( spaces, info_string, end )
+
+ return ( MatchState.MATCH, ctx )
+ else:
+ return ( MatchState.MISMATCH, None )
+
+class SinglePassStreamingLexer( object ):
+ _buffer : str
+ _line_start : bool
+ _eof : bool
+
+ # "leaf" blocks
+ _in_code_fence : bool
+ _code_fence_spaces : int
+ _in_paragraph : bool
+
+ def __init__( self : "SinglePassStreamingLexer" ):
+ self._buffer = ''
+ self._line_start = True
+
+ self._in_code_fence = False
+ self._code_fence_spaces = 0
+ self._in_paragraph = False
+ self._eof = False
+
+ def add_chunk( self : "SinglePassStreamingLexer", new_chunk : str ):
+ self._buffer += new_chunk
+
+ def finish( self : "SinglePassStreamingLexer" ):
+ self._eof = True
+
+ def _take_chunk( self : "SinglePassStreamingLexer", amount : int ):
+
+ chunk = self._buffer[ : amount ]
+ self._buffer = self._buffer[ amount : ]
+
+ return chunk
+
+ def _tokenize_text_until_newline( self : "SinglePassStreamingLexer" ):
+ # we can take tokens until we hit a newline
+
+ end = self._buffer.find('\n')
+
+ if end == -1:
+ l = len(self._buffer)
+
+ if l != 0:
+ self._line_start = False
+ return make_text_token( self._take_chunk( l ) )
+ else:
+ self._line_start = True
+
+ return make_text_token( self._take_chunk( end + 1 ) )
+
+ def parse( self : "SinglePassStreamingLexer" ) -> Generator[Token, None, None]:
+
+ while True:
+
+ if len(self._buffer) == 0:
+ if self._eof:
+
+ # terminate
+
+ if self._in_code_fence:
+ yield Token(
+ TokenType.CODE_FENCE,
+ TokenOrientation.END
+ )
+
+ self._in_code_fence = False
+
+ yield Token(
+ TokenType.EOF,
+ TokenOrientation.NONE
+ )
+
+ return
+ else:
+ # Wait for more content
+ return
+
+ if self._line_start:
+
+ state, ctx = _try_to_parse_code_fence( self._buffer, eof=self._eof )
+
+ if state == MatchState.INDETERMINATE and not self._eof:
+ # wait for more tokens to accumulate
+ return
+ elif state == MatchState.MATCH:
+
+ chunk = self._take_chunk( ctx.end )
+
+ if self._in_code_fence:
+ # closing fences cannot contain info strings
+ # consider it in the code block
+ if len(ctx.info_string) != 0:
+ yield make_text_token( chunk )
+ else:
+ yield Token( TokenType.CODE_FENCE, TokenOrientation.END )
+ self._in_code_fence = False
+ self._code_fence_spaces = 0
+ else:
+
+ if self._in_paragraph:
+ yield Token( TokenType.PARAGRAPH, TokenOrientation.END )
+ self._in_paragraph = False
+
+ yield Token(
+ TokenType.CODE_FENCE,
+ TokenOrientation.BEGIN,
+ content = ctx.info_string
+ )
+
+ self._code_fence_spaces = ctx.spacing
+ self._in_code_fence = True
+
+ # if we get to this point, we are at the
+ # beginning of a line, restart parsing
+
+ continue
+
+ # a mismatch occurred, but we're still at the beginning of
+ # a line, emit regular text
+
+ # TODO: add paragraph check
+ if self._in_code_fence:
+ if len(self._buffer) < self._code_fence_spaces and \
+ not self._eof:
+ # wait for mare tokens
+ return
+
+ token = self._tokenize_text_until_newline()
+
+ # strip off beginning spaces
+ if token.content.startswith(' ' * self._code_fence_spaces):
+ token.content = token.content[self._code_fence_spaces:]
+
+ yield token
+
+ continue
+
+ # FALLTHROUGH: tokenize text until newline then continue
+
+ if self._in_code_fence:
+ # tokenize text until next line
+ pass
+
+ # otherwise, emit a paragraph in the future
+ token = self._tokenize_text_until_newline()
+ yield token
+ continue
+