davydovcloud-setup/services/tools/cli/docker_service.py

104 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python3
from argparse import ArgumentParser
from args_utils import file_arg_type, find_git_base_dir
from dataclass.service import Service, Host
from base import BaseCli
import yaml
import subprocess
import os
from typing import List
from constants import SERVICES_DIR_RELATIVE_GIT_ROOT
CLI_DESCRIPTION = '''
This CLI is meant to execute Docker commands for specified services
defined in a services file. The services file can be supplied as an
argument, with a default value of ../services.yaml.
'''
class DockerServiceCLI(BaseCli):
def configure_args(self, parser: ArgumentParser):
parser.add_argument(
'-f',
'--services-file',
help='Path to the services file',
default='services.yaml',
type=file_arg_type
)
parser.add_argument(
'-n',
'--service-name',
help='Name of the service to run (default: all)',
required=True,
type=str
)
def process_args(self, parser: ArgumentParser):
args = parser.parse_args()
return args
def __init__(self):
super().__init__()
parser = ArgumentParser(description=CLI_DESCRIPTION)
self.configure_args(parser)
self.args = self.process_args(parser)
self.services_basedir = find_git_base_dir() + f'/{SERVICES_DIR_RELATIVE_GIT_ROOT}'
def load_services(self) -> List[Service]:
with open(self.args.services_file, 'r') as file:
services_data = yaml.safe_load(file)
services = []
for service_data in services_data['services']:
host_data = service_data['host']
host = Host(ip=host_data['ip'], user=host_data['user'])
project_path = f"{self.services_basedir}/{service_data['name']}"
service = Service(
name=service_data['name'],
host=host,
compose_file=f"{project_path}/{service_data['composeFile']}",
env_file=f"{project_path}/{service_data['envFile']}",
ports=service_data.get('ports', [])
)
services.append(service)
return services
def run_docker_command(self, service: Service):
self.info(f"Processing service:\n{service}")
for i in range(0, len(service.ports)):
self.info(f'Set env SVC_PORT_{i+1}={str(service.ports[i])}')
os.environ[f'SVC_PORT_{i+1}'] = str(service.ports[i])
command = [
'docker', '-H', f"{service.host.user}@{service.host.ip}",
'compose', '-f', f"{service.compose_file}",
'--env-file', f"{service.env_file}",
'up', '-d'
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
self.error(f"Error executing command: {' '.join(command)}, return code: {e.returncode}")
def start(self):
services = self.load_services()
if self.args.service_name == 'all':
for service in services:
self.run_docker_command(service)
else:
service = next((s for s in services if s.name == self.args.service_name), None)
if service:
self.run_docker_command(service)
else:
self.error(f'Error: Service "{self.args.service_name}" not found in the services file.')
def main():
cli = DockerServiceCLI()
cli.start()
if __name__ == "__main__":
main()