Source code for eprc.solver

import itertools
import json
import logging
import os.path
import pkg_resources
import subprocess


[docs]class VariableRegister(object): VIRTUAL_VERSION = pkg_resources.parse_version("virtual") def __init__(self): self.map_set = {} # (name, set(version), extra) -> variable self.map_set_rev = {} # variable -> (name, set(version), extra) self.map_single = {} # (name, version, extra) -> variable self.map_single_rev = {} # variable -> (name, version, extra) self.versions_register = {} # name -> set(set(version)) self.count = 1
[docs] def register_set(self, name, versions, extras): for e in extras: key = (name, frozenset(versions), e) if key in self.map_set: raise Error("Unreachable!") else: variable = self.count self.count += 1 self.map_set[key] = variable self.map_set_rev[variable] = key
[docs] def register_single(self, name, version, extras): if name not in self.versions_register: self.versions_register[name] = set() self.versions_register[name].add(version) for e in extras: key = (name, version, e) if key in self.map_single: raise Error("Unreachable!") else: variable = self.count self.count += 1 self.map_single[key] = variable self.map_single_rev[variable] = key
[docs] def get_virtual_variable(self): variable = self.count self.count += 1 return variable
[docs]def solve(scheduler, db, must_satisfy, tmpdir, solver, outpath, include_starting_points=False): register = VariableRegister() # get all names and known extras name_extras = dict() for name, extra in scheduler.done: if name not in name_extras: name_extras[name] = set() name_extras[name].add(extra) for name in name_extras.iterkeys(): name_extras[name].add("") # register all names # also compress single versions to set of versions if the # requirements are identical # FIXME separate extras from core for name in name_extras.iterkeys(): all_versions = [ pkg_resources.parse_version(version) for version in db.all_versions(name) ] if not all_versions: logging.warn("Create virtual version for {}".format(name)) all_versions = [VariableRegister.VIRTUAL_VERSION] aliases = {} for version in all_versions: data = db.get(name, str(version)) normalized = json.dumps(data, sort_keys=True) if normalized not in aliases: aliases[normalized] = set() aliases[normalized].add(version) register.register_single(name, version, name_extras[name]) for data_json, versions in aliases.iteritems(): register.register_set(name, versions, name_extras[name]) opb_optimization = [] opb_clauses = [] # clauses for requirements for (name, versions, extra), variable in register.map_set.iteritems(): data = db.get(name, str(iter(versions).next())) if not data: continue if extra: requirement_iter = data['extras_require'].get(extra, []) else: requirement_iter = itertools.chain(data['install_requires'], data['tests_require'], data['setup_requires']) # create representation variable for the entire set of versions and link it # (e.g. at least one version variable is true => set variable must be true) # (V1 v v2 v ... v VN => SET) # <=> ((V1 v V2 v .. v VN) v -SET) set_variable = register.get_virtual_variable() setlink_clause = "" for version in versions: variable = register.map_single[(name, version, extra)] setlink_clause += "-1 x{} ".format(variable) setlink_clause += "{} x{} >= 0;".format(len(versions), set_variable) opb_clauses.append(setlink_clause) for requ_data in requirement_iter: # build requirement object from requ_data # official version: # # requirement_string = "{}".format(requ_data['name']) # if requ_data['specs']: # requirement_string += ','.join("{}{}".format(spec["op"], spec["version"]) for spec in requ_data['specs']) # requirement = pkg_resources.Requirement.parse(requirement_string) # # but that is too slow, so use the undocumented API requirement = pkg_resources.Requirement( requ_data['name'], [(spec['op'], spec['version']) for spec in requ_data['specs']], requ_data['extras'] ) # create virtual variable for that requirement # and make the set variable require this virtual variable virtual_variable = register.get_virtual_variable() opb_clauses.append("-1 x{} 1 x{} >= 0;".format(set_variable, virtual_variable)) # check all known versions against this requirement # and put them in a possible set of satisfiying variable for the virtual object # `VIRT => V1 v V2 v ... v VN` or_clause = "-1 x{}".format(virtual_variable) requ_versions = register.versions_register.get(requ_data['name'], set()) if not requ_versions: # oops, we can never satisfy this # opb_clauses.append("-1 x{} >= 1;".format(variable)) pass # DEBUG for requ_version in requ_versions: if (requ_version == VariableRegister.VIRTUAL_VERSION) or (requ_version in requirement): # add constraint for base + all requested extras for requ_extra in itertools.chain([''], requ_data['extras']): requ_variable = register.map_single[(requ_data['name'], requ_version, requ_extra)] or_clause += " 1 x{}".format(requ_variable) # finish the or-clause and push it or_clause += " >= 0;" opb_clauses.append(or_clause) # clauses for general information of packages for name, versions in register.versions_register.iteritems(): # maximum one version opb_clauses.append( " ".join( "-1 x{}".format(register.map_single[name, version, '']) for version in versions ) + " >= -1;" ) # extras require base for version in versions: variable_base = register.map_single[name, version, ''] for extra in name_extras[name]: if extra: variable_extra = register.map_single[name, version, extra] opb_clauses.append("-1 x{} 1 x{} >= 0;".format(variable_extra, variable_base)) # order versions by history for optimization # FIXME add ability to require minimal version # FIXME implement better weights for versions # (e.g. 0.1.0, 0.1.1, 0.2.0) for weight, version in enumerate(sorted(versions, reverse=True)): opb_optimization.append("{} x{}".format(weight, register.map_single[name, version, ''])) # initial starting point for name, version in must_satisfy: variable = register.map_single[(name, pkg_resources.parse_version(version), '')] opb_clauses.append("1 x{} >= 1;".format(variable)) # write opb file opb_filepath = os.path.join(tmpdir, "to_solve.opb") with open(opb_filepath, "w") as opb_file: opb_file.write("* #variable= {} #constraint= {}\n".format(register.count, len(opb_clauses))) opb_file.write("min: ") for x in opb_optimization: opb_file.write(x) opb_file.write(" ") opb_file.write(";\n") for clause in opb_clauses: opb_file.write(clause) opb_file.write("\n") logging.info("#Variables = {} #Constraints= {}".format(register.count, len(opb_clauses))) # run solver result_path = os.path.join(tmpdir, "result.txt") subprocess.check_call( "{} {} | tee {}".format(solver, opb_filepath, result_path), shell=True ) # analyze result result_status = None result_result = None with open(result_path) as result_file: for line in result_file: line = line.strip() if line.startswith("s"): result_status = line[2:] elif line.startswith("v"): result_result = line[2:] if result_status == "OPTIMUM FOUND": packages = {} for part in result_result.split(" "): # only looking for true assigments if part.startswith("x"): variable = int(part[1:]) if variable in register.map_single_rev: name, version, extra = register.map_single_rev[variable] if (name, version) not in packages: packages[(name, version)] = set() packages[(name, version)].add(extra) with open(outpath, "w") as outfile: exclude = set() if not include_starting_points: exclude = set(name for name, _version in must_satisfy) for (name, version), extras in sorted( packages.iteritems(), key=lambda ((name, _version), _extras): name): if name not in exclude: requirement_string = "{}".format(name) if version != VariableRegister.VIRTUAL_VERSION: requirement_string += "=={}".format(version) extras.remove("") if extras: requirement_string += "[" + ",".join(sorted(extras)) + "]" outfile.write(requirement_string) outfile.write("\n") logging.info("Wrote requirements to {}".format(outpath)) else: logging.error("Cannot find a solution")