This commit is contained in:
zhaojing1987 2023-09-23 08:45:57 +08:00
parent 58abf5f3b8
commit a57a337b44
17 changed files with 634 additions and 151 deletions

View file

@ -1,4 +1,9 @@
# run app : uvicorn src.main:app --reload --port 9999
# run nginx proxy manager doc:docker run -p 9091:8080 -e SWAGGER_JSON=/foo/api.swagger.json -v /data/websoft9/appmanage_new/docs/:/foo swaggerapi/swagger-ui
# supervisorctl
## supervisorctl reload
## supervisorctl update
## supervisorctl status
# gitea_token: da7b9891a0bc71b5026b389c11ed13238c9a3866

View file

@ -26,5 +26,4 @@ def apps_install(
appInstall: appInstall,
endpointId: int = Query(None, description="Endpoint ID to install app on,if not set, install on the local endpoint"),
):
appManger = AppManger()
appManger.install_app(appInstall, endpointId)
AppManger().install_app(appInstall, endpointId)

View file

@ -4,28 +4,28 @@ access_token =
# The config for nginx proxy manager
[nginx_proxy_manager]
# base_url = http://websoft9-nginxproxymanager:81/api
base_url = http://47.92.222.186/nginxproxymanager/api
base_url = http://websoft9-proxy:81/api
#base_url = http://47.92.222.186/w9proxy/api
user_name = help@websoft9.com
user_pwd = websoft9@123456
user_pwd = websoft9@2023
#The config for gitea
[gitea]
#base_url = http://websoft9-gitea:3000/api/v1
base_url = http://47.92.222.186/git/api/v1
base_url = http://websoft9-git:3000/api/v1
# base_url = http://47.92.222.186/w9git/api/v1
user_name = websoft9
user_pwd = websoft9
user_pwd = O4rXXHkSoKVY
#The config for portainer
[portainer]
# base_url = http://websoft9-portainer:9000/api
base_url = http://47.92.222.186/portainer/api
base_url = http://websoft9-deployment:9000/api
#base_url = http://47.92.222.186/w9deployment/api
user_name = admin
user_pwd = websoft9@123456
user_pwd = &uswVF^wMyi]wpdc
#The path of docker library
[docker_library]
path = /data/library/apps
path = /websoft9/library/apps
# public_ip_url_list is a list of public ip url, which is used to get the public ip of the server
[public_ip_url_list]

View file

@ -1,5 +1,7 @@
import requests
from src.core.logger import logger
class APIHelper:
"""
Helper class for making API calls
@ -72,7 +74,7 @@ class APIHelper:
url = f"{self.base_url}/{path}"
return requests.put(url, params=params, json=json, headers=self._merge_headers(headers))
def delete(self, path, headers=None):
def delete(self, path,params=None, headers=None):
"""
Delete a resource
@ -84,7 +86,7 @@ class APIHelper:
Response: Response from API
"""
url = f"{self.base_url}/{path}"
return requests.delete(url, headers=self._merge_headers(headers))
return requests.delete(url, params=params, headers=self._merge_headers(headers))
def _merge_headers(self, headers):
"""

View file

@ -0,0 +1,57 @@
import fileinput
from src.core.exception import CustomException
from src.core.logger import logger
class EnvHelper:
"""
This class is used to modify env file
Attributes:
env_file_path (str): Path to env file
Methods:
modify_env_values(new_values: dict): Modify env file
"""
def __init__(self, env_file_path):
self.env_file_path = env_file_path
def modify_env_values(self, new_values: dict):
"""
Modify env file
Args:
new_values (dict): New values
example: {"key1": "value1", "key2": "value2"}
"""
try:
with fileinput.FileInput(self.env_file_path, inplace=True) as env_file:
for line in env_file:
for key, new_value in new_values.items():
if line.startswith(f"{key}="):
print(f"{key}={new_value}")
break
else: # Executed when the loop ended normally (no break was encountered).
print(line, end='')
except Exception as e:
logger.error(f"Modify env file error:{e}")
raise CustomException()
def get_env_value_by_key(self,key:str):
"""
Get env value by key
Args:
key (str): Key
Returns:
str: Value
"""
try:
with open(self.env_file_path, "r") as env_file:
for line in env_file:
if line.startswith(f"{key}="):
return line.replace(f"{key}=","").strip()
except Exception as e:
logger.error(f"Get env value by key error:{e}")
raise CustomException()

View file

