#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-10-30
# @Filename: node.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
from functools import partial
from typing import Any, Dict, List, Optional, Tuple, Union, cast
import docker.errors
import requests
from docker import DockerClient, types
from docker.models.containers import Container
from clu.command import Command
from .tools import FakeCommand, subprocess_run_async
DEFAULT_DOCKER_PORT = 2375
[docs]
class Node:
"""A client to handle a computer node.
Parameters
----------
name
The name associated with this node.
addr
The address to the node.
category
A category to use as a filter.
daemon_addr
The address to the Docker daemon. If `None`, defaults to
``tcp://node:port`` where ``port`` is the default Docker daemon port.
registry
The path to the Docker registry.
"""
def __init__(
self,
name: str,
addr: str,
category: Optional[str] = None,
daemon_addr: Optional[str] = None,
registry: Optional[str] = None,
):
self.name = name
self.addr = addr
self.category = category
self.loop = asyncio.get_running_loop()
if daemon_addr:
self.daemon_addr = daemon_addr
else:
self.daemon_addr = f"tcp://{addr}:{DEFAULT_DOCKER_PORT}"
self.registry = registry
self.client: DockerClient | None = None
self.enabled = True
async def _run(self, fn, *args, **kwargs):
"""Run in executor."""
return await self.loop.run_in_executor(None, partial(fn, *args, **kwargs))
[docs]
async def connect(self):
"""Connects to the Docker client on the remote node."""
if not await self.ping():
raise ConnectionError(f"Node {self.addr} is not responding.")
self.client = await self._run(DockerClient, self.daemon_addr, timeout=3)
[docs]
async def client_alive(self) -> bool:
"""Checks whether the Docker client is connected and pinging."""
if not self.client:
return False
try:
client_alive = await asyncio.wait_for(self._run(self.client.ping), 1)
if client_alive:
return True
return False
except (
requests.exceptions.ConnectionError,
docker.errors.APIError,
asyncio.TimeoutError,
):
return False
[docs]
async def connected(self) -> bool:
"""Returns `True` if the node and the Docker client are connected."""
return self.enabled and (await self.ping()) and (await self.client_alive())
[docs]
async def is_container_running(self, name: str):
"""Returns `True` if the container is running."""
if not self.client:
return False
containers = await self._run(
self.client.containers.list,
filters={"name": name, "status": "running"},
)
if len(containers) == 1:
return True
return False
[docs]
async def ping(self, timeout=2) -> bool:
"""Pings the node. Returns `True` if the node is responding."""
try:
ping = await asyncio.wait_for(
subprocess_run_async(
f"ping -c 1 -w {timeout} {self.addr}",
shell=True,
),
timeout,
)
return True if ping.returncode == 0 else False
except asyncio.TimeoutError:
return False
[docs]
async def get_volume(self, name: str):
"""Returns the volume that matches the name, if it exists."""
assert self.client, "Client is not connected."
volumes: List[Any] = await self.loop.run_in_executor(
None,
self.client.volumes.list,
)
for vol in volumes:
if vol.name == name:
return vol
return False
[docs]
async def report_status(
self,
command: Command,
volumes: bool = True,
containers: bool = True,
):
"""Reports the status of the node to an actor.
Parameters
----------
command
The command that is requesting the status.
volumes
Whether to report the volumes connected to the node Docker engine.
containers
Whether to report the containers running. Only reports running
containers whose ancestor matches the ``config['image']``.
Notes
-----
Outputs the ``node`` keyword, with format
``node={node_name, addr, daemon_addr, node_alive, docker_alive}``.
If ``containers=True``, outputs the ``container`` keyword with
format ``container={node_name, container_short_id}``. If
``volumes=True``, reports the ``volume`` keyword with format
``volume={node_name, volume, ping, docker_client}``
"""
status = [self.name, self.addr, self.daemon_addr, False, False]
config = command.actor.config
if not self.client:
command.warning(f"Node {self.addr} has no client.")
return
if not (await self.ping(timeout=config["ping_timeout"])):
command.warning(text=f"Node {self.addr} is not pinging back.")
command.info(node=status)
if self.client:
self.client.close()
return
status[3] = True # The NUC is responding.
if not (await self.client_alive()):
command.warning(text=f"Docker client on node {self.addr} is not connected.")
command.info(node=status)
if self.client:
self.client.close()
return
status[4] = True
command.info(node=status)
if containers:
image = config["image"].split(":")[0]
if config["registry"]:
image = config["registry"] + "/" + image
container_list = await self._run(
self.client.containers.list,
all=True,
filters={"ancestor": image, "status": "running"},
)
container_list = cast(List[Container], container_list)
if len(container_list) == 0:
command.warning(text=f"No containers running on {self.addr}.")
command.debug(container=[self.name, "NA"])
elif len(container_list) > 1:
command.warning(
text=f"Multiple containers with image {image} "
f"running on node {self.addr}."
)
command.debug(container=[self.name, "NA"])
else:
command.debug(container=[self.name, container_list[0].short_id])
if volumes:
for vname in config["volumes"]:
volume: Any = await self.get_volume(vname)
if volume is False:
command.warning(text=f"Volume {vname} not present in {self.name}.")
command.debug(volume=[self.name, vname, False, "NA"])
continue
command.debug(
volume=[self.name, vname, True, volume.attrs["Options"]["device"]]
)
[docs]
async def stop_container(
self,
name: str,
image: str,
force: bool = False,
command: Optional[Union[Command, FakeCommand]] = None,
):
"""Stops and removes the container.
Parameters
----------
name
The name to assign to the container.
image
The image to run.
force
If `True`, removes any stopped containers of the same name or
with the same image as ancestor.
command
A command to which output messages.
"""
assert self.client, "Client is not connected."
command = command or FakeCommand()
base_image = image.split(":")[0]
# Silently remove any exited containers that match the name or image
# TODO: In the future we may want to restart them instead.
exited_containers = await self._run(
self.client.containers.list,
all=True,
filters={"name": name},
)
exited_containers = cast(List[Container], exited_containers)
if len(exited_containers) > 0:
list(map(lambda c: c.remove(v=False, force=True), exited_containers))
if force:
ancestors = await self._run(
self.client.containers.list,
all=True,
filters={"ancestor": base_image},
)
ancestors = cast(List[Container], ancestors)
for container in ancestors:
command.warning(
text=f"{self.name}: removing container "
f"({container.name}, {container.short_id}) "
f"that uses image {base_image}."
)
container.remove(v=False, force=True)
name_containers = await self._run(
self.client.containers.list,
all=True,
filters={"name": name, "status": "running"},
)
name_containers = cast(List[Container], name_containers)
if len(name_containers) > 0:
container = name_containers[0]
command.warning(text=f"{self.name}: removing running container {name}.")
container.remove(v=False, force=True)
command.debug(container=[self.name, "NA"])
[docs]
async def run_container(
self,
name: str,
image: str,
volumes: List[Any] = [],
privileged: bool = False,
registry: Optional[Any] = None,
envs: Dict[str, Any] = {},
ports: Union[List[int], Dict[str, Tuple[str, int]]] = [],
force: bool = False,
command: Optional[Union[Command, FakeCommand]] = None,
) -> bool:
"""Runs a container in the node, in detached mode.
Parameters
----------
name
The name to assign to the container.
image
The image to run.
volumes
Names of the volumes to mount. The mount point in the container
will match the original device. The volumes must already exist
in the node Docker engine.
privileged
Whether to run the container in privileged mode.
registry
The registry from which to pull the image, if it doesn't exist
locally.
envs
A dictionary of environment variable to value to pass to the
container.
ports
Ports to bind inside the container. The format must be
``{'2222/tcp': 3333}`` which will expose port 2222 inside the
container as port 3333 on the node. Also accepted is a list of
integers; each integer port will be exposed in the container
and bound to the same port in the node.
force
If `True`, removes any running containers of the same name,
or any container with the same image as ancestor.
command
A command to which output messages.
Returns
-------
:
The container object.
"""
assert self.client, "Client is not connected."
# This is the command we aim to run (not a complete list).
# docker --context gfa1 run
# --rm -d --network host
# --mount source=data,target=/data
# --mount source=home,target=/home/sdss
# --env OBSERVATORY=APO --env ACTOR_NAME=gfa
# --privileged
# sdss-hub:5000/flicamera:latest
command = command or FakeCommand()
if (await self.is_container_running(name)) and not force:
command.debug(text=f"{self.name}: container already running.")
return True
await self.stop_container(name, image, force=force, command=command)
if registry:
image = registry + "/" + image
if isinstance(ports, (list, tuple)):
ports = {f"{port}/tcp": ("0.0.0.0", port) for port in ports}
mounts = []
for vname in volumes:
volume = await self._run(self.client.volumes.get, vname)
target = volume.attrs["Options"]["device"].strip(":")
mounts.append(types.Mount(target, vname))
# We need to bind /dev/bus/usb, which is where the USBs are mounted in
# the host NUC. This allows the container to access a new device when
# it becomes available. --privileged doesn't refresh the devices, it will
# be aware that a new device has been connected, but the file won't be there
# so it will fail when opening it. This allows a camera to be connected
# without having to recreate the container. We also provide access to
# the devices (this is probably unnecessary with privileged). I tried several
# things to avoid having to use privileged, but --cap-add SYS_ADMIN won't
# work, and anyway, the NUC is already an isolated system.
mounts.append(types.Mount("/dev/bus/usb", "/dev/bus/usb", type="bind"))
devices = ["/dev/bus/usb:/dev/bus/usb"]
# We give most of the resources in the host computer to the container since
# it's the only thing running. Honestly it doesn't seem to make a huge
# difference. cpu_period is the period of a single CPU (I think the value
# is arbitrary and only the ratio matters). The host has 4 CPUs so we assign
# 3.5 of their cycles to the container.
# https://docs.docker.com/config/containers/resource_constraints/
cpu_period = 10000
cpu_quota = 35000
mem_limit = "6G"
command.debug(text=f"{self.name}: pulling latest image.")
await self._run(self.client.images.pull, image)
command.info(text=f"{self.name}: running {name} from {image}.")
await self._run(
self.client.containers.run,
image,
name=name,
tty=False,
detach=True,
remove=True,
environment=envs,
privileged=privileged,
mem_limit=mem_limit,
cpu_period=cpu_period,
cpu_quota=cpu_quota,
mounts=mounts,
stdin_open=False,
stdout=False,
network="host",
devices=devices,
)
return True
[docs]
async def create_volume(
self,
name: str,
driver: str = "local",
opts: Dict[str, Any] = {},
force: bool = False,
command: Optional[Union[Command, FakeCommand]] = None,
):
"""Creates a volume in the node Docker engine.
Parameters
----------
name
The name of the volume to create.
driver
The driver to use.
opts
A dict of key-values with the options to pass to the volume when
creating it.
force
If `True`, and the volume already exists, removes it and
creates it anew.
command
A command to which output messages.
Returns
-------
:
The volume object.
Examples
--------
To create an NFS volume pointing to ``/data`` on ``sdss-hub`` ::
nuc.create_volume('data', driver='local'
opts=['type=nfs', 'o=nfsvers=4,addr=sdss-hub,rw',
'device=:/data'])
"""
assert self.client, "Client is not connected."
command = command or FakeCommand()
volume: Any = await self.get_volume(name)
if volume is not False:
if not force:
command.debug(text=f"{self.name}: volume {name} already exists.")
return volume
command.warning(text=f"{self.name}: recreating existing volume {name}.")
await self._run(volume.remove, force=True)
volume = await self._run(
self.client.volumes.create,
name,
driver=driver,
driver_opts=opts,
)
command.debug(text=f"{self.name}: creating volume {name}.")
command.debug(volume=[self.name, name, True, volume.attrs["Options"]["device"]])
return volume