Update install_kube script

This commit is contained in:
ItsDrike 2025-12-25 22:52:47 +01:00
parent c5a7f3f6ec
commit 873d78021e
Signed by: ItsDrike
GPG key ID: FA2745890B7048C0

View file

@ -163,6 +163,28 @@ def b64_to_file(target: Path, b64_str: str) -> None:
_ = fh.write(raw) _ = fh.write(raw)
def load_or_init_kubeconfig(path: Path) -> dict[str, Any]:
"""Load an existing kubeconfig or initialize a new one.
Args:
path: Path to the kubeconfig file.
Returns:
A valid kubeconfig mapping.
"""
if path.exists():
return load_yaml(path)
return {
"apiVersion": "v1",
"kind": "Config",
"clusters": [],
"users": [],
"contexts": [],
"current-context": "",
}
def backup_file(path: Path) -> Path: def backup_file(path: Path) -> Path:
"""Create a timestamped backup of a file. """Create a timestamped backup of a file.
@ -209,6 +231,7 @@ def integrate_cluster(
input_cfg: InputKubeConfig, input_cfg: InputKubeConfig,
key_root: Path, key_root: Path,
target_name: str | None = None, target_name: str | None = None,
server_override: str | None = None,
) -> None: ) -> None:
"""Integrate a parsed input cluster into an existing kubeconfig. """Integrate a parsed input cluster into an existing kubeconfig.
@ -224,6 +247,10 @@ def integrate_cluster(
If not set, the names from the input config will be used, otherwise, If not set, the names from the input config will be used, otherwise,
this will override those. this will override those.
server_override:
Server address of the cluster to be used instead of the one in input_cfg.
If not set, the server address from input_cfg will be used.
""" """
cluster = input_cfg.cluster cluster = input_cfg.cluster
user = input_cfg.user user = input_cfg.user
@ -253,11 +280,13 @@ def integrate_cluster(
if user.spec.client_key_data: if user.spec.client_key_data:
b64_to_file(cluster_dir / "client.key", user.spec.client_key_data) b64_to_file(cluster_dir / "client.key", user.spec.client_key_data)
server = server_override if server_override is not None else cluster.spec.server
base_cfg.setdefault("clusters", []).append( base_cfg.setdefault("clusters", []).append(
{ {
"name": target_name, "name": target_name,
"cluster": { "cluster": {
"server": cluster.spec.server, "server": server,
"certificate-authority": str(cluster_dir / "ca.crt"), "certificate-authority": str(cluster_dir / "ca.crt"),
}, },
} }
@ -283,8 +312,20 @@ def integrate_cluster(
} }
) )
if not base_cfg.get("current-context"):
base_cfg["current-context"] = target_name
def parse_args() -> argparse.Namespace:
@dataclass(frozen=True, slots=True)
class Args:
input: Path
kubeconfig: Path
keys: Path
name: str | None
server: str | None
def parse_args() -> Args:
"""Parse command-line arguments.""" """Parse command-line arguments."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Merge a single-cluster kubeconfig into ~/.kube/config" description="Merge a single-cluster kubeconfig into ~/.kube/config"
@ -311,7 +352,22 @@ def parse_args() -> argparse.Namespace:
type=str, type=str,
help="Override cluster, user and context name when importing", help="Override cluster, user and context name when importing",
) )
return parser.parse_args() _ = parser.add_argument(
"--server",
type=str,
help=(
"Override the cluster API server address "
"(useful if the kubeconfig points to localhost)"
),
)
ns = parser.parse_args()
return Args(
input=ns.input,
kubeconfig=ns.kubeconfig,
keys=ns.keys,
name=ns.name,
server=ns.server,
)
def main() -> None: def main() -> None:
@ -319,14 +375,16 @@ def main() -> None:
args = parse_args() args = parse_args()
input_yaml = load_yaml(args.input) input_yaml = load_yaml(args.input)
base_cfg = load_yaml(args.kubeconfig)
input_cfg = InputKubeConfig.from_yaml(input_yaml) input_cfg = InputKubeConfig.from_yaml(input_yaml)
backup_path = backup_file(args.kubeconfig) base_cfg = load_or_init_kubeconfig(args.kubeconfig)
print(f"Backup created at {backup_path}")
integrate_cluster(base_cfg, input_cfg, args.keys, args.name) if args.kubeconfig.exists():
backup_path = backup_file(args.kubeconfig)
print(f"Backup created at {backup_path}")
integrate_cluster(base_cfg, input_cfg, args.keys, args.name, args.server)
args.kubeconfig.parent.mkdir(parents=True, exist_ok=True)
write_yaml(args.kubeconfig, base_cfg) write_yaml(args.kubeconfig, base_cfg)
print("Cluster added successfully.") print("Cluster added successfully.")