aboutsummaryrefslogtreecommitdiff
path: root/src/gpt_chat_cli/cmd.py
blob: 83cd2989cbbf3dc5aef5fb10b4e6aa4fcb215c80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#!/bin/env python3

import sys
import openai
import pickle
import os
import datetime

from collections import defaultdict
from dataclasses import dataclass
from typing import Tuple, Optional

import subprocess
import tempfile
import os

from .openai_wrappers import (
    create_chat_completion,
    OpenAIChatResponse,
    OpenAIChatResponseStream,
    FinishReason,
    Role,
    ChatMessage
)

from .openai_wrappers import list_models as openai_list_models

from .argvalidation import (
    Arguments,
    DisplayArguments,
    CompletionArguments,
    DebugArguments,
    MessageSource
)

from .version import VERSION
from .color import (
    get_color_codes,
    surround_ansi_escapes
)

from .chat_colorizer import ChatColorizer
from .prompt import Prompter

###########################
####      UTILS        ####
###########################

def resolve_initial_message(src: MessageSource, interactive=False) -> str:
    msg = None

    if src.message:
        msg = src.message
    elif src.prompt_from_fd:
        with os.fdopen(src.prompt_from_fd, "r") as f:
            msg = f.read()
    elif src.prompt_from_file:
        with open(src.prompt_from_file, "r") as f:
            msg = f.read()
    elif not interactive:
        msg = sys.stdin.read()

    return msg

def get_system_message(system_message : Optional[str]):

    if not system_message:

        current_date_time = datetime.datetime.now()

        system_message = f'The current date is {current_date_time}. When emitting code or producing markdown, ensure to label fenced code blocks with the language in use.'

    return ChatMessage(Role.SYSTEM, system_message)

def _resolve_system_editor() -> Optional[str]:
    '''
        Attempt to resolve the system if one is not explicitly specified.
        This is either the editor stored in the EDITOR environmental variable
        or one of editor, vim, emacs, vi, or nano.
    '''

    fallback_editors = [
        "editor", # debian default
        "vim",
        "emacs",
        "vi",
        "nano",
    ]

    editor = os.getenv("EDITOR")

    if editor:
        return editor

    paths = os.getenv("PATH")

    if not paths:
        return None

    for editor in fallback_editors:
        for path in paths.split(os.pathsep):
            if os.path.exists(os.path.join(path, editor)):
                return editor

    return None

def _launch_interactive_editor(editor : Optional[str] = None) -> str:

    with tempfile.NamedTemporaryFile(suffix="gpt.msg") as tmp_file:

        editor = editor or _resolve_system_editor()

        try:
            subprocess.call([editor, tmp_file.name])
        except FileNotFoundError:
            print(f'error: the specified editor \"{editor}\" does not exist')
            sys.exit(1)

        # Read the resulting file into a string
        with open(tmp_file.name, "r") as edited_file:
            edited_content = edited_file.read()

        return edited_content


###########################
####   SAVE / REPLAY   ####
###########################

@dataclass
class CompletionContext:
    message: str
    completion_args: CompletionArguments
    system_message: Optional[str] = None

def create_singleton_chat_completion(ctx : CompletionContext):

    hist = [
        get_system_message(ctx.system_message),
        ChatMessage(Role.USER, ctx.message)
    ]

    completion = create_chat_completion(hist, ctx.completion_args)

    return completion

def save_response_and_arguments(args : Arguments) -> None:

    message = resolve_initial_message(args.initial_message)

    ctx = CompletionContext(
        message=message,
        completion_args=args.completion_args,
        system_message=args.system_message
    )

    completion = create_singleton_chat_completion(
        message,
        args.completion_args,
        args.system_message,
    )

    completion = list(completion)

    filename = args.debug_args.save_response_to_file

    with open(filename, 'wb') as f:
        pickle.dump((ctx, completion,), f)

def load_response_and_arguments(args : Arguments) \
        -> Tuple[CompletionContext, OpenAIChatResponseStream]:

    filename = args.debug_args.load_response_from_file

    with open(filename, 'rb') as f:
        ctx, completion = pickle.load(f)

    return (ctx, completion)

#########################
#### PRETTY PRINTING ####
#########################

@dataclass
class CumulativeResponse:
    delta_content: str = ""
    finish_reason: FinishReason = FinishReason.NONE
    content: str = ""

    def take_delta(self : "CumulativeResponse"):
        chunk = self.delta_content
        self.delta_content = ""
        return chunk

    def add_content(self : "CumulativeResponse", new_chunk : str):
        self.content += new_chunk
        self.delta_content += new_chunk

