Source code for pypsi.cmdline

# Copyright (c) 2015, Adam Meily <>
# Pypsi -
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.

Classes used for parsing user input.

import sys
from pypsi.utils import safe_open

__all__ = (
    'Token', 'StringToken', 'OperatorToken', 'WhitespaceToken',
    'IORedirectionError', 'StatementParser', 'StatementSyntaxError',
    'CommandNotFoundError', 'CommandInvocation', 'Expression', 'Statement',

#: The token accepts more characters.
TokenContinue = 0

#: The token does not accept the current chracter.
TokenEnd = 1

#: The token is finished and the current chracter should not be processed
#: again.
TokenTerm = 2

[docs]class Token(object): ''' Base class for all tokens. ''' def __init__(self, index, features=None): ''' :param int index: the starting index of this token ''' self.index = index self.features = None
[docs]class WhitespaceToken(Token): ''' Whitespace token that can contain any number of whitespace characters. ''' def __init__(self, index, c=' ', features=None): super(WhitespaceToken, self).__init__(index, features) self.text = c
[docs] def add_char(self, c): ''' Add a character to this token. :param str c: the current character :returns int: TokenEnd or TokenContinue ''' if c in (' ', '\t', '\xa0'): self.text += c return TokenContinue else: return TokenEnd
def __str__(self): return "WhitespaceToken( {} )".format(self.text) def __eq__(self, other): return isinstance(other, WhitespaceToken)
[docs]class StringToken(Token): ''' A string token. This token may be bound by matching quotes and/or contain escaped whitespace characters. ''' def __init__(self, index, c, quote=None, features=None): ''' :param str c: the current string or character :param str quote: the surrounding quotes, `None` if there isn't any ''' super(StringToken, self).__init__(index, features) self.quote = quote self._escape_char = features.escape_char if features else '' self.escape = False self.text = '' self.open_quote = False if c in ('"', "'"): self.quote = c self.open_quote = True elif self._escape_char and c == self._escape_char: self.escape = True elif c: self.text += c
[docs] def add_char(self, c): ''' Add a character to this token. :param str c: the current character :returns int: TokenEnd or TokenContinue ''' ret = TokenContinue if self.escape: self.escape = False if self.quote: if c == self.quote: self.text += c elif c is self._escape_char: self.text += self._escape_char + self._escape_char else: self.text += self._escape_char self.text += c elif c in (' ', '\t', "'", "\"") or c in OperatorToken.Operators: self.text += c else: self.text += self._escape_char self.text += c elif self.quote: if c == self.quote: ret = TokenTerm self.open_quote = False elif c == self._escape_char: self.escape = True else: self.text += c else: if c == self._escape_char: self.escape = True elif c in (' ', '\t', ';', '|', '&', '>', '<', '\xa0'): ret = TokenEnd elif c in ('"', "'"): ret = TokenEnd else: self.text += c return ret
def combine_token(self, token): self.text += token.text self.open_quote = token.open_quote self.escape = token.escape self.quote = token.quote def __str__(self): return "String( {quote}{text}{quote}, {open_quote}, {escape} )".format( quote=self.quote or '', text=self.text, open_quote=self.open_quote, escape=self.escape ) def __eq__(self, other): return ( isinstance(other, StringToken) and self.quote == other.quote and self.text == other.text and self.open_quote == other.open_quote and self.escape == other.escape )
[docs]class OperatorToken(Token): ''' An operator token. An operator can consist of one or more repetitions of the same operator character. For example, the string ">>" would be parsed as one `OperatorToken`, whereas the string "<>" would be parsed as two separate `OperatorToken` objects. ''' #: Valid operator characters Operators = '<>|&;' def __init__(self, index, operator): ''' :param str operator: the operator ''' super(OperatorToken, self).__init__(index) self.operator = operator
[docs] def add_char(self, c): ''' Add a character to this token. :param str c: the current character :returns int: TokenEnd or TokenContinue ''' if c == self.operator[0]: self.operator += c return TokenContinue return TokenEnd
def __str__(self): return "OperatorToken( {} )".format(self.operator) def is_chain_operator(self): for c in self.operator: if c in ('>', '<'): return False return True def __eq__(self, other): return ( isinstance(other, OperatorToken) and self.operator == other.operator )
class IORedirectionError(Exception): def __init__(self, path, message): self.path = path self.message = message def __str__(self): return "{}: {}".format(self.path, self.message)
[docs]class CommandNotFoundError(Exception): ''' The specified command does not exist. ''' def __init__(self, name): #: the command name = name def __str__(self): return + ": command not found"
[docs]class Statement(object): ''' A parsed statement containing a list of :class:`CommandInvocation` instances. ''' def __init__(self, invokes=None): ''' :param list[CommandInvocation] invokes: list of command invocations ''' self.invokes = invokes or []
[docs] def append(self, invoke): ''' Append a new command invocation. :param CommandInvocation invoke: parsed command invocation ''' self.invokes.append(invoke)
[docs] def __iter__(self): ''' :returns: an iterator for the :class:`CommandInvocation` instances ''' return iter(self.invokes)
def __nonzero__(self): ''' :returns bool: :const:`True` if there is at least 1 parsed command invocation, :const:`False` otherwise ''' return len(self.invokes) > 0
[docs] def __len__(self): ''' :returns int: the number of command invocations ''' return len(self.invokes)
def __eq__(self, other): return isinstance(other, Statement) and self.invokes == other.invokes
[docs]class CommandInvocation(object): ''' An invocation of a command. ''' def __init__(self, name, args=None, stdout=None, stderr=None, stdin=None, chain=None): #: Command name = name #: List of command arguments self.args = args or [] #: stdout redirection self.stdout = stdout #: stderr redirection self.stderr = stderr #: stderr redirection self.stdin = stdin #: The chain operator, if any is specified: &&, ||, |, ;. self.chain = chain #: The resolved pypsi command (:class:`~pypsi.core.Command`) self.cmd = None #: The fallback command to use if :attr:`cmd` is :const:`None`. self.fallback_cmd = None def __eq__(self, other): return ( isinstance(other, CommandInvocation) and == and self.args == other.args and self.stdout == other.stdout and self.stderr == other.stderr and self.stdin == other.stdin and self.chain == other.chain and self.cmd == other.cmd and self.fallback_cmd == other.fallback_cmd ) def __str__(self): s = "{name} {args}".format(, ' '.join(self.args)) if self.stdout: if isinstance(self.stdout, tuple) and self.stdout[1] == 'a': s += " >> " + self.stdout[0] else: s += " > " + self.stdout if self.stdin: s += " < " + self.stdin if self.chain: s += " " + self.chain return s
[docs] def setup(self, shell): ''' Retrieve the Pypsi command to execute and setup the streams for stdout, stderr, and stdin depending on whether I/O redirection is being performed. :raises CommandNotFoundError: command specified does not exist :raises IORedirectionError: I/O redirection error occurred ''' if in shell.commands: self.cmd = shell.commands[] elif shell.fallback_cmd: self.fallback_cmd = shell.fallback_cmd else: raise CommandNotFoundError( try: self.stdout = self.get_output(self.stdout) self.stderr = self.get_output(self.stdout) self.stdin = self.get_input(self.stdin) except: self.close_streams() raise
[docs] def get_output(self, output): ''' Open an output stream, if specified. :returns file: the stream opened for writting if specified, otherwise const:`None`. ''' if isinstance(output, tuple): # output is a tuple of (path, mode) path, mode = output ret = self.get_stream(path, mode) elif isinstance(output, str): # output is a path ret = self.get_stream(output, 'w') else: # output is either None or an existing open stream ret = output return ret
[docs] def get_input(self, stream): ''' Open an input stream, if specified. :returns file: the stream opened for reading if specified, otherwise :const:`None`. ''' if isinstance(stream, str): ret = self.get_stream(stream, 'r', safe=True) else: ret = stream return ret
[docs] def get_stream(self, path, mode, safe=False): ''' Open a file path. :param str path: file path :param str mode: file open mode :param bool safe: whether to use :meth:`pypsi.util.safe_open` instead of the standard :meth:`open` method. :raises IORedirectionError: stream could not be opened ''' func = safe_open if safe else open try: fp = func(path, mode=mode) except OSError as e: raise IORedirectionError(path, e.strerror) except Exception as e: raise IORedirectionError(path, str(e)) else: return fp
[docs] def close_streams(self): ''' Close all streams that were opened. ''' for fp in (self.stdout, self.stderr, self.stdin): if fp and hasattr(fp, 'close') and not fp.closed: fp.close()
[docs] def setup_io(self): ''' Setup stdout, stderr, and stdin by proxying the thread local streams. ''' if self.stdout: sys.stdout._proxy(self.stdout) if self.stderr: sys.stderr._proxy(self.stderr) if self.stdin: sys.stdin._proxy(self.stdin)
[docs] def cleanup_io(self): ''' Close proxied streams and unproxy them. ''' self.close_streams() if self.stdout: sys.stdout._unproxy() if self.stderr: sys.stderr._unproxy() if self.stdin: sys.stdin._unproxy()
[docs] def chain_and(self): ''' :returns: :const:`True` if the chain operator is AND (&&) ''' return self.chain == '&&'
[docs] def chain_or(self): ''' :returns: :const:`True` if the chain operator is OR (||) ''' return self.chain == '||'
[docs] def chain_uncond(self): ''' :returns: :const:`True` if the chain operator is UNCONDITIONAL (;) ''' return self.chain == ';'
[docs] def chain_pipe(self): ''' :returns: :const:`True` if the chain operator is PIPE (|) ''' return self.chain == '|'
[docs] def __call__(self, shell): ''' Invoke the command by proxying streams, running the command, and cleaning the resetting streams. :returns: the commnd's return code. ''' self.setup_io() try: if self.fallback_cmd: rc = self.fallback_cmd.fallback(shell,, self.args) else: rc =, self.args) finally: self.cleanup_io() return rc
[docs] def should_continue(self, prev_rc): ''' :returns: whether this invocation is chained and, using the previous invocation's return code, determine if the next command in the chain should be executed. ''' return ( not self.chain or ( self.chain_uncond() or (self.chain_or() and prev_rc is not 0) or (self.chain_and() and prev_rc is 0) ) )
[docs]class StatementSyntaxError(SyntaxError): ''' Invalid statement syntax was entered. ''' def __init__(self, message, index): ''' :param str message: error message :param int index: index in the statement that caused the error ''' self.message = message self.index = index def __str__(self): return "syntax error at {}: {}".format(self.index, self.message)
class UnclosedQuotationError(StatementSyntaxError): ''' The final token contains an unclosed quotation. ''' def __init__(self, index): super(UnclosedQuotationError, self).__init__("unclosed quotation", index) class TrailingEscapeError(StatementSyntaxError): ''' The final token ends with an escape character. ''' def __init__(self, index): super(TrailingEscapeError, self).__init__("trailing escape character", index)
[docs]class StatementParser(object): ''' Parses raw user input into a :class:`Statement`. ''' def __init__(self, features=None): self.features = features self.tokens = [] self.token = None
[docs] def process(self, index, c): ''' Process a single character of input. :param int index: current character index :param str c: current character ''' if self.token: action = self.token.add_char(c) if action == TokenEnd: self.tokens.append(self.token) self.token = None self.process(index, c) elif action == TokenTerm: self.tokens.append(self.token) self.token = None else: pass else: if c in (' ', '\t', '\xa0'): self.token = WhitespaceToken(index) elif c in ('>', '<', '|', '&', ';'): self.token = OperatorToken(index, c) else: self.token = StringToken(index, c, features=self.features)
[docs] def tokenize(self, line): ''' Transform a `str` into a `list` of :class:`Token` objects. :param str line: the line of text to tokenize :returns: `list` of :class:`Token` objects ''' index = 0 for c in line: self.process(index, c) index += 1 if self.token and self.features: if isinstance(self.token, StringToken): if self.token.escape: if self.features and self.features.multiline: # The last character in the input was an escape and the # shell supports multiline input. Switch back to the # shell to allow further input. raise TrailingEscapeError(self.token.index) elif self.features: self.token.text += self.features.escape_char self.token.escape = False elif self.token.open_quote: if self.features and self.features.multiline: # The last token is an unclosed quotation and the shell # supports multiline line input. Switch back to the # shell to allow further input. # # We add a newline to the last text token since we are # in a quoted string. self.token.text += '\n' raise UnclosedQuotationError(self.token.index) self.tokens.append(self.token) elif self.token: self.tokens.append(self.token) return self.tokens
[docs] def clean_escapes(self, tokens): ''' Remove all escape sequences. :param list tokens: :class:`Token` objects to remove escape sequences ''' escape_char = self.features.escape_char if self.features else '' for token in tokens: if not isinstance(token, StringToken) or ( escape_char not in token.text or token.quote): continue text = '' escape = False for c in token.text: if escape: text += c escape = False elif c == escape_char: escape = True else: text += c if escape: text += escape_char token.text = text
[docs] def condense(self, tokens): ''' Condenses sequential like :class:`Token` objects into a single :class:`Token` of the same type. For example, two sequential :class:`StringToken` objects will be concatenated into a single :class:`StringToken`. :param list tokens: :class:`Token` objects to condense :returns: condensed `list` of :class:`Token` objects ''' prev = None condensed = [] for token in tokens: if isinstance(token, StringToken): if (token.quote and self.features and self.features.preserve_quotes): token.text = "{q}{t}{q}".format(q=token.quote, t=token.text) if isinstance(prev, StringToken): prev.combine_token(token) token = prev else: condensed.append(token) elif not isinstance(token, WhitespaceToken): condensed.append(token) prev = token return condensed
[docs] def build(self, tokens): ''' Create a :class:`Statement` object from tokenized input and statement context. This method will first remove all remaining escape sequences and then :meth:`condense` all the tokens before building the statement. :param list tokens: list of :class:`Token` objects to process as a statement :raises: :class:`StatementSyntaxError` on error :returns: (:class:`Statement`) the parsed statement ''' statement = Statement() cmd = None prev = None self.clean_escapes(tokens) tokens = self.condense(tokens) for token in tokens: if cmd: if isinstance(prev, OperatorToken) and ( prev.operator in ('>', '<', '>>')): if isinstance(token, StringToken): if prev.operator in ('>', '>>'): cmd.stdout = ( token.text, 'w' if prev.operator == '>' else 'a' ) elif prev.operator == '<': cmd.stdin = token.text else: raise StatementSyntaxError( message="unexpected token: {}".format(str(token)), index=token.index ) elif isinstance(token, StringToken): cmd.args.append(token.text) elif isinstance(token, OperatorToken): done = False if token.operator in ('||', '&&', ';'): cmd.chain = token.operator done = True elif token.operator == '|': if cmd.stdout: msg = ("unexpected token: {}: duplicate output " "redirection is invalid").format(str(token)) raise StatementSyntaxError( message=msg, index=token.index ) cmd.chain = '|' done = True elif token.operator in ('>', '>>'): if cmd.stdout: msg = ("unexpected token: {}: duplicate output " "redirection is invalid").format(str(token)) raise StatementSyntaxError( message=msg, index=token.index ) elif token.operator == '<': if cmd.stdin or (len(statement) > 1 and statement[-1].chain == '|'): # The previous command in was a pipe, so input # redirection is not supported. msg = ("unexpected token: {} duplicate input " "redirection is invalid").format(str(token)) raise StatementSyntaxError( message=msg, index=token.index ) else: raise StatementSyntaxError( message="unknown operator: " + token.operator, index=token.index ) if done: statement.append(cmd) cmd = None else: if isinstance(token, StringToken): cmd = CommandInvocation(token.text) elif not isinstance(token, WhitespaceToken): raise StatementSyntaxError( message="unexpected token: {}".format(str(token)), index=token.index ) prev = token if isinstance(prev, StringToken) or ( isinstance(prev, OperatorToken) and prev.operator == ';'): pass elif prev: raise StatementSyntaxError( message="unexpected token: {}".format(str(prev)), index=prev.index ) if cmd: statement.append(cmd) return statement
[docs]class Expression(object): ''' Holds a string-based expression in the form of ``operand operator value``. This class makes parsing expressions that may be in a single string or a list of strings. For example, the :class:`pypsi.plugins.VarCommand` command accepts input in the form of: ``name = value``. This class allows for the user to input any of the following lines and the same Expression object would be created, regardless of how the input lines are tokenized: - ``some_var = 2`` => ``['some_var', '=', '2']`` - ``some_var= 2`` => ``['some_var=', '2']`` - ``some_var =2`` => ``['some_var', '=2']`` - ``some_var=2`` => ``['some_var=2']`` ''' Operators = '-+=/*' Whitespace = ' \t' def __init__(self, operand, operator, value): self.operand = operand self.operator = operator self.value = value def __str__(self): return "{} {} {}".format(self.operand, self.operator, self.value) def __eq__(self, other): return ( isinstance(other, Expression) and self.operand == other.operand and self.operator == other.operator and self.value == other.value ) @classmethod
[docs] def parse(cls, args): ''' Create an Expression from a list of strings. :param list args: arguments :returns: a tuple of ``(remaining, expression)``, where ``remaining`` is the list of remaining string arguments of ``args`` after parsing has completed, and ``expression`` in the parsed :class:`Expression`, or :const:`None` if the expression is invalid. ''' state = 'operand' operand = operator = value = None done = False remaining = list(args) for arg in args: for c in arg: if state == 'operand': if c in Expression.Whitespace: if operand: state = 'operator' elif c in Expression.Operators: state = 'operator' operator = c else: if operand is None: operand = c else: operand += c elif state == 'operator': if c in Expression.Operators: if operator is None: operator = c else: operator += c elif c in Expression.Whitespace: if operator: state = 'value' done = True value = '' else: state = 'value' value = c done = True elif state == 'value': if c in Expression.Whitespace and not value: pass else: value += c remaining.pop(0) if done: break if operator and not value: value = '' elif not done: return (None, None) return (remaining, Expression(operand, operator, value))