Source code for fliswarm.tools
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-11-01
# @Filename: tools.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import asyncio
from typing import Any, Dict, List, Optional, Set, Union
import fliswarm.node
__all__ = ["select_nodes", "FakeCommand", "IDPool", "subprocess_run_async"]
[docs]
def select_nodes(
nodes: Dict[str, Any],
category: Optional[str] = None,
names: Optional[Union[str, List[str]]] = None,
) -> Set["fliswarm.node.Node"]:
"""Filters the nodes to command.
Parameters
----------
nodes
A dictionary of `.Node` instances to be filtered, keyed by node name.
category
A category on which to filter.
names
A list or comma-separated string of node names on which to filter.
Returns
-------
:
A `set` of enabled `.Node` instances that match the provided
``category`` or ``names``. If neither ``category`` or ``names``
are defined, returns all the ``nodes``.
"""
if names and isinstance(names, str):
names = list(map(lambda x: x.strip(), names.split(",")))
valid_nodes = set()
node_values = nodes.values()
if names:
valid_nodes |= set([node for node in node_values if node.name in names])
if category:
valid_nodes |= set([node for node in node_values if node.category in category])
if not names and not category:
valid_nodes |= set(node_values)
selected_nodes = set([node for node in valid_nodes if node.enabled])
return selected_nodes
[docs]
class FakeCommand:
"""A fake `~clu.command.Command` object that doesn't do anything."""
def __getattr__(self, item):
def fake_method(*args, **kwargs):
pass
return fake_method
[docs]
class IDPool:
"""An ID pool that allows to return values to be reused."""
def __init__(self):
self.emitted: Set[int] = set()
self.returned: Set[int] = set()
[docs]
def get(self):
"""Returns an ID."""
if len(self.returned) > 0:
id = min(self.returned)
self.returned.remove(id)
return id
if len(self.emitted) == 0:
id = 1
else:
id = max(self.emitted) + 1
self.emitted.add(id)
return id
[docs]
def put(self, id: int):
"""Returns an ID to the pool."""
self.returned.add(id)
[docs]
async def subprocess_run_async(*args, shell=False):
"""Runs a command asynchronously.
If ``shell=True`` the command will be executed through the shell. In that case
the argument must be a single string with the full command. Otherwise, must receive
a list of program arguments. Returns the output of stdout.
"""
if shell:
cmd = await asyncio.create_subprocess_shell(
args[0],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
else:
cmd = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await cmd.communicate()
return cmd