Source code for pypsi.plugins.variable

#
# Copyright (c) 2015, Adam Meily <meily.adam@gmail.com>
# Pypsi - https://github.com/ameily/pypsi
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

import argparse
from pypsi.core import Plugin, Command, PypsiArgParser, CommandShortCircuit
from pypsi.namespace import ScopedNamespace
from pypsi.cmdline import (Token, StringToken, TokenContinue, TokenEnd,
                           Expression)
from pypsi.format import Table, Column, obj_str
import os
import sys
from datetime import datetime


[docs]class ManagedVariable(object): ''' Represents a variable that is managed by the shell. Managed variables have get and set hooks that allow for input validation or read-only enforcement. Each variable needs a ``getter``, which is called to retrieve the value, and possibly a ``setter``, which is called to set the value. If the setter is :const:`None`, the variable is read-only. The setter must accept two arguments when it is called: the active :class:`~pypsi.shell.Shell` instance, and the :class:`str` value. ''' def __init__(self, getter, setter=None): ''' :param callable getter: the callable to call when retrieving the variable's value (must return a value) :param callable setter: the callable to call when setting the variable's value ''' self.getter = getter self.setter = setter def set(self, shell, value): if self.setter: self.setter(shell, value) else: raise ValueError("read-only variable") def get(self, shell): return self.getter(shell)
[docs]class VariableCommand(Command): ''' Manage variables. ''' Usage = """var name or: var EXPRESSION or: var -l or: var -d name""" def __init__(self, name='var', brief='manage local variables', topic='shell', **kwargs): self.parser = PypsiArgParser( prog=name, description=brief, usage=VariableCommand.Usage ) self.parser.add_argument( '-l', '--list', help='list variables', action='store_true' ) self.parser.add_argument( '-d', '--delete', help='delete variable', metavar='VARIABLE' ) self.parser.add_argument( 'exp', metavar='EXPRESSION', help='expression defining variable, in the form of NAME = [VALUE]', nargs=argparse.REMAINDER ) super(VariableCommand, self).__init__( name=name, usage=self.parser.format_help(), topic=topic, brief=brief, **kwargs ) def run(self, shell, args): try: ns = self.parser.parse_args(args) except CommandShortCircuit as e: return e.code rc = 0 if ns.list: tbl = Table( columns=(Column("Variable"), Column("Value", Column.Grow)), width=shell.width, spacing=4, ) for name in shell.ctx.vars: if name == '_': continue s = shell.ctx.vars[name] if callable(s): s = s() elif isinstance(s, ManagedVariable): s = s.get(shell) tbl.append(name, obj_str(s)) tbl.write(sys.stdout) elif ns.delete: if ns.delete in shell.ctx.vars: s = shell.ctx.vars[ns.delete] if isinstance(s, ManagedVariable): self.error(shell, "variable is managed and cannot be deleted") rc = -1 else: del shell.ctx.vars[ns.delete] else: self.error(shell, "unknown variable: ", ns.delete) elif ns.exp and '=' in ''.join(args): (remainder, exp) = Expression.parse(args) if remainder or not exp: self.error(shell, "invalid expression") return 1 if isinstance(shell.ctx.vars[exp.operand], ManagedVariable): try: shell.ctx.vars[exp.operand].set(shell, exp.value) except ValueError as e: self.error(shell, "could not set variable: ", str(e)) else: shell.ctx.vars[exp.operand] = exp.value elif ns.exp: if len(args) == 1: if args[0] in shell.ctx.vars: s = shell.ctx.vars[args[0]] if callable(s): s = s() elif isinstance(s, ManagedVariable): s = s.getter(shell) print(obj_str(s)) else: self.error(shell, "unknown variable: ", args[0]) return 1 else: self.error(shell, "invalid expression") return 1 else: self.usage_error(shell, "missing required EXPRESSION") rc = 1 return rc
class VariableToken(Token): VarChars = ('abcdefghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' '0123456789_') def __init__(self, prefix, index, var=''): super(VariableToken, self).__init__(index) self.prefix = prefix self.var = var def add_char(self, c): if c in self.VarChars: self.var += c return TokenContinue return TokenEnd def __str__(self): return "VariableToken( {} )".format(self.var) def get_subtokens(token, prefix, features): escape = False index = token.index subt = None var = None for c in token.text: if escape: escape = False if c != prefix: subt.text += '\\' subt.text += c elif var: rc = var.add_char(c) if rc == TokenEnd: yield var var = None if c == prefix: var = VariableToken(index, c) else: if c == '\\': escape = True c = '' subt = StringToken(index, c, token.quote, features=features) elif c == prefix: if subt: yield subt subt = None var = VariableToken(c, index) else: if c == '\\': escape = True c = '' if not subt: subt = StringToken(index, c, token.quote, features=features) else: subt.text += c index += 1 if subt: yield subt elif var: yield var def safe_date_format(format, dt): try: return dt.strftime(format) except: return "<invalid date time format>" def var_date_getter(shell): return safe_date_format(shell.ctx.vars.datefmt or "%x", datetime.now()) def var_time_getter(shell): return safe_date_format(shell.ctx.vars.timefmt or "%X", datetime.now()) def var_datetime_getter(shell): return safe_date_format(shell.ctx.vars.datetimefmt or "%c", datetime.now()) def var_errno_getter(shell): return str(shell.errno) def var_prompt_getter(shell): return shell.get_current_prompt()
[docs]class VariablePlugin(Plugin): ''' Provides variable management and substitution in user input. ''' def __init__(self, var_cmd='var', prefix='$', locals=None, env=True, topic='shell', case_sensitive=True, preprocess=10, postprocess=90, **kwargs): ''' :param str var_cmd: the name of the variable command :param str prefix: the prefix that all variables need to start with :param dict locals: the base variables to register initially :param bool case_sensitive: whether variable names are case sensitive ''' super(VariablePlugin, self).__init__(preprocess=preprocess, postprocess=postprocess, **kwargs) self.var_cmd = VariableCommand(name=var_cmd, topic=topic) self.prefix = prefix self.base = os.environ if env else {} self.case_sensitive = case_sensitive if locals: self.base.update(locals)
[docs] def setup(self, shell): ''' Register the :class:`VariableCommand` and add the ``vars`` attribute (:class:`pypsi.namespace.ScopedNamespace`) to the shell's context. ''' shell.register(self.var_cmd) if 'vars' not in shell.ctx: shell.ctx.vars = ScopedNamespace('globals', self.case_sensitive) for k in self.base: shell.ctx.vars[k] = self.base[k] shell.ctx.vars.date = ManagedVariable(var_date_getter) shell.ctx.vars.time = ManagedVariable(var_time_getter) shell.ctx.vars.datetime = ManagedVariable(var_datetime_getter) shell.ctx.vars.prompt = ManagedVariable(var_prompt_getter, self.set_prompt) shell.ctx.vars.errno = ManagedVariable(var_errno_getter) return 0
def set_prompt(self, shell, value): shell.prompt = value def expand(self, shell, vart): name = vart.var if name in shell.ctx.vars: s = shell.ctx.vars[name] if callable(s): return s() elif isinstance(s, ManagedVariable): return s.getter(shell) return s return '' def on_tokenize(self, shell, tokens, origin): ret = [] for token in tokens: if (not isinstance(token, StringToken) or self.prefix not in token.text): ret.append(token) continue for subt in get_subtokens(token, self.prefix, shell.features): if isinstance(subt, StringToken): ret.append(subt) continue expanded = self.expand(shell, subt) ''' if token.quote: ret.append(StringToken(subt.index, expanded, token.quote)) else: ws = False for part in shell.parser.tokenize(expanded): ret.append(part) ''' ret.append(StringToken(subt.index, expanded, '"')) return ret