aboutsummaryrefslogtreecommitdiff
path: root/src/gpt_chat_cli
diff options
context:
space:
mode:
Diffstat (limited to 'src/gpt_chat_cli')
-rw-r--r--src/gpt_chat_cli/chat_colorizer.py267
-rw-r--r--src/gpt_chat_cli/gcli.py69
-rw-r--r--src/gpt_chat_cli/streaming_lexer.py288
3 files changed, 606 insertions, 18 deletions
diff --git a/src/gpt_chat_cli/chat_colorizer.py b/src/gpt_chat_cli/chat_colorizer.py
new file mode 100644
index 0000000..168a5b8
--- /dev/null
+++ b/src/gpt_chat_cli/chat_colorizer.py
@@ -0,0 +1,267 @@
+from pygments import highlight
+from pygments.lexer import Lexer
+from pygments.formatter import Formatter
+from pygments.lexers import (
+ get_lexer_by_name, get_all_lexers, guess_lexer,
+ find_lexer_class_by_name
+)
+from pygments.formatters import (
+ TerminalFormatter,
+ NullFormatter
+)
+
+from dataclasses import dataclass
+from typing import Optional
+from pygments.util import ClassNotFound
+
+from .streaming_lexer import (
+ SinglePassStreamingLexer,
+ Token,
+ TokenType,
+ TokenOrientation
+)
+
+from .color import (
+ get_color_codes,
+ ColorCode,
+)
+
+# Guessing languages takes time ...
+# Assume these are our candidates since
+# they likely cover upward of 90% of
+# usage
+GUESSABLE_LANGUAGES = [
+ 'html',
+ 'python',
+ 'java',
+ 'python2',
+ 'c++',
+ 'javascript',
+ 'c#',
+ 'sql',
+ 'c',
+ 'php',
+ 'go',
+ 'swift',
+ 'kotlin',
+ 'ruby',
+ 'typescript',
+ 'scala',
+ 'r',
+ 'rust',
+ 'css',
+ 'perl',
+ 'make',
+ 'text'
+]
+
+def guess_lexer( text : str, **options ):
+ '''
+ Guess the lexer in use from GUESSABLE_LANGUAGES.
+
+ Uses very primitive heuristics and is not very good.
+ '''
+
+ best_lexer = [0.0, None]
+
+ for lexer_name in GUESSABLE_LANGUAGES:
+
+ lexer = find_lexer_class_by_name(lexer_name)
+
+ rv = lexer.analyse_text(text)
+
+ if rv == 1.0:
+ return lexer(**options)
+ if rv > best_lexer[0]:
+ best_lexer[:] = (rv, lexer)
+
+ if not best_lexer[0] or best_lexer[1] is None:
+
+ raise ClassNotFound('no lexer matching the text found')
+
+ return best_lexer[1](**options)
+
+@dataclass
+class CodefenceContext:
+ language: str
+ lexer : Lexer
+ formatter : Formatter
+ buffer : str = ''
+ eof : bool = False
+
+ def may_guess_language( self : "CodefenceContext" ):
+
+ if self.eof:
+ return True
+
+ MIN_CHARACTERS = 150
+ MIN_LINES = 2
+
+ return (
+ len(self.buffer) > MIN_CHARACTERS and
+ self.buffer.count('\n') > MIN_LINES
+ )
+
+ def get_highlighted_lines(self : "CodefenceContext"):
+
+ if self.language is None:
+ if self.may_guess_language():
+
+ lexer = guess_lexer( self.buffer )
+
+ self.language = lexer.name
+ self.lexer = lexer
+
+ else:
+ return None
+
+
+ idx = self.buffer.rfind('\n')
+
+ if idx == -1:
+ return None
+ else:
+ lines = self.buffer[:idx+1]
+ self.buffer = self.buffer[idx+1:]
+
+ highlighted = highlight(
+ lines,
+ self.lexer,
+ self.formatter,
+ )
+
+ return highlighted
+
+class ChatColorizer( object ):
+
+ lexer : SinglePassStreamingLexer
+ formatter : Formatter
+ cf_ctx : Optional[CodefenceContext]
+ color_code : ColorCode
+ text_emitted: bool
+
+ def __init__( self : "ChatColorizer", no_color = False ):
+ self.lexer = SinglePassStreamingLexer()
+
+ self.cf_ctx = None
+
+ if no_color:
+ self.formatter = NullFormatter()
+ else:
+ self.formatter = TerminalFormatter()
+
+ self.color_code = get_color_codes( no_color=no_color )
+ self.text_emitted = False
+
+ def add_chunk( self : "ChatColorizer", chunk : str ):
+ self.lexer.add_chunk( chunk )
+
+ def print( self : "ChatColorizer" ):
+
+ for token in self.lexer.parse():
+
+ if token.type == TokenType.EOF:
+ break
+
+ if token.type == TokenType.CODE_FENCE:
+
+ if not self.text_emitted:
+ print()
+ self.text_emitted = True
+
+ if token.orientation == TokenOrientation.BEGIN:
+ assert self.cf_ctx is None
+
+ lang = token.content
+
+ try:
+ lexer = get_lexer_by_name(lang)
+ except ClassNotFound:
+ # try to guess it
+ lang = None
+ lexer = None
+
+ self.cf_ctx = CodefenceContext(lang, lexer, self.formatter)
+
+ else:
+ assert self.cf_ctx is not None
+
+ self.cf_ctx.eof = True
+
+ highlighted = self.cf_ctx.get_highlighted_lines()
+
+ if highlighted:
+ print( highlighted, end='', flush=True )
+
+ self.cf_ctx = None
+
+ # Add extra \n to either side of a chunk
+ print(f'{self.color_code.WHITE}```{self.color_code.RESET}', flush=True)
+
+ continue
+
+ if self.cf_ctx:
+
+ self.cf_ctx.buffer += token.content
+ highlighted = self.cf_ctx.get_highlighted_lines()
+
+ if highlighted:
+ print( highlighted, end='', flush=True )
+
+ else:
+
+ print( f'{self.color_code.WHITE}{token.content}{self.color_code.RESET}', end='', flush=True )
+ self.text_emitted = True
+
+
+ def finish( self : "ChatColorizer" ):
+ self.lexer.finish()
+
+
+# content = '''
+# Rust code:
+
+# ```rust
+# fn main() {
+# let x = 5;
+# let y = 10;
+# let z = x + y;
+# println!("The value of z is {}", z);
+# }
+# ```
+
+# Python code:
+
+# ```python
+# x = 5
+# y = 10
+# z = x + y
+# print("The value of z is", z)
+# ```
+
+# Unknown code:
+
+# ```
+# x = 5
+# y = 10
+# z = x + y
+# print("The value of z is", z)
+# ```
+
+
+# Testing
+
+# ```python
+# x = 5
+# y = 10
+# z = x + y
+# print("The value of z is", z)
+
+# '''
+
+# highlighter = ChatColorizer()
+
+# highlighter.add_chunk(content)
+# highlighter.finish()
+
+# highlighter.print()
diff --git a/src/gpt_chat_cli/gcli.py b/src/gpt_chat_cli/gcli.py
index d904733..1c5555c 100644
--- a/src/gpt_chat_cli/gcli.py
+++ b/src/gpt_chat_cli/gcli.py
@@ -29,6 +29,9 @@ from .argparsing import (
from .version import VERSION
from .color import get_color_codes
+import datetime
+
+
###########################
#### SAVE / REPLAY ####
###########################
@@ -38,7 +41,7 @@ def create_singleton_chat_completion(
completion_args : CompletionArguments
):
- hist = [ ChatMessage( Role.USER, message ) ]
+ hist = [ get_system_message(), ChatMessage( Role.USER, message ) ]
completion = create_chat_completion(hist, completion_args)
@@ -85,6 +88,8 @@ class CumulativeResponse:
self.content += new_chunk
self.delta_content += new_chunk
+from .chat_colorizer import ChatColorizer
+
def print_streamed_response(
display_args : DisplayArguments,
completion : OpenAIChatResponseStream,
@@ -98,13 +103,17 @@ def print_streamed_response(
on until all responses have been printed.
"""
- COLOR_CODE = get_color_codes(no_color = not display_args.color)
+ no_color = not display_args.color
+
+ COLOR_CODE = get_color_codes(no_color = no_color)
adornments = display_args.adornments
cumu_responses = defaultdict(CumulativeResponse)
display_idx = 0
prompt_printed = False
+ chat_colorizer = ChatColorizer(no_color = no_color)
+
for update in completion:
for choice in update.choices:
@@ -126,10 +135,15 @@ def print_streamed_response(
print(PROMPT, end=' ', flush=True)
content = display_response.take_delta()
- print(f'{COLOR_CODE.WHITE}{content}{COLOR_CODE.RESET}',
- sep='', end='', flush=True)
+ chat_colorizer.add_chunk( content )
+
+ chat_colorizer.print()
if display_response.finish_reason is not FinishReason.NONE:
+ chat_colorizer.finish()
+ chat_colorizer.print()
+ chat_colorizer = ChatColorizer( no_color=no_color )
+
if display_idx < n_completions:
display_idx += 1
prompt_printed = False
@@ -142,6 +156,13 @@ def print_streamed_response(
if return_responses:
return [ cumu_responses[i].content for i in range(n_completions) ]
+def get_system_message():
+ current_date_time = datetime.datetime.now()
+
+ msg = f'The current date is {current_date_time}. When emitting code or producing markdown, ensure to label fenced code blocks with the language in use.'
+
+ return ChatMessage( Role.SYSTEM, msg)
+
def cmd_version():
print(f'version {VERSION}')
@@ -149,25 +170,34 @@ def cmd_list_models():
for model in list_models():
print(model)
+def enable_emacs_editing():
+ try:
+ import readline
+ # self.old_completer = readline.get_completer()
+ # readline.set_completer(self.complete)
+ # readline.parse_and_bind(self.completekey+": complete")
+ except ImportError:
+ pass
+
def cmd_interactive(args : Arguments):
+
+ enable_emacs_editing()
+
COLOR_CODE = get_color_codes(no_color = not args.display_args.color)
completion_args = args.completion_args
display_args = args.display_args
- hist = []
-
- def print_prompt():
+ hist = [ get_system_message() ]
- print(f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}]', end=' ', flush=True)
+ PROMPT = f'[{COLOR_CODE.WHITE}#{COLOR_CODE.RESET}] '
def prompt_message() -> bool:
- print_prompt()
# Control-D closes the input stream
try:
- message = input()
- except EOFError:
+ message = input( PROMPT )
+ except (EOFError, KeyboardInterrupt):
print()
return False
@@ -179,21 +209,24 @@ def cmd_interactive(args : Arguments):
print(f'Press Control-D to exit')
if args.initial_message:
- print_prompt()
- print( args.initial_message )
+ print( PROMPT, args.initial_message, sep='' )
hist.append( ChatMessage( Role.USER, args.initial_message ) )
else:
- prompt_message()
+ if not prompt_message():
+ return
while True:
completion = create_chat_completion(hist, completion_args)
- response = print_streamed_response(
- display_args, completion, 1, return_responses=True,
- )[0]
+ try:
+ response = print_streamed_response(
+ display_args, completion, 1, return_responses=True,
+ )[0]
- hist.append( ChatMessage(Role.ASSISTANT, response) )
+ hist.append( ChatMessage(Role.ASSISTANT, response) )
+ except:
+ pass
if not prompt_message():
break
diff --git a/src/gpt_chat_cli/streaming_lexer.py b/src/gpt_chat_cli/streaming_lexer.py
new file mode 100644
index 0000000..7f3da73
--- /dev/null
+++ b/src/gpt_chat_cli/streaming_lexer.py
@@ -0,0 +1,288 @@
+#
+# SINGLE-PASS STREAMING LEXER
+# ---------------------------
+#
+# This is a simple "streaming lexer" which is designed to provide real-time syntax highlighting.
+# It consumes input from the LLM as it is produced and parses it efficiently, emitting tokens
+# as soon as they are available.
+#
+# The language is a subset of markdown. GPT seems to sometimes emit markdown, although it is somewhat
+# fickle and does not reliably emit most tokens. When responding to programming questions, it does
+# prefer code blocks and code spans. Maybe this should not be surprising since programmers do tend
+# to use markdown.
+#
+# For the design of this "streaming lexer," a subset of this markdown will be parsed. While I would like
+# to expand its purview, I have limited capacity.
+#
+# Thus, for version 0.0.3 we will only consider fenced code blocks and code spans. Fenced code blocks will
+# be passed off to pygments for syntax highlighting. Code spans will be highlighted a different color to
+# differentiate them from the rest of the text.
+#
+# A custom lexer seems to be needed since I have not found a lexer which is able to emit tokens when they
+# are guaranteed and no sooner. If this evolves, it may need to not completely follow the commonmark spec.
+# The recommended algorithm to parse markdown follows a two-pass algorithm. Regardless, the goals of the
+# syntax highlighting may differ from full parsing since it is highlighting the textual content rather
+# than converting the markdown to HTML.
+#
+# Currently, the design is one which emits two kinds of tokens "text" and "delimiters." Text unsurprisingly
+# contains text which can be displayed or passed on to another parser. The "delimiters" are meant to control
+# style. Currently, all delimiters are parsed left-to-right. When the lexer emits a "beginning" delimiter,
+# it must terminate it with an "end" delimiter.
+#
+# Here is an example:
+# [PARAGRAPH BEGIN][TEXT "Here is some" ] [TEXT " text"] [CODE_SPAN BEGIN][TEXT "I'm in "][TEXT "the span"][CODE_SPAN END][PARAGRAPH END]
+#
+# Code fences "begin" block may contain an info string. This is stripped (as the markdown specification demands.)
+
+####################################################
+## AS OF 2023-05-06 ONLY CODE FENCES ARE IMPLEMENTED
+
+
+from enum import Enum, auto
+from dataclasses import dataclass
+from typing import Optional, Tuple, Iterator, Generator
+import re
+
+class TokenType(Enum):
+ PARAGRAPH = auto()
+ TEXT = auto()
+ CODE_FENCE = auto()
+ CODE_SPAN = auto()
+ EOF = auto()
+
+class TokenOrientation(Enum):
+ NONE = auto()
+ BEGIN = auto()
+ END = auto()
+
+@dataclass
+class Token:
+ type: TokenType
+ orientation: TokenOrientation
+ content: Optional[str] = None
+
+def make_text_token( s : str ) -> Token:
+ return Token(
+ type = TokenType.TEXT,
+ orientation = TokenOrientation.NONE,
+ content = s
+ )
+
+class MatchState(Enum):
+ MATCH = auto()
+ INDETERMINATE = auto()
+ MISMATCH = auto()
+
+@dataclass
+class CodeFenceContext:
+ spacing : int
+ info_string : str
+ end : int
+
+def _try_to_parse_code_fence( buffer, eof=False ) -> Tuple[MatchState, Optional[CodeFenceContext]]:
+ '''
+ Parse a code fence from a line boundary
+
+ Example:
+
+ ```python
+ ```python
+ ```python
+ ~~~python
+
+ Invalid:
+ ```python [four spaces]
+ ~``python
+ '''
+
+ # match between zero and three spaces, followed by a grouping of ` and ~
+ fence_match = re.search(r"^( {0,3})([`~]{1,3})", buffer )
+
+ if fence_match:
+ fence_indicator = fence_match.group(2)
+
+ # Ensure there isn't a mix of ` and ~
+ if '`' in fence_indicator and '~' in fence_indicator:
+ return ( MatchState.MISMATCH, None )
+
+ remaining = buffer[fence_match.end():]
+
+ if len(fence_indicator) != 3 and len(remaining) > 0:
+ return ( MatchState.MISMATCH, None )
+
+ if '\n' not in remaining and not eof:
+ # wait for info string to accumulate
+ return ( MatchState.INDETERMINATE, None )
+ else:
+
+ if eof:
+ info_match = re.search(r"^([^`~\n]*)(?:\n|$)", remaining )
+ else:
+ info_match = re.search(r"^([^`~\n]*)\n", remaining )
+
+ # info string cannot contain ^ or ~
+ if not info_match:
+ # info string cannot contain ` or ~
+ return ( MatchState.MISMATCH, None )
+
+ spaces = len(fence_match.group(1))
+ info_string = info_match.group( 1 )
+
+ # remove extra spaces
+ info_string = info_string.strip()
+
+ # end of match
+ end = info_match.end() + fence_match.end()
+
+ ctx = CodeFenceContext( spaces, info_string, end )
+
+ return ( MatchState.MATCH, ctx )
+ else:
+ return ( MatchState.MISMATCH, None )
+
+class SinglePassStreamingLexer( object ):
+ _buffer : str
+ _line_start : bool
+ _eof : bool
+
+ # "leaf" blocks
+ _in_code_fence : bool
+ _code_fence_spaces : int
+ _in_paragraph : bool
+
+ def __init__( self : "SinglePassStreamingLexer" ):
+ self._buffer = ''
+ self._line_start = True
+
+ self._in_code_fence = False
+ self._code_fence_spaces = 0
+ self._in_paragraph = False
+ self._eof = False
+
+ def add_chunk( self : "SinglePassStreamingLexer", new_chunk : str ):
+ self._buffer += new_chunk
+
+ def finish( self : "SinglePassStreamingLexer" ):
+ self._eof = True
+
+ def _take_chunk( self : "SinglePassStreamingLexer", amount : int ):
+
+ chunk = self._buffer[ : amount ]
+ self._buffer = self._buffer[ amount : ]
+
+ return chunk
+
+ def _tokenize_text_until_newline( self : "SinglePassStreamingLexer" ):
+ # we can take tokens until we hit a newline
+
+ end = self._buffer.find('\n')
+
+ if end == -1:
+ l = len(self._buffer)
+
+ if l != 0:
+ self._line_start = False
+ return make_text_token( self._take_chunk( l ) )
+ else:
+ self._line_start = True
+
+ return make_text_token( self._take_chunk( end + 1 ) )
+
+ def parse( self : "SinglePassStreamingLexer" ) -> Generator[Token, None, None]:
+
+ while True:
+
+ if len(self._buffer) == 0:
+ if self._eof:
+
+ # terminate
+
+ if self._in_code_fence:
+ yield Token(
+ TokenType.CODE_FENCE,
+ TokenOrientation.END
+ )
+
+ self._in_code_fence = False
+
+ yield Token(
+ TokenType.EOF,
+ TokenOrientation.NONE
+ )
+
+ return
+ else:
+ # Wait for more content
+ return
+
+ if self._line_start:
+
+ state, ctx = _try_to_parse_code_fence( self._buffer, eof=self._eof )
+
+ if state == MatchState.INDETERMINATE and not self._eof:
+ # wait for more tokens to accumulate
+ return
+ elif state == MatchState.MATCH:
+
+ chunk = self._take_chunk( ctx.end )
+
+ if self._in_code_fence:
+ # closing fences cannot contain info strings
+ # consider it in the code block
+ if len(ctx.info_string) != 0:
+ yield make_text_token( chunk )
+ else:
+ yield Token( TokenType.CODE_FENCE, TokenOrientation.END )
+ self._in_code_fence = False
+ self._code_fence_spaces = 0
+ else:
+
+ if self._in_paragraph:
+ yield Token( TokenType.PARAGRAPH, TokenOrientation.END )
+ self._in_paragraph = False
+
+ yield Token(
+ TokenType.CODE_FENCE,
+ TokenOrientation.BEGIN,
+ content = ctx.info_string
+ )
+
+ self._code_fence_spaces = ctx.spacing
+ self._in_code_fence = True
+
+ # if we get to this point, we are at the
+ # beginning of a line, restart parsing
+
+ continue
+
+ # a mismatch occurred, but we're still at the beginning of
+ # a line, emit regular text
+
+ # TODO: add paragraph check
+ if self._in_code_fence:
+ if len(self._buffer) < self._code_fence_spaces and \
+ not self._eof:
+ # wait for mare tokens
+ return
+
+ token = self._tokenize_text_until_newline()
+
+ # strip off beginning spaces
+ if token.content.startswith(' ' * self._code_fence_spaces):
+ token.content = token.content[self._code_fence_spaces:]
+
+ yield token
+
+ continue
+
+ # FALLTHROUGH: tokenize text until newline then continue
+
+ if self._in_code_fence:
+ # tokenize text until next line
+ pass
+
+ # otherwise, emit a paragraph in the future
+ token = self._tokenize_text_until_newline()
+ yield token
+ continue
+