Add RepoVersion class to make handling of many arguments easier

There are a number of arguments being passed around, nearly all of
which are duplicated between the old and new versions. Moving these
into a separate class should hopefully make it simpler to follow
what is being done.
This commit is contained in:
Darryl Green 2019-03-05 16:25:38 +00:00
parent e29ce70ca5
commit 7c1a73370b

View File

@ -28,23 +28,37 @@ import fnmatch
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
class RepoVersion(object):
def __init__(self, version, repository, revision,
crypto_repository, crypto_revision):
"""Class containing details for a particular revision.
version: either 'old' or 'new'
repository: repository for git revision
revision: git revision for comparison
crypto_repository: repository for git revision of crypto submodule
crypto_revision: git revision of crypto submodule
"""
self.version = version
self.repository = repository
self.revision = revision
self.crypto_repository = crypto_repository
self.crypto_revision = crypto_revision
self.abi_dumps = {}
self.modules = {}
class AbiChecker(object): class AbiChecker(object):
"""API and ABI checker.""" """API and ABI checker."""
def __init__(self, report_dir, old_repo, old_rev, old_crypto_rev, def __init__(self, old_version, new_version, report_dir,
old_crypto_repo, new_repo, new_rev, new_crypto_rev, keep_all_reports, brief, skip_file=None):
new_crypto_repo, keep_all_reports, brief, skip_file=None):
"""Instantiate the API/ABI checker. """Instantiate the API/ABI checker.
old_version: RepoVersion containing details to compare against
new_version: RepoVersion containing details to check
report_dir: directory for output files report_dir: directory for output files
old_repo: repository for git revision to compare against
old_rev: reference git revision to compare against
old_crypto_rev: reference git revision for old crypto submodule
old_crypto_repo: repository for git revision for old crypto submodule
new_repo: repository for git revision to check
new_rev: git revision to check
new_crypto_rev: reference git revision for new crypto submodule
new_crypto_repo: repository for git revision for new crypto submodule
keep_all_reports: if false, delete old reports keep_all_reports: if false, delete old reports
brief: if true, output shorter report to stdout brief: if true, output shorter report to stdout
skip_file: path to file containing symbols and types to skip skip_file: path to file containing symbols and types to skip
@ -56,19 +70,10 @@ class AbiChecker(object):
self.keep_all_reports = keep_all_reports self.keep_all_reports = keep_all_reports
self.can_remove_report_dir = not (os.path.isdir(self.report_dir) or self.can_remove_report_dir = not (os.path.isdir(self.report_dir) or
keep_all_reports) keep_all_reports)
self.old_repo = old_repo self.old_version = old_version
self.old_rev = old_rev self.new_version = new_version
self.old_crypto_rev = old_crypto_rev
self.old_crypto_repo = old_crypto_repo
self.new_repo = new_repo
self.new_rev = new_rev
self.new_crypto_rev = new_crypto_rev
self.new_crypto_repo = new_crypto_repo
self.skip_file = skip_file self.skip_file = skip_file
self.brief = brief self.brief = brief
self.mbedtls_modules = {"old": {}, "new": {}}
self.old_dumps = {}
self.new_dumps = {}
self.git_command = "git" self.git_command = "git"
self.make_command = "make" self.make_command = "make"
@ -90,18 +95,19 @@ class AbiChecker(object):
if not shutil.which(command): if not shutil.which(command):
raise Exception("{} not installed, aborting".format(command)) raise Exception("{} not installed, aborting".format(command))
def get_clean_worktree_for_git_revision(self, remote_repo, git_rev): def get_clean_worktree_for_git_revision(self, version):
"""Make a separate worktree with git_rev checked out. """Make a separate worktree with version.revision checked out.
Do not modify the current worktree.""" Do not modify the current worktree."""
git_worktree_path = tempfile.mkdtemp() git_worktree_path = tempfile.mkdtemp()
if remote_repo: if version.repository:
self.log.info( self.log.info(
"Checking out git worktree for revision {} from {}".format( "Checking out git worktree for revision {} from {}".format(
git_rev, remote_repo version.revision, version.repository
) )
) )
fetch_process = subprocess.Popen( fetch_process = subprocess.Popen(
[self.git_command, "fetch", remote_repo, git_rev], [self.git_command, "fetch",
version.repository, version.revision],
cwd=self.repo_path, cwd=self.repo_path,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT stderr=subprocess.STDOUT
@ -112,10 +118,10 @@ class AbiChecker(object):
raise Exception("Fetching revision failed, aborting") raise Exception("Fetching revision failed, aborting")
worktree_rev = "FETCH_HEAD" worktree_rev = "FETCH_HEAD"
else: else:
self.log.info( self.log.info("Checking out git worktree for revision {}".format(
"Checking out git worktree for revision {}".format(git_rev) version.revision
) ))
worktree_rev = git_rev worktree_rev = version.revision
worktree_process = subprocess.Popen( worktree_process = subprocess.Popen(
[self.git_command, "worktree", "add", "--detach", [self.git_command, "worktree", "add", "--detach",
git_worktree_path, worktree_rev], git_worktree_path, worktree_rev],
@ -129,8 +135,7 @@ class AbiChecker(object):
raise Exception("Checking out worktree failed, aborting") raise Exception("Checking out worktree failed, aborting")
return git_worktree_path return git_worktree_path
def update_git_submodules(self, git_worktree_path, crypto_repo, def update_git_submodules(self, git_worktree_path, version):
crypto_rev):
process = subprocess.Popen( process = subprocess.Popen(
[self.git_command, "submodule", "update", "--init", '--recursive'], [self.git_command, "submodule", "update", "--init", '--recursive'],
cwd=git_worktree_path, cwd=git_worktree_path,
@ -142,14 +147,14 @@ class AbiChecker(object):
if process.returncode != 0: if process.returncode != 0:
raise Exception("git submodule update failed, aborting") raise Exception("git submodule update failed, aborting")
if not (os.path.exists(os.path.join(git_worktree_path, "crypto")) if not (os.path.exists(os.path.join(git_worktree_path, "crypto"))
and crypto_rev): and version.crypto_revision):
return return
if crypto_repo: if version.crypto_repository:
shutil.rmtree(os.path.join(git_worktree_path, "crypto")) shutil.rmtree(os.path.join(git_worktree_path, "crypto"))
clone_process = subprocess.Popen( clone_process = subprocess.Popen(
[self.git_command, "clone", crypto_repo, [self.git_command, "clone", version.crypto_repository,
"--branch", crypto_rev, "crypto"], "--branch", version.crypto_revision, "crypto"],
cwd=git_worktree_path, cwd=git_worktree_path,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT stderr=subprocess.STDOUT
@ -160,7 +165,7 @@ class AbiChecker(object):
raise Exception("git clone failed, aborting") raise Exception("git clone failed, aborting")
else: else:
checkout_process = subprocess.Popen( checkout_process = subprocess.Popen(
[self.git_command, "checkout", crypto_rev], [self.git_command, "checkout", version.crypto_revision],
cwd=os.path.join(git_worktree_path, "crypto"), cwd=os.path.join(git_worktree_path, "crypto"),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT stderr=subprocess.STDOUT
@ -187,29 +192,28 @@ class AbiChecker(object):
self.log.info(make_output.decode("utf-8")) self.log.info(make_output.decode("utf-8"))
for root, dirs, files in os.walk(git_worktree_path): for root, dirs, files in os.walk(git_worktree_path):
for file in fnmatch.filter(files, "*.so"): for file in fnmatch.filter(files, "*.so"):
self.mbedtls_modules[version][os.path.splitext(file)[0]] = ( version.modules[os.path.splitext(file)[0]] = (
os.path.join(root, file) os.path.join(root, file)
) )
if make_process.returncode != 0: if make_process.returncode != 0:
raise Exception("make failed, aborting") raise Exception("make failed, aborting")
def get_abi_dumps_from_shared_libraries(self, git_ref, git_worktree_path, def get_abi_dumps_from_shared_libraries(self, git_worktree_path,
version): version):
"""Generate the ABI dumps for the specified git revision. """Generate the ABI dumps for the specified git revision.
It must be checked out in git_worktree_path and the shared libraries It must be checked out in git_worktree_path and the shared libraries
must have been built.""" must have been built."""
abi_dumps = {} for mbed_module, module_path in version.modules.items():
for mbed_module, module_path in self.mbedtls_modules[version].items():
output_path = os.path.join( output_path = os.path.join(
self.report_dir, version, "{}-{}.dump".format( self.report_dir, version.version, "{}-{}.dump".format(
mbed_module, git_ref mbed_module, version.revision
) )
) )
abi_dump_command = [ abi_dump_command = [
"abi-dumper", "abi-dumper",
module_path, module_path,
"-o", output_path, "-o", output_path,
"-lver", git_ref "-lver", version.revision
] ]
abi_dump_process = subprocess.Popen( abi_dump_process = subprocess.Popen(
abi_dump_command, abi_dump_command,
@ -220,8 +224,7 @@ class AbiChecker(object):
self.log.info(abi_dump_output.decode("utf-8")) self.log.info(abi_dump_output.decode("utf-8"))
if abi_dump_process.returncode != 0: if abi_dump_process.returncode != 0:
raise Exception("abi-dumper failed, aborting") raise Exception("abi-dumper failed, aborting")
abi_dumps[mbed_module] = output_path version.abi_dumps[mbed_module] = output_path
return abi_dumps
def cleanup_worktree(self, git_worktree_path): def cleanup_worktree(self, git_worktree_path):
"""Remove the specified git worktree.""" """Remove the specified git worktree."""
@ -237,19 +240,13 @@ class AbiChecker(object):
if worktree_process.returncode != 0: if worktree_process.returncode != 0:
raise Exception("Worktree cleanup failed, aborting") raise Exception("Worktree cleanup failed, aborting")
def get_abi_dump_for_ref(self, remote_repo, git_rev, crypto_repo, def get_abi_dump_for_ref(self, version):
crypto_rev, version):
"""Generate the ABI dumps for the specified git revision.""" """Generate the ABI dumps for the specified git revision."""
git_worktree_path = self.get_clean_worktree_for_git_revision( git_worktree_path = self.get_clean_worktree_for_git_revision(version)
remote_repo, git_rev self.update_git_submodules(git_worktree_path, version)
)
self.update_git_submodules(git_worktree_path, crypto_repo, crypto_rev)
self.build_shared_libraries(git_worktree_path, version) self.build_shared_libraries(git_worktree_path, version)
abi_dumps = self.get_abi_dumps_from_shared_libraries( self.get_abi_dumps_from_shared_libraries(git_worktree_path, version)
git_rev, git_worktree_path, version
)
self.cleanup_worktree(git_worktree_path) self.cleanup_worktree(git_worktree_path)
return abi_dumps
def remove_children_with_tag(self, parent, tag): def remove_children_with_tag(self, parent, tag):
children = parent.getchildren() children = parent.getchildren()
@ -275,19 +272,20 @@ class AbiChecker(object):
be available.""" be available."""
compatibility_report = "" compatibility_report = ""
compliance_return_code = 0 compliance_return_code = 0
shared_modules = list(set(self.mbedtls_modules["old"].keys()) & shared_modules = list(set(self.old_version.modules.keys()) &
set(self.mbedtls_modules["new"].keys())) set(self.new_version.modules.keys()))
for mbed_module in shared_modules: for mbed_module in shared_modules:
output_path = os.path.join( output_path = os.path.join(
self.report_dir, "{}-{}-{}.html".format( self.report_dir, "{}-{}-{}.html".format(
mbed_module, self.old_rev, self.new_rev mbed_module, self.old_version.revision,
self.new_version.revision
) )
) )
abi_compliance_command = [ abi_compliance_command = [
"abi-compliance-checker", "abi-compliance-checker",
"-l", mbed_module, "-l", mbed_module,
"-old", self.old_dumps[mbed_module], "-old", self.old_version.abi_dumps[mbed_module],
"-new", self.new_dumps[mbed_module], "-new", self.new_version.abi_dumps[mbed_module],
"-strict", "-strict",
"-report-path", output_path, "-report-path", output_path,
] ]
@ -329,8 +327,8 @@ class AbiChecker(object):
"abi-compliance-checker failed with a return code of {}," "abi-compliance-checker failed with a return code of {},"
" aborting".format(abi_compliance_process.returncode) " aborting".format(abi_compliance_process.returncode)
) )
os.remove(self.old_dumps[mbed_module]) os.remove(self.old_version.abi_dumps[mbed_module])
os.remove(self.new_dumps[mbed_module]) os.remove(self.new_version.abi_dumps[mbed_module])
if self.can_remove_report_dir: if self.can_remove_report_dir:
os.rmdir(self.report_dir) os.rmdir(self.report_dir)
self.log.info(compatibility_report) self.log.info(compatibility_report)
@ -341,12 +339,8 @@ class AbiChecker(object):
between self.old_rev and self.new_rev.""" between self.old_rev and self.new_rev."""
self.check_repo_path() self.check_repo_path()
self.check_abi_tools_are_installed() self.check_abi_tools_are_installed()
self.old_dumps = self.get_abi_dump_for_ref(self.old_repo, self.old_rev, self.get_abi_dump_for_ref(self.old_version)
self.old_crypto_repo, self.get_abi_dump_for_ref(self.new_version)
self.old_crypto_rev, "old")
self.new_dumps = self.get_abi_dump_for_ref(self.new_repo, self.new_rev,
self.new_crypto_repo,
self.new_crypto_rev, "new")
return self.get_abi_compatibility_report() return self.get_abi_compatibility_report()
@ -412,12 +406,13 @@ def run_main():
help="output only the list of issues to stdout, instead of a full report", help="output only the list of issues to stdout, instead of a full report",
) )
abi_args = parser.parse_args() abi_args = parser.parse_args()
old_version = RepoVersion("old", abi_args.old_repo, abi_args.old_rev,
abi_args.old_crypto_repo, abi_args.old_crypto_rev)
new_version = RepoVersion("new", abi_args.new_repo, abi_args.new_rev,
abi_args.new_crypto_repo, abi_args.new_crypto_rev)
abi_check = AbiChecker( abi_check = AbiChecker(
abi_args.report_dir, abi_args.old_repo, abi_args.old_rev, old_version, new_version, abi_args.report_dir,
abi_args.old_crypto_rev, abi_args.old_crypto_repo, abi_args.keep_all_reports, abi_args.brief, abi_args.skip_file
abi_args.new_repo, abi_args.new_rev, abi_args.new_crypto_rev,
abi_args.new_crypto_repo, abi_args.keep_all_reports,
abi_args.brief, abi_args.skip_file
) )
return_code = abi_check.check_for_abi_changes() return_code = abi_check.check_for_abi_changes()
sys.exit(return_code) sys.exit(return_code)