diff options
Diffstat (limited to 'src/gpt_chat_cli')
| -rw-r--r-- | src/gpt_chat_cli/chat_colorizer.py | 267 | ||||
| -rw-r--r-- | src/gpt_chat_cli/gcli.py | 69 | ||||
| -rw-r--r-- | src/gpt_chat_cli/streaming_lexer.py | 288 | 
3 files changed, 606 insertions, 18 deletions
| diff --git a/src/gpt_chat_cli/chat_colorizer.py b/src/gpt_chat_cli/chat_colorizer.py new file mode 100644 index 0000000..168a5b8 --- /dev/null +++ b/src/gpt_chat_cli/chat_colorizer.py @@ -0,0 +1,267 @@ +from pygments import highlight +from pygments.lexer import Lexer +from pygments.formatter import Formatter +from pygments.lexers import ( +    get_lexer_by_name, get_all_lexers, guess_lexer, +    find_lexer_class_by_name +) +from pygments.formatters import ( +    TerminalFormatter, +    NullFormatter +) + +from dataclasses import dataclass +from typing import Optional +from pygments.util import ClassNotFound + +from .streaming_lexer import ( +    SinglePassStreamingLexer, +    Token, +    TokenType, +    TokenOrientation +) + +from .color import ( +    get_color_codes, +    ColorCode, +) + +# Guessing languages takes time ... +# Assume these are our candidates since +# they likely cover upward of 90% of +# usage +GUESSABLE_LANGUAGES = [ +    'html', +    'python', +    'java', +    'python2', +    'c++', +    'javascript', +    'c#', +    'sql', +    'c', +    'php', +    'go', +    'swift', +    'kotlin', +    'ruby', +    'typescript', +    'scala', +    'r', +    'rust', +    'css', +    'perl', +    'make', +    'text' +] + +def guess_lexer( text : str, **options ): +    ''' +    Guess the lexer in use from GUESSABLE_LANGUAGES. + +    Uses very primitive heuristics and is not very good. +    ''' + +    best_lexer = [0.0, None] + +    for lexer_name in GUESSABLE_LANGUAGES: + +        lexer = find_lexer_class_by_name(lexer_name) + +        rv = lexer.analyse_text(text) + +        if rv == 1.0: +            return lexer(**options) +        if rv > best_lexer[0]: +            best_lexer[:] = (rv, lexer) + +    if not best_lexer[0] or best_lexer[1] is None: + +        raise ClassNotFound('no lexer matching the text found') + +    return best_lexer[1](**options) + +@dataclass +class CodefenceContext: +    language: str +    lexer : Lexer +    formatter : Formatter +    buffer : str = '' +    eof : bool = False + +    def may_guess_language( self : "CodefenceContext" ): + +        if self.eof: +            return True + +        MIN_CHARACTERS = 150 +        MIN_LINES = 2 + +        return ( +            len(self.buffer) > MIN_CHARACTERS and +            self.buffer.count('\n') > MIN_LINES +        ) + +    def get_highlighted_lines(self : "CodefenceContext"): + +        if self.language is None: +            if self.may_guess_language(): + +                lexer = guess_lexer( self.buffer ) + +                self.language = lexer.name +                self.lexer = lexer + +            else: +                return None + + +        idx = self.buffer.rfind('\n') + +        if idx == -1: +            return None +        else: +            lines = self.buffer[:idx+1] +            self.buffer = self.buffer[idx+1:] + +            highlighted = highlight( +                lines, +                self.lexer, +                self.formatter, +            ) + +            return highlighted + +class ChatColorizer( object ): + +    lexer : SinglePassStreamingLexer +    formatter : Formatter +    cf_ctx : Optional[CodefenceContext] +    color_code : ColorCode +    text_emitted: bool + +    def __init__( self : "ChatColorizer", no_color = False ): +        self.lexer = SinglePassStreamingLexer() + +        self.cf_ctx = None + +        if no_color: +            self.formatter = NullFormatter() +        else: +            self.formatter = TerminalFormatter() + +        self.color_code = get_color_codes( no_color=no_color ) +        self.text_emitted = False + +    def add_chunk( self : "ChatColorizer", chunk : str ): +        self.lexer.add_chunk( chunk ) + +    def print( self : "ChatColorizer" ): + +        for token in self.lexer.parse(): + +            if token.type == TokenType.EOF: +                break + +            if token.type == TokenType.CODE_FENCE: + +                if not self.text_emitted: +                    print() +                    self.text_emitted = True + +                if token.orientation == TokenOrientation.BEGIN: +                    assert self.cf_ctx is None + +                    lang = token.content + +                    try: +                        lexer = get_lexer_by_name(lang) +                    except ClassNotFound: +                        # try to guess it +                        lang = None +                        lexer = None + +                    self.cf_ctx = CodefenceContext(lang, lexer, self.formatter) + +                else: +                    assert self.cf_ctx is not None + +                    self.cf_ctx.eof = True + +                    highlighted = self.cf_ctx.get_highlighted_lines() + +                    if highlighted: +                        print( highlighted, end='', flush=True ) + +                    self.cf_ctx = None + +                # Add extra \n to either side of a chunk +                print(f'{self.color_code.WHITE}```{self.color_code.RESET}', flush=True) + +                continue + +            if self.cf_ctx: + +                self.cf_ctx.buffer += token.content +                highlighted = self.cf_ctx.get_highlighted_lines() + +                if highlighted: +                    print( highlighted, end='', flush=True ) + +            else: + +                print( f'{self.color_code.WHITE}{token.content}{self.color_code.RESET}', end='', flush=True ) +                self.text_emitted = True + + +    def finish( self : "ChatColorizer" ): +        self.lexer.finish() + + +# content = ''' +# Rust code: + +# ```rust +# fn main() { +#     let x = 5; +#     let y = 10; +#     let z = x + y; +#     println!("The value of z is {}", z); +# } +# ``` + +# Python code: + +# ```python +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) +# ``` + +# Unknown code: + +# ``` +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) +# ``` + + +# Testing + +# ```python +# x = 5 +# y = 10 +# z = x + y +# print("The value of z is", z) + +# ''' + +# highlighter = ChatColorizer() + +# highlighter.add_chunk(content) +# highlighter.finish() + +# highlighter.print() diff --git a/src/gpt_chat_cli/gcli.py b/src/gpt_chat_cli/gcli.py index d904733..1c5555c 100644 --- a/src/gpt_chat_cli/gcli.py +++ b/src/gpt_chat_cli/gcli.py @@ -29,6 +29,9 @@ from .argparsing import (  from .version import VERSION  from .color import get_color_codes +import datetime + +  ###########################  ####   SAVE / REPLAY   ####  ########################### @@ -38,7 +41,7 @@ def create_singleton_chat_completion(          completion_args : CompletionArguments      ): -    hist = [ ChatMessage( Role.USER, message ) ] +    hist = [ get_system_message(), ChatMessage( Role.USER, message ) ]      completion = create_chat_completion(hist, completion_args) @@ -85,6 +88,8 @@ class CumulativeResponse:          self.content += new_chunk          self.delta_content += new_chunk +from .chat_colorizer import ChatColorizer +  def print_streamed_response(          display_args : DisplayArguments,          completion : OpenAIChatResponseStream, @@ -98,13 +103,17 @@ def print_streamed_response(      on until all responses have been printed.      """ -    COLOR_CODE = get_color_codes(no_color = not display_args.color) +    no_color = not display_args.color + +    COLOR_CODE = get_color_codes(no_color = no_color)      adornments = display_args.adornments      cumu_responses = defaultdict(CumulativeResponse)      display_idx = 0      prompt_printed = False +    chat_colorizer = ChatColorizer(no_color = no_color) +      for update in completion:          for choice in update.choices: @@ -126,10 +135,15 @@ def print_streamed_response(              print(PROMPT, end=' ', flush=True)          content = display_response.take_delta() -        print(f'{COLOR_CODE.WHITE}{content}{COLOR_CODE.RESET}', -              sep='', end='', flush=True) +        chat_colorizer.add_chunk( content ) + +        chat_colorizer.print()          if display_response.finish_reason is not FinishReason.NONE: +            chat_colorizer.finish() +            chat_colorizer.print() +            chat_colorizer = ChatColorizer( no_color=no_color ) +              if display_idx < n_completions:                  display_idx += 1                  prompt_printed = False @@ -142,6 +156,13 @@ def print_streamed_response(      if return_responses:          return [ cumu_responses[i].content for i in range(n_completions) ] +def get_system_message(): +    current_date_time = datetime.datetime.now() + +    msg = f'The current date is {current_date_time}. When emitting code or producing markdown, ensure to label fenced code blocks with the language in use.' + +    return ChatMessage( Role.SYSTEM, msg) +  def cmd_version():      print(f'version {VERSION}') @@ -149,25 +170,34 @@ def cmd_list_models():      for model in list_models():          print(model) +def enable_emacs_editing(): +    try: +        import readline +        # self.old_completer = readline.get_completer() +        # readline.set_completer(self.complete) +        # readline.parse_and_bind(self.completekey+": complete") +    except ImportError: +        pass +  def cmd_interactive(args : Arguments): + +    enable_emacs_editing() +      COLOR_CODE = get_color_codes(no_color = not args.display_args.color)      completion_args = args.completion_args      display_args = args.display_args -    hist = [] - -    def print_prompt(): +    hist = [ get_system_message() ] -        print(f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}]', end=' ', flush=True) +    PROMPT = f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}] '      def prompt_message() -> bool: -        print_prompt()          # Control-D closes the input stream          try: -            message = input() -        except EOFError: +            message = input( PROMPT ) +        except (EOFError, KeyboardInterrupt):              print()              return False @@ -179,21 +209,24 @@ def cmd_interactive(args : Arguments):      print(f'Press Control-D to exit')      if args.initial_message: -        print_prompt() -        print( args.initial_message ) +        print( PROMPT, args.initial_message, sep='' )          hist.append( ChatMessage( Role.USER, args.initial_message ) )      else: -        prompt_message() +        if not prompt_message(): +            return      while True:          completion = create_chat_completion(hist, completion_args) -        response = print_streamed_response( -            display_args, completion, 1, return_responses=True, -        )[0] +        try: +            response = print_streamed_response( +                display_args, completion, 1, return_responses=True, +            )[0] -        hist.append( ChatMessage(Role.ASSISTANT, response) ) +            hist.append( ChatMessage(Role.ASSISTANT, response) ) +        except: +            pass          if not prompt_message():              break diff --git a/src/gpt_chat_cli/streaming_lexer.py b/src/gpt_chat_cli/streaming_lexer.py new file mode 100644 index 0000000..7f3da73 --- /dev/null +++ b/src/gpt_chat_cli/streaming_lexer.py @@ -0,0 +1,288 @@ +# +# SINGLE-PASS STREAMING LEXER +# --------------------------- +# +# This is a simple "streaming lexer" which is designed to provide real-time syntax highlighting. +# It consumes input from the LLM as it is produced and parses it efficiently, emitting tokens +# as soon as they are available. +# +# The language is a subset of markdown. GPT seems to sometimes emit markdown, although it is somewhat +# fickle and does not reliably emit most tokens. When responding to programming questions, it does +# prefer code blocks and code spans. Maybe this should not be surprising since programmers do tend +# to use markdown. +# +# For the design of this "streaming lexer," a subset of this markdown will be parsed. While I would like +# to expand its purview, I have limited capacity. +# +# Thus, for version 0.0.3 we will only consider fenced code blocks and code spans. Fenced code blocks will +# be passed off to pygments for syntax highlighting. Code spans will be highlighted a different color to +# differentiate them from the rest of the text. +# +# A custom lexer seems to be needed since I have not found a lexer which is able to emit tokens when they +# are guaranteed and no sooner. If this evolves, it may need to not completely follow the commonmark spec. +# The recommended algorithm to parse markdown follows a two-pass algorithm. Regardless, the goals of the +# syntax highlighting may differ from full parsing since it is highlighting the textual content rather +# than converting the markdown to HTML. +# +# Currently, the design is one which emits two kinds of tokens "text" and "delimiters." Text unsurprisingly +# contains text which can be displayed or passed on to another parser. The "delimiters" are meant to control +# style. Currently, all delimiters are parsed left-to-right. When the lexer emits a "beginning" delimiter, +# it must terminate it with an "end" delimiter. +# +# Here is an example: +# [PARAGRAPH BEGIN][TEXT "Here is some" ] [TEXT " text"] [CODE_SPAN BEGIN][TEXT "I'm in "][TEXT "the span"][CODE_SPAN END][PARAGRAPH END] +# +# Code fences "begin" block may contain an info string. This is stripped (as the markdown specification demands.) + +#################################################### +## AS OF 2023-05-06 ONLY CODE FENCES ARE IMPLEMENTED + + +from enum import Enum, auto +from dataclasses import dataclass +from typing import Optional, Tuple, Iterator, Generator +import re + +class TokenType(Enum): +    PARAGRAPH = auto() +    TEXT = auto() +    CODE_FENCE = auto() +    CODE_SPAN = auto() +    EOF = auto() + +class TokenOrientation(Enum): +    NONE = auto() +    BEGIN = auto() +    END = auto() + +@dataclass +class Token: +    type: TokenType +    orientation: TokenOrientation +    content: Optional[str] = None + +def make_text_token( s : str ) -> Token: +    return Token( +        type = TokenType.TEXT, +        orientation = TokenOrientation.NONE, +        content = s +    ) + +class MatchState(Enum): +    MATCH = auto() +    INDETERMINATE = auto() +    MISMATCH = auto() + +@dataclass +class CodeFenceContext: +    spacing : int +    info_string : str +    end : int + +def _try_to_parse_code_fence( buffer, eof=False ) -> Tuple[MatchState, Optional[CodeFenceContext]]: +    ''' +        Parse a code fence from a line boundary + +        Example: + +        ```python +          ```python +           ```python +           ~~~python + +        Invalid: +            ```python [four spaces] +        ~``python +    ''' + +    # match between zero and three spaces, followed by a grouping of ` and ~ +    fence_match = re.search(r"^( {0,3})([`~]{1,3})", buffer ) + +    if fence_match: +        fence_indicator = fence_match.group(2) + +        # Ensure there isn't a mix of ` and ~ +        if '`' in fence_indicator and '~' in fence_indicator: +            return ( MatchState.MISMATCH, None ) + +        remaining = buffer[fence_match.end():] + +        if len(fence_indicator) != 3 and len(remaining) > 0: +            return ( MatchState.MISMATCH, None ) + +        if '\n' not in remaining and not eof: +            # wait for info string to accumulate +            return ( MatchState.INDETERMINATE, None ) +        else: + +            if eof: +                info_match = re.search(r"^([^`~\n]*)(?:\n|$)", remaining ) +            else: +                info_match = re.search(r"^([^`~\n]*)\n", remaining ) + +            # info string cannot contain ^ or ~ +            if not info_match: +                # info string cannot contain ` or ~ +                return ( MatchState.MISMATCH, None ) + +            spaces = len(fence_match.group(1)) +            info_string = info_match.group( 1 ) + +            # remove extra spaces +            info_string = info_string.strip() + +            # end of match +            end = info_match.end() + fence_match.end() + +            ctx = CodeFenceContext( spaces, info_string, end ) + +            return ( MatchState.MATCH, ctx ) +    else: +        return ( MatchState.MISMATCH, None ) + +class SinglePassStreamingLexer( object ): +    _buffer : str +    _line_start : bool +    _eof : bool + +    # "leaf" blocks +    _in_code_fence : bool +    _code_fence_spaces : int +    _in_paragraph : bool + +    def __init__( self : "SinglePassStreamingLexer" ): +        self._buffer = '' +        self._line_start = True + +        self._in_code_fence = False +        self._code_fence_spaces = 0 +        self._in_paragraph = False +        self._eof = False + +    def add_chunk( self : "SinglePassStreamingLexer", new_chunk : str ): +        self._buffer += new_chunk + +    def finish( self : "SinglePassStreamingLexer" ): +        self._eof = True + +    def _take_chunk( self : "SinglePassStreamingLexer", amount : int ): + +        chunk = self._buffer[ : amount ] +        self._buffer = self._buffer[ amount : ] + +        return chunk + +    def _tokenize_text_until_newline( self : "SinglePassStreamingLexer" ): +        # we can take tokens until we hit a newline + +        end = self._buffer.find('\n') + +        if end == -1: +            l = len(self._buffer) + +            if l != 0: +                self._line_start = False +                return make_text_token( self._take_chunk( l ) ) +        else: +            self._line_start = True + +            return make_text_token( self._take_chunk( end + 1 ) ) + +    def parse( self : "SinglePassStreamingLexer" ) -> Generator[Token, None, None]: + +        while True: + +            if len(self._buffer) == 0: +                if self._eof: + +                    # terminate + +                    if self._in_code_fence: +                        yield Token( +                            TokenType.CODE_FENCE, +                            TokenOrientation.END +                        ) + +                        self._in_code_fence = False + +                    yield Token( +                        TokenType.EOF, +                        TokenOrientation.NONE +                    ) + +                    return +                else: +                    # Wait for more content +                    return + +            if self._line_start: + +                state, ctx = _try_to_parse_code_fence( self._buffer, eof=self._eof ) + +                if state == MatchState.INDETERMINATE and not self._eof: +                    # wait for more tokens to accumulate +                    return +                elif state == MatchState.MATCH: + +                    chunk = self._take_chunk( ctx.end ) + +                    if self._in_code_fence: +                        # closing fences cannot contain info strings +                        # consider it in the code block +                        if len(ctx.info_string) != 0: +                            yield make_text_token( chunk ) +                        else: +                            yield Token( TokenType.CODE_FENCE, TokenOrientation.END ) +                            self._in_code_fence = False +                            self._code_fence_spaces = 0 +                    else: + +                        if self._in_paragraph: +                            yield Token( TokenType.PARAGRAPH, TokenOrientation.END ) +                            self._in_paragraph = False + +                        yield Token( +                            TokenType.CODE_FENCE, +                            TokenOrientation.BEGIN, +                            content = ctx.info_string +                        ) + +                        self._code_fence_spaces = ctx.spacing +                        self._in_code_fence = True + +                    # if we get to this point, we are at the +                    # beginning of a line, restart parsing + +                    continue + +                # a mismatch occurred, but we're still at the beginning of +                # a line, emit regular text + +                # TODO: add paragraph check +                if self._in_code_fence: +                    if len(self._buffer) < self._code_fence_spaces and \ +                        not self._eof: +                        # wait for mare tokens +                        return + +                    token = self._tokenize_text_until_newline() + +                    # strip off beginning spaces +                    if token.content.startswith(' ' * self._code_fence_spaces): +                        token.content = token.content[self._code_fence_spaces:] + +                    yield token + +                    continue + +                # FALLTHROUGH: tokenize text until newline then continue + +            if self._in_code_fence: +                # tokenize text until next line +                pass + +            # otherwise, emit a paragraph in the future +            token = self._tokenize_text_until_newline() +            yield token +            continue + | 
