2020-04-08 10:38:14 -03:00

497 lines
15 KiB
Python

#!/usr/bin/env python
"""
Jedi EPC server.
Copyright (C) 2012 Takafumi Arakaki
Author: Takafumi Arakaki <aka.tkf at gmail.com>
This file is NOT part of GNU Emacs.
Jedi EPC server is free software: you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
Jedi EPC server is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Jedi EPC server.
If not, see <http://www.gnu.org/licenses/>.
"""
import argparse
import glob
import itertools
import logging
import logging.handlers
import os
import re
import site
import sys
from collections import namedtuple
import jedi
import jedi.api
import epc
import epc.server
import sexpdata
logger = logging.getLogger('jediepcserver')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description=__doc__)
parser.add_argument(
'--address', default='localhost')
parser.add_argument(
'--port', default=0, type=int)
parser.add_argument(
'--port-file', '-f', default='-', type=argparse.FileType('wt'),
help='file to write port on. default is stdout.')
parser.add_argument(
'--sys-path', '-p', default=[], action='append',
help='paths to be inserted at the top of `sys.path`.')
parser.add_argument(
'--sys-path-append', default=[], action='append',
help='paths to be appended at the end of `sys.path`.')
parser.add_argument(
'--virtual-env', '-v', default=[], action='append',
help='paths to be used as if VIRTUAL_ENV is set to it.')
parser.add_argument(
'--log', help='Save server log to this file.')
parser.add_argument(
'--log-level',
choices=['CRITICAL', 'ERROR', 'WARN', 'INFO', 'DEBUG'],
help='Logging level for log file.')
parser.add_argument(
'--log-rotate-max-size', default=0, type=int,
help='Rotate log file after it reaches this size',
)
parser.add_argument(
'--log-rotate-max-count', default=3, type=int,
help='Max number of log rotations before removal',
)
parser.add_argument(
'--log-traceback', action='store_true', default=False,
help='Include traceback in logging output.')
parser.add_argument(
'--pdb', dest='debugger', const='pdb', action='store_const',
help='start pdb when error occurs.')
parser.add_argument(
'--ipdb', dest='debugger', const='ipdb', action='store_const',
help='start ipdb when error occurs.')
PY3 = (sys.version_info[0] >= 3)
NEED_ENCODE = not PY3
LogSettings = namedtuple(
'LogSettings',
[
'log_file',
'log_level',
'log_rotate_max_size',
'log_rotate_max_count',
],
)
try:
jedi.create_environment
except AttributeError:
jedi_create_environment = None
else:
_cached_jedi_environments = {}
def jedi_create_environment(venv, safe=False):
"""Cache jedi environments to avoid startup cost."""
try:
return _cached_jedi_environments[venv]
except KeyError:
logger.info('Creating jedi environment: %s', venv)
if venv is None:
jedienv = jedi.api.environment.get_default_environment()
else:
jedienv = jedi.create_environment(venv, safe=safe)
_cached_jedi_environments[venv] = jedienv
return jedienv
def get_venv_sys_path(venv):
if jedi_create_environment is not None:
return jedi_create_environment(venv).get_sys_path()
from jedi.evaluate.sys_path import get_venv_path
return get_venv_path(venv)
class JediEPCHandler(object):
def __init__(self, sys_path=(), virtual_envs=(), sys_path_append=()):
self.script_kwargs = self._get_script_path_kwargs(
sys_path=sys_path,
virtual_envs=virtual_envs,
sys_path_append=sys_path_append,
)
def get_sys_path(self):
environment = self.script_kwargs.get('environment')
if environment is not None:
return environment.get_sys_path()
sys_path = self.script_kwargs.get('sys_path')
if sys_path is not None:
return sys_path
return sys.path
@classmethod
def _get_script_path_kwargs(cls, sys_path, virtual_envs, sys_path_append):
result = {}
if jedi_create_environment:
# Need to specify some environment explicitly to workaround
# https://github.com/davidhalter/jedi/issues/1242. Otherwise jedi
# will create a lot of child processes.
if virtual_envs:
primary_env, virtual_envs = virtual_envs[0], virtual_envs[1:]
primary_env = path_expand_vars_and_user(primary_env)
else:
primary_env = None
try:
result['environment'] = jedi_create_environment(primary_env)
except Exception:
logger.warning(
'Cannot create environment for %r', primary_env, exc_info=1
)
if primary_env is not None:
result['environment'] = jedi_create_environment(None)
if not sys_path and not virtual_envs and not sys_path_append:
# No additional path customizations.
return result
# Either multiple environments or custom sys_path extensions are
# specified, or jedi version doesn't support environments.
final_sys_path = []
final_sys_path.extend(path_expand_vars_and_user(p) for p in sys_path)
for p in virtual_envs:
final_sys_path.extend(get_venv_sys_path(path_expand_vars_and_user(p)))
final_sys_path.extend(
path_expand_vars_and_user(p) for p in sys_path_append
)
dupes = set()
def not_seen_yet(val):
if val in dupes:
return False
dupes.add(val)
return True
result['sys_path'] = [p for p in final_sys_path if not_seen_yet(p)]
return result
def jedi_script(self, source, line, column, source_path):
if NEED_ENCODE:
source = source.encode('utf-8')
source_path = source_path and source_path.encode('utf-8')
return jedi.Script(
source, line, column, source_path or '', **self.script_kwargs
)
def complete(self, *args):
reply = []
for comp in self.jedi_script(*args).completions():
try:
docstr = comp.docstring()
except KeyError:
docstr = ""
reply.append(dict(
word=comp.name,
doc=docstr,
description=candidates_description(comp),
symbol=candidate_symbol(comp),
))
return reply
def get_in_function_call(self, *args):
sig = self.jedi_script(*args).call_signatures()
call_def = sig[0] if sig else None
if not call_def:
return []
return dict(
# p.description should do the job. But jedi-vim use replace.
# So follow what jedi-vim does...
params=[PARAM_PREFIX_RE.sub('', p.description).replace('\n', '')
for p in call_def.params],
index=call_def.index,
call_name=call_def.name,
)
def _goto(self, method, *args):
"""
Helper function for `goto_assignments` and `usages`.
:arg method: `jedi.Script.goto_assignments` or `jedi.Script.usages`
:arg args: Arguments to `jedi_script`
"""
# `definitions` is a list. Each element is an instances of
# `jedi.api_classes.BaseOutput` subclass, i.e.,
# `jedi.api_classes.RelatedName` or `jedi.api_classes.Definition`.
definitions = method(self.jedi_script(*args))
return [dict(
column=d.column,
line_nr=d.line,
module_path=d.module_path if d.module_path != '__builtin__' else [],
module_name=d.module_name,
description=d.description,
) for d in definitions]
def goto(self, *args):
return self._goto(jedi.Script.goto_assignments, *args)
def related_names(self, *args):
return self._goto(jedi.Script.usages, *args)
def get_definition(self, *args):
definitions = self.jedi_script(*args).goto_definitions()
return [definition_to_dict(d) for d in definitions]
def defined_names(self, *args):
# XXX: there's a bug in Jedi that returns returns definitions from inside
# classes or functions even though all_scopes=False is set by
# default. Hence some additional filtering is in order.
#
# See https://github.com/davidhalter/jedi/issues/1202
top_level_names = [
defn
for defn in jedi.api.names(*args)
if defn.parent().type == 'module'
]
return list(map(get_names_recursively, top_level_names))
def get_jedi_version(self):
return [dict(
name=module.__name__,
file=getattr(module, '__file__', []),
version=get_module_version(module) or [],
) for module in [sys, jedi, epc, sexpdata]]
def candidate_symbol(comp):
"""
Return a character representing completion type.
:type comp: jedi.api.Completion
:arg comp: A completion object returned by `jedi.Script.completions`.
"""
try:
return comp.type[0].lower()
except (AttributeError, TypeError):
return '?'
def candidates_description(comp):
"""
Return `comp.description` in an appropriate format.
* Avoid return a string 'None'.
* Strip off all newlines. This is required for using
`comp.description` as candidate summary.
"""
desc = comp.description
return _WHITESPACES_RE.sub(' ', desc) if desc and desc != 'None' else ''
_WHITESPACES_RE = re.compile(r'\s+')
PARAM_PREFIX_RE = re.compile(r'^param\s+')
"""RE to strip unwanted "param " prefix returned by param.description."""
def definition_to_dict(d):
return dict(
doc=d.docstring(),
description=d.description,
desc_with_module=d.desc_with_module,
line_nr=d.line,
column=d.column,
module_path=d.module_path,
name=getattr(d, 'name', []),
full_name=getattr(d, 'full_name', []),
type=getattr(d, 'type', []),
)
def get_names_recursively(definition, parent=None):
"""
Fetch interesting defined names in sub-scopes under `definition`.
:type names: jedi.api_classes.Definition
"""
d = definition_to_dict(definition)
try:
d['local_name'] = parent['local_name'] + '.' + d['name']
except (AttributeError, TypeError):
d['local_name'] = d['name']
if definition.type == 'class':
ds = definition.defined_names()
return [d] + [get_names_recursively(c, d) for c in ds]
else:
return [d]
def get_module_version(module):
notfound = object()
for key in ['__version__', 'version']:
version = getattr(module, key, notfound)
if version is not notfound:
return version
try:
from pkg_resources import get_distribution, DistributionNotFound
try:
return get_distribution(module.__name__).version
except DistributionNotFound:
pass
except ImportError:
pass
def path_expand_vars_and_user(p):
return os.path.expandvars(os.path.expanduser(p))
def configure_logging(log_settings):
"""
:type log_settings: LogSettings
"""
if not log_settings.log_file:
return
fmter = logging.Formatter('%(asctime)s:' + logging.BASIC_FORMAT)
if log_settings.log_rotate_max_size > 0:
handler = logging.handlers.RotatingFileHandler(
filename=log_settings.log_file,
mode='w',
maxBytes=log_settings.log_rotate_max_size,
backupCount=log_settings.log_rotate_max_count,
)
else:
handler = logging.FileHandler(filename=log_settings.log_file, mode='w')
handler.setFormatter(fmter)
if log_settings.log_level:
logging.root.setLevel(log_settings.log_level.upper())
logging.root.addHandler(handler)
def jedi_epc_server(
address='localhost',
port=0,
port_file=sys.stdout,
sys_path=[],
virtual_env=[],
sys_path_append=[],
debugger=None,
log_traceback=None,
):
"""Start EPC server.
:type log_settings: LogSettings
"""
logger.debug(
'jedi_epc_server: sys_path=%r virtual_env=%r sys_path_append=%r',
sys_path, virtual_env, sys_path_append,
)
if not virtual_env and os.getenv('VIRTUAL_ENV'):
logger.debug(
'Taking virtual env from VIRTUAL_ENV: %r',
os.environ['VIRTUAL_ENV'],
)
virtual_env = [os.environ['VIRTUAL_ENV']]
handler = JediEPCHandler(
sys_path=sys_path,
virtual_envs=virtual_env,
sys_path_append=sys_path_append,
)
logger.debug(
'Starting Jedi EPC server with the following sys.path: %r',
handler.get_sys_path(),
)
server = epc.server.EPCServer((address, port))
server.register_function(handler.complete)
server.register_function(handler.get_in_function_call)
server.register_function(handler.goto)
server.register_function(handler.related_names)
server.register_function(handler.get_definition)
server.register_function(handler.defined_names)
server.register_function(handler.get_jedi_version)
@server.register_function
def toggle_log_traceback():
server.log_traceback = not server.log_traceback
return server.log_traceback
port_file.write(str(server.server_address[1])) # needed for Emacs client
port_file.write("\n")
port_file.flush()
if port_file is not sys.stdout:
port_file.close()
# This is not supported Python-EPC API, but I am using this for
# backward compatibility for Python-EPC < 0.0.4. In the future,
# it should be passed to the constructor.
server.log_traceback = bool(log_traceback)
if debugger:
server.set_debugger(debugger)
handler = logging.StreamHandler()
fmter = logging.Formatter('%(asctime)s:' + logging.BASIC_FORMAT)
handler.setFormatter(fmter)
handler.setLevel(logging.DEBUG)
server.logger.addHandler(handler)
server.logger.setLevel(logging.DEBUG)
return server
# def add_virtualenv_path(venv):
# """Add virtualenv's site-packages to `sys.path`."""
# venv = os.path.abspath(venv)
# paths = glob.glob(os.path.join(
# venv, 'lib', 'python*', 'site-packages'))
# if not paths:
# raise ValueError('Invalid venv: no site-packages found: %s' % venv)
# for path in paths:
# site.addsitedir(path)
def main(args=None):
ns = parser.parse_args(args)
ns_vars = vars(ns).copy()
log_settings = LogSettings(
log_file=ns_vars.pop('log'),
log_level=ns_vars.pop('log_level'),
log_rotate_max_size=ns_vars.pop('log_rotate_max_size'),
log_rotate_max_count=ns_vars.pop('log_rotate_max_count'),
)
configure_logging(log_settings)
server = jedi_epc_server(**ns_vars)
server.serve_forever()
server.logger.info('exit')
if __name__ == '__main__':
main()