From 603ebf9a866314b3304f800d50c09a3cd55d8546 Mon Sep 17 00:00:00 2001 From: flu0r1ne Date: Sat, 6 May 2023 05:42:39 -0500 Subject: Add automatic code highlighting --- src/gpt_chat_cli/chat_colorizer.py | 267 +++++++++++++++++++++++++++++++++ src/gpt_chat_cli/gcli.py | 69 ++++++--- src/gpt_chat_cli/streaming_lexer.py | 288 ++++++++++++++++++++++++++++++++++++ tests/test_streaming_lexer.py | 207 ++++++++++++++++++++++++++ 4 files changed, 813 insertions(+), 18 deletions(-) create mode 100644 src/gpt_chat_cli/chat_colorizer.py create mode 100644 src/gpt_chat_cli/streaming_lexer.py create mode 100644 tests/test_streaming_lexer.py diff --git a/src/gpt_chat_cli/chat_colorizer.py b/src/gpt_chat_cli/chat_colorizer.py new file mode 100644 index 0000000..168a5b8 --- /dev/null +++ b/src/gpt_chat_cli/chat_colorizer.py @@ -0,0 +1,267 @@ +from pygments import highlight +from pygments.lexer import Lexer +from pygments.formatter import Formatter +from pygments.lexers import ( + get_lexer_by_name, get_all_lexers, guess_lexer, + find_lexer_class_by_name +) +from pygments.formatters import ( + TerminalFormatter, + NullFormatter +) + +from dataclasses import dataclass +from typing import Optional +from pygments.util import ClassNotFound + +from .streaming_lexer import ( + SinglePassStreamingLexer, + Token, + TokenType, + TokenOrientation +) + +from .color import ( + get_color_codes, + ColorCode, +) + +# Guessing languages takes time ... +# Assume these are our candidates since +# they likely cover upward of 90% of +# usage +GUESSABLE_LANGUAGES = [ + 'html', + 'python', + 'java', + 'python2', + 'c++', + 'javascript', + 'c#', + 'sql', + 'c', + 'php', + 'go', + 'swift', + 'kotlin', + 'ruby', + 'typescript', + 'scala', + 'r', + 'rust', + 'css', + 'perl', + 'make', + 'text' +] + +def guess_lexer( text : str, **options ): + ''' + Guess the lexer in use from GUESSABLE_LANGUAGES. + + Uses very primitive heuristics and is not very good. + ''' + + best_lexer = [0.0, None] + + for lexer_name in GUESSABLE_LANGUAGES: + + lexer = find_lexer_class_by_name(lexer_name) + + rv = lexer.analyse_text(text) + + if rv == 1.0: + return lexer(**options) + if rv > best_lexer[0]: + best_lexer[:] = (rv, lexer) + + if not best_lexer[0] or best_lexer[1] is None: + + raise ClassNotFound('no lexer matching the text found') + + return best_lexer[1](**options) + +@dataclass +class CodefenceContext: + language: str + lexer : Lexer + formatter : Formatter + buffer : str = '' + eof : bool = False + + def may_guess_language( self : "CodefenceContext" ): + + if self.eof: + return True + + MIN_CHARACTERS = 150 + MIN_LINES = 2 + + return ( + len(self.buffer) > MIN_CHARACTERS and + self.buffer.count('\n') > MIN_LINES + ) + + def get_highlighted_lines(self : "CodefenceContext"): + + if self.language is None: + if self.may_guess_language(): + + lexer = guess_lexer( self.buffer ) + + self.language = lexer.name + self.lexer = lexer + + else: + return None + + + idx = self.buffer.rfind('\n') + + if idx == -1: + return None + else: + lines = self.buffer[:idx+1] + self.buffer = self.buffer[idx+1:] + + highlighted = highlight( + lines, + self.lexer, + self.formatter, + ) + + return highlighted + +class ChatColorizer( object ): + + lexer : SinglePassStreamingLexer + formatter : Formatter + cf_ctx : Optional[CodefenceContext] + color_code : ColorCode + text_emitted: bool + + def __init__( self : "ChatColorizer", no_color = False ): + self.lexer = SinglePassStreamingLexer() + + self.cf_ctx = None + + if no_color: + self.formatter = NullFormatter() + else: + self.formatter = TerminalFormatter() + + self.color_code = get_color_codes( no_color=no_color ) + self.text_emitted = False + + def add_chunk( self : "ChatColorizer", chunk : str ): + self.lexer.add_chunk( chunk ) + + def print( self : "ChatColorizer" ): + + for token in self.lexer.parse(): + + if token.type == TokenType.EOF: + break + + if token.type == TokenType.CODE_FENCE: + + if not self.text_emitted: + print() + self.text_emitted = True + + if token.orientation == TokenOrientation.BEGIN: + assert self.cf_ctx is None + + lang = token.content + + try: + lexer = get_lexer_by_name(lang) + except ClassNotFound: + # try to guess it + lang = None + lexer = None + + self.cf_ctx = CodefenceContext(lang, lexer, self.formatter) + + else: + assert self.cf_ctx is not None + + self.cf_ctx.eof = True + + highlighted = self.cf_ctx.get_highlighted_lines() + + if highlighted: + print( highlighted, end='', flush=True ) + + self.cf_ctx = None + + # Add extra \n to either side of a chunk + print(f'{self.color_code.WHITE}```{self.color_code.RESET}', flush=True) + + continue + + if self.cf_ctx: + + self.cf_ctx.buffer += token.content + highlighted = self.cf_ctx.get_highlighted_lines() + + if highlighted: + print( highlighted, end='', flush=True ) + + else: + + print( f'{self.color_code.WHITE}{token.content}{self.color_code.RESET}', end='', flush=True ) + self.text_emitted = True + + + def finish( self : "ChatColorizer" ): + self.lexer.finish() + + +# content = ''' +# Rust code: + +# ```rust +# fn main() { +# let x = 5; +# let y = 10; +# let z = x + y; +# println!("The value of z is {}", z); +# } +# ``` + +# Python code: + +# ```python +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) +# ``` + +# Unknown code: + +# ``` +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) +# ``` + + +# Testing + +# ```python +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) + +# ''' + +# highlighter = ChatColorizer() + +# highlighter.add_chunk(content) +# highlighter.finish() + +# highlighter.print() diff --git a/src/gpt_chat_cli/gcli.py b/src/gpt_chat_cli/gcli.py index d904733..1c5555c 100644 --- a/src/gpt_chat_cli/gcli.py +++ b/src/gpt_chat_cli/gcli.py @@ -29,6 +29,9 @@ from .argparsing import ( from .version import VERSION from .color import get_color_codes +import datetime + + ########################### #### SAVE / REPLAY #### ########################### @@ -38,7 +41,7 @@ def create_singleton_chat_completion( completion_args : CompletionArguments ): - hist = [ ChatMessage( Role.USER, message ) ] + hist = [ get_system_message(), ChatMessage( Role.USER, message ) ] completion = create_chat_completion(hist, completion_args) @@ -85,6 +88,8 @@ class CumulativeResponse: self.content += new_chunk self.delta_content += new_chunk +from .chat_colorizer import ChatColorizer + def print_streamed_response( display_args : DisplayArguments, completion : OpenAIChatResponseStream, @@ -98,13 +103,17 @@ def print_streamed_response( on until all responses have been printed. """ - COLOR_CODE = get_color_codes(no_color = not display_args.color) + no_color = not display_args.color + + COLOR_CODE = get_color_codes(no_color = no_color) adornments = display_args.adornments cumu_responses = defaultdict(CumulativeResponse) display_idx = 0 prompt_printed = False + chat_colorizer = ChatColorizer(no_color = no_color) + for update in completion: for choice in update.choices: @@ -126,10 +135,15 @@ def print_streamed_response( print(PROMPT, end=' ', flush=True) content = display_response.take_delta() - print(f'{COLOR_CODE.WHITE}{content}{COLOR_CODE.RESET}', - sep='', end='', flush=True) + chat_colorizer.add_chunk( content ) + + chat_colorizer.print() if display_response.finish_reason is not FinishReason.NONE: + chat_colorizer.finish() + chat_colorizer.print() + chat_colorizer = ChatColorizer( no_color=no_color ) + if display_idx < n_completions: display_idx += 1 prompt_printed = False @@ -142,6 +156,13 @@ def print_streamed_response( if return_responses: return [ cumu_responses[i].content for i in range(n_completions) ] +def get_system_message(): + current_date_time = datetime.datetime.now() + + msg = f'The current date is {current_date_time}. When emitting code or producing markdown, ensure to label fenced code blocks with the language in use.' + + return ChatMessage( Role.SYSTEM, msg) + def cmd_version(): print(f'version {VERSION}') @@ -149,25 +170,34 @@ def cmd_list_models(): for model in list_models(): print(model) +def enable_emacs_editing(): + try: + import readline + # self.old_completer = readline.get_completer() + # readline.set_completer(self.complete) + # readline.parse_and_bind(self.completekey+": complete") + except ImportError: + pass + def cmd_interactive(args : Arguments): + + enable_emacs_editing() + COLOR_CODE = get_color_codes(no_color = not args.display_args.color) completion_args = args.completion_args display_args = args.display_args - hist = [] - - def print_prompt(): + hist = [ get_system_message() ] - print(f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}]', end=' ', flush=True) + PROMPT = f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}] ' def prompt_message() -> bool: - print_prompt() # Control-D closes the input stream try: - message = input() - except EOFError: + message = input( PROMPT ) + except (EOFError, KeyboardInterrupt): print() return False @@ -179,21 +209,24 @@ def cmd_interactive(args : Arguments): print(f'Press Control-D to exit') if args.initial_message: - print_prompt() - print( args.initial_message ) + print( PROMPT, args.initial_message, sep='' ) hist.append( ChatMessage( Role.USER, args.initial_message ) ) else: - prompt_message() + if not prompt_message(): + return while True: completion = create_chat_completion(hist, completion_args) - response = print_streamed_response( - display_args, completion, 1, return_responses=True, - )[0] + try: + response = print_streamed_response( + display_args, completion, 1, return_responses=True, + )[0] - hist.append( ChatMessage(Role.ASSISTANT, response) ) + hist.append( ChatMessage(Role.ASSISTANT, response) ) + except: + pass if not prompt_message(): break diff --git a/src/gpt_chat_cli/streaming_lexer.py b/src/gpt_chat_cli/streaming_lexer.py new file mode 100644 index 0000000..7f3da73 --- /dev/null +++ b/src/gpt_chat_cli/streaming_lexer.py @@ -0,0 +1,288 @@ +# +# SINGLE-PASS STREAMING LEXER +# --------------------------- +# +# This is a simple "streaming lexer" which is designed to provide real-time syntax highlighting. +# It consumes input from the LLM as it is produced and parses it efficiently, emitting tokens +# as soon as they are available. +# +# The language is a subset of markdown. GPT seems to sometimes emit markdown, although it is somewhat +# fickle and does not reliably emit most tokens. When responding to programming questions, it does +# prefer code blocks and code spans. Maybe this should not be surprising since programmers do tend +# to use markdown. +# +# For the design of this "streaming lexer," a subset of this markdown will be parsed. While I would like +# to expand its purview, I have limited capacity. +# +# Thus, for version 0.0.3 we will only consider fenced code blocks and code spans. Fenced code blocks will +# be passed off to pygments for syntax highlighting. Code spans will be highlighted a different color to +# differentiate them from the rest of the text. +# +# A custom lexer seems to be needed since I have not found a lexer which is able to emit tokens when they +# are guaranteed and no sooner. If this evolves, it may need to not completely follow the commonmark spec. +# The recommended algorithm to parse markdown follows a two-pass algorithm. Regardless, the goals of the +# syntax highlighting may differ from full parsing since it is highlighting the textual content rather +# than converting the markdown to HTML. +# +# Currently, the design is one which emits two kinds of tokens "text" and "delimiters." Text unsurprisingly +# contains text which can be displayed or passed on to another parser. The "delimiters" are meant to control +# style. Currently, all delimiters are parsed left-to-right. When the lexer emits a "beginning" delimiter, +# it must terminate it with an "end" delimiter. +# +# Here is an example: +# [PARAGRAPH BEGIN][TEXT "Here is some" ] [TEXT " text"] [CODE_SPAN BEGIN][TEXT "I'm in "][TEXT "the span"][CODE_SPAN END][PARAGRAPH END] +# +# Code fences "begin" block may contain an info string. This is stripped (as the markdown specification demands.) + +#################################################### +## AS OF 2023-05-06 ONLY CODE FENCES ARE IMPLEMENTED + + +from enum import Enum, auto +from dataclasses import dataclass +from typing import Optional, Tuple, Iterator, Generator +import re + +class TokenType(Enum): + PARAGRAPH = auto() + TEXT = auto() + CODE_FENCE = auto() + CODE_SPAN = auto() + EOF = auto() + +class TokenOrientation(Enum): + NONE = auto() + BEGIN = auto() + END = auto() + +@dataclass +class Token: + type: TokenType + orientation: TokenOrientation + content: Optional[str] = None + +def make_text_token( s : str ) -> Token: + return Token( + type = TokenType.TEXT, + orientation = TokenOrientation.NONE, + content = s + ) + +class MatchState(Enum): + MATCH = auto() + INDETERMINATE = auto() + MISMATCH = auto() + +@dataclass +class CodeFenceContext: + spacing : int + info_string : str + end : int + +def _try_to_parse_code_fence( buffer, eof=False ) -> Tuple[MatchState, Optional[CodeFenceContext]]: + ''' + Parse a code fence from a line boundary + + Example: + + ```python + ```python + ```python + ~~~python + + Invalid: + ```python [four spaces] + ~``python + ''' + + # match between zero and three spaces, followed by a grouping of ` and ~ + fence_match = re.search(r"^( {0,3})([`~]{1,3})", buffer ) + + if fence_match: + fence_indicator = fence_match.group(2) + + # Ensure there isn't a mix of ` and ~ + if '`' in fence_indicator and '~' in fence_indicator: + return ( MatchState.MISMATCH, None ) + + remaining = buffer[fence_match.end():] + + if len(fence_indicator) != 3 and len(remaining) > 0: + return ( MatchState.MISMATCH, None ) + + if '\n' not in remaining and not eof: + # wait for info string to accumulate + return ( MatchState.INDETERMINATE, None ) + else: + + if eof: + info_match = re.search(r"^([^`~\n]*)(?:\n|$)", remaining ) + else: + info_match = re.search(r"^([^`~\n]*)\n", remaining ) + + # info string cannot contain ^ or ~ + if not info_match: + # info string cannot contain ` or ~ + return ( MatchState.MISMATCH, None ) + + spaces = len(fence_match.group(1)) + info_string = info_match.group( 1 ) + + # remove extra spaces + info_string = info_string.strip() + + # end of match + end = info_match.end() + fence_match.end() + + ctx = CodeFenceContext( spaces, info_string, end ) + + return ( MatchState.MATCH, ctx ) + else: + return ( MatchState.MISMATCH, None ) + +class SinglePassStreamingLexer( object ): + _buffer : str + _line_start : bool + _eof : bool + + # "leaf" blocks + _in_code_fence : bool + _code_fence_spaces : int + _in_paragraph : bool + + def __init__( self : "SinglePassStreamingLexer" ): + self._buffer = '' + self._line_start = True + + self._in_code_fence = False + self._code_fence_spaces = 0 + self._in_paragraph = False + self._eof = False + + def add_chunk( self : "SinglePassStreamingLexer", new_chunk : str ): + self._buffer += new_chunk + + def finish( self : "SinglePassStreamingLexer" ): + self._eof = True + + def _take_chunk( self : "SinglePassStreamingLexer", amount : int ): + + chunk = self._buffer[ : amount ] + self._buffer = self._buffer[ amount : ] + + return chunk + + def _tokenize_text_until_newline( self : "SinglePassStreamingLexer" ): + # we can take tokens until we hit a newline + + end = self._buffer.find('\n') + + if end == -1: + l = len(self._buffer) + + if l != 0: + self._line_start = False + return make_text_token( self._take_chunk( l ) ) + else: + self._line_start = True + + return make_text_token( self._take_chunk( end + 1 ) ) + + def parse( self : "SinglePassStreamingLexer" ) -> Generator[Token, None, None]: + + while True: + + if len(self._buffer) == 0: + if self._eof: + + # terminate + + if self._in_code_fence: + yield Token( + TokenType.CODE_FENCE, + TokenOrientation.END + ) + + self._in_code_fence = False + + yield Token( + TokenType.EOF, + TokenOrientation.NONE + ) + + return + else: + # Wait for more content + return + + if self._line_start: + + state, ctx = _try_to_parse_code_fence( self._buffer, eof=self._eof ) + + if state == MatchState.INDETERMINATE and not self._eof: + # wait for more tokens to accumulate + return + elif state == MatchState.MATCH: + + chunk = self._take_chunk( ctx.end ) + + if self._in_code_fence: + # closing fences cannot contain info strings + # consider it in the code block + if len(ctx.info_string) != 0: + yield make_text_token( chunk ) + else: + yield Token( TokenType.CODE_FENCE, TokenOrientation.END ) + self._in_code_fence = False + self._code_fence_spaces = 0 + else: + + if self._in_paragraph: + yield Token( TokenType.PARAGRAPH, TokenOrientation.END ) + self._in_paragraph = False + + yield Token( + TokenType.CODE_FENCE, + TokenOrientation.BEGIN, + content = ctx.info_string + ) + + self._code_fence_spaces = ctx.spacing + self._in_code_fence = True + + # if we get to this point, we are at the + # beginning of a line, restart parsing + + continue + + # a mismatch occurred, but we're still at the beginning of + # a line, emit regular text + + # TODO: add paragraph check + if self._in_code_fence: + if len(self._buffer) < self._code_fence_spaces and \ + not self._eof: + # wait for mare tokens + return + + token = self._tokenize_text_until_newline() + + # strip off beginning spaces + if token.content.startswith(' ' * self._code_fence_spaces): + token.content = token.content[self._code_fence_spaces:] + + yield token + + continue + + # FALLTHROUGH: tokenize text until newline then continue + + if self._in_code_fence: + # tokenize text until next line + pass + + # otherwise, emit a paragraph in the future + token = self._tokenize_text_until_newline() + yield token + continue + diff --git a/tests/test_streaming_lexer.py b/tests/test_streaming_lexer.py new file mode 100644 index 0000000..cd03513 --- /dev/null +++ b/tests/test_streaming_lexer.py @@ -0,0 +1,207 @@ +import re +from typing import Optional, Tuple +from enum import Enum, auto +from dataclasses import dataclass + +import pytest + +from src.gpt_chat_cli.streaming_lexer import ( + MatchState, + CodeFenceContext, + _try_to_parse_code_fence, + SinglePassStreamingLexer, + Token, + TokenType, + TokenOrientation, + make_text_token +) + +def test_try_to_parse_code_fence(): + # Test valid cases + valid_cases = [ + ("```python\nhe", CodeFenceContext(0, "python", 10)), + (" ```python\n", CodeFenceContext(2, "python", 12)), + ("~~~python\n", CodeFenceContext(0, "python", 10)), + (" ~~~python\nmore", CodeFenceContext(3, "python", 13)) + ] + + + for case, expected in valid_cases: + result = _try_to_parse_code_fence(case) + assert result[0] == MatchState.MATCH + assert result[1] == expected + + # Test invalid cases + invalid_cases = [ + " ```python\n", + "~``python\n", + "```python ```\n", + "~~~python ~~~\n", + ] + + for case in invalid_cases: + print(case) + result = _try_to_parse_code_fence(case) + assert result[0] == MatchState.MISMATCH + + # Test indeterminate case + indeterminate_cases = [ + "```", + " ~~~", + ] + + for case in indeterminate_cases: + result = _try_to_parse_code_fence(case) + assert result[0] == MatchState.INDETERMINATE + +def _check_exact_lexing_matches( chunk_tokens, final_tokens ): + + lexer = SinglePassStreamingLexer() + + for ( chunk, expected_tokens ) in chunk_tokens: + + lexer.add_chunk( chunk ) + + n_tokens_emitted = 0 + + for i, token in enumerate(lexer.parse()): + assert i < len(expected_tokens) + assert expected_tokens[i] == token + + n_tokens_emitted += 1 + + assert n_tokens_emitted == len(expected_tokens) + + lexer.finish() + + n_tokens_emitted = 0 + + for i, token in enumerate(lexer.parse()): + assert i < len(final_tokens) + print(token) + assert final_tokens[i] == token + + n_tokens_emitted += 1 + + assert n_tokens_emitted == len(final_tokens) + + +def test_single_pass_lexing(): + + cases = [ + ( 'Some text\n', [ + make_text_token( 'Some text\n' ) + ] ), + ( 'More text\n', [ + make_text_token( 'More text\n' ) + ] ), + ( ' Indented text\n', [ + make_text_token( ' Indented text\n' ) + ] ), + ( '```python\n', [ + Token( TokenType.CODE_FENCE, TokenOrientation.BEGIN, 'python' ) + ] ), + ( 'print("Hello")\n', [ + make_text_token( 'print("Hello")\n' ) + ] ), + ( '```', [] ), + ] + + final_tokens = [ + Token( TokenType.CODE_FENCE, TokenOrientation.END ), + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + + cases = [ + ( '```java\nSome text\nMore ', [ + Token( TokenType.CODE_FENCE, TokenOrientation.BEGIN, 'java' ), + make_text_token( 'Some text\n' ), + make_text_token( 'More ' ), + ] ), + ( ' text\n```', [ + make_text_token( ' text\n' ), + ] ), + ( '\n', [ + Token( TokenType.CODE_FENCE, TokenOrientation.END ) + ]), + ] + + final_tokens = [ + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + + cases = [ + ( ' ```java \n Some text\n More ', [ + Token( TokenType.CODE_FENCE, TokenOrientation.BEGIN, 'java' ), + make_text_token( 'Some text\n' ), + make_text_token( 'More ' ), + ] ), + ( ' text\n ```', [ + make_text_token( ' text\n' ), + ] ), + ( '\n', [ + Token( TokenType.CODE_FENCE, TokenOrientation.END ) + ]), + ] + + final_tokens = [ + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + + cases = [ + ( ' ``', []), + ('` java \n Some text\n More ', [ + Token( TokenType.CODE_FENCE, TokenOrientation.BEGIN, 'java' ), + make_text_token( 'Some text\n' ), + make_text_token( 'More ' ), + ] ), + ( ' text\n ```', [ + make_text_token( ' text\n' ), + ] ), + ( '\n', [ + Token( TokenType.CODE_FENCE, TokenOrientation.END ) + ]), + ] + + final_tokens = [ + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + + # Ticks preceded by characters don't initiate a code block + cases = [ + ( 'tick```java\nSome text\n', [ + make_text_token( 'tick```java\n' ), + make_text_token( 'Some text\n' ), + ] ), + ] + + final_tokens = [ + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + + # Code blocks which are not terminated, terminate + # at the end of the document + cases = [ + ( '```java\nSome text\n', [ + Token( TokenType.CODE_FENCE, TokenOrientation.BEGIN, 'java' ), + make_text_token( 'Some text\n' ), + ] ), + ] + + final_tokens = [ + Token( TokenType.CODE_FENCE, TokenOrientation.END ), + Token( TokenType.EOF, TokenOrientation.NONE ), + ] + + _check_exact_lexing_matches( cases, final_tokens ) + -- cgit v1.2.3