@ -2,6 +2,7 @@ import base64
from src.core.apiHelper import APIHelper
from src.core.config import ConfigManager
from src.core.logger import logger
class GiteaAPI:
@ -69,10 +70,10 @@ class GiteaAPI:
return self.api.post(
path="user/repos",
json={
"auto_init": True,
"default_branch": "main",
"name": repo_name,
"trust_model": "default",
"private": True,
},
)
@ -101,7 +102,7 @@ class GiteaAPI:
"""
return self.api.get(
path=f"repos/{self.owner}/{repo_name}/contents/{file_path}",
params={"ref": "main"},
params={"ref": "main"}
)
def update_file_content_in_repo(self, repo_name: str, file_path: str, content: str, sha: str):

View file

@ -64,7 +64,6 @@ class PortainerAPI:
},
)
def get_endpoints(self,start: int = 0,limit: int = 1000):
"""
Get endpoints
@ -80,17 +79,17 @@ class PortainerAPI:
},
)
def get_endpoint_by_id(self, endpointID: int):
def get_endpoint_by_id(self, endpointId: int):
"""
Get endpoint by ID
Args:
endpointID (int): Endpoint ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.get(path=f"endpoints/{endpointID}")
return self.api.get(path=f"endpoints/{endpointId}")
def create_endpoint(self, name: str, EndpointCreationType: int = 1):
"""
@ -109,12 +108,12 @@ class PortainerAPI:
params={"Name": name, "EndpointCreationType": EndpointCreationType},
)
def get_stacks(self, endpointID: int):
def get_stacks(self, endpointId: int):
"""
Get stacks
Args:
endpointID (int): Endpoint ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
@ -123,7 +122,7 @@ class PortainerAPI:
path="stacks",
params={
"filters": json.dumps(
{"EndpointID": endpointID, "IncludeOrphanedStacks": True}
{"EndpointID": endpointId, "IncludeOrphanedStacks": True}
)
},
)
@ -140,22 +139,22 @@ class PortainerAPI:
"""
return self.api.get(path=f"stacks/{stackID}")
def remove_stack(self, stackID: int, endPointID: int):
def remove_stack(self, stackID: int, endpointId: int):
"""
Remove a stack
Args:
stackID (int): Stack ID
endPointID (int): Endpoint ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.delete(
path=f"stacks/{stackID}", params={"endpointId": endPointID}
path=f"stacks/{stackID}", params={"endpointId": endpointId}
)
def create_stack_standlone_repository(self, stack_name: str, endpointId: int, repositoryURL: str):
def create_stack_standlone_repository(self, stack_name: str, endpointId: int, repositoryURL: str,usr_name:str,usr_password:str):
"""
Create a stack from a standalone repository
@ -174,6 +173,9 @@ class PortainerAPI:
"Name": stack_name,
"RepositoryURL": repositoryURL,
"ComposeFile": "docker-compose.yml",
"repositoryAuthentication": True,
"RepositoryUsername": usr_name,
"RepositoryPassword": usr_password,
},
)
@ -221,3 +223,31 @@ class PortainerAPI:
return self.api.post(
path=f"stacks/{stackID}/redeploy", params={"endpointId": endpointId}
)
def get_volumes(self, endpointId: int,dangling: bool = False):
"""
Get volumes in endpoint
Args:
endpointId (int): Endpoint ID
"""
return self.api.get(
path=f"endpoints/{endpointId}/docker/volumes",
params={
"filters": json.dumps(
{"dangling": [str(dangling).lower()]}
)
}
)
def remove_volume_by_name(self, endpointId: int,volume_name:str):
"""
Remove volumes by name
Args:
endpointId (int): Endpoint ID
volume_name (str): volume name
"""
return self.api.delete(
path=f"endpoints/{endpointId}/docker/volumes/{volume_name}",
)

View file

