Source code for eprc.extractor
import json
import logging
import os.path
import pkg_resources
import shutil
import subprocess
import tarfile
import urllib2
import zipfile
import utils
[docs]class Extractor(object):
def __init__(
self,
virtualenv,
tmpdir,
pypi,
extractors_path=pkg_resources.resource_filename(
__name__,
"extractors"
)
):
self.extractor_setup_py = os.path.join(extractors_path, "setup_py.py")
self.extractor_bundled = os.path.join(extractors_path, "bundled.py")
self.virtualenv = virtualenv
self.tmpdir = tmpdir
self.pypi = pypi
def _run_extractor(
self,
pyfile,
args=None,
cwd=None,
env=None,
packages=None):
if not env:
env = os.environ.copy()
extract_path = os.path.join(self.tmpdir, "extractor_result.json")
env['ILLUVATAR_EXTRACT_PATH'] = extract_path
# FIXME do not create a new venv all the time
# ideas:
# - overlay fs
# - clone (copy does not work, use virtualenv-clone)
# - copy + `virtualenv --relocatable ENV`
# (see https://pypi.python.org/pypi/virtualenv/1.3.1#making-environments-relocatable)
with open(os.devnull, "w") as fnull:
venvdir = os.path.join(self.tmpdir, "venv")
subprocess.check_call(
[self.virtualenv, venvdir],
stdout=fnull
)
pip = os.path.join(venvdir, "bin", "pip")
if packages:
args = [pip, "install"]
args.extend(packages)
subprocess.check_call(
args,
stdout=fnull
)
try:
python = os.path.join(venvdir, "bin", "python")
what_to_call = [python, os.path.abspath(pyfile)]
if args:
what_to_call.extend(args)
subprocess.check_call(
what_to_call,
cwd=cwd,
env=env
)
shutil.rmtree(venvdir)
with open(extract_path, 'r') as infile:
data = json.load(infile)
os.remove(extract_path)
return data
except subprocess.CalledProcessError:
return None
[docs] def from_path(self, path, db, name=None, version=None):
logging.debug("Extract from '{}'".format(path))
# fire up setup_py.py
data = self._run_extractor(
pyfile=self.extractor_setup_py,
cwd=path,
packages=["mock"]
)
if data:
# try to fix some weird cases (e.g. numpy)
if name and data['name'] == 'None':
data['name'] = name
if version and data['version'] == 'None':
data['version'] = version
# some packages are messed up
if name and utils.normalize(name) != utils.normalize(data['name']):
logging.warn(
"Package '{}':'{}' gives wrong name '{}'".format(
name,
version,
data['name']
)
)
data['name'] = name
if name \
and version \
and utils.normalize(version) != utils.normalize(data['version']):
logging.warn(
"Package '{}':'{}' gives wrong version '{}'".format(
name,
version,
data['version']
)
)
data['version'] = version
db.set(data['name'], data['version'], data)
return data
else:
return None
[docs] def from_pypi(self, db, name, version):
name = self.pypi.real_name(name)
# find source package
url = None
for entry in self.pypi.release_urls(name, version):
if entry['packagetype'] == 'sdist':
url = entry['url']
if not url:
logging.warn("No source URL found for {}:{}".format(name, version))
return None
# download source package
archive_path = os.path.join(self.tmpdir, os.path.basename(url))
fp = urllib2.urlopen(url)
with open(archive_path, "wb") as archive_file:
archive_file.write(fp.read())
# extract archive
# FIXME be smarter and more secure about extraction
# (paths, permissions, ...)
extracted_path = archive_path + ".extracted"
if archive_path.endswith("zip"):
with zipfile.ZipFile(archive_path, "r") as archive_file:
archive_file.extractall(extracted_path)
else:
with tarfile.open(archive_path, "r:gz") as archive_file:
archive_file.extractall(extracted_path)
os.remove(archive_path)
# extract dependency information
# FIXME be smarter about finding setup.py
target_path = os.path.join(
extracted_path,
os.listdir(extracted_path)[0]
)
data = self.from_path(
target_path,
db,
utils.normalize(name),
utils.normalize(version)
)
shutil.rmtree(extracted_path)
return data
[docs] def from_native(self, db, name):
try:
# only try to extract it if module exist
__import__(utils.normalize(name))
data_simple = self._run_extractor(
pyfile=self.extractor_bundled,
args=[utils.normalize(name)]
)
if data_simple:
data = {
'name': utils.normalize(data_simple['name']),
'version': utils.normalize(data_simple['version']),
'setup_requires': [],
'install_requires': [],
'tests_require': [],
'extras_require': {}
}
db.set(data['name'], data['version'], data)
return data
else:
return None
except ImportError:
return None