You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
188 lines
5.5 KiB
188 lines
5.5 KiB
import git |
|
import os |
|
import shutil |
|
import tempfile |
|
import time |
|
|
|
from util import load_env |
|
|
|
|
|
class GitManagerConfiguration: |
|
@staticmethod |
|
def from_environment(): |
|
origin = load_env("GIT_ORIGIN", None) |
|
wc_path = load_env("GIT_WC_PATH", None) |
|
git_pw = load_env("GIT_PASSWORD", None) |
|
pull_intv = load_env("GIT_PULL_INTV", None) |
|
|
|
return GitManagerConfiguration(origin=origin, |
|
git_pw=git_pw, |
|
wc_path=wc_path, |
|
pull_intv=pull_intv) |
|
|
|
def __init__(self, origin, git_pw=None, wc_path=None, pull_intv=None): |
|
if not origin: |
|
raise ValueError("Git origin cannot be empty!") |
|
|
|
self._origin = origin |
|
self._git_pw = git_pw |
|
self._wc_path = wc_path |
|
self._pull_intv = 30 if pull_intv is None else int(pull_intv) |
|
|
|
@property |
|
def origin(self): |
|
return self._origin |
|
|
|
@property |
|
def git_pw(self): |
|
return self._git_pw |
|
|
|
@property |
|
def wc_path(self): |
|
return self._wc_path |
|
|
|
@property |
|
def pull_intv(self): |
|
return self._pull_intv |
|
|
|
|
|
class GitManager: |
|
def __init__(self, configuration): |
|
if configuration is None: |
|
raise ValueError("GitManager must be initialized with a configuration!") |
|
|
|
self._configuration = configuration |
|
self._wc = None |
|
self._last_pull = 0 |
|
|
|
@property |
|
def configuration(self): |
|
return self._configuration |
|
|
|
def _setup_wc(self): |
|
if self._wc is not None: |
|
return |
|
|
|
_wc = self.configuration.wc_path |
|
|
|
if _wc is None: |
|
_wc = tempfile.mkdtemp(prefix='entities_git_') |
|
|
|
if not os.path.isdir(_wc): |
|
raise ValueError("Configured directory for the working copy does not exist!") |
|
|
|
self._wc = _wc |
|
|
|
def _teardown_wc(self): |
|
if self._wc is None: |
|
return |
|
|
|
if self.configuration.wc_path is not None: |
|
print("NOTE: Not tearing down externally configured working copy.") |
|
return |
|
|
|
shutil.rmtree(self._wc) |
|
|
|
self._wc = None |
|
|
|
def _assert_wc(self): |
|
"""Assert working copy matches origin and is a valid repository. |
|
|
|
A failed assertion will throw exceptions and lead to service abort, |
|
as this error is not recoverable. |
|
|
|
Returns False if the WC path is an empty directory""" |
|
|
|
# Check if WC is empty |
|
if not os.listdir(self._wc): |
|
return False |
|
|
|
# Create a repository object |
|
# This fails if there is no valid repository |
|
repo = git.Repo(self._wc) |
|
|
|
# Assert that this is not a bare repo |
|
if repo.bare: |
|
raise ValueError("WC path points to a bare git repository!") |
|
|
|
origin = repo.remote('origin') |
|
if self.configuration.origin not in origin.urls: |
|
raise ValueError("Origin URL does not match!") |
|
|
|
# We're good here. |
|
return True |
|
|
|
def _askpass_script(self): |
|
# Passwords are impossible to store in scripts, as they may contain any character ... |
|
# We convert the password into a list of integers and create a little script |
|
# that reconstructs the password and writes it to the console. |
|
# Python will be installed anyways. |
|
|
|
pw_chars = [ord(c) for c in self.configuration.git_pw] |
|
|
|
script = "#!/usr/bin/env python3\n" |
|
script += "l = %s\n" % str(list(pw_chars)) |
|
script += "p = [chr(c) for c in l]\n" |
|
script += f"print(\"\".join(p))\n" |
|
return script |
|
|
|
def _init_repo(self): |
|
# Assert working copy is valid, |
|
# return false if cloning is necessary |
|
if not self._assert_wc(): |
|
print("Cloning new git working copy ...") |
|
|
|
# Create a temporary script file for GIT_ASKPASS |
|
with tempfile.NamedTemporaryFile(mode='w+t') as askpass: |
|
askpass.write(self._askpass_script()) |
|
askpass.file.close() |
|
os.chmod(path=askpass.name, mode=0o700) |
|
self.repo = git.Repo.clone_from(url=self.configuration.origin, |
|
to_path=self._wc, |
|
env={'GIT_ASKPASS': askpass.name}) |
|
else: |
|
print("Reusing existing git working copy ...") |
|
self.repo = git.Repo(self._wc) |
|
|
|
def setup(self): |
|
self._setup_wc() |
|
self._init_repo() |
|
self.pull(force=True) |
|
|
|
def teardown(self): |
|
self._teardown_wc() |
|
|
|
def printout(self): |
|
print("Git Manager:") |
|
print(f"\tGit origin is %s" % self.configuration.origin) |
|
print(f"\tUsing working copy path %s" % self._wc) |
|
if not self._wc == self.configuration.wc_path: |
|
print("\tUsing a temporary working copy.") |
|
|
|
@property |
|
def head_sha(self): |
|
return None if self.repo is None else self.repo.head.object.hexsha |
|
|
|
def pull(self, force=False): |
|
"""Pull from origin. |
|
|
|
Arguments: |
|
`force` -- Do a pull even though the pull interval has not elapsed |
|
|
|
Returns: True if pull was executed |
|
""" |
|
|
|
if not force and (time.time() - self._last_pull < self.configuration.pull_intv): |
|
return False |
|
|
|
self._last_pull = time.time() |
|
|
|
old_head = self.head_sha |
|
|
|
# get the origin |
|
# (We verified during initialization that this origin exists.) |
|
origin = self.repo.remote('origin') |
|
|
|
origin.pull(rebase=True) |
|
|
|
return self.head_sha != old_head
|
|
|