| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- import argparse
- import os
- import re
- import tarfile
- import tempfile
- import time
- import zipfile
- from functools import wraps
- import gitlab
- try:
- from typing import Any, Callable, Dict, List, Optional
- TR = Callable[..., Any]
- except ImportError:
- pass
- def retry(func): # type: (TR) -> TR
- """
- This wrapper will only catch several exception types associated with
- "network issues" and retry the whole function.
- """
- @wraps(func)
- def wrapper(self, *args, **kwargs): # type: (Gitlab, Any, Any) -> Any
- retried = 0
- while True:
- try:
- res = func(self, *args, **kwargs)
- except (IOError, EOFError, gitlab.exceptions.GitlabError) as e:
- if isinstance(e, gitlab.exceptions.GitlabError) and e.response_code != 500:
- # Only retry on error 500
- raise e
- retried += 1
- if retried > self.DOWNLOAD_ERROR_MAX_RETRIES:
- raise e # get out of the loop
- else:
- print('Network failure in {}, retrying ({})'.format(getattr(func, '__name__', '(unknown callable)'), retried))
- time.sleep(2 ** retried) # wait a bit more after each retry
- continue
- else:
- break
- return res
- return wrapper
- class Gitlab(object):
- JOB_NAME_PATTERN = re.compile(r'(\w+)(\s+(\d+)/(\d+))?')
- DOWNLOAD_ERROR_MAX_RETRIES = 3
- def __init__(self, project_id=None): # type: (Optional[int]) -> None
- config_data_from_env = os.getenv('PYTHON_GITLAB_CONFIG')
- if config_data_from_env:
- # prefer to load config from env variable
- with tempfile.NamedTemporaryFile('w', delete=False) as temp_file:
- temp_file.write(config_data_from_env)
- config_files = [temp_file.name] # type: Optional[List[str]]
- else:
- # otherwise try to use config file at local filesystem
- config_files = None
- self._init_gitlab_inst(project_id, config_files)
- @retry
- def _init_gitlab_inst(self, project_id, config_files): # type: (Optional[int], Optional[List[str]]) -> None
- gitlab_id = os.getenv('LOCAL_GITLAB_HTTPS_HOST') # if None, will use the default gitlab server
- self.gitlab_inst = gitlab.Gitlab.from_config(gitlab_id=gitlab_id, config_files=config_files)
- self.gitlab_inst.auth()
- if project_id:
- self.project = self.gitlab_inst.projects.get(project_id)
- else:
- self.project = None
- @retry
- def get_project_id(self, name, namespace=None): # type: (str, Optional[str]) -> int
- """
- search project ID by name
- :param name: project name
- :param namespace: namespace to match when we have multiple project with same name
- :return: project ID
- """
- projects = self.gitlab_inst.projects.list(search=name)
- res = []
- for project in projects:
- if namespace is None:
- if len(projects) == 1:
- res.append(project.id)
- break
- if project.namespace['path'] == namespace:
- if project.name == name:
- res.insert(0, project.id)
- else:
- res.append(project.id)
- if not res:
- raise ValueError("Can't find project")
- return int(res[0])
- @retry
- def download_artifacts(self, job_id, destination): # type (int, str) -> None
- """
- download full job artifacts and extract to destination.
- :param job_id: Gitlab CI job ID
- :param destination: extract artifacts to path.
- """
- job = self.project.jobs.get(job_id)
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- job.artifacts(streamed=True, action=temp_file.write)
- with zipfile.ZipFile(temp_file.name, 'r') as archive_file:
- archive_file.extractall(destination)
- @retry
- def download_artifact(self, job_id, artifact_path, destination=None): # type: (int, str, Optional[str]) -> List[bytes]
- """
- download specific path of job artifacts and extract to destination.
- :param job_id: Gitlab CI job ID
- :param artifact_path: list of path in artifacts (relative path to artifact root path)
- :param destination: destination of artifact. Do not save to file if destination is None
- :return: A list of artifact file raw data.
- """
- job = self.project.jobs.get(job_id)
- raw_data_list = []
- for a_path in artifact_path:
- try:
- data = job.artifact(a_path) # type: bytes
- except gitlab.GitlabGetError as e:
- print("Failed to download '{}' from job {}".format(a_path, job_id))
- raise e
- raw_data_list.append(data)
- if destination:
- file_path = os.path.join(destination, a_path)
- try:
- os.makedirs(os.path.dirname(file_path))
- except OSError:
- # already exists
- pass
- with open(file_path, 'wb') as f:
- f.write(data)
- return raw_data_list
- @retry
- def find_job_id(self, job_name, pipeline_id=None, job_status='success'): # type: (str, Optional[str], str) -> List[Dict]
- """
- Get Job ID from job name of specific pipeline
- :param job_name: job name
- :param pipeline_id: If None, will get pipeline id from CI pre-defined variable.
- :param job_status: status of job. One pipeline could have multiple jobs with same name after retry.
- job_status is used to filter these jobs.
- :return: a list of job IDs (parallel job will generate multiple jobs)
- """
- job_id_list = []
- if pipeline_id is None:
- pipeline_id = os.getenv('CI_PIPELINE_ID')
- pipeline = self.project.pipelines.get(pipeline_id)
- jobs = pipeline.jobs.list(all=True)
- for job in jobs:
- match = self.JOB_NAME_PATTERN.match(job.name)
- if match:
- if match.group(1) == job_name and job.status == job_status:
- job_id_list.append({'id': job.id, 'parallel_num': match.group(3)})
- return job_id_list
- @retry
- def download_archive(self, ref, destination, project_id=None): # type: (str, str, Optional[int]) -> str
- """
- Download archive of certain commit of a repository and extract to destination path
- :param ref: commit or branch name
- :param destination: destination path of extracted archive file
- :param project_id: download project of current instance if project_id is None
- :return: root path name of archive file
- """
- if project_id is None:
- project = self.project
- else:
- project = self.gitlab_inst.projects.get(project_id)
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- try:
- project.repository_archive(sha=ref, streamed=True, action=temp_file.write)
- except gitlab.GitlabGetError as e:
- print('Failed to archive from project {}'.format(project_id))
- raise e
- print('archive size: {:.03f}MB'.format(float(os.path.getsize(temp_file.name)) / (1024 * 1024)))
- with tarfile.open(temp_file.name, 'r') as archive_file:
- root_name = archive_file.getnames()[0]
- archive_file.extractall(destination)
- return os.path.join(os.path.realpath(destination), root_name)
- def main(): # type: () -> None
- parser = argparse.ArgumentParser()
- parser.add_argument('action')
- parser.add_argument('project_id', type=int)
- parser.add_argument('--pipeline_id', '-i', type=int, default=None)
- parser.add_argument('--ref', '-r', default='master')
- parser.add_argument('--job_id', '-j', type=int, default=None)
- parser.add_argument('--job_name', '-n', default=None)
- parser.add_argument('--project_name', '-m', default=None)
- parser.add_argument('--destination', '-d', default=None)
- parser.add_argument('--artifact_path', '-a', nargs='*', default=None)
- args = parser.parse_args()
- gitlab_inst = Gitlab(args.project_id)
- if args.action == 'download_artifacts':
- gitlab_inst.download_artifacts(args.job_id, args.destination)
- if args.action == 'download_artifact':
- gitlab_inst.download_artifact(args.job_id, args.artifact_path, args.destination)
- elif args.action == 'find_job_id':
- job_ids = gitlab_inst.find_job_id(args.job_name, args.pipeline_id)
- print(';'.join([','.join([str(j['id']), j['parallel_num']]) for j in job_ids]))
- elif args.action == 'download_archive':
- gitlab_inst.download_archive(args.ref, args.destination)
- elif args.action == 'get_project_id':
- ret = gitlab_inst.get_project_id(args.project_name)
- print('project id: {}'.format(ret))
- if __name__ == '__main__':
- main()
|