@ -1,13 +1,26 @@
import json
from fastapi import FastAPI, HTTPException, Request
import logging
import uvicorn
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from src.api.v1.routers import app as api_app
from src.api.v1.routers import settings as api_settings
from src.api.v1.routers import proxy as api_proxy
from src.core.exception import CustomException
from src.core.logger import logger
from src.schemas.errorResponse import ErrorResponse
uvicorn_logger = logging.getLogger("uvicorn")
for handler in uvicorn_logger.handlers:
uvicorn_logger.removeHandler(handler)
for handler in logger._error_logger.handlers:
uvicorn_logger.addHandler(handler)
uvicorn_logger.setLevel(logging.INFO)
app = FastAPI(
title="AppManae API",
# summary="[ Base URL: /api/v1 ]",

View file

@ -1,29 +1,67 @@
import re
from typing import Optional, List
from typing import Optional, List,Union
from pydantic import BaseModel, Field, validator
from src.core.exception import CustomException
class Edition(BaseModel):
dist: str = Field("community", description="The edition of the app",examples=["community","enterprise"])
dist: str = Field(..., description="The edition of the app",examples=["community"])
version: str = Field(..., description="The version of the app",examples=["1.0.0","latest"])
@validator('dist')
def validate_dist(cls, v):
if v != 'community':
raise CustomException(400,"Invalid Request","'dist' must be 'community'.")
return v
@validator('version')
def validate_fields(cls, v):
if not v.strip():
raise CustomException(400,"Invalid Request","'version' cannot be empty.")
return v
class appInstall(BaseModel):
app_name: str = Field(...,description="The name of the app",examples=["wordpress","mysql"])
edition: Edition = Field(..., description="The edition of the app", example={"dist":"community","version":"1.0.0"})
app_id: str = Field(...,description="The custom identifier for the application. It must be a combination of 2 to 20 lowercase letters and numbers, and cannot start with a number.", example="wordpress")
domain_names: Optional[List[str]] = Field(None, description="The domain names for the app, not exceeding 2, one wildcard domain and one custom domain.", example=["wordpress.example.com","mysql.example.com"])
default_domain: Optional[str] = Field(None, description="The default domain for the app, sourced from domain_names. If not set, the custom domain will be used automatically.", example="wordpress.example.com")
@validator('app_id', check_fields=False)
proxy_enabled: bool = Field(...,
description="""Whether to enable proxy for the app:
If true,the app will be accessed through the proxy server,
If false, the app will be accessed through the port of the host machine.""", example=True)
domain_names: Optional[List[str]] = Field(...,
description="""The domain or IP for the app:
If proxy_enabled is true, provide the domain name.The first domain name will be used as the primary domain name.(e.g., ["wordpress.example1.com", "wordpress.example2.com"])
If proxy_enabled is false, provide the host machine's IP address.(e.g., ["192.168.1.1"])""",
example=["wordpress.example1.com", "wordpress.example2.com"])
@validator('app_name')
def validate_app_name(cls, v):
if not v.strip():
raise CustomException(400,"Invalid Request","'app_name' cannot be empty.")
return v
@validator('app_id')
def validate_app_id(cls, v):
pattern = re.compile("^[a-z][a-z0-9]{1,19}$")
if not pattern.match(v):
raise CustomException(400,"Invalid Request","The app_id must be a combination of 2 to 20 lowercase letters and numbers, and cannot start with a number.")
return v
@validator('domain_names', check_fields=False)
def validate_domain_names(cls, v):
if v and len(v) > 2:
raise CustomException(400, "Invalid Request","The domain_names not exceeding 2")
@validator('domain_names', each_item=True)
def validate_domain_name(cls, v):
if not v.strip():
raise CustomException(400,"Invalid Request","domain_names' cannot be empty string.")
return v
@validator('domain_names')
def validate_domain_names(cls, v,values):
if not v:
raise CustomException(400,"Invalid Request","domain_names' cannot be empty.")
if 'proxy_enabled' in values:
if not values['proxy_enabled'] and v and len(v) > 1:
raise CustomException(400,"Invalid Request","When proxy is disabled, you can only provide one IP address.")
if v and len(set(v)) != len(v):
raise CustomException(400,"Invalid Request","Duplicate entries found in 'domain_names'. All domains must be unique.")
return v

View file

@ -1,24 +1,30 @@
import json
import os
import shutil
from src.core.config import ConfigManager
from src.core.envHelper import EnvHelper
from src.core.exception import CustomException
from src.schemas.appInstall import appInstall
from src.services.git_manager import GitManager
from src.services.gitea_manager import GiteaManager
from src.services.portainer_manager import PortainerManager
from src.core.logger import logger
from src.services.proxy_manager import ProxyManager
from src.utils.file_manager import FileHelper
from src.utils.password_generator import PasswordGenerator
class AppManger:
def install_app(self,appInstall: appInstall, endpointId: int = None):
library_path = ConfigManager().get_value("docker_library", "path")
portainerManager = PortainerManager()
# if endpointId is None, get the local endpointId
if endpointId is None:
try:
endpointId = portainerManager.get_local_endpoint_id()
except CustomException:
raise
except Exception:
except (CustomException,Exception) as e:
raise CustomException()
else :
# validate the endpointId is exists
@ -27,31 +33,97 @@ class AppManger:
if not is_endpointId_exists:
raise CustomException(
status_code=404,
message="Not found",
message="Invalid Request",
details="EndpointId Not Found"
)
# validate the app_name and app_version
app_name = appInstall.app_name
app_version = appInstall.edition.version
self._check_appName_and_appVersion(app_name,app_version)
self._check_appName_and_appVersion(app_name,app_version,library_path)
# validate the app_id
app_id = appInstall.app_id
self._check_appId(app_id,endpointId)
proxy_enabled = appInstall.proxy_enabled
domain_names = appInstall.domain_names
# validate the domain_names
if proxy_enabled:
self._check_domain_names(domain_names)
# Begin install app
# Step 1 : create repo in gitea
giteaManager = GiteaManager()
repo_url = giteaManager.create_repo(app_id)
# Step 2 : initialize local git repo and push to gitea
try:
local_path = f"{library_path}/{app_name}"
# The destination directory.
app_tmp_dir = "/tmp"
app_tmp_dir_path = f"{app_tmp_dir}/{app_name}"
# Check if the destination directory exists, create it if necessary.
if not os.path.exists(app_tmp_dir):
os.makedirs(app_tmp_dir)
# If the specific target folder already exists, remove it before copying.
if os.path.exists(app_tmp_dir_path):
shutil.rmtree(app_tmp_dir_path)
# Copy the entire directory.
shutil.copytree(local_path, app_tmp_dir_path)
# Modify the env file
env_file_path = f"{app_tmp_dir_path}/.env"
new_env_values = {
"APP_NAME": app_id,
"APP_VERSION": app_version,
"POWER_PASSWORD": PasswordGenerator.generate_strong_password()
}
new_env_values["APP_URL"] = domain_names[0]
EnvHelper(env_file_path).modify_env_values(new_env_values)
# Get the forward port form env file
forward_port = EnvHelper(env_file_path).get_env_value_by_key("APP_HTTP_PORT")
# Commit and push to remote repo
self._init_local_repo_and_push_to_remote(app_tmp_dir_path,repo_url)
# Remove the tmp dir
shutil.rmtree(app_tmp_dir_path)
except (CustomException,Exception) as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
raise CustomException()
# Step 3 : create stack in portainer
try:
user_name = ConfigManager().get_value("gitea","user_name")
user_pwd = ConfigManager().get_value("gitea","user_pwd")
portainerManager.create_stack_from_repository(app_id,endpointId,repo_url,user_name,user_pwd)
stack_id = portainerManager.get_stack_by_name(app_id,endpointId)["Id"]
except (CustomException,Exception) as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
raise CustomException()
# Step 4 : create proxy in proxy
try:
if domain_names:
ProxyManager().create_proxy_for_app(domain_names,app_id,forward_port)
except (CustomException,Exception) as e:
# Rollback-1: remove repo in gitea
giteaManager.remove_repo(app_id)
# Rollback-2: remove stack in portainer
portainerManager.remove_stack_and_volumes(stack_id,endpointId)
raise CustomException()
def _check_appName_and_appVersion(self,app_name:str, app_version:str):
def _check_appName_and_appVersion(self,app_name:str, app_version:str,library_path:str):
"""
Check the app_name and app_version is exists in docker library
@ -62,12 +134,11 @@ class AppManger:
Raises:
CustomException: If the app_name or app_version is not exists in docker library
"""
library_path = ConfigManager().get_value("docker_library", "path")
if not os.path.exists(f"{library_path}/{app_name}"):
logger.error(f"When install app:{app_name}, the app is not exists in docker library")
raise CustomException(
status_code=400,
message="App Name Not Supported",
message="Invalid Request",
details=f"app_name:{app_name} not supported",
)
else:
@ -80,11 +151,21 @@ class AppManger:
logger.error(f"When install app:{app_name}, the app version:{app_version} is not exists in docker library")
raise CustomException(
status_code=400,
message="App Version Not Supported",
message="Invalid Request",
details=f"app_version:{app_version} not supported",
)
def _check_appId(self,app_id:str,endpointId:int):
"""
Check the app_id is exists in gitea and portainer
Args:
app_id (str): App Id
endpointId (int): Endpoint Id
Raises:
CustomException: If the app_id is exists in gitea or portainer
"""
# validate the app_id is exists in gitea
giteaManager = GiteaManager()
is_repo_exists = giteaManager.check_repo_exists(app_id)
@ -92,8 +173,8 @@ class AppManger:
logger.error(f"When install app,the app_id:{{app_id}} is exists in gitea")
raise CustomException(
status_code=400,
message="App_id Conflict",
details=f"App_id:{app_id} Is Exists In Gitea"
message="Invalid Request",
details=f"App_id:{app_id} is exists in gitea"
)
# validate the app_id is exists in portainer
@ -103,6 +184,36 @@ class AppManger:
logger.error(f"When install app, the app_id:{app_id} is exists in portainer")
raise CustomException(
status_code=400,
message="App_id Conflict",
message="Invalid Request",
details=f"app_id:{app_id} is exists in portainer"
)
)
def _check_domain_names(self,domain_names:list[str]):
"""
Check the domain_names is exists in proxy
Args:
domain_names (list[str]): Domain Names
Raises:
CustomException: If the domain_names is not exists in proxy
"""
ProxyManager().check_proxy_host_exists(domain_names)
def _init_local_repo_and_push_to_remote(self,local_path:str,repo_url:str):
"""
Initialize a local git repository from a directory and push to remote repo
Args:
local_path (str): The path to the local git repository.
repo_url (str): The URL of the remote origin.
"""
try:
gitManager =GitManager(local_path)
gitManager.init_local_repo_from_dir()
user_name = ConfigManager().get_value("gitea","user_name")
user_pwd = ConfigManager().get_value("gitea","user_pwd")
gitManager.push_local_repo_to_remote_repo(repo_url,user_name,user_pwd)
except (CustomException,Exception) as e:
logger.error(f"Init local repo and push to remote repo error:{e}")
raise CustomException()

