davydovcloud-setup/services/tools/cli/portainer_host.py
2025-03-04 21:35:16 +00:00

79 lines
2.3 KiB
Python

import json
from api import ApiClient
from base import BaseCLi
from constants import (
DEFAULT_PORTAINER_JWT_CONFIG_FILE,
FUNCTIONAL_USER_USERNAME_ENV,
FUNCTIONAL_USER_PASSWORD_ENV,
)
import os
import requests
class MissingEnvironmentVariable(Exception):
pass
class EmptyConfigException(Exception):
pass
class PortainerHost(BaseCLi):
def __init__(self, host_url):
super().__init__()
self.host_url = host_url
api_url = f'{host_url}/api'
self.api = ApiClient(api_url=api_url)
self.jwt_config = {}
self.jwt_token = ''
home_directory = os.path.expanduser("~")
config_file_path = os.path.join(home_directory, DEFAULT_PORTAINER_JWT_CONFIG_FILE)
try:
self.__read_current_config(config_file_path)
except EmptyConfigException:
self.__create_config(config_file_path)
def __read_current_config(self, file_path):
try:
with open(file_path, 'r') as file:
self.jwt_config = json.load(file)
except FileNotFoundError:
print(f"Configuration does not exist. Creating...")
raise EmptyConfigException()
try:
self.jwt_token = self.jwt_config[self.host_url]
except KeyError:
print(f"No token present in configuration file for host {self.host_url}. Creating...")
raise EmptyConfigException()
def __create_config(self, file_path):
jwt_token = self.__get_jwt_token()
self.jwt_config[self.host_url] = {
"jwt": jwt_token,
}
self.api.reassign_token(jwt_token)
with open(file_path, 'w') as file:
json.dump(self.jwt_config, file, indent=4)
def __get_jwt_token(self):
def get_env(env_key):
try:
return os.environ[env_key]
except KeyError:
raise MissingEnvironmentVariable(f"{env_key} env does not exist")
payload = {
"Username": get_env(FUNCTIONAL_USER_USERNAME_ENV),
"Password": get_env(FUNCTIONAL_USER_PASSWORD_ENV)
}
response = self.api.post('/auth', payload=payload)
try:
token = response.json()['jwt']
except KeyError:
self.error('No "jwt" key in /auth response found.')
return token