Files
zebra-power/backend/app/services/proxmox_host_service.py

253 lines
9.3 KiB
Python

import asyncio
import subprocess
import requests
from typing import List, Optional
from sqlalchemy.orm import Session
from datetime import datetime
from proxmoxer import ProxmoxAPI
from app.database import ProxmoxHost
from app.models.schemas import ProxmoxVM
from app.services.logging_service import LoggingService
import logging
logger = logging.getLogger(__name__)
class ProxmoxHostService:
def __init__(self, host_config: ProxmoxHost):
self.host_config = host_config
self._proxmox = None
def _get_proxmox_connection(self):
if not self._proxmox:
try:
self._proxmox = ProxmoxAPI(
self.host_config.proxmox_host,
user=self.host_config.proxmox_username,
password=self.host_config.proxmox_password,
port=self.host_config.proxmox_port,
verify_ssl=self.host_config.verify_ssl,
timeout=10
)
except Exception as e:
logger.error(f"Failed to connect to Proxmox {self.host_config.proxmox_host}: {str(e)}")
raise ConnectionError(f"Cannot connect to Proxmox: {str(e)}")
return self._proxmox
@staticmethod
async def send_wol_packet(mac_address: str) -> bool:
try:
result = subprocess.run(
["wakeonlan", mac_address],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except subprocess.TimeoutExpired:
logger.error(f"WOL timeout for MAC: {mac_address}")
return False
except Exception as e:
logger.error(f"WOL error for MAC {mac_address}: {str(e)}")
return False
@staticmethod
async def ping_host(ip_address: str) -> bool:
try:
result = subprocess.run(
["ping", "-c", "1", "-W", "3", ip_address],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except Exception as e:
logger.error(f"Ping error for IP {ip_address}: {str(e)}")
return False
async def wake_host(self, db: Session) -> bool:
success = await self.send_wol_packet(self.host_config.mac_address)
# Log WOL action
LoggingService.log_host_action(
db=db,
action="wake",
host_id=self.host_config.id,
host_name=self.host_config.name,
success=success,
message=f"WOL packet {'sent successfully' if success else 'failed'} to {self.host_config.name} ({self.host_config.mac_address})"
)
return success
async def shutdown_host(self, db: Session) -> bool:
try:
# Try using shutdown endpoint if configured
if self.host_config.shutdown_endpoint:
try:
response = requests.post(
f"http://{self.host_config.ip_address}{self.host_config.shutdown_endpoint}",
timeout=10
)
success = response.status_code == 200
except Exception as e:
logger.error(f"Shutdown endpoint failed: {str(e)}")
success = False
else:
# Try Proxmox API shutdown (shutdown all VMs then the node)
proxmox = self._get_proxmox_connection()
nodes = proxmox.nodes.get()
success = True
for node_data in nodes:
node_name = node_data['node']
try:
# Stop all VMs/containers first
vms = await self.get_vms_for_node(node_name)
for vm in vms:
if vm.status == 'running':
await self.stop_vm(node_name, vm.vmid, vm.type)
# Then shutdown the node
proxmox.nodes(node_name).status.post(command='shutdown')
except Exception as e:
logger.error(f"Failed to shutdown node {node_name}: {str(e)}")
success = False
# Log shutdown action
LoggingService.log_host_action(
db=db,
action="shutdown",
host_id=self.host_config.id,
host_name=self.host_config.name,
success=success,
message=f"Host {self.host_config.name} {'shutdown initiated' if success else 'shutdown failed'}"
)
return success
except Exception as e:
logger.error(f"Host shutdown failed: {str(e)}")
LoggingService.log_host_action(
db=db,
action="shutdown",
host_id=self.host_config.id,
host_name=self.host_config.name,
success=False,
message=f"Host shutdown failed: {str(e)}"
)
return False
async def test_proxmox_connection(self) -> bool:
try:
proxmox = self._get_proxmox_connection()
proxmox.version.get()
return True
except Exception as e:
logger.error(f"Proxmox connection test failed: {str(e)}")
return False
async def get_nodes(self) -> List[dict]:
try:
proxmox = self._get_proxmox_connection()
return proxmox.nodes.get()
except Exception as e:
logger.error(f"Failed to get nodes: {str(e)}")
return []
async def get_vms(self) -> List[ProxmoxVM]:
try:
proxmox = self._get_proxmox_connection()
vms = []
nodes_list = await self.get_nodes()
nodes = [n['node'] for n in nodes_list]
for node_name in nodes:
vms.extend(await self.get_vms_for_node(node_name))
return vms
except Exception as e:
logger.error(f"Failed to get VMs: {str(e)}")
return []
async def get_vms_for_node(self, node_name: str) -> List[ProxmoxVM]:
try:
proxmox = self._get_proxmox_connection()
vms = []
# Get QEMU VMs
try:
qemu_vms = proxmox.nodes(node_name).qemu.get()
for vm in qemu_vms:
if vm.get('template', 0) != 1:
vms.append(ProxmoxVM(
vmid=str(vm['vmid']),
name=vm.get('name', f"VM-{vm['vmid']}"),
status=vm.get('status', 'unknown'),
node=node_name,
type='qemu'
))
except Exception as e:
logger.error(f"Failed to get QEMU VMs from node {node_name}: {str(e)}")
# Get LXC Containers
try:
lxc_containers = proxmox.nodes(node_name).lxc.get()
for container in lxc_containers:
vms.append(ProxmoxVM(
vmid=str(container['vmid']),
name=container.get('name', f"CT-{container['vmid']}"),
status=container.get('status', 'unknown'),
node=node_name,
type='lxc'
))
except Exception as e:
logger.error(f"Failed to get LXC containers from node {node_name}: {str(e)}")
return vms
except Exception as e:
logger.error(f"Failed to get VMs for node {node_name}: {str(e)}")
return []
async def start_vm(self, node: str, vmid: str, vm_type: str = 'qemu') -> bool:
try:
proxmox = self._get_proxmox_connection()
if vm_type == 'lxc':
proxmox.nodes(node).lxc(vmid).status.start.post()
else:
proxmox.nodes(node).qemu(vmid).status.start.post()
return True
except Exception as e:
logger.error(f"Failed to start VM {vmid}: {str(e)}")
return False
async def stop_vm(self, node: str, vmid: str, vm_type: str = 'qemu') -> bool:
try:
proxmox = self._get_proxmox_connection()
if vm_type == 'lxc':
proxmox.nodes(node).lxc(vmid).status.shutdown.post()
else:
proxmox.nodes(node).qemu(vmid).status.shutdown.post()
return True
except Exception as e:
logger.error(f"Failed to stop VM {vmid}: {str(e)}")
return False
@staticmethod
async def check_all_hosts_status(db: Session) -> None:
hosts = db.query(ProxmoxHost).all()
tasks = []
for host in hosts:
tasks.append(ProxmoxHostService.ping_host(host.ip_address))
results = await asyncio.gather(*tasks)
for host, is_online in zip(hosts, results):
host.is_online = is_online
host.last_ping = datetime.utcnow()
db.commit()