View file

@ -0,0 +1,94 @@
import os
import shutil
from git import Repo, GitCommandError
from src.core.exception import CustomException
from src.core.logger import logger
from urllib.parse import urlparse, urlunparse
class GitManager:
"""
This class is used to interact with Git
Attributes:
local_path (str): The path to the local git repository.
Methods:
init_local_repo_from_dir() -> None: Initialize a local git repository from a directory.
push_local_repo_to_remote_repo(remote_url:str,user_name:str,user_pwd:str) -> None: Push a local git repository to a remote origin.
remove_git_directory() -> None: Remove the .git directory.
"""
def __init__(self,local_path:str):
"""
Initialize the GitManager instance
"""
self.local_path = local_path
def init_local_repo_from_dir(self):
"""
Initialize a local git repository from a directory.
"""
# Validate the repo path.
if not os.path.exists(self.local_path):
logger.error(f"When initializing a local git repository, the path {self.local_path} does not exist.")
raise CustomException()
# Initialize the repository
try:
repo = Repo.init(self.local_path)
except GitCommandError as e:
logger.error(f"When initializing a local git repository,failed to initialize git repository at {self.local_path}: {str(e)}")
raise CustomException()
# Add all files to the index and commit.
try:
repo.git.add('.')
repo.git.commit('-m', 'Initial commit')
except GitCommandError as e:
logger.error(f"When initializing a local git repository,failed to add/commit files in git repository at {self.local_path}: {str(e)}")
raise CustomException()
def push_local_repo_to_remote_repo(self,remote_url:str,user_name:str,user_pwd:str):
"""
Push a local git repository to a remote origin.
Args:
repo_path (str): The path to the local git repository.
remote_url (str): The URL of the remote origin.
user_name (str): The user name to use when authenticating with the remote origin.
user_pwd (str): The password to use when authenticating with the remote origin.
Raises:
CustomException: If there is an error pushing the local git repository to the remote origin.
"""
# Validate the repo path.
if not os.path.exists(self.local_path):
logger.error(f"Invalid repo path: {self.local_path}")
raise CustomException()
# Parse the remote URL.
parsed = urlparse(remote_url)
# Get the network location.
auth_netloc = f"{user_name}:{user_pwd}@{parsed.netloc}"
# Create a new ParseResult with the updated network location
auth_parsed = parsed._replace(netloc=auth_netloc)
auth_repo_url = urlunparse(auth_parsed)
# Set remote origin URL.
try:
repo = Repo(self.local_path)
repo.create_remote('origin', url=auth_repo_url)
except (ValueError, GitCommandError) as e:
logger.error(f"Failed to set remote origin URL in git repository at {self.local_path}: {str(e)}")
raise CustomException()
# Push local code to main branch on remote origin.
try:
repo.git.push('origin', 'HEAD:refs/heads/main')
except GitCommandError as e:
logger.error(f"Failed to push from 'main' branch in git repository at {self.local_path} to remote '{remote_url}': {str(e)}")
raise CustomException()