def print_streamed_response(
        display_args : DisplayArguments,
        completion : OpenAIChatResponseStream,
        n_completions : int,
        return_responses : bool = False
    ) -> None:
    """
    Print the response in real time by printing the deltas as they occur. If multiple responses
    are requested, print the first in real-time, accumulating the others in the background. One the
    first response completes, move on to the second response printing the deltas in real time. Continue
    on until all responses have been printed.
    """

    no_color = not display_args.color

    COLOR_CODE = get_color_codes(no_color = no_color)
    adornments = display_args.adornments

    cumu_responses = defaultdict(CumulativeResponse)
    display_idx = 0
    prompt_printed = False

    chat_colorizer = ChatColorizer(no_color = no_color)

    for update in completion:

        for choice in update.choices:
            delta = choice.delta

            if delta.content:
                cumu_responses[choice.index].add_content(delta.content)

            if choice.finish_reason is not FinishReason.NONE:
                cumu_responses[choice.index].finish_reason = choice.finish_reason

        display_response = cumu_responses[display_idx]

        if not prompt_printed and adornments:
            res_indicator = '' if n_completions == 1 else \
                    f' {display_idx + 1}/{n_completions}'
            PROMPT = f'[{COLOR_CODE.GREEN}{update.model}{COLOR_CODE.RESET}{COLOR_CODE.RED}{res_indicator}{COLOR_CODE.RESET}]'
            prompt_printed = True
            print(PROMPT, end=' ', flush=True)

        content = display_response.take_delta()
        chat_colorizer.add_chunk( content )

        chat_colorizer.print()

        if display_response.finish_reason is not FinishReason.NONE:
            chat_colorizer.finish()
            chat_colorizer.print()
            chat_colorizer = ChatColorizer( no_color=no_color )

            if display_idx < n_completions:
                display_idx += 1
                prompt_printed = False

            if adornments:
                print(end='\n\n', flush=True)
            else:
                print(end='\n', flush=True)

    if return_responses:
        return [ cumu_responses[i].content for i in range(n_completions) ]

#########################
####    COMMANDS     ####
#########################

def version():
    print(f'version {VERSION}')

def list_models():
    for model in openai_list_models():
        print(model)

def interactive(args : Arguments):

    completion_args = args.completion_args
    display_args = args.display_args

    system_message = get_system_message(args.system_message)
    interactive_editor = args.interactive_editor

    hist = [ system_message ]

    no_color = not display_args.color

    prompter = Prompter(no_color = no_color)

    CLEAR = prompter.add_command('clear', 'clears the context (excluding the system message)')
    EDIT = prompter.add_command('edit', 'launches a terminal editor')
    EXIT = prompter.add_command('exit', 'exit the terminal')
    HELP = prompter.add_command('help', 'print all interactive commands')

    print(f'GPT Chat CLI version {VERSION}')
    print(f'Press Control-D to exit')

    initial_message = resolve_initial_message(args.initial_message, interactive=True)

    with prompter as prompt:
        if initial_message:
            print( prompt.prompt, initial_message, sep='', flush=True )

        while True:
            try:
                if initial_message:
                    cmd, args = None, initial_message
                    initial_message = None
                else:
                    cmd, args = prompt.input()

                if cmd == CLEAR:
                    hist = [ system_message ]
                    continue
                elif cmd == EDIT:
                    message = _launch_interactive_editor(
                        editor=interactive_editor
                    )
                    print( prompt.prompt, end='')
                    print( message, end='' )
                elif cmd == EXIT:
                    return
                elif cmd == HELP:
                    prompt.print_help()
                    continue
                else:
                    message = args

                if message == '':
                    continue

                hist.append( ChatMessage( Role.USER, message ) )

                completion = create_chat_completion(hist, completion_args)

                response = print_streamed_response(
                    display_args, completion, 1, return_responses=True,
                )[0]

                hist.append( ChatMessage(Role.ASSISTANT, response) )
            except KeyboardInterrupt: # Skip to next prompt
                print()
                continue
            except EOFError: # Exit on Control-D
                print()
                sys.exit(1)

def singleton(args: Arguments):
    completion_args = args.completion_args

    debug_args : DebugArguments = args.debug_args
    message = args.initial_message

    if debug_args.save_response_to_file:
        save_response_and_arguments(args)
        return
    elif debug_args.load_response_from_file:
        ctx, completion = load_response_and_arguments(args)

        message = ctx.message
        completion_args = ctx.completion_args
    else:
        # message is only None is a TTY is not attached
        message = resolve_initial_message(args.initial_message)

        ctx = CompletionContext(
            message=message,
            completion_args=completion_args,
            system_message=args.system_message
        )

        completion = create_singleton_chat_completion(ctx)

    print_streamed_response(
        args.display_args,
        completion,
        completion_args.n_completions
    )