davydovcloud-setup/services/tools/cli/portainer_host.py

77 lines
2.4 KiB
Python

import json
from base import BaseCLi
from constants import (
DEFAULT_PORTAINER_JWT_CONFIG_FILE,
FUNCTIONAL_USER_USERNAME_ENV,
FUNCTIONAL_USER_PASSWORD_ENV,
)
import os
import requests
class MissingEnvironmentVariable(Exception):
pass
class EmptyConfigException(Exception):
pass
class PortainerHost(BaseCLi):
def __init__(self, host_url):
super().__init__()
self.host_url = host_url
self.api_url = f'{host_url}/api'
self.jwt_config = []
home_directory = os.path.expanduser("~")
config_file_path = os.path.join(home_directory, DEFAULT_PORTAINER_JWT_CONFIG_FILE)
try:
self.__read_current_config(config_file_path)
except EmptyConfigException:
self.__create_config(config_file_path)
def __read_current_config(self, file_path):
try:
with open(file_path, 'r') as file:
self.jwt_config = json.load(file)
except FileNotFoundError:
print(f"Configuration does not exist. Creating...")
raise EmptyConfigException()
hostnames = [config['hostname'] for config in self.jwt_config]
if self.host_url not in hostnames:
raise EmptyConfigException()
def __create_config(self, file_path):
config = {
"hostname": self.host_url,
"jwt": self.__get_jwt_token(),
}
self.jwt_config.append(config)
with open(file_path, 'w') as file:
json.dump(self.jwt_config, file, indent=4)
def __get_jwt_token(self):
def get_env(env_key):
try:
return os.environ[env_key]
except KeyError:
raise MissingEnvironmentVariable(f"{env_key} env does not exist")
payload = {
"Username": get_env(FUNCTIONAL_USER_USERNAME_ENV),
"Password": get_env(FUNCTIONAL_USER_PASSWORD_ENV)
}
auth_endpoint = f'{self.api_url}/auth'
try:
response = requests.post(
auth_endpoint,
json=payload
)
if response.status_code != 200:
self.error(f'Return code {response.status_code} from {auth_endpoint} with message {response.text}')
except requests.exceptions.RequestException as e:
self.error("An error occurred:", e)
data = response.json()
return data['jwt']