View file

@ -54,56 +54,41 @@ class GiteaManager:
repo_name (str): Repository name
Returns:
bool: True if repo is created, raise exception if repo is not created
str: Repository clone url
"""
response = self.gitea.create_repo(repo_name)
if response.status_code == 201:
return True
repo_json = response.json()
return repo_json["clone_url"]
else:
logger.error(f"Error create repo from gitea: {response.text}")
raise CustomException()
def create_local_repo_and_push_remote(self, local_git_path: str,remote_git_url: str):
if os.path.exists(local_git_path):
try:
repo = Repo.init(local_git_path)
repo.create_head('main')
repo.git.add(A=True)
repo.index.commit("Initial commit")
origin = repo.create_remote('origin',remote_git_url)
origin.push(refspec='main:main')
except Exception as e:
logger.error(f"Error create local repo and push remote: {e}")
raise CustomException()
else:
logger.error(f"Error repo path not exist: {local_git_path}")
raise CustomException()
def get_file_content_from_repo(self, repo_name: str, file_path: str):
response = self.gitea.get_file_content_from_repo(repo_name, file_path)
if response.status_code == 200:
return {
"name": response.json()["name"],
"encoding": response.json()["encoding"],
"sha": response.json()["sha"],
"content": response.json()["content"],
}
response_json = response.json() # The gitea Api: if the repo is empty, the response is: []
if not response_json:
return None
else:
return {
"name": response_json["name"],
"encoding": response_json["encoding"],
"sha": response_json["sha"],
"content": response_json["content"],
}
else:
logger.error(f"Error get file content from repo from gitea: {response.text}")
raise CustomException()
def update_file_in_repo(self, repo_name: str, file_path: str, content: str,sha: str):
response = self.gitea.update_file_content_in_repo(repo_name, file_path, content, sha)
if response.status_code == 201:
return True
else:
if response.status_code != 201:
logger.error(f"Error update file in repo from gitea: {response.text}")
raise CustomException()
def remove_repo(self, repo_name: str):
response = self.gitea.remove_repo(repo_name)
if response.status_code == 204:
return True
else:
if response.status_code != 204:
logger.error(f"Error remove repo from gitea: {response.text}")
raise CustomException()

View file

@ -89,7 +89,7 @@ class PortainerManager:
logger.error(f"Error get local endpoint id from portainer: {response.text}")
raise CustomException()
def check_endpoint_exists(self, endpoint_id: str):
def check_endpoint_exists(self, endpoint_id: int):
response = self.portainer.get_endpoint_by_id(endpoint_id)
if response.status_code == 200:
return True
@ -99,7 +99,7 @@ class PortainerManager:
logger.error(f"Error validate endpoint is exist from portainer: {response.text}")
raise CustomException()
def check_stack_exists(self, stack_name: str, endpoint_id: str):
def check_stack_exists(self, stack_name: str, endpoint_id: int):
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
stacks = response.json()
@ -111,15 +111,13 @@ class PortainerManager:
logger.error(f"Error validate stack is exist from portainer: {response.text}")
raise CustomException()
def create_stack_from_repository(self, stack_name: str, endpoint_id: str,repositoryURL : str):
response = self.portainer.create_stack_standlone_repository(stack_name, endpoint_id,repositoryURL)
if response.status_code == 200:
return True
else:
def create_stack_from_repository(self, stack_name: str, endpoint_id: int,repositoryURL : str,user_name:str,user_password:str):
response = self.portainer.create_stack_standlone_repository(stack_name, endpoint_id,repositoryURL,user_name,user_password)
if response.status_code != 200:
logger.error(f"Error create stack from portainer: {response.text}")
raise CustomException()
def get_stacks(self, endpoint_id: str):
def get_stacks(self, endpoint_id: int):
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
return response.json()
@ -127,11 +125,109 @@ class PortainerManager:
logger.error(f"Error get stacks from portainer: {response.text}")
raise CustomException()
def get_stack_by_id(self, stack_id: str):
def get_stack_by_id(self, stack_id: int):
response = self.portainer.get_stack_by_id(stack_id)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error get stack by id from portainer: {response.text}")
raise CustomException()
def get_stack_by_name(self, stack_name: str, endpoint_id: int):
"""
Get stack by name
Args:
stack_name (str): stack name
endpoint_id (int): endpoint id
Returns:
dict: stack info
"""
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
stacks = response.json()
for stack in stacks:
if stack["Name"] == stack_name:
return stack
return None
else:
logger.error(f"Error get stack by name from portainer: {response.text}")
raise CustomException()
def remove_stack(self, stack_id: int, endpoint_id: int):
response = self.portainer.remove_stack(stack_id, endpoint_id)
if response.status_code != 204:
logger.error(f"Error remove stack from portainer: {response.text}")
raise CustomException()
def remove_stack_and_volumes(self, stack_id: int, endpoint_id: int):
# get stack name
stack_name = self.get_stack_by_id(stack_id).get("Name")
# remove stack
response = self.portainer.remove_stack(stack_id, endpoint_id)
if response.status_code != 204:
logger.error(f"Error remove stack from portainer: {response.text}")
raise CustomException()
# remove volumes
try:
if stack_name is not None:
volumes = self.get_volumes_by_stack_name(stack_name, endpoint_id,True)
volume_names = []
for volume in volumes.get("mountpoint", []):
volume_names.append(volume["name"])
self.remove_volume(volume_names, endpoint_id)
except (CustomException,Exception) as e:
raise CustomException()
def get_volumes_by_stack_name(self, stack_name: str, endpoint_id: int,dangling:bool):
"""
Get volumes by stack name
Args:
stack_name (str): stack name
endpoint_id (int): endpoint id
Returns:
dict: volumes info
"""
response = self.portainer.get_volumes(endpoint_id,dangling)
if response.status_code == 200:
volumes = response.json().get("Volumes", [])
mountpoints = []
for volume in volumes:
labels = volume.get("Labels", {})
if labels.get("com.docker.compose.project") == stack_name:
mountpoint_info = {
"name": volume["Name"],
"path": volume["Mountpoint"]
}
mountpoints.append(mountpoint_info)
return {
"stack_name": stack_name,
"mountpoint": mountpoints
}
else:
logger.error(f"Error remove stack from portainer: {response.text}")
raise CustomException()
def remove_volume(self, volume_names: list, endpoint_id: int):
"""
Remove volume by name
Args:
volume_names (list): volume name list
endpoint_id (int): endpoint id
"""
for volume_name in volume_names:
response = self.portainer.remove_volume_by_name(endpoint_id,volume_name)
if response.status_code != 204:
logger.error(f"Error remove volume from portainer: {response.text}")
raise CustomException()

View file

@ -1,4 +1,5 @@
import time
import jwt
import keyring
import json
from src.core.config import ConfigManager
@ -8,14 +9,13 @@ from src.external.nginx_proxy_manager_api import NginxProxyManagerAPI
class ProxyManager:
def __init__(self, app_name):
def __init__(self):
"""
Initialize the ProxyManager instance.
Args:
app_name (str): The name of the app
"""
self.app_name = app_name
try:
self.nginx = NginxProxyManagerAPI()
self._set_nginx_token()
@ -32,23 +32,23 @@ class ProxyManager:
# Try to get token from keyring
try:
token_json_str = keyring.get_password(service_name, token_name)
jwt_token = keyring.get_password(service_name, token_name)
print(jwt_token)
except Exception as e:
token_json_str = None
jwt_token = None
# if the token is got from keyring, parse it
if token_json_str is not None:
if jwt_token is not None:
try:
token_json = json.loads(token_json_str)
expires = token_json.get("expires")
api_token = token_json.get("token")
decoded_jwt = jwt.decode(jwt_token, options={"verify_signature": False})
exp_timestamp = decoded_jwt['exp']
# if the token is not expired, return it
if int(expires) - int(time.time()) > 3600:
self.nginx.set_token(api_token)
if int(exp_timestamp) - int(time.time()) > 3600:
self.nginx.set_token(jwt_token)
return
except Exception as e:
logger.error(f"Parse Nginx Proxy Manager's Token Error:{e}")
logger.error(f"Decode Nginx Proxy Manager's Token Error:{e}")
raise CustomException()
# if the token is expired or not got from keyring, get a new one
@ -62,16 +62,11 @@ class ProxyManager:
nginx_tokens = self.nginx.get_token(userName, userPwd)
if nginx_tokens.status_code == 200:
nginx_tokens = nginx_tokens.json()
expires = nginx_tokens.get("expires")
api_token = nginx_tokens.get("token")
self.nginx.set_token(api_token)
token_json = {"expires": expires, "token": api_token}
jwt_token = nginx_tokens.get("token")
self.nginx.set_token(jwt_token)
# set new token to keyring
try:
keyring.set_password(service_name, token_name, json.dumps(token_json))
keyring.set_password(service_name, token_name, jwt_token)
except Exception as e:
logger.error(f"Set Nginx Proxy Manager's Token To Keyring Error:{e}")
raise CustomException()
@ -82,23 +77,41 @@ class ProxyManager:
response = self.nginx.get_proxy_hosts()
if response.status_code == 200:
proxy_hosts = response.json()
matching_domains = []
for proxy_host in proxy_hosts:
if proxy_host["domain_names"] == domain_names:
return True
return False
matching_domains += [domain for domain in domain_names if domain in proxy_host.get("domain_names", [])]
if matching_domains:
raise CustomException(
status_code=400,
message=f"Proxy Host Already Used",
details=f"matching_domains:{matching_domains} already used"
)
else:
raise CustomException()
def create_proxy_for_app(self,domain_names: list[str],forward_host: str,forward_port: int,advanced_config: str = "",forward_scheme: str = "http"):
try:
self.nginx.create_proxy_host(
response = self.nginx.create_proxy_host(
domain_names=domain_names,
forward_scheme=forward_scheme,
forward_host=forward_host,
forward_port=forward_port,
advanced_config=advanced_config,
)
except Exception as e:
logger.error(f"Create Proxy Host For {self.app_name} Error {e}")
raise e
)
if response.status_code != 201:
logger.error(f"Error create proxy for app:{response.text}")
raise CustomException()
def update_proxy_for_app(self,domain_names: list[str],forward_host: str,forward_port: int,advanced_config: str = "",forward_scheme: str = "http"):
response = self.nginx.update_proxy_host(
domain_names=domain_names,
forward_scheme=forward_scheme,
forward_host=forward_host,
forward_port=forward_port,
advanced_config=advanced_config,
)
if response.status_code != 200:
logger.error(f"Error update proxy for app:{response.text}")
raise CustomException()

View file

@ -0,0 +1,37 @@
class FileHelper:
"""
Helper class for file operations.
Methods:
read_file(file_path): Read a file and return its contents.
write_file(file_path, content): Write given content to a file.
"""
@staticmethod
def read_file(file_path):
"""
Read a file and return its contents.
Args:
file_path (str): The path to the file.
Returns:
str: The contents of the file.
"""
with open(file_path, 'r') as f:
content = f.read()
return content
@staticmethod
def write_file(file_path, content):
"""
Write given content to a file.
Args:
file_path (str): The path to the file.
content (str): The content to be written.
"""
with open(file_path, 'w') as f:
f.write(content)

View file

@ -1,32 +1,34 @@
import string
import random
def generate_strong_password():
lowercase_letters = string.ascii_lowercase # all lowercase letters
uppercase_letters = string.ascii_uppercase # all uppercase letters
digits = string.digits # all digits
special_symbols = "`$%()[]{},.*+-:;<>?_~/|\"" # all special symbols
class PasswordGenerator:
"""
A class that generates a strong password.
"""
@staticmethod
def generate_strong_password():
lowercase_letters = string.ascii_lowercase # all lowercase letters
uppercase_letters = string.ascii_uppercase # all uppercase letters
digits = string.digits # all digits
special_symbols = "`$%()[]{},.*+-:;<>?_~/|\"" # all special symbols
# get 4 random characters from each category
password = [
random.choice(lowercase_letters),
random.choice(uppercase_letters),
random.choice(digits),
random.choice(special_symbols)
]
# get 4 random characters from each category
password = [
random.choice(lowercase_letters),
random.choice(uppercase_letters),
random.choice(digits),
random.choice(special_symbols)
]
# get 12 random characters from all categories
all_characters = lowercase_letters + uppercase_letters + digits + special_symbols
for i in range(12): # 12 characters
password.append(random.choice(all_characters)) # get a random character from all characters
# get 12 random characters from all categories
all_characters = lowercase_letters + uppercase_letters + digits + special_symbols
for i in range(12): # 12 characters
password.append(random.choice(all_characters)) # get a random character from all characters
# shuffle the password list
random.shuffle(password)
# shuffle the password list
random.shuffle(password)
# convert the list to a string
password = ''.join(password)
# convert the list to a string
password = ''.join(password)
return password
if __name__ == "__main__":
print(generate_strong_password())
return password