diff --git a/codecov_cli/commands/labelanalysis.py b/codecov_cli/commands/labelanalysis.py index 8f720e287..9e117bc73 100644 --- a/codecov_cli/commands/labelanalysis.py +++ b/codecov_cli/commands/labelanalysis.py @@ -1,17 +1,13 @@ import json import logging import pathlib -import time from typing import Dict, List, Optional import click -import requests import sentry_sdk from codecov_cli.fallbacks import CodecovOption, FallbackFieldEnum -from codecov_cli.helpers import request from codecov_cli.helpers.args import get_cli_args -from codecov_cli.helpers.config import CODECOV_API_URL from codecov_cli.helpers.validators import validate_commit_sha from codecov_cli.runners import get_runner from codecov_cli.runners.types import ( @@ -92,7 +88,6 @@ def label_analysis( ): with sentry_sdk.start_transaction(op="task", name="Label Analysis"): with sentry_sdk.start_span(name="labelanalysis"): - enterprise_url = ctx.obj.get("enterprise_url") args = get_cli_args(ctx) logger.debug( "Starting label analysis", @@ -124,18 +119,6 @@ def label_analysis( extra=dict(extra_log_attributes=dict(config=runner.params)), ) - upload_url = enterprise_url or CODECOV_API_URL - url = f"{upload_url}/labels/labels-analysis" - token_header = f"Repotoken {token}" - payload = { - "base_commit": base_commit_sha, - "head_commit": head_commit_sha, - "requested_labels": None, - } - # Send the initial label analysis request without labels - # Because labels might take a long time to collect - eid = _send_labelanalysis_request(payload, url, token_header) - logger.info("Collecting labels...") requested_labels = runner.collect_tests() logger.info(f"Collected {len(requested_labels)} test labels") @@ -145,98 +128,15 @@ def label_analysis( extra_log_attributes=dict(labels_collected=requested_labels) ), ) - payload["requested_labels"] = requested_labels - - if eid: - # Initial request with no labels was successful - # Now we PATCH the labels in - patch_url = f"{upload_url}/labels/labels-analysis/{eid}" - _patch_labels(payload, patch_url, token_header) - else: - # Initial request with no labels failed - # Retry it - eid = _send_labelanalysis_request(payload, url, token_header) - if eid is None: - _fallback_to_collected_labels( - requested_labels, - runner, - dry_run=dry_run, - dry_run_format=dry_run_format, - fallback_reason="codecov_unavailable", - ) - return - has_result = False - logger.info("Waiting for list of tests to run...") - start_wait = time.monotonic() - time.sleep(1) - while not has_result: - resp_data = request.get( - f"{upload_url}/labels/labels-analysis/{eid}", - headers={"Authorization": token_header}, - ) - resp_json = resp_data.json() - if resp_json["state"] == "finished": - logger.info( - "Received list of tests from Codecov", - extra=dict( - extra_log_attributes=dict( - processing_errors=resp_json.get("errors", []) - ) - ), - ) - request_result = _potentially_calculate_absent_labels( - resp_json["result"], requested_labels - ) - if not dry_run: - runner.process_labelanalysis_result(request_result) - else: - _dry_run_output( - LabelAnalysisRequestResult(request_result), - runner, - dry_run_format, - # It's possible that the task had processing errors and fallback to all tests - # Even though it's marked as FINISHED (not ERROR) it's not a true success - fallback_reason=( - "test_list_processing_errors" - if resp_json.get("errors", None) - else None - ), - ) - return - if resp_json["state"] == "error": - logger.error( - "Request had problems calculating", - extra=dict( - extra_log_attributes=dict( - base_commit=resp_json["base_commit"], - head_commit=resp_json["head_commit"], - external_id=resp_json["external_id"], - ) - ), - ) - _fallback_to_collected_labels( - collected_labels=requested_labels, - runner=runner, - dry_run=dry_run, - dry_run_format=dry_run_format, - fallback_reason="test_list_processing_failed", - ) - return - if max_wait_time and (time.monotonic() - start_wait) > max_wait_time: - logger.error( - f"Exceeded max waiting time of {max_wait_time} seconds. Running all tests.", - ) - _fallback_to_collected_labels( - collected_labels=requested_labels, - runner=runner, - dry_run=dry_run, - dry_run_format=dry_run_format, - fallback_reason="max_wait_time_exceeded", - ) - return - logger.info("Waiting more time for result...") - time.sleep(5) + _fallback_to_collected_labels( + requested_labels, + runner, + dry_run=dry_run, + dry_run_format=dry_run_format, + fallback_reason="codecov_unavailable", + ) + return def _parse_runner_params(runner_params: List[str]) -> Dict[str, str]: @@ -271,103 +171,6 @@ def _parse_runner_params(runner_params: List[str]) -> Dict[str, str]: return final_params -def _potentially_calculate_absent_labels( - request_result, requested_labels -) -> LabelAnalysisRequestResult: - if request_result["absent_labels"]: - # This means that Codecov already calculated everything for us - final_result = LabelAnalysisRequestResult(request_result) - else: - # Here we have to calculate the absent labels - # And also remove labels that maybe don't exist anymore from the set of labels to test - # Because codecov didn't have this info previously - requested_labels_set = set(requested_labels) - present_diff_labels_set = set(request_result.get("present_diff_labels", [])) - present_report_labels_set = set(request_result.get("present_report_labels", [])) - global_level_labels_set = set(request_result.get("global_level_labels", [])) - final_result = LabelAnalysisRequestResult( - { - "present_report_labels": sorted( - present_report_labels_set & requested_labels_set - ), - "present_diff_labels": sorted( - present_diff_labels_set & requested_labels_set - ), - "absent_labels": sorted( - requested_labels_set - present_report_labels_set - ), - "global_level_labels": sorted( - global_level_labels_set & requested_labels_set - ), - } - ) - logger.info( - "Received information about tests to run", - extra=dict( - extra_log_attributes=dict( - absent_labels=len(final_result.absent_labels), - present_diff_labels=len(final_result.present_diff_labels), - global_level_labels=len(final_result.global_level_labels), - present_report_labels=len(final_result.present_report_labels), - ) - ), - ) - return final_result - - -def _patch_labels(payload, url, token_header): - logger.info("Sending collected labels to Codecov...") - try: - response = request.patch( - url, json=payload, headers={"Authorization": token_header} - ) - if response.status_code < 300: - logger.info("Labels successfully sent to Codecov") - except requests.RequestException: - raise click.ClickException(click.style("Unable to reach Codecov", fg="red")) - - -def _send_labelanalysis_request(payload, url, token_header): - logger.info( - "Requesting set of labels to run...", - extra=dict( - extra_log_attributes=dict( - with_labels=(payload["requested_labels"] is not None) - ) - ), - ) - try: - response = request.post( - url, data=payload, headers={"Authorization": token_header} - ) - if response.status_code >= 500: - logger.warning( - "Sorry. Codecov is having problems", - extra=dict(extra_log_attributes=dict(status_code=response.status_code)), - ) - return None - if response.status_code >= 400: - logger.warning( - "Got a 4XX status code back from Codecov", - extra=dict( - extra_log_attributes=dict( - status_code=response.status_code, response_json=response.json() - ) - ), - ) - raise click.ClickException( - "There is some problem with the submitted information" - ) - except requests.RequestException: - raise click.ClickException(click.style("Unable to reach Codecov", fg="red")) - eid = response.json()["external_id"] - logger.info( - "Label Analysis request successful", - extra=dict(extra_log_attributes=dict(request_id=eid)), - ) - return eid - - def _dry_run_json_output( labels_to_run: set, labels_to_skip: set, diff --git a/codecov_cli/commands/staticanalysis.py b/codecov_cli/commands/staticanalysis.py index 58592291c..6dd8f3cd7 100644 --- a/codecov_cli/commands/staticanalysis.py +++ b/codecov_cli/commands/staticanalysis.py @@ -1,4 +1,3 @@ -import asyncio import logging import pathlib import typing @@ -7,9 +6,7 @@ import sentry_sdk from codecov_cli.fallbacks import CodecovOption, FallbackFieldEnum -from codecov_cli.helpers.args import get_cli_args from codecov_cli.helpers.validators import validate_commit_sha -from codecov_cli.services.staticanalysis import run_analysis_entrypoint from codecov_cli.types import CommandContext logger = logging.getLogger("codecovcli") @@ -62,25 +59,4 @@ def static_analysis( ): with sentry_sdk.start_transaction(op="task", name="Static Analysis"): with sentry_sdk.start_span(name="static_analysis"): - enterprise_url = ctx.obj.get("enterprise_url") - args = get_cli_args(ctx) - logger.debug( - "Starting Static Analysis processing", - extra=dict( - extra_log_attributes=args, - ), - ) - return asyncio.run( - run_analysis_entrypoint( - ctx.obj["codecov_yaml"], - foldertosearch, - numberprocesses, - pattern, - commit, - token, - force, - list(folders_to_exclude), - enterprise_url, - args, - ) - ) + pass diff --git a/codecov_cli/services/staticanalysis/__init__.py b/codecov_cli/services/staticanalysis/__init__.py deleted file mode 100644 index 16cd15fd7..000000000 --- a/codecov_cli/services/staticanalysis/__init__.py +++ /dev/null @@ -1,296 +0,0 @@ -import asyncio -import json -import logging -import typing -from functools import partial -from multiprocessing import Pool -from pathlib import Path - -import click -import httpx -import requests - -from codecov_cli.helpers import request -from codecov_cli.helpers.config import CODECOV_API_URL -from codecov_cli.services.staticanalysis.analyzers import get_best_analyzer -from codecov_cli.services.staticanalysis.exceptions import AnalysisError -from codecov_cli.services.staticanalysis.finders import select_file_finder -from codecov_cli.services.staticanalysis.types import ( - FileAnalysisRequest, - FileAnalysisResult, -) - -logger = logging.getLogger("codecovcli") - - -async def run_analysis_entrypoint( - config: typing.Optional[typing.Dict], - folder: Path, - numberprocesses: typing.Optional[int], - pattern, - commit: str, - token: str, - should_force: bool, - folders_to_exclude: typing.List[Path], - enterprise_url: typing.Optional[str], - args: dict, -): - ff = select_file_finder(config) - files = list(ff.find_files(folder, pattern, folders_to_exclude)) - processing_results = await process_files(files, numberprocesses, config) - # Let users know if there were processing errors - # This is here and not in the function so we can add an option to ignore those (possibly) - # Also makes the function easier to test - processing_errors = processing_results["processing_errors"] - log_processing_errors(processing_errors) - # Upload results metadata to codecov to get list of files that we need to upload - file_metadata = processing_results["file_metadata"] - all_data = processing_results["all_data"] - try: - json_output = {"commit": commit, "filepaths": file_metadata} - logger.info( - "Sending files fingerprints to Codecov", - extra=dict( - extra_log_attributes=dict( - files_effectively_analyzed=len(json_output["filepaths"]) - ) - ), - ) - logger.debug( - "Data sent to Codecov", - extra=dict(extra_log_attributes=dict(json_payload=json_output)), - ) - upload_url = enterprise_url or CODECOV_API_URL - response = request.post( - f"{upload_url}/staticanalysis/analyses", - data=json_output, - headers={"Authorization": f"Repotoken {token}"}, - ) - response_json = response.json() - if response.status_code >= 500: - raise click.ClickException("Sorry. Codecov is having problems") - if response.status_code >= 400: - raise click.ClickException( - f"There is some problem with the submitted information.\n{response_json.get('detail')}" - ) - except requests.RequestException: - raise click.ClickException(click.style("Unable to reach Codecov", fg="red")) - logger.info( - "Received response from server", - extra=dict( - extra_log_attributes=dict(time_taken=response.elapsed.total_seconds()) - ), - ) - logger.debug( - "Response", - extra=dict( - extra_log_attributes=dict( - response_json=response_json, - ) - ), - ) - - valid_files_len = len( - [el for el in response_json["filepaths"] if el["state"].lower() == "valid"] - ) - created_files_len = len( - [el for el in response_json["filepaths"] if el["state"].lower() == "created"] - ) - logger.info( - f"{valid_files_len} files VALID; {created_files_len} files CREATED", - ) - - files_that_need_upload = [ - el - for el in response_json["filepaths"] - if (el["state"].lower() == "created" or should_force) - ] - - if files_that_need_upload: - uploaded_files = [] - failed_uploads = [] - with click.progressbar( - length=len(files_that_need_upload), - label="Upload info to storage", - ) as bar: - # It's better to have less files competing over CPU time when uploading - # Especially if we might have large files - limits = httpx.Limits(max_connections=20) - # Because there might be too many files to upload we will ignore most timeouts - timeout = httpx.Timeout(read=None, pool=None, connect=None, write=10.0) - async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: - all_tasks = [] - for el in files_that_need_upload: - all_tasks.append(send_single_upload_put(client, all_data, el)) - try: - for task in asyncio.as_completed(all_tasks): - resp = await task - bar.update(1, el["filepath"]) - if resp["succeeded"]: - uploaded_files.append(resp["filepath"]) - else: - failed_uploads.append(resp["filepath"]) - except asyncio.CancelledError: - message = ( - "Unknown error cancelled the upload tasks.\n" - + f"Uploaded {len(uploaded_files)}/{len(files_that_need_upload)} files successfully." - ) - raise click.ClickException(message) - if failed_uploads: - logger.warning(f"{len(failed_uploads)} files failed to upload") - logger.debug( - "Failed files", - extra=dict(extra_log_attributes=dict(filenames=failed_uploads)), - ) - logger.info( - f"Uploaded {len(uploaded_files)} files", - ) - logger.debug( - "Uploaded files", - extra=dict(extra_log_attributes=dict(filenames=uploaded_files)), - ) - else: - logger.info("All files are already uploaded!") - try: - response = send_finish_signal(response_json, upload_url, token) - except requests.RequestException: - raise click.ClickException(click.style("Unable to reach Codecov", fg="red")) - logger.info( - "Received response with status code %s from server", - response.status_code, - extra=dict( - extra_log_attributes=dict(time_taken=response.elapsed.total_seconds()) - ), - ) - log_processing_errors(processing_errors) - - -def log_processing_errors(processing_errors: typing.Dict[str, str]) -> None: - if len(processing_errors) > 0: - logger.error( - f"{len(processing_errors)} files have processing errors and have been IGNORED." - ) - for file, error in processing_errors.items(): - logger.error(f"-> {file}: ERROR {error}") - - -async def process_files( - files_to_analyze: typing.List[FileAnalysisRequest], - numberprocesses: int, - config: typing.Optional[typing.Dict], -): - logger.info(f"Running the analyzer on {len(files_to_analyze)} files") - mapped_func = partial(analyze_file, config) - all_data = {} - file_metadata = [] - errors = {} - with click.progressbar( - length=len(files_to_analyze), - label="Analyzing files", - ) as bar: - # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - # from the link above, we want to use the default start methods - with Pool(processes=numberprocesses) as pool: - file_results = pool.imap_unordered(mapped_func, files_to_analyze) - for result in file_results: - bar.update(1, result) - if result is not None: - if result.result: - all_data[result.filename] = result.result - file_metadata.append( - { - "filepath": result.filename, - "file_hash": result.result["hash"], - } - ) - elif result.error: - errors[result.filename] = result.error - logger.info("All files have been processed") - return dict( - all_data=all_data, file_metadata=file_metadata, processing_errors=errors - ) - - -async def send_single_upload_put(client, all_data, el) -> typing.Dict: - retryable_statuses = (429,) - presigned_put = el["raw_upload_location"] - number_retries = 5 - try: - for current_retry in range(number_retries): - response = await client.put( - presigned_put, data=json.dumps(all_data[el["filepath"]]) - ) - if response.status_code < 300: - return { - "status_code": response.status_code, - "filepath": el["filepath"], - "succeeded": True, - } - if response.status_code in retryable_statuses: - await asyncio.sleep(2**current_retry) - status_code = response.status_code - message_to_warn = response.text - exception = None - except httpx.HTTPError as exp: - status_code = None - exception = type(exp) - message_to_warn = str(exp) - logger.warning( - "Unable to send single_upload_put", - extra=dict( - extra_log_attributes=dict( - message=message_to_warn, - exception=exception, - filepath=el["filepath"], - latest_status_code=status_code, - ) - ), - ) - return { - "status_code": status_code, - "exception": exception, - "filepath": el["filepath"], - "succeeded": False, - } - - -def send_finish_signal(response_json, upload_url: str, token: str): - external_id = response_json["external_id"] - logger.debug( - "Sending finish signal to let API know to schedule static analysis task", - extra=dict(extra_log_attributes=dict(external_id=external_id)), - ) - response = request.post( - f"{upload_url}/staticanalysis/analyses/{external_id}/finish", - headers={"Authorization": f"Repotoken {token}"}, - ) - if response.status_code >= 500: - raise click.ClickException("Sorry. Codecov is having problems") - if response.status_code >= 400: - raise click.ClickException( - f"There is some problem with the submitted information.\n{response_json.get('detail')}" - ) - return response - - -def analyze_file( - config, filename: FileAnalysisRequest -) -> typing.Optional[FileAnalysisResult]: - try: - with open(filename.actual_filepath, "rb") as file: - actual_code = file.read() - analyzer = get_best_analyzer(filename, actual_code) - if analyzer is None: - return None - output = analyzer.process() - if output is None: - return None - return FileAnalysisResult(filename=filename.result_filename, result=output) - except AnalysisError as e: - error_dict = { - "filename": str(filename.result_filename), - "error": str(e), - } - return FileAnalysisResult( - filename=str(filename.result_filename), error=error_dict - ) diff --git a/codecov_cli/services/staticanalysis/analyzers/__init__.py b/codecov_cli/services/staticanalysis/analyzers/__init__.py deleted file mode 100644 index 077cbc5ff..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -from codecov_cli.services.staticanalysis.analyzers.general import BaseAnalyzer -from codecov_cli.services.staticanalysis.analyzers.javascript_es6 import ES6Analyzer -from codecov_cli.services.staticanalysis.analyzers.python import PythonAnalyzer -from codecov_cli.services.staticanalysis.types import FileAnalysisRequest - - -def get_best_analyzer( - filename: FileAnalysisRequest, actual_code: bytes -) -> BaseAnalyzer: - if filename.actual_filepath.suffix == ".py": - return PythonAnalyzer(filename, actual_code) - if filename.actual_filepath.suffix == ".js": - return ES6Analyzer(filename, actual_code) - return None diff --git a/codecov_cli/services/staticanalysis/analyzers/general.py b/codecov_cli/services/staticanalysis/analyzers/general.py deleted file mode 100644 index c0554f738..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/general.py +++ /dev/null @@ -1,124 +0,0 @@ -import hashlib -from collections import deque - - -class BaseAnalyzer(object): - def __init__(self, filename, actual_code): - pass - - def process(self): - return {} - - def _count_elements(self, node, types): - count = 0 - for c in node.children: - count += self._count_elements(c, types) - if node.type in types: - count += 1 - return count - - def _get_max_nested_conditional(self, head): - """Iterates over all nodes in a function body and returns the max nested conditional depth. - Uses BFS to avoid recursion calls (so we don't throw RecursionError) - """ - nodes_to_visit = deque() - nodes_to_visit.append([head, int(head.type in self.condition_statements)]) - max_nested_depth = 0 - - while nodes_to_visit: - curr_node, curr_depth = nodes_to_visit.popleft() - max_nested_depth = max(max_nested_depth, curr_depth) - # Here is where the depth might change - # If the current node is a conditional - is_curr_conditional = curr_node.type in self.condition_statements - - # Enqueue all child nodes of the curr_node - for child in curr_node.children: - nodes_to_visit.append([child, curr_depth + is_curr_conditional]) - - return max_nested_depth - - def _get_complexity_metrics(self, body_node): - number_conditions = self._count_elements( - body_node, - self.condition_statements, - ) - return { - "conditions": number_conditions, - "mccabe_cyclomatic_complexity": number_conditions + 1, - "returns": self._count_elements(body_node, ["return_statement"]), - "max_nested_conditional": self._get_max_nested_conditional(body_node), - } - - def _get_name(self, node): - name_node = node.child_by_field_name("name") - body_node = node.child_by_field_name("body") - actual_name = ( - self.actual_code[name_node.start_byte : name_node.end_byte].decode() - if name_node - else f"Anonymous_{body_node.start_point[0] + 1}_{body_node.end_point[0] - body_node.start_point[0]}" - ) - wrapping_classes = [ - x for x in self._get_parent_chain(node) if x.type in self.wrappers - ] - wrapping_classes.reverse() - if wrapping_classes: - parents_actual_names = "" - - for x in wrapping_classes: - name = x.child_by_field_name("name") - body = x.child_by_field_name("body") - class_name = ( - self.actual_code[name.start_byte : name.end_byte].decode() - if name - else f"Anonymous_{body.start_point[0] + 1}_{body.end_point[0] - body.start_point[0]}" - ) - parents_actual_names = parents_actual_names + class_name + "::" - return f"{parents_actual_names}{actual_name}" - return actual_name - - def _get_parent_chain(self, node): - cur = node.parent - while cur: - yield cur - cur = cur.parent - - def get_import_lines(self, root_node, imports_query): - import_lines = set() - for a, _ in imports_query.captures(root_node): - import_lines.add((a.start_point[0] + 1, a.end_point[0] - a.start_point[0])) - return import_lines - - def get_definition_lines(self, root_node, definitions_query): - definition_lines = set() - for a, _ in definitions_query.captures(root_node): - definition_lines.add( - (a.start_point[0] + 1, a.end_point[0] - a.start_point[0]) - ) - return definition_lines - - def _get_code_hash(self, start_byte, end_byte): - j = hashlib.md5() - j.update(self.actual_code[start_byte:end_byte].strip()) - return j.hexdigest() - - def get_statements(self): - return sorted( - ( - ( - x["current_line"], - { - "line_surety_ancestorship": self.line_surety_ancestorship.get( - x["current_line"], None - ), - **dict( - (k, v) - for (k, v) in x.items() - if k not in ["line_surety_ancestorship", "current_line"] - ), - }, - ) - for x in self.statements - ), - key=lambda x: (x[0], x[1]["start_column"]), - ) diff --git a/codecov_cli/services/staticanalysis/analyzers/javascript_es6/__init__.py b/codecov_cli/services/staticanalysis/analyzers/javascript_es6/__init__.py deleted file mode 100644 index 107a34b26..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/javascript_es6/__init__.py +++ /dev/null @@ -1,122 +0,0 @@ -import hashlib - -from tree_sitter import Language, Parser - -import staticcodecov_languages -from codecov_cli.services.staticanalysis.analyzers.general import BaseAnalyzer -from codecov_cli.services.staticanalysis.analyzers.javascript_es6.node_wrappers import ( - NodeVisitor, -) - -function_query_str = """ -(function_declaration) @elemen -(generator_function_declaration) @elemen2 -(function) @elemen3 -(generator_function) @elemen4 -(arrow_function) @elemen5 -""" - -method_query_str = """ -(method_definition) @elemen -""" - -imports_query_str = """ -(import_statement) @elemen -(import) @elemen -""" - -definitions_query_str = """ -(function_declaration) @elemen -(generator_function_declaration) @elemen2 -(function) @elemen3 -(generator_function) @elemen4 -(arrow_function) @elemen5 -(method_definition) @elemen6 -(class_declaration) @elemen7 -""" - - -class ES6Analyzer(BaseAnalyzer): - condition_statements = [ - "if_statement", - "switch_statement", - "for_statement", - "for_in_statement", - "while_statement", - "do_statement", - ] - - wrappers = [ - "class_declaration", - "function_declaration", - "generator_function_declaration", - "function", - "generator_function", - "arrow_function", - ] - - def __init__(self, path, actual_code, **options): - self.actual_code = actual_code - self.lines = self.actual_code.split(b"\n") - self.executable_lines = set() - self.functions = [] - self.path = path.result_filename - self.JS_LANGUAGE = Language(staticcodecov_languages.__file__, "javascript") - self.parser = Parser() - self.parser.set_language(self.JS_LANGUAGE) - self.import_lines = set() - self.definitions_lines = set() - self.line_surety_ancestorship = {} - self.statements = [] - - def get_code_hash(self, start_byte, end_byte): - j = hashlib.md5() - j.update(self.actual_code[start_byte:end_byte].strip()) - return j.hexdigest() - - def process(self): - tree = self.parser.parse(self.actual_code) - root_node = tree.root_node - function_query = self.JS_LANGUAGE.query(function_query_str) - method_query = self.JS_LANGUAGE.query(method_query_str) - imports_query = self.JS_LANGUAGE.query(imports_query_str) - definitions_query = self.JS_LANGUAGE.query(definitions_query_str) - combined_results = function_query.captures(root_node) + method_query.captures( - root_node - ) - for func_node, _ in combined_results: - body_node = func_node.child_by_field_name("body") - self.functions.append( - { - "identifier": self._get_name(func_node), - "start_line": func_node.start_point[0] + 1, - "end_line": func_node.end_point[0] + 1, - "code_hash": self.get_code_hash( - body_node.start_byte, body_node.end_byte - ), - "complexity_metrics": self._get_complexity_metrics(body_node), - } - ) - self.functions = sorted(self.functions, key=lambda x: x["start_line"]) - - self.import_lines = self.get_import_lines(root_node, imports_query) - self.definition_lines = self.get_definition_lines(root_node, definitions_query) - - visitor = NodeVisitor(self) - visitor.start_visit(tree.root_node) - statements = self.get_statements() - - h = hashlib.md5() - h.update(self.actual_code) - return { - "empty_lines": [i + 1 for (i, n) in enumerate(self.lines) if not n.strip()], - "executable_lines": sorted(self.executable_lines), - "functions": self.functions, - "number_lines": len(self.lines), - "hash": h.hexdigest(), - "filename": str(self.path), - "language": "javascript", - "import_lines": sorted(self.import_lines), - "definition_lines": sorted(self.definition_lines), - "statements": statements, - } diff --git a/codecov_cli/services/staticanalysis/analyzers/javascript_es6/node_wrappers.py b/codecov_cli/services/staticanalysis/analyzers/javascript_es6/node_wrappers.py deleted file mode 100644 index 9a364a475..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/javascript_es6/node_wrappers.py +++ /dev/null @@ -1,71 +0,0 @@ -class NodeVisitor(object): - def __init__(self, analyzer): - self.analyzer = analyzer - - def start_visit(self, node): - self.visit(node) - - def visit(self, node): - self.do_visit(node) - for c in node.children: - self.visit(c) - - def do_visit(self, node): - if node.is_named: - current_line_number = node.start_point[0] + 1 - if node.type in ( - "expression_statement", - "variable_declaration", - "lexical_declaration", - "return_statement", - "if_statement", - "for_statement", - "for_in_statement", - "while_statement", - "do_statement", - "switch_statement", - ): - if node.prev_named_sibling: - self.analyzer.line_surety_ancestorship[current_line_number] = ( - node.prev_named_sibling.start_point[0] + 1 - ) - self.analyzer.statements.append( - { - "current_line": current_line_number, - "start_column": node.start_point[1], - "line_hash": self.analyzer._get_code_hash( - node.start_byte, node.end_byte - ), - "len": node.end_point[0] + 1 - current_line_number, - "extra_connected_lines": tuple(), - } - ) - if node.type in ("if_statement",): - first_if_statement = node.child_by_field_name("consequence") - if first_if_statement.type == "statement_block": - first_if_statement = first_if_statement.children[1] - if first_if_statement.type == "expression_statement": - first_if_statement = first_if_statement.children[0] - self.analyzer.line_surety_ancestorship[ - first_if_statement.start_point[0] + 1 - ] = current_line_number - - if node.type in ("for_statement", "while_statement", "for_in_statement"): - first_statement = node.child_by_field_name("body") - if first_statement.type == "statement_block": - first_statement = first_statement.children[1] - if first_statement.type == "expression_statement": - first_statement = first_statement.children[0] - self.analyzer.line_surety_ancestorship[ - first_statement.start_point[0] + 1 - ] = current_line_number - - if node.type == "do_statement": - do_statement_body = node.child_by_field_name("body") - if do_statement_body.type == "statement_block": - do_statement_body = do_statement_body.children[1] - elif do_statement_body.type == "expression_statement": - do_statement_body = do_statement_body.children[0] - self.analyzer.line_surety_ancestorship[ - do_statement_body.start_point[0] + 1 - ] = current_line_number diff --git a/codecov_cli/services/staticanalysis/analyzers/python/__init__.py b/codecov_cli/services/staticanalysis/analyzers/python/__init__.py deleted file mode 100644 index d5e6db0c3..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/python/__init__.py +++ /dev/null @@ -1,110 +0,0 @@ -import hashlib - -from tree_sitter import Language, Parser - -import staticcodecov_languages -from codecov_cli.services.staticanalysis.analyzers.general import BaseAnalyzer -from codecov_cli.services.staticanalysis.analyzers.python.node_wrappers import ( - NodeVisitor, -) -from codecov_cli.services.staticanalysis.types import FileAnalysisRequest - -_function_query_str = """ -(function_definition - name: (identifier) - parameters: (parameters) -) @elemen -""" - -_unreachable_code_query_str = """ -(function_definition - body: (block (return_statement) @return_stmt . (_)) -) -""" - -_executable_lines_query_str = """ -(block (_) @elem) -(expression_statement) @elemf -""" - -_definitions_query_str = """ -(function_definition) @elemc -(class_definition) @elemd -(decorated_definition) @eleme -""" - -_imports_query_str = """ -(import_statement) @elema -(import_from_statement) @elemb -""" - -_wildcard_import_query_str = """ -(wildcard_import) @elema -""" - - -class PythonAnalyzer(BaseAnalyzer): - condition_statements = [ - "if_statement", - "while_statement", - "for_statement", - "conditional_expression", - ] - wrappers = ["class_definition", "function_definition"] - - def __init__( - self, file_analysis_request: FileAnalysisRequest, actual_code: bytes, **options - ): - self.actual_code = actual_code - self.lines = self.actual_code.split(b"\n") - self.statements = [] - self.import_lines = set() - self.definitions_lines = set() - self.functions = [] - self.path = file_analysis_request.result_filename - self.PY_LANGUAGE = Language(staticcodecov_languages.__file__, "python") - self.parser = Parser() - self.parser.set_language(self.PY_LANGUAGE) - self.line_surety_ancestorship = {} - - def process(self): - function_query = self.PY_LANGUAGE.query(_function_query_str) - definitions_query = self.PY_LANGUAGE.query(_definitions_query_str) - imports_query = self.PY_LANGUAGE.query(_imports_query_str) - tree = self.parser.parse(self.actual_code) - root_node = tree.root_node - captures = function_query.captures(root_node) - for node, _ in captures: - actual_name = self._get_name(node) - body_node = node.child_by_field_name("body") - self.functions.append( - { - "identifier": actual_name, - "start_line": node.start_point[0] + 1, - "end_line": node.end_point[0] + 1, - "code_hash": self._get_code_hash( - body_node.start_byte, body_node.end_byte - ), - "complexity_metrics": self._get_complexity_metrics(body_node), - } - ) - visitor = NodeVisitor(self) - visitor.start_visit(tree.root_node) - self.functions = sorted(self.functions, key=lambda x: x["start_line"]) - - self.import_lines = self.get_import_lines(root_node, imports_query) - self.definitions_lines = self.get_definition_lines(root_node, definitions_query) - - h = hashlib.md5() - h.update(self.actual_code) - statements = self.get_statements() - return { - "language": "python", - "empty_lines": [i + 1 for (i, n) in enumerate(self.lines) if not n.strip()], - "functions": self.functions, - "hash": h.hexdigest(), - "number_lines": len(self.lines), - "statements": statements, - "definition_lines": sorted(self.definitions_lines), - "import_lines": sorted(self.import_lines), - } diff --git a/codecov_cli/services/staticanalysis/analyzers/python/node_wrappers.py b/codecov_cli/services/staticanalysis/analyzers/python/node_wrappers.py deleted file mode 100644 index 681eeb1ad..000000000 --- a/codecov_cli/services/staticanalysis/analyzers/python/node_wrappers.py +++ /dev/null @@ -1,117 +0,0 @@ -from tree_sitter import Node - -from codecov_cli.services.staticanalysis.exceptions import AnalysisError - - -class NodeVisitor(object): - def __init__(self, analyzer): - self.analyzer = analyzer - - def start_visit(self, node): - self.visit(node) - - def visit(self, node: Node): - self.do_visit(node) - for c in node.children: - self.visit(c) - - def _is_function_docstring(self, node: Node): - """Skips docstrings for functions, such as this one. - Pytest doesn't include them in the report, so I don't think we should either, - at least for now. - """ - # Docstrings have type 'expression_statement - if node.type != "expression_statement": - return False - # Docstrings for a module are OK - they show up in pytest result - # Docstrings for a class are OK - they show up in pytest result - # Docstrings for functions are NOT OK - they DONT show up in pytest result - # Check if it's docstring - has_single_child = len(node.children) == 1 - only_child_is_string = node.children[0].type == "string" - # Check if is the first line of a function - parent_is_block = node.parent.type == "block" - first_exp_in_block = node.prev_named_sibling is None - is_in_function_context = ( - parent_is_block and node.parent.parent.type == "function_definition" - ) - - return ( - has_single_child - and only_child_is_string - and parent_is_block - and first_exp_in_block - and is_in_function_context - ) - - def _get_previous_sibling_that_is_not_comment_not_func_docstring(self, node: Node): - curr = node.prev_named_sibling - while curr is not None and ( - curr.type == "comment" or self._is_function_docstring(curr) - ): - curr = curr.prev_named_sibling - return curr - - def do_visit(self, node: Node): - if node.is_named: - current_line_number = node.start_point[0] + 1 - if node.type in ( - "expression_statement", - "return_statement", - "if_statement", - "for_statement", - "while_statement", - ): - if self._is_function_docstring(node): - # We ignore these - return - closest_named_sibling_not_comment_that_is_in_statements = ( - self._get_previous_sibling_that_is_not_comment_not_func_docstring( - node - ) - ) - if closest_named_sibling_not_comment_that_is_in_statements: - self.analyzer.line_surety_ancestorship[current_line_number] = ( - closest_named_sibling_not_comment_that_is_in_statements.start_point[ - 0 - ] - + 1 - ) - self.analyzer.statements.append( - { - "current_line": current_line_number, - "start_column": node.start_point[1], - "line_hash": self.analyzer._get_code_hash( - node.start_byte, node.end_byte - ), - "len": node.end_point[0] + 1 - current_line_number, - "extra_connected_lines": tuple(), - } - ) - if node.type in ("if_statement", "elif_clause"): - # Some of the children of a node have a field_name associated to them - # In the case of an if and elif, "consequence" is the code that is executed in that branch of code - first_if_statement = node.child_by_field_name("consequence") - try: - if first_if_statement.type == "block": - first_if_statement = first_if_statement.children[0] # BUG - except IndexError: - raise AnalysisError( - f"if_statement consequence is empty block @ {self.analyzer.path}:{first_if_statement.start_point[0] + 1}, column {first_if_statement.start_point[1]}" - ) - self.analyzer.line_surety_ancestorship[ - first_if_statement.start_point[0] + 1 - ] = current_line_number - if node.type in ("for_statement", "while_statement"): - first_loop_statement = node.child_by_field_name("body") - try: - if first_loop_statement.type == "block": - first_loop_statement = first_loop_statement.children[0] - except IndexError: - raise AnalysisError( - f"loop_statement body is empty block @ {self.analyzer.path}:{first_loop_statement.start_point[0] + 1}, column {first_loop_statement.start_point[1]}" - ) - self.analyzer.line_surety_ancestorship[ - first_loop_statement.start_point[0] + 1 - ] = current_line_number - pass diff --git a/codecov_cli/services/staticanalysis/exceptions.py b/codecov_cli/services/staticanalysis/exceptions.py deleted file mode 100644 index 0e6e67dc1..000000000 --- a/codecov_cli/services/staticanalysis/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class AnalysisError(Exception): - pass diff --git a/codecov_cli/services/staticanalysis/finders.py b/codecov_cli/services/staticanalysis/finders.py deleted file mode 100644 index e3c11eee1..000000000 --- a/codecov_cli/services/staticanalysis/finders.py +++ /dev/null @@ -1,47 +0,0 @@ -import subprocess -from pathlib import Path - -from codecov_cli.helpers.folder_searcher import globs_to_regex, search_files -from codecov_cli.services.staticanalysis.types import FileAnalysisRequest - - -class FileFinder(object): - def find_files(self, root_folder, pattern, exclude_folders): - regex_patterns_to_include = globs_to_regex( - [ - pattern, - ] - ) - exclude_folders = list(map(str, exclude_folders)) - files_paths = search_files( - folder_to_search=root_folder, - folders_to_ignore=exclude_folders, - filename_include_regex=regex_patterns_to_include, - ) - - return [ - FileAnalysisRequest( - actual_filepath=p, result_filename=str(p.relative_to(root_folder)) - ) - for p in files_paths - ] - - -class GitFileFinder(object): - def find_files(self, folder_name, pattern, exclude_folders): - res = subprocess.run( - ["git", "-C", str(folder_name), "ls-files"], capture_output=True - ) - return [ - FileAnalysisRequest( - actual_filepath=f"{Path(folder_name) / x}", result_filename=x - ) - for x in res.stdout.decode().split() - ] - - def find_configuration_file(self, folder_name): - return None - - -def select_file_finder(config): - return FileFinder() diff --git a/codecov_cli/services/staticanalysis/types.py b/codecov_cli/services/staticanalysis/types.py deleted file mode 100644 index f189dbfe6..000000000 --- a/codecov_cli/services/staticanalysis/types.py +++ /dev/null @@ -1,19 +0,0 @@ -import pathlib -from dataclasses import dataclass -from typing import Optional - - -@dataclass -class FileAnalysisRequest(object): - result_filename: str - actual_filepath: pathlib.Path - - -@dataclass -class FileAnalysisResult(object): - filename: str - result: Optional[dict] = None - error: Optional[dict] = None - - def asdict(self): - return {"result": self.result, "error": self.error} diff --git a/tests/commands/test_invoke_labelanalysis.py b/tests/commands/test_invoke_labelanalysis.py index 729f29650..d5eeaa28f 100644 --- a/tests/commands/test_invoke_labelanalysis.py +++ b/tests/commands/test_invoke_labelanalysis.py @@ -1,9 +1,7 @@ import json from contextlib import redirect_stdout from io import StringIO -from pathlib import Path -import click import pytest import responses from click.testing import CliRunner @@ -14,10 +12,7 @@ _dry_run_list_output, _fallback_to_collected_labels, _parse_runner_params, - _potentially_calculate_absent_labels, - _send_labelanalysis_request, ) -from codecov_cli.commands.labelanalysis import time as labelanalysis_time from codecov_cli.main import cli from codecov_cli.runners.types import LabelAnalysisRequestResult from tests.factory import FakeProvider, FakeRunner, FakeVersioningSystem @@ -41,7 +36,6 @@ def get_labelanalysis_deps(mocker): fake_runner = FakeRunner(collect_tests_response=collected_labels) fake_runner.process_labelanalysis_result = mocker.MagicMock() - mocker.patch.object(labelanalysis_time, "sleep") mocker.patch("codecov_cli.main.get_ci_adapter", return_value=fake_ci_provider) mocker.patch( "codecov_cli.main.get_versioning_system", @@ -61,52 +55,6 @@ def get_labelanalysis_deps(mocker): class TestLabelAnalysisNotInvoke(object): - def test_potentially_calculate_labels_recalculate(self): - request_result = { - "present_report_labels": [ - "label_1", - "label_2", - "label_3", - "label_old", - "label_older", - ], - "absent_labels": [], - "present_diff_labels": ["label_2", "label_3", "label_old"], - "global_level_labels": ["label_1", "label_older"], - } - collected_labels = ["label_1", "label_2", "label_3", "label_4"] - expected = { - "present_diff_labels": ["label_2", "label_3"], - "global_level_labels": ["label_1"], - "absent_labels": ["label_4"], - "present_report_labels": ["label_1", "label_2", "label_3"], - } - assert ( - _potentially_calculate_absent_labels(request_result, collected_labels) - == expected - ) - - def test_send_label_analysis_bad_payload(self): - payload = { - "base_commit": "base_commit", - "head_commit": "head_commit", - "requested_labels": [], - } - url = "https://api.codecov.io/labels/labels-analysis" - header = "Repotoken STATIC_TOKEN" - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"error": "list field cannot be empty list"}, - status=400, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - with pytest.raises(click.ClickException): - _send_labelanalysis_request(payload, url, header) - def test__dry_run_json_output(self): list_to_run = ["label_1", "label_2"] list_to_skip = ["label_3", "label_4"] @@ -223,187 +171,6 @@ def test_invoke_label_analysis_base_sha_same_as_head_sha( assert result.exit_code != 0 assert "Base and head sha can't be the same" in result.output - def test_invoke_label_analysis( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - _ = get_labelanalysis_deps["collected_labels"] - - label_analysis_result = { - "present_report_labels": ["test_present"], - "absent_labels": ["test_absent"], - "present_diff_labels": ["test_in_diff"], - "global_level_labels": ["test_global"], - } - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"state": "finished", "result": label_analysis_result}, - ) - cli_runner = CliRunner() - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - ], - obj={}, - ) - assert result.exit_code == 0 - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_called_with( - label_analysis_result - ) - print(result.output) - - @pytest.mark.parametrize("processing_errors", [[], [{"error": "missing_data"}]]) - def test_invoke_label_analysis_dry_run( - self, processing_errors, get_labelanalysis_deps, mocker - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - - label_analysis_result = { - "present_report_labels": ["test_present", "test_in_diff", "test_global"], - "absent_labels": ["test_absent"], - "present_diff_labels": ["test_in_diff"], - "global_level_labels": ["test_global"], - } - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={ - "state": "finished", - "result": label_analysis_result, - "errors": processing_errors, - }, - ) - cli_runner = CliRunner(mix_stderr=False) - with cli_runner.isolated_filesystem(): - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--dry-run", - ], - obj={}, - ) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_not_called() - # Dry run format defaults to json - print(result.stdout) - ats_fallback_reason = ( - "test_list_processing_errors" if processing_errors else None - ) - assert json.loads(result.stdout) == { - "runner_options": ["--labels"], - "ats_tests_to_run": ["test_absent", "test_global", "test_in_diff"], - "ats_tests_to_skip": ["test_present"], - "ats_fallback_reason": ats_fallback_reason, - } - - def test_invoke_label_analysis_dry_run_pytest_format( - self, get_labelanalysis_deps, mocker - ): - _ = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - - label_analysis_result = { - "present_report_labels": ["test_present", "test_in_diff", "test_global"], - "absent_labels": ["test_absent"], - "present_diff_labels": ["test_in_diff"], - "global_level_labels": ["test_global"], - } - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"state": "finished", "result": label_analysis_result}, - ) - cli_runner = CliRunner(mix_stderr=False) - with cli_runner.isolated_filesystem(): - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--dry-run", - "--dry-run-format=space-separated-list", - ], - obj={}, - ) - fake_runner.process_labelanalysis_result.assert_not_called() - print(result.stdout) - assert result.exit_code == 0 - assert ( - result.stdout - == "TESTS_TO_RUN='--labels' 'test_absent' 'test_global' 'test_in_diff'\nTESTS_TO_SKIP='--labels' 'test_present'\n" - ) - def test_fallback_to_collected_labels(self, mocker): mock_runner = mocker.MagicMock() collected_labels = ["label_1", "label_2", "label_3"] @@ -424,360 +191,3 @@ def test_fallback_to_collected_labels_no_labels(self, mocker): _fallback_to_collected_labels([], mock_runner) mock_runner.process_labelanalysis_result.assert_not_called() assert str(exp.value) == "Failed to get list of labels to run" - - def test_fallback_collected_labels_covecov_500_error( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - status=500, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - cli_runner = CliRunner() - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - ], - obj={}, - ) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_called_with( - { - "present_report_labels": [], - "absent_labels": collected_labels, - "present_diff_labels": [], - "global_level_labels": [], - } - ) - print(result.output) - assert result.exit_code == 0 - - def test_fallback_collected_labels_covecov_500_error_dry_run( - self, get_labelanalysis_deps, mocker - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - status=500, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - cli_runner = CliRunner(mix_stderr=False) - with cli_runner.isolated_filesystem(): - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--dry-run", - ], - obj={}, - ) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_not_called() - # Dry run format defaults to json - assert json.loads(result.stdout) == { - "runner_options": ["--labels"], - "ats_tests_to_run": sorted(collected_labels), - "ats_tests_to_skip": [], - "ats_fallback_reason": "codecov_unavailable", - } - assert result.exit_code == 0 - - def test_fallback_collected_labels_codecov_error_processing_label_analysis( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={ - "state": "error", - "external_id": "uuid4-external-id", - "base_commit": "BASE_COMMIT_SHA", - "head_commit": "HEAD_COMMIT_SHA", - }, - ) - cli_runner = CliRunner() - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - ], - obj={}, - ) - print(result) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_called_with( - { - "present_report_labels": [], - "absent_labels": collected_labels, - "present_diff_labels": [], - "global_level_labels": [], - } - ) - print(result.output) - assert result.exit_code == 0 - - def test_fallback_collected_labels_codecov_error_processing_label_analysis_dry_run( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={ - "state": "error", - "external_id": "uuid4-external-id", - "base_commit": "BASE_COMMIT_SHA", - "head_commit": "HEAD_COMMIT_SHA", - }, - ) - cli_runner = CliRunner(mix_stderr=False) - with cli_runner.isolated_filesystem(): - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--dry-run", - ], - obj={}, - ) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_not_called() - # Dry run format defaults to json - assert json.loads(result.stdout) == { - "runner_options": ["--labels"], - "ats_tests_to_run": sorted(collected_labels), - "ats_tests_to_skip": [], - "ats_fallback_reason": "test_list_processing_failed", - } - assert result.exit_code == 0 - - def test_fallback_collected_labels_codecov_max_wait_time_exceeded( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - mocker.patch.object(labelanalysis_time, "monotonic", side_effect=[0, 6]) - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"state": "processing"}, - ) - cli_runner = CliRunner() - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--max-wait-time=5", - ], - obj={}, - ) - print(result) - assert result.exit_code == 0 - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_called_with( - { - "present_report_labels": [], - "absent_labels": collected_labels, - "present_diff_labels": [], - "global_level_labels": [], - } - ) - - def test_fallback_collected_labels_codecov_max_wait_time_exceeded_dry_run( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - collected_labels = get_labelanalysis_deps["collected_labels"] - mocker.patch.object(labelanalysis_time, "monotonic", side_effect=[0, 6]) - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.PATCH, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"state": "processing"}, - ) - cli_runner = CliRunner(mix_stderr=False) - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - "--max-wait-time=5", - "--dry-run", - ], - obj={}, - ) - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_not_called() - # Dry run format defaults to json - assert json.loads(result.stdout) == { - "runner_options": ["--labels"], - "ats_tests_to_run": sorted(collected_labels), - "ats_tests_to_skip": [], - "ats_fallback_reason": "max_wait_time_exceeded", - } - assert result.exit_code == 0 - - def test_first_labelanalysis_request_fails_but_second_works( - self, get_labelanalysis_deps, mocker, use_verbose_option - ): - mock_get_runner = get_labelanalysis_deps["mock_get_runner"] - fake_runner = get_labelanalysis_deps["fake_runner"] - _ = get_labelanalysis_deps["collected_labels"] - - label_analysis_result = { - "present_report_labels": ["test_present"], - "absent_labels": ["test_absent"], - "present_diff_labels": ["test_in_diff"], - "global_level_labels": ["test_global"], - } - - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - status=502, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/labels/labels-analysis", - json={"external_id": "label-analysis-request-id"}, - status=201, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.GET, - "https://api.codecov.io/labels/labels-analysis/label-analysis-request-id", - json={"state": "finished", "result": label_analysis_result}, - ) - cli_runner = CliRunner() - result = cli_runner.invoke( - cli, - [ - "label-analysis", - "--token=STATIC_TOKEN", - f"--base-sha={FAKE_BASE_SHA}", - ], - obj={}, - ) - assert result.exit_code == 0 - mock_get_runner.assert_called() - fake_runner.process_labelanalysis_result.assert_called_with( - label_analysis_result - ) - print(result.output) diff --git a/tests/services/static_analysis/languages/python/test_node_wrappers_malformed_code.py b/tests/services/static_analysis/languages/python/test_node_wrappers_malformed_code.py deleted file mode 100644 index aa7f7fd18..000000000 --- a/tests/services/static_analysis/languages/python/test_node_wrappers_malformed_code.py +++ /dev/null @@ -1,65 +0,0 @@ -import pathlib - -import pytest -from tree_sitter import Node - -from codecov_cli.services.staticanalysis.analyzers.python import PythonAnalyzer -from codecov_cli.services.staticanalysis.analyzers.python.node_wrappers import ( - NodeVisitor, -) -from codecov_cli.services.staticanalysis.exceptions import AnalysisError -from codecov_cli.services.staticanalysis.types import FileAnalysisRequest - - -class TestMalformedIfStatements(object): - def test_if_empty_block_raises_analysis_error(self): - analysis_request = FileAnalysisRequest( - actual_filepath=pathlib.Path("test_file"), result_filename="test_file" - ) - # Code for an empty IF. NOT valid python code - actual_code = b'x = 10\nif x == "batata":\n\n' - python_analyser = PythonAnalyzer(analysis_request, actual_code=actual_code) - # Parse the code snippet and get the if_statement node - tree = python_analyser.parser.parse(actual_code) - root = tree.root_node - assert root.type == "module" - assert root.child_count == 2 - if_statement_node = root.children[1] - assert if_statement_node.type == "if_statement" - # Make sure it is indeed an empty if_statement - if_body = if_statement_node.child_by_field_name("consequence") - assert if_body.type == "block" - assert if_body.child_count == 0 - visitor = NodeVisitor(python_analyser) - with pytest.raises(AnalysisError) as exp: - visitor.do_visit(if_statement_node) - assert ( - str(exp.value) - == "if_statement consequence is empty block @ test_file:2, column 17" - ) - - def test_for_empty_block_raises_analysis_error(self): - analysis_request = FileAnalysisRequest( - actual_filepath=pathlib.Path("test_file"), result_filename="test_file" - ) - # Code for an empty IF. NOT valid python code - actual_code = b"for x in range(10):\n\n" - python_analyser = PythonAnalyzer(analysis_request, actual_code=actual_code) - # Parse the code snippet and get the if_statement node - tree = python_analyser.parser.parse(actual_code) - root = tree.root_node - assert root.type == "module" - assert root.child_count == 1 - for_statement_node = root.children[0] - assert for_statement_node.type == "for_statement" - # Make sure it is indeed an empty if_statement - if_body = for_statement_node.child_by_field_name("body") - assert if_body.type == "block" - assert if_body.child_count == 0 - visitor = NodeVisitor(python_analyser) - with pytest.raises(AnalysisError) as exp: - visitor.do_visit(for_statement_node) - assert ( - str(exp.value) - == "loop_statement body is empty block @ test_file:1, column 19" - ) diff --git a/tests/services/static_analysis/test_analyse_file.py b/tests/services/static_analysis/test_analyse_file.py deleted file mode 100644 index 9d4ad7494..000000000 --- a/tests/services/static_analysis/test_analyse_file.py +++ /dev/null @@ -1,62 +0,0 @@ -import json -import sys -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -from codecov_cli.services.staticanalysis import analyze_file -from codecov_cli.services.staticanalysis.types import FileAnalysisRequest - -here = Path(__file__) -here_parent = here.parent - - -@pytest.mark.parametrize( - "input_filename,output_filename", - [ - ("samples/inputs/sample_001.py", "samples/outputs/sample_001.json"), - ("samples/inputs/sample_002.py", "samples/outputs/sample_002.json"), - ("samples/inputs/sample_003.js", "samples/outputs/sample_003.json"), - ("samples/inputs/sample_004.js", "samples/outputs/sample_004.json"), - ("samples/inputs/sample_005.py", "samples/outputs/sample_005.json"), - ], -) -@pytest.mark.skipif( - sys.platform == "win32", reason="windows is producing different `code_hash` values" -) -def test_sample_analysis(input_filename, output_filename): - config = {} - res = analyze_file( - config, FileAnalysisRequest(input_filename, Path(input_filename)) - ) - with open(output_filename, "r") as file: - expected_result = json.load(file) - json_res = json.dumps(res.asdict()) - res_dict = json.loads(json_res) - assert sorted(res_dict["result"].keys()) == sorted(expected_result["result"].keys()) - res_dict["result"]["functions"] = sorted( - res_dict["result"]["functions"], key=lambda x: x["start_line"] - ) - expected_result["result"]["functions"] = sorted( - expected_result["result"]["functions"], key=lambda x: x["start_line"] - ) - assert res_dict["result"]["functions"] == expected_result["result"]["functions"] - assert res_dict["result"].get("statements") == expected_result["result"].get( - "statements" - ) - assert res_dict["result"] == expected_result["result"] - assert res_dict == expected_result - - -@patch("builtins.open") -@patch("codecov_cli.services.staticanalysis.get_best_analyzer", return_value=None) -def test_analyse_file_no_analyzer(mock_get_analyzer, mock_open): - fake_contents = MagicMock(name="fake_file_contents") - file_name = MagicMock(actual_filepath="filepath") - mock_open.return_value.__enter__.return_value.read.return_value = fake_contents - config = {} - res = analyze_file(config, file_name) - assert res is None - mock_open.assert_called_with("filepath", "rb") - mock_get_analyzer.assert_called_with(file_name, fake_contents) diff --git a/tests/services/static_analysis/test_static_analysis_service.py b/tests/services/static_analysis/test_static_analysis_service.py deleted file mode 100644 index 461ed6844..000000000 --- a/tests/services/static_analysis/test_static_analysis_service.py +++ /dev/null @@ -1,617 +0,0 @@ -from asyncio import CancelledError -from pathlib import Path -from unittest.mock import MagicMock - -import click -import httpx -import pytest -import requests -import responses -from responses import matchers - -from codecov_cli.services.staticanalysis import ( - process_files, - run_analysis_entrypoint, - send_single_upload_put, -) -from codecov_cli.services.staticanalysis.types import ( - FileAnalysisRequest, - FileAnalysisResult, -) - - -class TestStaticAnalysisService: - @pytest.mark.asyncio - async def test_process_files_with_error(self, mocker): - files_found = list( - map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "correct_file.py", - "error_file.py", - ], - ) - ) - mock_pool = mocker.patch("codecov_cli.services.staticanalysis.Pool") - - def side_effect(config, filename: FileAnalysisRequest): - if filename.result_filename == "correct_file.py": - return FileAnalysisResult( - filename=filename.result_filename, result={"hash": "abc123"} - ) - if filename.result_filename == "error_file.py": - return FileAnalysisResult( - filename=filename.result_filename, error="some error @ line 12" - ) - # Should not get here, so fail test - assert False - - mock_analyze_function = mocker.patch( - "codecov_cli.services.staticanalysis.analyze_file" - ) - mock_analyze_function.side_effect = side_effect - - def imap_side_effect(mapped_func, files): - results = [] - for file in files: - results.append(mapped_func(file)) - return results - - mock_pool.return_value.__enter__.return_value.imap_unordered.side_effect = ( - imap_side_effect - ) - - results = await process_files(files_found, 1, {}) - mock_pool.return_value.__enter__.return_value.imap_unordered.assert_called() - assert mock_analyze_function.call_count == 2 - assert results == dict( - all_data={"correct_file.py": {"hash": "abc123"}}, - file_metadata=[{"file_hash": "abc123", "filepath": "correct_file.py"}], - processing_errors={"error_file.py": "some error @ line 12"}, - ) - - @pytest.mark.asyncio - async def test_static_analysis_service_success(self, mocker): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - # Doing it this way to support Python 3.7 - async def side_effect(*args, **kwargs): - return MagicMock() - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - }, - { - "state": "valid", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses/externalid/finish", - status=204, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=False, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 1 - args, _ = mock_send_upload_put.call_args - assert args[2] == { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - } - - @pytest.mark.asyncio - async def test_static_analysis_service_CancelledError(self, mocker): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - async def side_effect(client, all_data, el): - if el["filepath"] == "samples/inputs/sample_001.py": - return { - "status_code": 204, - "filepath": el["filepath"], - "succeeded": True, - } - raise CancelledError("Pretending something cancelled this task") - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url-001", - }, - { - "state": "created", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url-002", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - - with pytest.raises(click.ClickException) as exp: - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=False, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - assert "Unknown error cancelled the upload tasks." in str(exp.value) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 2 - - @pytest.mark.asyncio - async def test_send_single_upload_put_success(self, mocker): - mock_client = MagicMock() - - async def side_effect(presigned_put, data): - if presigned_put == "http://storage-url-001": - return httpx.Response(status_code=204) - - mock_client.put.side_effect = side_effect - - all_data = { - "file-001": {"some": "data", "id": "1"}, - "file-002": {"some": "data", "id": "2"}, - "file-003": {"some": "data", "id": "3"}, - } - - success_response = await send_single_upload_put( - mock_client, - all_data=all_data, - el={ - "filepath": "file-001", - "raw_upload_location": "http://storage-url-001", - }, - ) - assert success_response == { - "status_code": 204, - "filepath": "file-001", - "succeeded": True, - } - - @pytest.mark.asyncio - async def test_send_single_upload_put_fail_401(self, mocker): - mock_client = MagicMock() - - async def side_effect(presigned_put, data): - if presigned_put == "http://storage-url-002": - return httpx.Response(status_code=401) - - mock_client.put.side_effect = side_effect - - all_data = { - "file-001": {"some": "data", "id": "1"}, - "file-002": {"some": "data", "id": "2"}, - "file-003": {"some": "data", "id": "3"}, - } - - fail_401_response = await send_single_upload_put( - mock_client, - all_data=all_data, - el={ - "filepath": "file-002", - "raw_upload_location": "http://storage-url-002", - }, - ) - assert fail_401_response == { - "status_code": 401, - "exception": None, - "filepath": "file-002", - "succeeded": False, - } - - @pytest.mark.asyncio - async def test_send_single_upload_put_fail_exception(self, mocker): - mock_client = MagicMock() - - async def side_effect(presigned_put, data): - if presigned_put == "http://storage-url-003": - raise httpx.HTTPError("Some error occurred in the request") - - mock_client.put.side_effect = side_effect - - all_data = { - "file-001": {"some": "data", "id": "1"}, - "file-002": {"some": "data", "id": "2"}, - "file-003": {"some": "data", "id": "3"}, - } - - fail_401_response = await send_single_upload_put( - mock_client, - all_data=all_data, - el={ - "filepath": "file-003", - "raw_upload_location": "http://storage-url-003", - }, - ) - assert fail_401_response == { - "status_code": None, - "exception": httpx.HTTPError, - "filepath": "file-003", - "succeeded": False, - } - - @pytest.mark.asyncio - @pytest.mark.parametrize( - "finish_endpoint_response,expected", - [ - (500, "Codecov is having problems"), - (400, "some problem with the submitted information"), - ], - ) - async def test_static_analysis_service_finish_fails_status_code( - self, mocker, finish_endpoint_response, expected - ): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - # Doing it this way to support Python 3.7 - async def side_effect(*args, **kwargs): - return MagicMock() - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - }, - { - "state": "valid", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses/externalid/finish", - status=finish_endpoint_response, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - with pytest.raises(click.ClickException, match=expected): - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=False, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 1 - args, _ = mock_send_upload_put.call_args - assert args[2] == { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - } - - @pytest.mark.asyncio - async def test_static_analysis_service_finish_fails_request_exception(self, mocker): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - # Doing it this way to support Python 3.7 - async def side_effect(*args, **kwargs): - return MagicMock() - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - }, - { - "state": "valid", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses/externalid/finish", - body=requests.RequestException(), - ) - with pytest.raises(click.ClickException, match="Unable to reach Codecov"): - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=False, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 1 - args, _ = mock_send_upload_put.call_args - assert args[2] == { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - } - - @pytest.mark.asyncio - async def test_static_analysis_service_should_force_option(self, mocker): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - # Doing it this way to support Python 3.7 - async def side_effect(*args, **kwargs): - return MagicMock() - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "created", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - }, - { - "state": "valid", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses/externalid/finish", - status=204, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=True, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 2 - - @pytest.mark.asyncio - async def test_static_analysis_service_no_upload(self, mocker): - mock_file_finder = mocker.patch( - "codecov_cli.services.staticanalysis.select_file_finder" - ) - mock_send_upload_put = mocker.patch( - "codecov_cli.services.staticanalysis.send_single_upload_put" - ) - - # Doing it this way to support Python 3.7 - async def side_effect(*args, **kwargs): - return MagicMock() - - mock_send_upload_put.side_effect = side_effect - - files_found = map( - lambda filename: FileAnalysisRequest(str(filename), Path(filename)), - [ - "samples/inputs/sample_001.py", - "samples/inputs/sample_002.py", - ], - ) - mock_file_finder.return_value.find_files = MagicMock(return_value=files_found) - with responses.RequestsMock() as rsps: - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses", - json={ - "external_id": "externalid", - "filepaths": [ - { - "state": "valid", - "filepath": "samples/inputs/sample_001.py", - "raw_upload_location": "http://storage-url", - }, - { - "state": "valid", - "filepath": "samples/inputs/sample_002.py", - "raw_upload_location": "http://storage-url", - }, - ], - }, - status=200, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - rsps.add( - responses.POST, - "https://api.codecov.io/staticanalysis/analyses/externalid/finish", - status=204, - match=[ - matchers.header_matcher({"Authorization": "Repotoken STATIC_TOKEN"}) - ], - ) - - await run_analysis_entrypoint( - config={}, - folder=".", - numberprocesses=1, - pattern="*.py", - token="STATIC_TOKEN", - commit="COMMIT", - should_force=False, - folders_to_exclude=[], - enterprise_url=None, - args=None, - ) - mock_file_finder.assert_called_with({}) - mock_file_finder.return_value.find_files.assert_called() - assert mock_send_upload_put.call_count == 0