Source code for pypsi.shell

#
# Copyright (c) 2015, Adam Meily <meily.adam@gmail.com>
# Pypsi - https://github.com/ameily/pypsi
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

from pypsi.cmdline import (StatementParser, StatementSyntaxError,
                           IORedirectionError, CommandNotFoundError,
                           StringToken, OperatorToken, WhitespaceToken,
                           UnclosedQuotationError, TrailingEscapeError)

from pypsi.namespace import Namespace
from pypsi.completers import path_completer
from pypsi.os import is_path_prefix
from pypsi.ansi import AnsiCodes
from pypsi.features import BashFeatures, TabCompletionFeatures
from pypsi.core import pypsi_print, Plugin, Command
from pypsi.pipes import ThreadLocalStream, InvocationThread
from pypsi.utils import escape_string
import readline
import sys
import os


[docs]class Shell(object): ''' The command line interface that the user interacts with. All shell's need to inherit this base class. ''' def __init__(self, shell_name='pypsi', width=79, exit_rc=-1024, ctx=None, features=None): ''' Subclasses need to call the Shell constructor to properly initialize it. :param str shell_name: the name of the shell; used in error messages :param int exit_rc: the exit return code that is returned from a command when the shell needs to end execution :param pypsi.namespace.Namespace ctx: the base context ''' self.backup_stdout = None self.backup_stdin = None self.backup_stderr = None self.backup_print = None self.width = width self.shell_name = shell_name self.exit_rc = exit_rc self.errno = 0 self.commands = {} self.preprocessors = [] self.postprocessors = [] self.plugins = [] self.prompt = "{name} )> ".format(name=shell_name) self.ctx = ctx or Namespace() self.features = features or BashFeatures() self.default_cmd = None self.register_base_plugins() self.fallback_cmd = None self.eof_is_sigint = False self._backup_completer = readline.get_completer() self.bootstrap() self.on_shell_ready() def bootstrap(self): import builtins if not isinstance(sys.stdout, ThreadLocalStream): self.backup_stdout = sys.stdout sys.stdout = ThreadLocalStream(sys.stdout, width=self.width) if not isinstance(sys.stderr, ThreadLocalStream): self.backup_stderr = sys.stderr sys.stderr = ThreadLocalStream(sys.stderr, width=self.width) if not isinstance(sys.stdin, ThreadLocalStream): self.backup_stdin = sys.stdin sys.stdin = ThreadLocalStream(sys.stdin) if builtins.print != pypsi_print: self.backup_print = print builtins.print = pypsi_print def restore(self): if self.backup_stdout: sys.stdout = self.backup_stdout if self.backup_stderr: sys.stderr = self.backup_stderr if self.backup_stdin: sys.stdin = self.backup_stdin if self.backup_print: import builtins builtins.print = self.backup_print
[docs] def register_base_plugins(self): ''' Register all base plugins that are defined. ''' cls = self.__class__ for name in dir(cls): attr = getattr(cls, name) if isinstance(attr, Command) or isinstance(attr, Plugin): self.register(attr)
[docs] def register(self, obj): ''' Register a :class:`~pypsi.core.Command` or a :class:`~pypsi.core.Plugin`. ''' if isinstance(obj, Command): self.commands[obj.name] = obj if isinstance(obj, Plugin): self.plugins.append(obj) if obj.preprocess is not None: self.preprocessors.append(obj) self.preprocessors = sorted(self.preprocessors, key=lambda x: x.preprocess) if obj.postprocess is not None: self.postprocessors.append(obj) self.postprocessors = sorted(self.postprocessors, key=lambda x: x.postprocess) obj.setup(self) return 0
[docs] def on_shell_ready(self): ''' Hook that is called after the shell has been created. ''' return 0
[docs] def on_cmdloop_begin(self): ''' Hook that is called once the :meth:`cmdloop` function is called. ''' return 0
[docs] def on_cmdloop_end(self): ''' Hook that is called once the :meth:`cmdloop` has ended. ''' return 0
def get_current_prompt(self): if callable(self.prompt): prompt = self.prompt() else: prompt = self.prompt return self.preprocess_single(prompt, 'prompt') def set_readline_completer(self): if readline.get_completer() != self.complete: readline.parse_and_bind("tab: complete") self._backup_completer = readline.get_completer() readline.set_completer(self.complete) def reset_readline_completer(self): if readline.get_completer() == self.complete: readline.set_completer(self._backup_completer) def on_input_canceled(self): for pp in self.preprocessors: pp.on_input_canceled(self) def on_tokenize(self, tokens, origin): for pp in self.preprocessors: tokens = pp.on_tokenize(self, tokens, origin) if not tokens: break return tokens
[docs] def include(self, file): ''' Read commands from a file and execute them line by line :param file file: File object to read commands from :return int: 0 if error free; 1 if an error occurred ''' rc = 1 eof = False # set STDIN to the file stdin = sys.stdin._get_target() sys.stdin._proxy(file) try: while self.running and not eof: raw = input().replace('\r', '') rc = None try: rc = self.execute(raw) rc = rc or 0 except SystemExit as e: rc = e.code eof = True finally: if rc is not None: self.errno = rc for pp in self.postprocessors: pp.on_statement_finished(self, rc) except EOFError: print() self.on_input_canceled() rc = 0 except KeyboardInterrupt: print() self.on_input_canceled() finally: # Reset stdin to a tty sys.stdin._proxy(stdin) return rc
[docs] def cmdloop(self): ''' Begin the input processing loop where the user will be prompted for input. ''' self.running = True self.set_readline_completer() self.on_cmdloop_begin() rc = 0 try: while self.running: try: raw = input(self.get_current_prompt()) except EOFError: print() self.on_input_canceled() if not self.features.eof_is_sigint: self.running = False print("exiting....") except KeyboardInterrupt: print() self.on_input_canceled() else: rc = None try: rc = self.execute(raw) rc = rc or 0 except SystemExit as e: rc = e.code print("exiting....") self.running = False except KeyboardInterrupt: # Bash returns 130 if a command was interrupted rc = None print() except EOFError: # Bash returns 1 for Ctrl+D rc = None print() finally: if rc is not None: self.errno = rc for pp in self.postprocessors: pp.on_statement_finished(self, rc) finally: self.on_cmdloop_end() self.reset_readline_completer() return rc
def error(self, msg): print( AnsiCodes.red, self.shell_name, ": ", msg, AnsiCodes.reset, file=sys.stderr, sep='' ) def mkpipe(self): r, w = os.pipe() return ( os.fdopen(r, 'r'), os.fdopen(w, 'w') )
[docs] def execute(self, raw): ''' Parse and execute a statement. :param str raw: the raw command line to parse. :param function input: a function that returns a string, overrides default input function (stdin). :returns int: the return code of the statement. ''' parser = StatementParser(self.features) input_complete = False while not input_complete: text = self.preprocess(raw, 'input') if text is None: return None try: tokens = parser.tokenize(text) except (UnclosedQuotationError, TrailingEscapeError): input_complete = False else: # Parsing succeeded, break out of the input loop input_complete = True if not input_complete: # This is a multiline input try: # hide prompt if reading from a file raw = input("> " if sys.stdin.isatty() else '') except (EOFError, KeyboardInterrupt) as e: self.on_input_canceled() raise e tokens = self.on_tokenize(tokens, 'input') statement = None if not tokens: return None try: statement = parser.build(tokens) except StatementSyntaxError as e: self.error(str(e)) return 1 rc = None if not statement: # The line was empty, a comment, or just contained whitespace. return rc # Setup the invocations for invoke in statement: try: # Open any and all I/O redirections and resolve the pypsi # command. invoke.setup(self) except Exception as e: for sub in statement: sub.close_streams() if isinstance(e, (IORedirectionError, CommandNotFoundError)): # pypsi can handle I/O redirection and command not found # errors, these are not fatal. self.error(str(e)) return -1 else: # Unhandled fatal exception, re-raise it raise # Current pipe being built pipe = [] # Process the statement for invoke in statement: if invoke.chain_pipe(): # We are in a pipe pipe.append(invoke) else: # We are not in a pipe if pipe: # We have a pipe built that needs to be executed. # Create the invocation threads for the pipe. threads, stdin = self.create_pipe_threads(pipe) # Reset the building pipe pipe = [] # Set the current invocation's stdin to the last # invocation's stdout. invoke.stdin = stdin else: # We were not in a pipe threads = [] # Start all the pipe threads, if we are processing a pipe for t in threads: t.start() # Execute the invocation in the current thread. try: rc = invoke(self) except Exception as e: # Unhandled exception, stop all threads if any are running. for t in threads: t.stop() # Wait for threads to terminate. try: for t in threads: t.join() except: # Something went wrong or a KeyboardInterrupt was # issued. Stop waiting for threads to terminate. pass # Print thread-specific unhandled exceptions. for t in threads: if t.exc_info: if t.exc_info[0] == OSError: msg = t.exc_info[1].strerror else: msg = str(t.exc_info[1]) print( AnsiCodes.red, t.invoke.name, ": ", msg, AnsiCodes.reset, sep='' ) if isinstance(e, KeyboardInterrupt): # Ctrl+c was entered print() rc = -1 elif isinstance(e, SystemExit): # The command is requesting to exit the shell. rc = e.code print("exiting....") self.running = False elif isinstance(e, RuntimeError): # The command was aborted by a generic exception. self.error("command aborted: "+str(e)) rc = -1 else: # Unhandled fatal exception, re-raise it raise self.errno = rc # Check if the statement's next invocation be executed. if not invoke.should_continue(rc): break return rc
[docs] def create_pipe_threads(self, pipe): ''' Given a pipe (list of :class:`~pypsi.cmdline.CommandInvocation` objects) create a thread to execute for each invocation. :returns tuple: a tuple containing the list of threads (:class:`~pypsi.pipes.CommandThread`) and the last invocation's stdout stream. ''' threads = [] stdin = None for invoke in pipe: next_stdin, stdout = self.mkpipe() t = InvocationThread(self, invoke, stdin=stdin, stdout=stdout) threads.append(t) stdin = next_stdin return threads, stdin
def preprocess(self, raw, origin): for pp in self.preprocessors: raw = pp.on_input(self, raw) if raw is None: break return raw def preprocess_single(self, raw, origin): tokens = self.on_tokenize([StringToken(0, raw, quote='"')], origin) if tokens: parser = StatementParser(self.features) parser.clean_escapes(tokens) ret = '' for token in tokens: ret += token.text return ret return '' def _clean_completions(self, completions, quotation): escape_char = self.features.escape_char if len(completions) == 1: if escape_char: if quotation: # We are quotes. Escape items with the same quotations and # the escape character itself completions = [ escape_string(entry, escape_char, quotation, False) for entry in completions ] else: # We are not in quotes. Escape whitespace and the escape # character completions = [ escape_string(entry, escape_char) for entry in completions ] # Entries that end in a null byte, \0, need to close the current # quotation or add whitespace so that further tab completions don't # return the same result. completions = [ entry[:-1] + (quotation or ' ') if entry.endswith('\0') else entry for entry in completions ] return completions def get_completions(self, line, prefix): try: parser = StatementParser(TabCompletionFeatures(self.features)) tokens = parser.tokenize(line) parser.clean_escapes(tokens) cmd_name = "" loc = None args = [] next_arg = True ret = [] in_quote = None for token in tokens: if isinstance(token, StringToken): in_quote = token.quote if token.open_quote else None if not cmd_name: cmd_name = token.text loc = 'name' elif loc == 'name': cmd_name += token.text else: if next_arg: args.append(token.text) next_arg = False else: args[-1] += token.text elif isinstance(token, OperatorToken): in_quote = None if token.operator in ('|', ';', '&&', '||'): cmd_name = None args = [] next_arg = True elif token.operator in ('>', '<', '>>'): loc = 'path' args = [] elif isinstance(token, WhitespaceToken): in_quote = None if loc == 'name': loc = None next_arg = True if loc == 'path': ret = path_completer(''.join(args)) elif not cmd_name or loc == 'name': if is_path_prefix(cmd_name): ret = path_completer(cmd_name) else: ret = self.get_command_name_completions(cmd_name) else: if cmd_name not in self.commands: ret = [] else: if next_arg: args.append('') cmd = self.commands[cmd_name] ret = cmd.complete(self, args, prefix) ret = self._clean_completions(ret, in_quote) except: pass return ret def get_command_name_completions(self, prefix): return [cmd for cmd in self.commands if cmd.startswith(prefix)] def complete(self, text, state): if state == 0: self.completion_matches = [] begidx = readline.get_begidx() endidx = readline.get_endidx() line = readline.get_line_buffer() prefix = line[begidx:endidx] if line else '' line = line[:endidx] self.completion_matches = self.get_completions(line, prefix) if state < len(self.completion_matches): return self.completion_matches[state] return None def print_completion_matches(self, substitution, matches, max_len): print("substitution:", substitution) print("matches: ", matches) print("max_len: ", max_len)