davydovcloud-setup/services/tools/cli/portainer_host.py
2025-03-02 23:51:53 +00:00

77 lines
2.4 KiB
Python

import json
from base import BaseCLi
from constants import (
DEFAULT_PORTAINER_JWT_CONFIG_FILE,
FUNCTIONAL_USER_USERNAME_ENV,
FUNCTIONAL_USER_PASSWORD_ENV,
)
import os
import requests
class MissingEnvironmentVariable(Exception):
pass
class EmptyConfigException(Exception):
pass
class PortainerHost(BaseCLi):
def __init__(self, host_url):
super().__init__()
self.host_url = host_url
self.api_url = f'{host_url}/api'
self.jwt_config = []
home_directory = os.path.expanduser("~")
config_file_path = os.path.join(home_directory, DEFAULT_PORTAINER_JWT_CONFIG_FILE)
try:
self.read_current_config(config_file_path)
except EmptyConfigException:
self.create_config(config_file_path)
def read_current_config(self, file_path):
try:
with open(file_path, 'r') as file:
self.jwt_config = json.load(file)
except FileNotFoundError:
print(f"Configuration does not exist. Creating...")
raise EmptyConfigException()
hostnames = [config['hostname'] for config in self.jwt_config]
if self.host_url not in hostnames:
raise EmptyConfigException()
def create_config(self, file_path):
config = {
"hostname": self.host_url,
"jwt": self.get_jwt_token(),
}
self.jwt_config.append(config)
with open(file_path, 'w') as file:
json.dump(self.jwt_config, file, indent=4)
def get_jwt_token(self):
def get_env(env_key):
try:
return os.environ[env_key]
except KeyError:
raise MissingEnvironmentVariable(f"{env_key} env does not exist")
payload = {
"Username": get_env(FUNCTIONAL_USER_USERNAME_ENV),
"Password": get_env(FUNCTIONAL_USER_PASSWORD_ENV)
}
auth_endpoint = f'{self.api_url}/auth'
try:
response = requests.post(
auth_endpoint,
json=payload
)
if response.status_code != 200:
self.error(f'Return code {response.status_code} from {auth_endpoint} with message {response.text}')
except requests.exceptions.RequestException as e:
self.error("An error occurred:", e)
data = response.json()
return data['jwt']