David Barroso - linkedin - github - twitter
In this presentation we are going to do a nornir 3 deep dive.
To do so we are going to pretend we have been hired by Acme Corp. and tasked to build the necessary tooling so the network engineering team can easily build their own tooling
An inventory plugin is a nornir plugin that allows nornir to create an Inventory
object from an external source
It is implemented by writing a class with the following structure:
from typing import Dict, Optional, List
from nornir.core.inventory import Inventory
class MyPlugin:
def __init__(self, *args: Any, **kwargs: Any) -> None:
# This method will allow you to configure the plugin
# For instance, when creating the object inventory.options will be passed
# to this constructor
...
def load(self) -> Inventory:
# This method will be called and it will be responsible to instantiate and
# return the Inventory
...
In order for nornir to be able to use the inventory plugin we need to register it. You can do so in two ways:
Add to your setup.py
:
setup(
# ...
entry_points={
"nornir.plugins.inventory": "inventory-name = path.to:InventoryPlugin",
}
)
Or if using poetry, add to pyproject.toml
:
[tool.poetry.plugins."nornir.plugins.inventory"]
"inventory-name" = "path.to:InventoryPlugin"
inventory_name
is the name of the inventory, you will refer to this plugin by this name in the inventory.plugin
configuration option in config.yaml
or when calling InitNornir
path.to:InventoryPlugin
is the path to the class implementing the pluginfrom nornir.core.plugins.inventory import InventoryPluginRegister
from path.to import InventoryPlugin
InventoryPluginRegister.register("inventory-name", InventoryPlugin)
We are using as inventory ACME’s inventory system. In order to interact with it we could use requests
or some other library, however, we have been given already a library that interacts with the backend so we are going to leverage it.
Looking at the documentation given to us by the developer of the library we care about the following methods:
ACMEAPI()
- Create an instance of the object to interact with the inventory. Takes no arguments.get(filter_sites: Optional[List[str]] = None, filter_dev_types: Optional[List[str]] = None)
- Returns information about all of our hosts spread across all of our datacenters. This method takes an optional list of sites and an optional list of device types to help us filter the inventory.Let’s start by trying the library in the console:
$ ipython
In [1]: from nornir3_demo.ext.inventory import ACMEAPI
In [2]: acme = ACMEAPI()
In [3]: acme.get()
Out[3]:
{'mercury': {'edge00.mercury': {'platform': 'acmeos',
'dev_type': 'edge',
'rack': '10'},
'edge01.mercury': {'platform': 'acmeos', 'dev_type': 'edge', 'rack': '10'},
'spine00.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
'spine01.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
'spine02.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
'spine03.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
'leaf00.mercury': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '100'},
'leaf01.mercury': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '100'},
...
'leaf94.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '147'},
'leaf95.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '147'},
'leaf96.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '148'},
'leaf97.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '148'},
'leaf98.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '149'},
'leaf99.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '149'}}}
Let’s try the filtering capabilities:
In [4]: acme.get(filter_sites=["earth"], filter_dev_types=["edge", "spine"])
Out[4]:
{'earth': {'edge00.earth': {'platform': 'acmeos',
'dev_type': 'edge',
'rack': '10'},
'edge01.earth': {'platform': 'acmeos', 'dev_type': 'edge', 'rack': '10'},
'spine00.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
'spine01.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
'spine02.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
'spine03.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'}}}
Turns out we have:
The inventory also give us some rack information
In order to write our inventory plugin we need to write a plugin that gets the data using the library that was given to us and applies the necessary transformations to create the inventory object.
Let’s look at it step by step:
# nornir3_demo/plugins/inventory/acme.py
# we will pretend this library was given to us by a third party
from nornir3_demo.ext.inventory import ACMEAPI
class ACMEInventory:
def __init__(
self,
filter_sites: Optional[List[str]] = None,
filter_dev_types: Optional[List[str]] = None,
) -> None:
# we will use the constructor to create the inventory object
self.conn = ACMEAPI()
# we will also save the parameters so we can use them later on
self.filter_sites = filter_sites
self.filter_dev_types = filter_dev_types
...
# nornir3_demo/plugins/inventory/acme.py
...
def load(self) -> Inventory:
# we retrieve the data from the inventory passing the options we saved
# in he constructor
data = self.conn.get(self.filter_sites, self.filter_dev_types)
# we create placeholder for the hosts and for the groups
hosts = Hosts()
groups = Groups()
# the inventory returned the hosts by datacenter so we iterate over them
for dc_name, dc_data in data.items():
# we are going to start by creating a group per DC
groups[dc_name] = Group(dc_name)
# now we process the dc data we got
hosts_in_dc = process_dc_data(groups[dc_name], dc_data)
# we add the hosts in the dc to the main hosts object
hosts.update(hosts_in_dc)
# we populate the inventory and return it
# note our inventory doesn't support defaults so we just return an empty object
return Inventory(hosts=hosts, groups=groups, defaults=Defaults())
# nornir3_demo/plugins/inventory/acme.py
def process_dc_data(group: Group, group_data: Dict[str, Dict[str, str]]) -> Hosts:
"""
Arguments:
group: Current group we are processing
group_data: the data for the entire group as returned by the backend
"""
# we create a placeholder for the hosts
hosts = Hosts()
# inside each datacenter we had a dictionary where the key was the hostname
# and the data inside had some data about it, we iterate over the hosts
for hostname, host_data in group_data.items():
# for each host we create a Host object mapping it's required parameters
# with the data we got
hosts[hostname] = Host(
name=hostname,
hostname=hostname,
platform=host_data.pop("platform"),
groups=ParentGroups([group]), # we add the DC group as a parent group
data={"site": group.name, **host_data}, # extra data
)
return hosts
Now that we have the plugin, we need to register it. As we are using poetry
in our project that’s what we will use to register it:
# pyproject.toml
...
[tool.poetry.plugins."nornir.plugins.inventory"]
"ACMEInventory" = "nornir3_demo.plugins.inventory.acme:ACMEInventory"
...
Script:
# demo/scripts/10_inventory_plugin.py
from pprint import pprint
from nornir import InitNornir
nr = InitNornir(inventory={"plugin": "ACMEInventory"})
pprint(nr.inventory.groups)
pprint(nr.inventory.hosts)
Output:
$ python 10_inventory_plugin.py
{'earth': Group: earth,
'jupyter': Group: jupyter,
'mars': Group: mars,
'mercury': Group: mercury,
'neptune': Group: neptune,
...
{'edge00.earth': Host: edge00.earth,
'edge00.jupyter': Host: edge00.jupyter,
'edge00.mars': Host: edge00.mars,
'edge00.mercury': Host: edge00.mercury,
...
Script:
# demo/scripts/10_inventory_plugin_filter.py
from nornir import InitNornir
nr = InitNornir(
inventory={
"plugin": "ACMEInventory",
"options": {
"filter_sites": ["earth"],
"filter_dev_types": ["spine", "edge"],
},
}
)
print(nr.inventory.groups)
print(nr.inventory.hosts)
Output:
$ python 10_inventory_plugin_filter.py
{'earth': Group: earth}
{'edge00.earth': Host: edge00.earth,
'edge01.earth': Host: edge01.earth,
'spine00.earth': Host: spine00.earth,
'spine01.earth': Host: spine01.earth,
'spine02.earth': Host: spine02.earth,
'spine03.earth': Host: spine03.earth}
A connection plugin is a nornir plugin that allows nornir to manage connections with devices
It is implemented by writing a class with the following structure:
from typing import Any, Dict, Optional
from nornir.core.configuration import Config
CONNECTION_NAME = "my-connection-name"
class MyPlugin:
def open(
self,
hostname: Optional[str],
username: Optional[str],
password: Optional[str],
port: Optional[int],
platform: Optional[str],
extras: Optional[Dict[str, Any]] = None,
configuration: Optional[Config] = None,
) -> None:
# we open the connection and save it under self.connection
self.connection = connection
def close(self) -> None:
# logic to close the connection
As with the InventoryPlugin
we need to register the connection plugin. We can do it in two ways:
Add to your setup.py
:
setup(
# ...
entry_points={
"nornir.plugins.connections": "connection_name = path.to:ConnectionPlugin",
}
)
Or if using poetry, add to pyproject.toml
:
[tool.poetry.plugins."nornir.plugins.connections"]
"connection_name" = "path.to:ConnectionPlugin"
connection_name
is the name of the connection, you will refer to this plugin by this name when writing taskspath.to:ConnectionPlugin
is the path to the class implementing the pluginfrom nornir.core.plugins.connections import ConnectionPluginRegister
from path.to import ConnectionPlugin
ConnectionPluginRegister.register("connection-name", ConnectionPlugin)
Our AcmeOS device has a python library we can leverage. In order to manage the connection to the device it provides a constructor and two methods:
AcmeOS(hostname, username, password, port)
- Create an instance of the object to interact with a deviceopen()
- Establishes a connectionclose()
- Closes the connection# nornir3_demo/plugins/connections/acmeos.py
# we will pretend this library was given to us by a third party
from nornir3_demo.ext.acmeos import AcmeOSAPI
# We will use this variable in the tasks to quickly reference the plugin
CONNECTION_NAME = "acmeos"
class AcmeOS:
def open(...) -> None:
# we use the constructor to pass the parameters needed by the library
connection = AcmeOSAPI(hostname, username, password, port)
# now we call the open method as instructed by the library documentation
connection.open()
# finally we save the connection under self.connection as instructed by nornir
self.connection = connection
def close(self) -> None:
# we follow the instructions provided by the library to close the connection
self.connection.close()
Now that we have the plugin, we need to register it. As we are using poetry
in our project that’s what we will use to register it:
# pyproject.toml
...
[tool.poetry.plugins."nornir.plugins.connections"]
"acmeos" = "nornir3_demo.plugins.connections.acmeos:AcmeOS"
...
Now we are ready to write a few tasks that will leverage this connection plugin.
Let’s start by looking at the vendor’s documentation, according to it we have a few methods that are interesting to us:
get_version()
- Returns a dictionary with OS informationget_cpu_ram()
- Returns a dictionary with information about cpu and ram usageinstall_os_version(version: str)
- Installs the given version of the device’s operating system# nornir3_demo/plugins/tasks/acmeos/__init__.py
from nornir.core.task import Result, Task
# We import the CONNECTION_NAME from the plugin itself
from nornir3_demo.plugins.connections.acmeos import CONNECTION_NAME
def get_version(task: Task) -> Result:
# nornir manages the connection automatically using the Connection plugin
# To retrieve it you can just call the following method. Note that
# CONNETION_NAME needs to match the name we used when registering the plugin
device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)
# now we are ready to use the library given to us by the vendor
version_info = device.get_version()
return Result(host=task.host, result=version_info)
# nornir3_demo/plugins/tasks/acmeos/__init__.py
#
# continuation...
def get_cpu_ram(task: Task) -> Result:
device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)
return Result(host=task.host, result=device.get_cpu_ram())
def install_os_version(task: Task, version: str) -> Result:
device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)
# note that we set changed=True as we changed the system
return Result(
host=task.host, result=device.install_os_version(version), changed=True
)
# nornir3_demo/plugins/tasks/acmeos/__init__.py
#
# continuation...
def upgrade_os(task: Task, version: str) -> Result:
# we use task get_verion to retrieve current OS running
result = task.run(task=get_version)
# if the version matches what we want to install we are done!
if result.result["full_version"] == version:
return Result(host=task.host, result="nothing to do!!!")
# otherwise we call install_os_version task to install the image
task.run(task=install_os_version, version=version)
return Result(host=task.host, changed=True, result="success!!!")
Script:
# demo/scripts/20_connection_plugin.py
from nornir import InitNornir
from nornir3_demo.plugins.tasks import acmeos
nr = InitNornir(
inventory={"plugin": "ACMEInventory", "options": {"filter_sites": ["earth"]}}
)
results = nr.run(task=acmeos.get_version)
for hostname, result in results.items():
if result.failed:
print(f"{hostname}: {result.exception}")
else:
print(f"{hostname}: {result.result['full_version']}")
Output:
$ python 20_connection_plugin.py
edge00.earth: problem communicating with device
edge01.earth: 5.4.1
spine00.earth: 5.2.1
spine01.earth: 5.2.4
spine02.earth: 5.1.3
spine03.earth: 5.2.9
leaf00.earth: 5.2.3
leaf01.earth: 5.4.1
leaf02.earth: 5.2.7
...
A processor is a plugin that taps into certain events and allows the user to execute arbitrary code on those events.
Those events are:
The benefit of using a Processor
is that the code for each event is called as they occur so you can execute arbitrary code without having to wait until the entire task is completed
It is implemented by writing a class with the following structure:
from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task
class MyProcessorPlugin:
def task_started(self, task: Task) -> None:
...
def task_completed(self, task: Task, result: AggregatedResult) -> None:
...
def task_instance_started(self, task: Task, host: Host) -> None:
...
def task_instance_completed(self, task: Task, host: Host, results: MultiResult) -> None:
...
def subtask_instance_started(self, task: Task, host: Host) -> None:
...
def subtask_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
...
ACME has +800 devices so we are going to write a progress bar that will shows the progress of the execution in real time while we wait for everything to complete
Note: this plugin leverages rich
# nornir3_demo/plugins/processors/rich.py
class ProgressBar:
def __init__(self, total: int) -> None:
# we will need to inform this processor the total amount of hosts
# we instantiate a progress bar object
self.progress = Progress(
"[progress.description]{task.description}",
BarColumn(),
"[progress.percentage]{task.completed:>3.0f}/{task.total}",
)
# we create four progress bars to track total execution, successes, errors and changes
self.total = self.progress.add_task("[cyan]Completed...", total=total)
self.successful = self.progress.add_task("[green]Successful...", total=total)
self.changed = self.progress.add_task("[orange3]Changed...", total=total)
self.error = self.progress.add_task("[red]Failed...", total=total)
def task_started(self, task: Task) -> None:
# we start the progress bar
self.progress.start()
def task_completed(self, task: Task, result: AggregatedResult) -> None:
# we stop the progress bar
self.progress.stop()
# nornir3_demo/plugins/processors/rich.py (continuation)
def task_instance_started(self, task: Task, host: Host) -> None:
pass
def task_instance_completed(self, task: Task, host: Host, results: MultiResult) -> None:
# we upgrade total execution advancing 1
self.progress.update(self.total, advance=1)
if results.failed:
# if the task failed we increase the progress bar counting errors
self.progress.update(self.error, advance=1)
else:
# if the task succeeded we increase the progress bar counting successes
self.progress.update(self.successful, advance=1)
if results.changed:
# if the task changed the device we increase the progress bar counting changes
self.progress.update(self.changed, advance=1)
def subtask_instance_started(self, task: Task, host: Host) -> None:
pass
def subtask_instance_completed(self, task: Task, host: Host, result: MultiResult) -> None:
pass
Script:
# demo/scripts/30_progress_bar.py
from nornir import InitNornir
from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.processors.rich import ProgressBar
nr = InitNornir(inventory={"plugin": "ACMEInventory"})
total_hosts = len(nr.inventory.hosts)
nr_with_progress_bar = nr.with_processors([ProgressBar(total_hosts)])
nr_with_progress_bar.run(task=acmeos.upgrade_os, version="5.3.2")
(a few seconds later)
(on completion)
In addition to our fancy progress bar we are going to add a logging processor so we can inspect the execution as it happens. We are going to log to a file but ideally we’d be using this to a syslog server connected to a system like splunk, graylog or to an ELK stack.
# nornir3_demo/plugins/processors/logger.py
import uuid
from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task
import logging
class Logger:
def __init__(self, filename: str, log_level: int = logging.INFO) -> None:
self.logger = logging.getLogger("nornir_runner_logger")
handler = logging.FileHandler(filename=filename)
formatter = logging.Formatter("%(levelname)s:%(asctime)s:%(message)s")
handler.setFormatter(formatter)
self.logger.setLevel(log_level)
self.logger.addHandler(handler)
def task_started(self, task: Task) -> None:
# we generate a unique uuid and attach it to all the logs
# this unique uuid will allow us to correlate logs and filter them by task execution
self.uuid = uuid.uuid4()
self.logger.info("%s:starting task:%s", self.uuid, task.name)
def task_completed(self, task: Task, result: AggregatedResult) -> None:
self.logger.info("%s:completed task:%s", self.uuid, task.name)
def task_instance_started(self, task: Task, host: Host) -> None:
self.logger.info("%s:starting host:%s", self.uuid, task.host.name)
def task_instance_completed(
self, task: Task, host: Host, results: MultiResult
) -> None:
if results.failed:
self.logger.error("%s:completed host:%s:%s", self.uuid, task.host.name, results[-1].exception)
else:
self.logger.info(
"%s:completed host:%s:%s", self.uuid, task.host.name, results.result
)
def subtask_instance_started(self, task: Task, host: Host) -> None:
self.logger.debug(
"%s:starting subtask:%s:%s", self.uuid, task.host.name, task.name
)
def subtask_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
if result.failed:
self.logger.error(
"%s:completed subtask:%s:%s:%s",
self.uuid,
task.host.name,
task.name,
result.exception,
)
else:
self.logger.debug(
"%s:completed subtask:%s:%s:%s",
self.uuid,
task.host.name,
task.name,
result.result,
)
Script:
import logging
from nornir import InitNornir
from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.processors.logger import Logger
from nornir3_demo.plugins.processors.rich import ProgressBar
nr = InitNornir(inventory={"plugin": "ACMEInventory"})
total_hosts = len(nr.inventory.hosts)
# We can run as many procressors at the same time as we want!!!
nr_with_progress_bar_and_logs = nr.with_processors(
[ProgressBar(total_hosts), Logger("upgrade_os.log", log_level=logging.DEBUG)]
)
nr_with_progress_bar_and_logs.run(task=acmeos.upgrade_os, version="5.3.2")
Output while executing python 31_progress_bar_and_logs.py
:
$ tail -f upgrade_os.log
...
DEBUG:2020-06-09 20:23:10,331:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:edge00.earth:install_os_version
DEBUG:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf90.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
INFO:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf90.venus:success!!!
INFO:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:starting host:leaf04.earth
DEBUG:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:leaf04.earth:get_version
DEBUG:2020-06-09 20:23:10,462:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf89.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
INFO:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf89.venus:success!!!
INFO:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:starting host:leaf05.earth
DEBUG:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:leaf05.earth:get_version
DEBUG:2020-06-09 20:23:10,521:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:edge01.earth:get_version:{'os_version': '5.1', 'revision': '5', 'full_version': '5.1.5'}
DEBUG:2020-06-09 20:23:10,521:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:edge01.earth:install_os_version
DEBUG:2020-06-09 20:23:10,560:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf95.venus:get_version:{'os_version': '5.2', 'revision': '7', 'full_version': '5.2.7'}
DEBUG:2020-06-09 20:23:10,560:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:leaf95.venus:install_os_version
DEBUG:2020-06-09 20:23:10,691:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf93.venus:get_version:{'os_version': '5.3', 'revision': '6', 'full_version': '5.3.6'}
DEBUG:2020-06-09 20:23:10,691:bd566653-13f4-47a6-8350-a98301cc9eef:starting subtask:leaf93.venus:install_os_version
DEBUG:2020-06-09 20:23:10,715:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf87.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
...
A runner is a plugin that dictates how to execute the tasks over the hosts
Nornir comes with two runners:
SerialRunner
executes the task over all the hosts one after the otherThreadedRunner
executes the task over all the hosts in parallel using threads (default)You can implement a runner by writing a class with the following structure:
class DCAwareRunner:
def __init__(self, *args: Any, **kwargs: Any) -> None:
# This method will allow you to configure the plugin
# For instance, when creating the object runner.options will be passed
# to this constructor
...
def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
# here is where you would implement your logic to "spread" the task over the hosts
As with the InventoryPlugin
and the ConnectionPlugin
we need to register the runner. We can do it in two ways:
Add to your setup.py
:
setup(
# ...
entry_points={
"nornir.plugins.runners": "runner_name = path.to:RunnerPlugin",
}
)
Or if using poetry, add to pyproject.toml
:
[tool.poetry.plugins."nornir.plugins.runners"]
"runner_name" = "path.to:RunnerPlugin"
runner_name
is the name of the runner, you will refer to this plugin by this name when writing taskspath.to:RunnerPlugin
is the path to the class implementing the pluginAs we have seen already Acme has 8 datacenters with 2 edge devices, 4 spines and 50 racks with two ToRs each. The way the network is designed allows us to lose an edge device, a spine or even a ToR on each rack without impacting normal operations.
To perform operations at scale in our network we are going to design a runner that takes these properties into account. To do that we are going to do the following:
Code is not too complex but it’s rather large so I’d encourage you to look at it directly on github
Now that we have the plugin, we need to register it. As we are using poetry
in our project that’s what we will use to register it:
# pyproject.toml
...
[tool.poetry.plugins."nornir.plugins.runners"]
"DCAwareRunner" = "nornir3_demo.plugins.runners.dc_aware:DCAwareRunner"
...
Script:
from nornir import InitNornir
from nornir3_demo.plugins.processors.rich import ProgressBar
from nornir3_demo.plugins.runners import DCAwareRunner
from nornir3_demo.plugins.tasks import acmeos
nr = InitNornir(inventory={"plugin": "ACMEInventory"})
total_hosts = len(nr.inventory.hosts)
nr = nr.with_processors([ProgressBar(total_hosts)])
dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_runner(dc_runner)
nr.run(task=acmeos.upgrade_os, version="5.3.1")
# let's print the report so we can see which hosts failed and which ones were skipped
print()
for data in dc_runner.report():
print(data)
Output:
A few notes:
In the example we chose the runner with the following code:
dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_runner(dc_runner)
This allows you to set up the runner “at runtime”, but you can also configure it like this if you prefer:
nr = InitNornir(
runner={
"plugin": "DCAwareRunner",
"options": {
"num_workers": 100,
},
},
)
You can choose one method or the other depending on your needs
A nornir function is a standard python function you invoke on your own
There are no rules to write them :)
print_result
is the most well known example of a nornir function
We are going to write an alternative to print_result
that leverages rich
# nornir3_demo/plugins/functions/rich.py
def rich_table(results: AggregatedResult) -> None:
console = Console()
for hostname, host_result in results.items():
table = Table(box=MINIMAL_DOUBLE_HEAD)
table.add_column(hostname, justify="right", style="cyan", no_wrap=True)
table.add_column("result")
table.add_column("changed")
for r in host_result:
text = Text()
if r.failed:
text.append(f"{r.exception}", style="red")
else:
text.append(f"{r.result or ''}", style="green")
changed = Text()
if r.changed:
color = "orange3"
else:
color = "green"
changed.append(f"{r.changed}", style=color)
table.add_row(r.name, text, changed)
console.print(table)
Script:
from nornir import InitNornir
from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.functions.rich import rich_table
from nornir.core.task import Task
def gather_info(task: Task) -> None:
task.run(task=acmeos.get_version)
task.run(task=acmeos.get_cpu_ram)
nr = InitNornir(
inventory={"plugin": "ACMEInventory", "options": {"filter_sites": ["earth"]}}
)
results = nr.run(task=gather_info)
rich_table(results)
Output:
In the previous section we built a DCAwareRunner
. This runner returned a report indicating which hosts failed and which ones were skipped and why. We are going to build a function that processes that report and builds a nice table.
def rich_dc_aware_report(dc_runner: DCAwareRunner) -> Table:
table = Table(box=MINIMAL_DOUBLE_HEAD, title="DCAwareRunner report")
table.add_column("group", justify="right", style="blue", no_wrap=True)
table.add_column("failed", style="red")
table.add_column("skipped", style="sky_blue3")
table.add_column("error")
for group_name, failed, skipped, exc in dc_runner.report():
failed_hosts = ", ".join([h.name for h in failed])
skipped_hosts = ", ".join([h.name for h in skipped])
table.add_row(group_name, failed_hosts, skipped_hosts, f"{exc}")
return table
Script:
# demo/scripts/51_dc_aware_runner_report.py
from nornir import InitNornir
from nornir3_demo.plugins.processors.rich import ProgressBar
from nornir3_demo.plugins.runners.dc_aware import DCAwareRunner
from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.functions.rich import rich_dc_aware_report
from rich.console import Console
nr = InitNornir(inventory={"plugin": "ACMEInventory"})
total_hosts = len(nr.inventory.hosts)
dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_processors([ProgressBar(total_hosts)]).with_runner(dc_runner)
nr.run(task=acmeos.upgrade_os, version="5.3.1")
table = rich_dc_aware_report(dc_runner)
Console().print("\n", table)
Output:
Finally we are going to wrap everything up by writing a POC for an orchestraor
It won’t be feature complete but it will highlight its capabilities
The specification can be found under orchestrator.yaml
Let’s start by writing and endpoint to serve it so we can usee the swagger-ui
def respond(raw: Any) -> Response:
"""
This methods serializes the response into json and
set the appropiate HTTP headers
"""
return Response(
json.dumps(raw),
status=200,
headers={
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*", # this is certainly not good in prod!!!
"Access-Control-Allow-Methods": "GET, POST, DELETE, PUT, PATCH, OPTIONS",
},
)
@app.route("/swagger/")
def swagger() -> Response:
"""
Serves the spec so we can use the swagger-ui
"""
with open("swagger.yaml", "r") as f:
yml = ruamel.yaml.YAML()
spec_data = yml.load(f)
return respond(spec_data)
We can start the orchestrar so far with:
$ cd demo/orchestrator
$ python orchestrator.py
* Serving Flask app "orchestrator" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
Now we can start swagger-ui
:
docker run --rm -p 8080:8080 swaggerapi/swagger-ui
With everything up and running we can go to the URL http://localhost:8080 and enter the URL to explore http://localhost:5000/swagger/
Now let’s implement the endpoint
Let’s start by writing a method that will return the nornir object fully configured:
prometheus = Prometheus()
def get_nornir(
filter_sites: Optional[List[str]], filter_dev_types: Optional[List[str]]
) -> Nornir:
processors = [prometheus, Logger("orchestrator.log", log_level=logging.INFO)]
return InitNornir(
inventory={
"plugin": "ACMEInventory",
"options": {
"filter_dev_types": filter_dev_types,
"filter_sites": filter_sites,
},
},
runner={
"plugin": "DCAwareRunner",
"options": {"num_workers": 100}},
).with_processors(processors)
Note: We will get back to the Prometheus()
processor later on.
Now we need to return a dictionary with the completed
, failed
and skipped
hosts. That information will come from the AggregatedResult
and DCAwareRunner
report so let’s write a function to generate this response:
def calculate_result(
dc_runner: DCAwareRunner, results: AggregatedResult
) -> Dict[str, List[str]]:
report: Dict[str, List[str]] = {
"failed": [],
"skipped": [],
"completed": [h for h, r in results.items() if not r.failed],
}
for _, failed, skipped, _ in dc_runner.report():
report["failed"].extend([h.name for h in failed])
report["skipped"].extend([h.name for h in skipped])
return report
Finally, our endpoint is going to be just a few lines as we leverage everything we wrote so far:
@app.route("/upgrade-os/", methods=["POST"])
def upgrade_os_endpoint() -> Response:
nr = get_nornir(
request.json.get("filter_sites"), request.json.get("filter_dev_types")
)
version = request.json["version"]
results = nr.run(task=acmeos.upgrade_os, version=version)
report = calculate_result(nr.runner, results)
return respond(report)
Let’s try it out!
$ curl -X POST \
-H "Content-Type: application/json" \
-d "{ \"version\": \"5.3.1\", \"filter_dev_sites\": [\"earth\"]}" \
http://localhost:5000/upgrade-os/ | jq # use jq to make the output slightly prettier
{
"failed": [
"leaf63.earth",
"leaf81.earth",
"leaf92.earth",
"leaf98.earth"
],
"skipped": [
"leaf93.earth",
"leaf99.earth"
],
"completed": [
"edge00.earth",
"edge01.earth",
"spine00.earth",
"spine01.earth",
"spine02.earth",
"spine03.earth"
...
]
}
We could run other sites or devices types if we wanted too:
$ curl -X POST \
-H "Content-Type: application/json" \
-d "{\"version\": \"5.3.1\", \"filter_dev_types\": [\"spine\"]}" \
http://localhost:5000/upgrade-os/ | jq # use jq to make the output slightly prettier
{
"failed": [
"spine00.mars"
],
"skipped": [
"spine01.mars",
"spine02.mars",
"spine03.mars"
],
"completed": [
"spine00.mercury",
"spine00.venus",
"spine00.earth",
"spine00.jupyter",
"spine00.uranus",
...
]
}
Finally we are going to add some observability metrics to our system. To do so we are going to use the Prometheus
processor you already saw in the get_nornir
method.
The prometheus processor will count successes, changes and failures so we can graph them over time.
# nornir3_demo/plugins/processors/prometheus.py
from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task
from prometheus_client import Counter
class Prometheus:
def __init__(self) -> None:
self.total_task_requests = Counter(
"total_task_requests", "Total number of task requests"
)
self.failed_tasks = Counter("failed_tasks", "Total number of task requests")
self.total_tasks_per_host = Counter(
"total_task_requests_per_host",
"Total number of task requests per host",
["host", "site", "dev_type"],
)
self.failed_tasks_per_host = Counter(
"failed_tasks_per_host",
"Total number of task requests per host",
["host", "site", "dev_type"],
)
def task_started(self, task: Task) -> None:
self.total_task_requests.inc()
def task_completed(self, task: Task, result: AggregatedResult) -> None:
if result.failed:
self.failed_tasks.inc()
def task_instance_started(self, task: Task, host: Host) -> None:
self.total_tasks_per_host.labels(
task.host.name, task.host.data["site"], task.host.data["dev_type"]
).inc()
def task_instance_completed(
self, task: Task, host: Host, results: MultiResult
) -> None:
if results.failed:
self.failed_tasks_per_host.labels(
task.host.name, task.host.data["site"], task.host.data["dev_type"]
).inc()
def subtask_instance_started(self, task: Task, host: Host) -> None:
pass
def subtask_instance_completed(self, task: Task, host: Host, result: MultiResult) -> None:
pass
Now we add an endpoint to our orchestrator to expose these metrics:
@app.route("/metrics/")
def metrics() -> Response:
CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")
return Response(prometheus_client.generate_latest(), mimetype=CONTENT_TYPE_LATEST)
Let’s try it:
$ curl http://localhost:5000/metrics/
...
total_task_requests_per_host_total{dev_type="spine",host="spine03.jupyter",site="jupyter"} 3.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.uranus",site="uranus"} 3.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.saturn",site="saturn"} 2.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.neptune",site="neptune"} 2.0
...
failed_tasks_per_host_total{dev_type="spine",host="spine03.uranus",site="uranus"} 1.0
failed_tasks_per_host_total{dev_type="spine",host="spine01.saturn",site="saturn"} 1.0
failed_tasks_per_host_total{dev_type="spine",host="spine02.neptune",site="neptune"} 1.0
...
Now you could have prometheus gather this metrics, create dashboard with grafana, and see how the systems behaves over time!
David Barroso - linkedin - github - twitter