Nornir 3 demo

David Barroso - linkedin - github - twitter

Part of ipSpace’s Network Automation Roadmap

Slides Github

Introduction

In this presentation we are going to do a nornir 3 deep dive.

To do so we are going to pretend we have been hired by Acme Corp. and tasked to build the necessary tooling so the network engineering team can easily build their own tooling

Goals

  1. Write an inventory plugin to interact with Acme Inventory System
  2. Write a connection plugin and a few taks to interact with AcmeOS
  3. Write a few processors to show meaningful information to users and to log custom events to syslog
  4. Write a runner that understands Acme’s network topology in order to safely deploy changes at scale
  5. Write a few functions to show more meaningful information to users
  6. Build a POC for a network orchestrator

Inventory plugins

An inventory plugin is a nornir plugin that allows nornir to create an Inventory object from an external source

It is implemented by writing a class with the following structure:

from typing import Dict, Optional, List

from nornir.core.inventory import Inventory

class MyPlugin:
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        # This method will allow you to configure the plugin
        # For instance, when creating the object inventory.options will be passed
        # to this constructor
        ...

    def load(self) -> Inventory:
        # This method will be called and it will be responsible to instantiate and
        # return the Inventory
        ...

Registering the Inventory plugin

In order for nornir to be able to use the inventory plugin we need to register it. You can do so in two ways:

  1. Using entrypoints
  2. Programmatically

Registering Inventory Plugins using Entry Points

Add to your setup.py:

setup(
    # ...
    entry_points={
      "nornir.plugins.inventory": "inventory-name = path.to:InventoryPlugin",
    }
)

Or if using poetry, add to pyproject.toml:

[tool.poetry.plugins."nornir.plugins.inventory"]
"inventory-name" = "path.to:InventoryPlugin"
  • inventory_name is the name of the inventory, you will refer to this plugin by this name in the inventory.plugin configuration option in config.yaml or when calling InitNornir
  • path.to:InventoryPlugin is the path to the class implementing the plugin

Registering Inventory Plugins Programmatically

from nornir.core.plugins.inventory import InventoryPluginRegister

from path.to import InventoryPlugin


InventoryPluginRegister.register("inventory-name", InventoryPlugin)

Example: Acme’s Inventory Plugin

We are using as inventory ACME’s inventory system. In order to interact with it we could use requests or some other library, however, we have been given already a library that interacts with the backend so we are going to leverage it.

Looking at the documentation given to us by the developer of the library we care about the following methods:

  • ACMEAPI() - Create an instance of the object to interact with the inventory. Takes no arguments.
  • get(filter_sites: Optional[List[str]] = None, filter_dev_types: Optional[List[str]] = None) - Returns information about all of our hosts spread across all of our datacenters. This method takes an optional list of sites and an optional list of device types to help us filter the inventory.

Let’s start by trying the library in the console:

$ ipython
In [1]: from nornir3_demo.ext.inventory import ACMEAPI
In [2]: acme = ACMEAPI()
In [3]: acme.get()
Out[3]:
{'mercury': {'edge00.mercury': {'platform': 'acmeos',
   'dev_type': 'edge',
   'rack': '10'},
  'edge01.mercury': {'platform': 'acmeos', 'dev_type': 'edge', 'rack': '10'},
  'spine00.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
  'spine01.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
  'spine02.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
  'spine03.mercury': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
  'leaf00.mercury': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '100'},
  'leaf01.mercury': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '100'},

  ...

  'leaf94.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '147'},
  'leaf95.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '147'},
  'leaf96.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '148'},
  'leaf97.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '148'},
  'leaf98.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '149'},
  'leaf99.neptune': {'platform': 'acmeos', 'dev_type': 'leaf', 'rack': '149'}}}

Let’s try the filtering capabilities:

In [4]: acme.get(filter_sites=["earth"], filter_dev_types=["edge", "spine"])
Out[4]:
{'earth': {'edge00.earth': {'platform': 'acmeos',
   'dev_type': 'edge',
   'rack': '10'},
  'edge01.earth': {'platform': 'acmeos', 'dev_type': 'edge', 'rack': '10'},
  'spine00.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
  'spine01.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '20'},
  'spine02.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'},
  'spine03.earth': {'platform': 'acmeos', 'dev_type': 'spine', 'rack': '21'}}}

Turns out we have:

  • 8 datacenters
  • with:
    • 2 edge devices each
    • 4 spines each
    • 100 ToRs each

The inventory also give us some rack information

In order to write our inventory plugin we need to write a plugin that gets the data using the library that was given to us and applies the necessary transformations to create the inventory object.

Let’s look at it step by step:

# nornir3_demo/plugins/inventory/acme.py

# we will pretend this library was given to us by a third party
from nornir3_demo.ext.inventory import ACMEAPI


class ACMEInventory:
    def __init__(
        self,
        filter_sites: Optional[List[str]] = None,
        filter_dev_types: Optional[List[str]] = None,
    ) -> None:
        # we will use the constructor to create the inventory object
        self.conn = ACMEAPI()

        # we will also save the parameters so we can use them later on
        self.filter_sites = filter_sites
        self.filter_dev_types = filter_dev_types

    ...
# nornir3_demo/plugins/inventory/acme.py
    ...

    def load(self) -> Inventory:
        # we retrieve the data from the inventory passing the options we saved
        # in he constructor
        data = self.conn.get(self.filter_sites, self.filter_dev_types)
        # we create placeholder for the hosts and for the groups
        hosts = Hosts()
        groups = Groups()

        # the inventory returned the hosts by datacenter so we iterate over them
        for dc_name, dc_data in data.items():
            # we are going to start by creating a group per DC
            groups[dc_name] = Group(dc_name)

            # now we process the dc data we got
            hosts_in_dc = process_dc_data(groups[dc_name], dc_data)

            # we add the hosts in the dc to the main hosts object
            hosts.update(hosts_in_dc)

        # we populate the inventory and return it
        # note our inventory doesn't support defaults so we just return an empty object
        return Inventory(hosts=hosts, groups=groups, defaults=Defaults())
# nornir3_demo/plugins/inventory/acme.py

def process_dc_data(group: Group, group_data: Dict[str, Dict[str, str]]) -> Hosts:
    """
    Arguments:
        group: Current group we are processing
        group_data: the data for the entire group as returned by the backend
    """
    # we create a placeholder for the hosts
    hosts = Hosts()

    # inside each datacenter we had a dictionary where the key was the hostname
    # and the data inside had some data about it, we iterate over the hosts
    for hostname, host_data in group_data.items():
        # for each host we create a Host object mapping it's required parameters
        # with the data we got
        hosts[hostname] = Host(
            name=hostname,
            hostname=hostname,
            platform=host_data.pop("platform"),
            groups=ParentGroups([group]),  # we add the DC group as a parent group
            data={"site": group.name, **host_data},  # extra data
        )
    return hosts

Registering the Inventory Plugin

Now that we have the plugin, we need to register it. As we are using poetry in our project that’s what we will use to register it:

# pyproject.toml
...

[tool.poetry.plugins."nornir.plugins.inventory"]
"ACMEInventory" = "nornir3_demo.plugins.inventory.acme:ACMEInventory"

...

Demo: Acme’s Inventory Plugin

Script:

# demo/scripts/10_inventory_plugin.py
from pprint import pprint

from nornir import InitNornir

nr = InitNornir(inventory={"plugin": "ACMEInventory"})

pprint(nr.inventory.groups)

pprint(nr.inventory.hosts)

Output:

$ python 10_inventory_plugin.py
{'earth': Group: earth,
 'jupyter': Group: jupyter,
 'mars': Group: mars,
 'mercury': Group: mercury,
 'neptune': Group: neptune,
 ...
{'edge00.earth': Host: edge00.earth,
 'edge00.jupyter': Host: edge00.jupyter,
 'edge00.mars': Host: edge00.mars,
 'edge00.mercury': Host: edge00.mercury,
 ...

Script:

# demo/scripts/10_inventory_plugin_filter.py
from nornir import InitNornir

nr = InitNornir(
    inventory={
        "plugin": "ACMEInventory",
        "options": {
            "filter_sites": ["earth"],
            "filter_dev_types": ["spine", "edge"],
        },
    }
)

print(nr.inventory.groups)

print(nr.inventory.hosts)

Output:

$ python 10_inventory_plugin_filter.py
{'earth': Group: earth}
{'edge00.earth': Host: edge00.earth,
 'edge01.earth': Host: edge01.earth,
 'spine00.earth': Host: spine00.earth,
 'spine01.earth': Host: spine01.earth,
 'spine02.earth': Host: spine02.earth,
 'spine03.earth': Host: spine03.earth}

Questions so far?

Connection plugins

A connection plugin is a nornir plugin that allows nornir to manage connections with devices

It is implemented by writing a class with the following structure:

from typing import Any, Dict, Optional

from nornir.core.configuration import Config


CONNECTION_NAME = "my-connection-name"


class MyPlugin:
    def open(
        self,
        hostname: Optional[str],
        username: Optional[str],
        password: Optional[str],
        port: Optional[int],
        platform: Optional[str],
        extras: Optional[Dict[str, Any]] = None,
        configuration: Optional[Config] = None,
    ) -> None:
        # we open the connection and save it under self.connection
        self.connection = connection

    def close(self) -> None:
        # logic to close the connection

Registering the connection plugin

As with the InventoryPlugin we need to register the connection plugin. We can do it in two ways:

  1. Using entrypoints
  2. Programmatically

Registering Connection Plugins using Entrypoints

Add to your setup.py:

setup(
    # ...
    entry_points={
      "nornir.plugins.connections": "connection_name = path.to:ConnectionPlugin",
    }
)

Or if using poetry, add to pyproject.toml:

[tool.poetry.plugins."nornir.plugins.connections"]
"connection_name" = "path.to:ConnectionPlugin"
  • connection_name is the name of the connection, you will refer to this plugin by this name when writing tasks
  • path.to:ConnectionPlugin is the path to the class implementing the plugin

Registering Connection Plugins Programmatically

from nornir.core.plugins.connections import ConnectionPluginRegister

from path.to import ConnectionPlugin


ConnectionPluginRegister.register("connection-name", ConnectionPlugin)

Example: AcmeOS Connection Plugin

Our AcmeOS device has a python library we can leverage. In order to manage the connection to the device it provides a constructor and two methods:

  • AcmeOS(hostname, username, password, port) - Create an instance of the object to interact with a device
  • open() - Establishes a connection
  • close() - Closes the connection
# nornir3_demo/plugins/connections/acmeos.py

# we will pretend this library was given to us by a third party
from nornir3_demo.ext.acmeos import AcmeOSAPI


# We will use this variable in the tasks to quickly reference the plugin
CONNECTION_NAME = "acmeos"


class AcmeOS:
    def open(...) -> None:
        # we use the constructor to pass the parameters needed by the library
        connection = AcmeOSAPI(hostname, username, password, port)

        # now we call the open method as instructed by the library documentation
        connection.open()

        # finally we save the connection under self.connection as instructed by nornir
        self.connection = connection

    def close(self) -> None:
        # we follow the instructions provided by the library to close the connection
        self.connection.close()

Now that we have the plugin, we need to register it. As we are using poetry in our project that’s what we will use to register it:

# pyproject.toml
...

[tool.poetry.plugins."nornir.plugins.connections"]
"acmeos" = "nornir3_demo.plugins.connections.acmeos:AcmeOS"

...

Now we are ready to write a few tasks that will leverage this connection plugin.

Let’s start by looking at the vendor’s documentation, according to it we have a few methods that are interesting to us:

  • get_version() - Returns a dictionary with OS information
  • get_cpu_ram() - Returns a dictionary with information about cpu and ram usage
  • install_os_version(version: str) - Installs the given version of the device’s operating system
# nornir3_demo/plugins/tasks/acmeos/__init__.py

from nornir.core.task import Result, Task

# We import the CONNECTION_NAME from the plugin itself
from nornir3_demo.plugins.connections.acmeos import CONNECTION_NAME


def get_version(task: Task) -> Result:
    # nornir manages the connection automatically using the Connection plugin
    # To retrieve it you can just call the following method. Note that
    # CONNETION_NAME needs to match the name we used when registering the plugin
    device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)

    # now we are ready to use the library given to us by the vendor
    version_info = device.get_version()

    return Result(host=task.host, result=version_info)

# nornir3_demo/plugins/tasks/acmeos/__init__.py
#
# continuation...

def get_cpu_ram(task: Task) -> Result:
    device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)
    return Result(host=task.host, result=device.get_cpu_ram())


def install_os_version(task: Task, version: str) -> Result:
    device = task.host.get_connection(CONNECTION_NAME, task.nornir.config)

    # note that we set changed=True as we changed the system
    return Result(
        host=task.host, result=device.install_os_version(version), changed=True
    )
# nornir3_demo/plugins/tasks/acmeos/__init__.py
#
# continuation...

def upgrade_os(task: Task, version: str) -> Result:
    # we use task get_verion to retrieve current OS running
    result = task.run(task=get_version)

    # if the version matches what we want to install we are done!
    if result.result["full_version"] == version:
        return Result(host=task.host, result="nothing to do!!!")

    # otherwise we call install_os_version task to install the image
    task.run(task=install_os_version, version=version)
    return Result(host=task.host, changed=True, result="success!!!")

Demo: AcmeOS Connection Plugin

Script:

# demo/scripts/20_connection_plugin.py
from nornir import InitNornir

from nornir3_demo.plugins.tasks import acmeos

nr = InitNornir(
    inventory={"plugin": "ACMEInventory", "options": {"filter_sites": ["earth"]}}
)


results = nr.run(task=acmeos.get_version)

for hostname, result in results.items():
    if result.failed:
        print(f"{hostname}: {result.exception}")
    else:
        print(f"{hostname}: {result.result['full_version']}")

Output:

$ python 20_connection_plugin.py
edge00.earth: problem communicating with device
edge01.earth: 5.4.1
spine00.earth: 5.2.1
spine01.earth: 5.2.4
spine02.earth: 5.1.3
spine03.earth: 5.2.9
leaf00.earth: 5.2.3
leaf01.earth: 5.4.1
leaf02.earth: 5.2.7
...

Questions so far?

Processors

A processor is a plugin that taps into certain events and allows the user to execute arbitrary code on those events.

Those events are:

  1. When a task starts or finishes
  2. When a host starts executing the task or completes it
  3. When a host starts executing a subtasks or completes it

The benefit of using a Processor is that the code for each event is called as they occur so you can execute arbitrary code without having to wait until the entire task is completed

It is implemented by writing a class with the following structure:

from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task


class MyProcessorPlugin:
    def task_started(self, task: Task) -> None:
        ...

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        ...

    def task_instance_started(self, task: Task, host: Host) -> None:
        ...

    def task_instance_completed(self, task: Task, host: Host, results: MultiResult) -> None:
        ...

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        ...

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        ...

Example: rich progress bar

ACME has +800 devices so we are going to write a progress bar that will shows the progress of the execution in real time while we wait for everything to complete

Note: this plugin leverages rich

# nornir3_demo/plugins/processors/rich.py

class ProgressBar:
    def __init__(self, total: int) -> None:
        # we will need to inform this processor the total amount of hosts
        # we instantiate a progress bar object
        self.progress = Progress(
            "[progress.description]{task.description}",
            BarColumn(),
            "[progress.percentage]{task.completed:>3.0f}/{task.total}",
        )

        # we create four progress bars to track total execution, successes, errors and changes
        self.total = self.progress.add_task("[cyan]Completed...", total=total)
        self.successful = self.progress.add_task("[green]Successful...", total=total)
        self.changed = self.progress.add_task("[orange3]Changed...", total=total)
        self.error = self.progress.add_task("[red]Failed...", total=total)

    def task_started(self, task: Task) -> None:
        # we start the progress bar
        self.progress.start()

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        # we stop the progress bar
        self.progress.stop()
# nornir3_demo/plugins/processors/rich.py (continuation)

    def task_instance_started(self, task: Task, host: Host) -> None:
        pass

    def task_instance_completed(self, task: Task, host: Host, results: MultiResult) -> None:
        # we upgrade total execution advancing 1
        self.progress.update(self.total, advance=1)
        if results.failed:
            # if the task failed we increase the progress bar counting errors
            self.progress.update(self.error, advance=1)
        else:
            # if the task succeeded we increase the progress bar counting successes
            self.progress.update(self.successful, advance=1)

        if results.changed:
            # if the task changed the device we increase the progress bar counting changes
            self.progress.update(self.changed, advance=1)

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass

    def subtask_instance_completed(self, task: Task, host: Host, result: MultiResult) -> None:
        pass

Demo: rich progress bar

Script:

# demo/scripts/30_progress_bar.py

from nornir import InitNornir

from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.processors.rich import ProgressBar

nr = InitNornir(inventory={"plugin": "ACMEInventory"})

total_hosts = len(nr.inventory.hosts)

nr_with_progress_bar = nr.with_processors([ProgressBar(total_hosts)])

nr_with_progress_bar.run(task=acmeos.upgrade_os, version="5.3.2")
progress bar

(a few seconds later)

progress bar

(on completion)

progress bar

Example: custom logging

In addition to our fancy progress bar we are going to add a logging processor so we can inspect the execution as it happens. We are going to log to a file but ideally we’d be using this to a syslog server connected to a system like splunk, graylog or to an ELK stack.

# nornir3_demo/plugins/processors/logger.py

import uuid

from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task


import logging


class Logger:
    def __init__(self, filename: str, log_level: int = logging.INFO) -> None:
        self.logger = logging.getLogger("nornir_runner_logger")

        handler = logging.FileHandler(filename=filename)
        formatter = logging.Formatter("%(levelname)s:%(asctime)s:%(message)s")
        handler.setFormatter(formatter)

        self.logger.setLevel(log_level)

        self.logger.addHandler(handler)
    def task_started(self, task: Task) -> None:
        # we generate a unique uuid and attach it to all the logs
        # this unique uuid will allow us to correlate logs and filter them by task execution
        self.uuid = uuid.uuid4()
        self.logger.info("%s:starting  task:%s", self.uuid, task.name)

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        self.logger.info("%s:completed task:%s", self.uuid, task.name)

    def task_instance_started(self, task: Task, host: Host) -> None:
        self.logger.info("%s:starting  host:%s", self.uuid, task.host.name)

    def task_instance_completed(
        self, task: Task, host: Host, results: MultiResult
    ) -> None:
        if results.failed:
            self.logger.error("%s:completed host:%s:%s", self.uuid, task.host.name, results[-1].exception)
        else:
            self.logger.info(
                "%s:completed host:%s:%s", self.uuid, task.host.name, results.result
            )
    def subtask_instance_started(self, task: Task, host: Host) -> None:
        self.logger.debug(
            "%s:starting  subtask:%s:%s", self.uuid, task.host.name, task.name
        )

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        if result.failed:
            self.logger.error(
                "%s:completed subtask:%s:%s:%s",
                self.uuid,
                task.host.name,
                task.name,
                result.exception,
            )
        else:
            self.logger.debug(
                "%s:completed subtask:%s:%s:%s",
                self.uuid,
                task.host.name,
                task.name,
                result.result,
            )

Script:

import logging

from nornir import InitNornir

from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.processors.logger import Logger
from nornir3_demo.plugins.processors.rich import ProgressBar

nr = InitNornir(inventory={"plugin": "ACMEInventory"})

total_hosts = len(nr.inventory.hosts)

# We can run as many procressors at the same time as we want!!!
nr_with_progress_bar_and_logs = nr.with_processors(
    [ProgressBar(total_hosts), Logger("upgrade_os.log", log_level=logging.DEBUG)]
)

nr_with_progress_bar_and_logs.run(task=acmeos.upgrade_os, version="5.3.2")

Output while executing python 31_progress_bar_and_logs.py:

$ tail -f upgrade_os.log
...
DEBUG:2020-06-09 20:23:10,331:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:edge00.earth:install_os_version
DEBUG:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf90.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
INFO:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf90.venus:success!!!
INFO:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:starting  host:leaf04.earth
DEBUG:2020-06-09 20:23:10,406:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:leaf04.earth:get_version
DEBUG:2020-06-09 20:23:10,462:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf89.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
INFO:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf89.venus:success!!!
INFO:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:starting  host:leaf05.earth
DEBUG:2020-06-09 20:23:10,463:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:leaf05.earth:get_version
DEBUG:2020-06-09 20:23:10,521:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:edge01.earth:get_version:{'os_version': '5.1', 'revision': '5', 'full_version': '5.1.5'}
DEBUG:2020-06-09 20:23:10,521:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:edge01.earth:install_os_version
DEBUG:2020-06-09 20:23:10,560:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf95.venus:get_version:{'os_version': '5.2', 'revision': '7', 'full_version': '5.2.7'}
DEBUG:2020-06-09 20:23:10,560:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:leaf95.venus:install_os_version
DEBUG:2020-06-09 20:23:10,691:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf93.venus:get_version:{'os_version': '5.3', 'revision': '6', 'full_version': '5.3.6'}
DEBUG:2020-06-09 20:23:10,691:bd566653-13f4-47a6-8350-a98301cc9eef:starting  subtask:leaf93.venus:install_os_version
DEBUG:2020-06-09 20:23:10,715:bd566653-13f4-47a6-8350-a98301cc9eef:completed host:leaf87.venus:install_os_version:{'os_version': '5.3', 'revision': '2', 'full_version': '5.3.2'}
...

Questions so far?

Runners

A runner is a plugin that dictates how to execute the tasks over the hosts

Nornir comes with two runners:

  • SerialRunner executes the task over all the hosts one after the other
  • ThreadedRunner executes the task over all the hosts in parallel using threads (default)

You can implement a runner by writing a class with the following structure:

class DCAwareRunner:
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        # This method will allow you to configure the plugin
        # For instance, when creating the object runner.options will be passed
        # to this constructor
        ...

    def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
        # here is where you would implement your logic to "spread" the task over the hosts

Registering the runner

As with the InventoryPlugin and the ConnectionPlugin we need to register the runner. We can do it in two ways:

  1. Using entrypoints
  2. Programmatically

Registering Runner Plugins using Entrypoints

Add to your setup.py:

setup(
    # ...
    entry_points={
      "nornir.plugins.runners": "runner_name = path.to:RunnerPlugin",
    }
)

Or if using poetry, add to pyproject.toml:

[tool.poetry.plugins."nornir.plugins.runners"]
"runner_name" = "path.to:RunnerPlugin"
  • runner_name is the name of the runner, you will refer to this plugin by this name when writing tasks
  • path.to:RunnerPlugin is the path to the class implementing the plugin

Example: DCAwareRunner

As we have seen already Acme has 8 datacenters with 2 edge devices, 4 spines and 50 racks with two ToRs each. The way the network is designed allows us to lose an edge device, a spine or even a ToR on each rack without impacting normal operations.

To perform operations at scale in our network we are going to design a runner that takes these properties into account. To do that we are going to do the following:

  1. We are going to group our devices based on their redundancy group
  2. Those groups will be independent between datacenters (i.e. losing an edge in dc A doesn’t have an impact on dc B)
  3. Those groups will be:
    1. One for all the edge devices
    2. One for all the spines
    3. One for each pair of ToRs
    4. This means each DC will have 50 ToR groups + 1 edge group + 1 spine group
  4. We will parallelize all the groups but only one device of each group will be run at a time
  5. If a device in a group fails, pending devices in the same group will NOT be executed

Code is not too complex but it’s rather large so I’d encourage you to look at it directly on github

Now that we have the plugin, we need to register it. As we are using poetry in our project that’s what we will use to register it:

# pyproject.toml
...

[tool.poetry.plugins."nornir.plugins.runners"]
"DCAwareRunner" = "nornir3_demo.plugins.runners.dc_aware:DCAwareRunner"

...

Demo: DCAwareRunner

Script:

from nornir import InitNornir

from nornir3_demo.plugins.processors.rich import ProgressBar
from nornir3_demo.plugins.runners import DCAwareRunner
from nornir3_demo.plugins.tasks import acmeos

nr = InitNornir(inventory={"plugin": "ACMEInventory"})

total_hosts = len(nr.inventory.hosts)
nr = nr.with_processors([ProgressBar(total_hosts)])

dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_runner(dc_runner)

nr.run(task=acmeos.upgrade_os, version="5.3.1")

# let's print the report so we can see which hosts failed and which ones were skipped
print()
for data in dc_runner.report():
    print(data)

Output:

dc aware

A few notes:

  1. You can see how the “Completed” progress bar didn’t reach the end due to skipped hosts
  2. In the report you can see “group_name”, “failed_hosts”, “skipped_hosts”, “error”
  3. Notice how when even devices (i.e. leaf32.mercury) failed, the even device was skipped
  4. However, if the odd device failed, the even wasn’t skipped. This is because the even device was executed before the odd one

with_runner vs configuration

In the example we chose the runner with the following code:

dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_runner(dc_runner)

This allows you to set up the runner “at runtime”, but you can also configure it like this if you prefer:

nr = InitNornir(
    runner={
        "plugin": "DCAwareRunner",
        "options": {
            "num_workers": 100,
        },
    },
)

You can choose one method or the other depending on your needs

Questions so far?

Functions

A nornir function is a standard python function you invoke on your own

There are no rules to write them :)

print_result is the most well known example of a nornir function

Example: rich_table

We are going to write an alternative to print_result that leverages rich

# nornir3_demo/plugins/functions/rich.py
def rich_table(results: AggregatedResult) -> None:
    console = Console()

    for hostname, host_result in results.items():
        table = Table(box=MINIMAL_DOUBLE_HEAD)
        table.add_column(hostname, justify="right", style="cyan", no_wrap=True)
        table.add_column("result")
        table.add_column("changed")

        for r in host_result:
            text = Text()
            if r.failed:
                text.append(f"{r.exception}", style="red")
            else:
                text.append(f"{r.result or ''}", style="green")

            changed = Text()
            if r.changed:
                color = "orange3"
            else:
                color = "green"
            changed.append(f"{r.changed}", style=color)
            table.add_row(r.name, text, changed)
        console.print(table)

Script:

from nornir import InitNornir

from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.functions.rich import rich_table

from nornir.core.task import Task


def gather_info(task: Task) -> None:
    task.run(task=acmeos.get_version)
    task.run(task=acmeos.get_cpu_ram)


nr = InitNornir(
    inventory={"plugin": "ACMEInventory", "options": {"filter_sites": ["earth"]}}
)

results = nr.run(task=gather_info)
rich_table(results)

Output:

rich table

Example: dcaware report

In the previous section we built a DCAwareRunner. This runner returned a report indicating which hosts failed and which ones were skipped and why. We are going to build a function that processes that report and builds a nice table.

def rich_dc_aware_report(dc_runner: DCAwareRunner) -> Table:
    table = Table(box=MINIMAL_DOUBLE_HEAD, title="DCAwareRunner report")
    table.add_column("group", justify="right", style="blue", no_wrap=True)
    table.add_column("failed", style="red")
    table.add_column("skipped", style="sky_blue3")
    table.add_column("error")

    for group_name, failed, skipped, exc in dc_runner.report():
        failed_hosts = ", ".join([h.name for h in failed])
        skipped_hosts = ", ".join([h.name for h in skipped])
        table.add_row(group_name, failed_hosts, skipped_hosts, f"{exc}")

    return table

Example: dcaware report

Script:

# demo/scripts/51_dc_aware_runner_report.py

from nornir import InitNornir

from nornir3_demo.plugins.processors.rich import ProgressBar
from nornir3_demo.plugins.runners.dc_aware import DCAwareRunner
from nornir3_demo.plugins.tasks import acmeos
from nornir3_demo.plugins.functions.rich import rich_dc_aware_report

from rich.console import Console

nr = InitNornir(inventory={"plugin": "ACMEInventory"})

total_hosts = len(nr.inventory.hosts)
dc_runner = DCAwareRunner(num_workers=100)
nr = nr.with_processors([ProgressBar(total_hosts)]).with_runner(dc_runner)

nr.run(task=acmeos.upgrade_os, version="5.3.1")

table = rich_dc_aware_report(dc_runner)
Console().print("\n", table)

Output:

progress bar

Questions so far?

Orchestrator

Finally we are going to wrap everything up by writing a POC for an orchestraor

It won’t be feature complete but it will highlight its capabilities

Objective and Steps

  • Write a HTTP API that allows us to execute tasks over our network
  • To showcase the orchestrator we will have an endpoint that will upgrade the OS in our entire network leveraging everything we built so far
  • We will use flask to write the service
  • We will write an OpenAPI specification
  • Finally we will add some instrumentation

OpenAPI Specification

The specification can be found under orchestrator.yaml

Let’s start by writing and endpoint to serve it so we can usee the swagger-ui

def respond(raw: Any) -> Response:
    """
    This methods serializes the response into json and
    set the appropiate HTTP headers
    """
    return Response(
        json.dumps(raw),
        status=200,
        headers={
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",  # this is certainly not good in prod!!!
            "Access-Control-Allow-Methods": "GET, POST, DELETE, PUT, PATCH, OPTIONS",
        },
    )

@app.route("/swagger/")
def swagger() -> Response:
    """
    Serves the spec so we can use the swagger-ui
    """
    with open("swagger.yaml", "r") as f:
        yml = ruamel.yaml.YAML()
        spec_data = yml.load(f)
    return respond(spec_data)

We can start the orchestrar so far with:

$ cd demo/orchestrator
$ python orchestrator.py
 * Serving Flask app "orchestrator" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

Now we can start swagger-ui:

docker run --rm -p 8080:8080 swaggerapi/swagger-ui

With everything up and running we can go to the URL http://localhost:8080 and enter the URL to explore http://localhost:5000/swagger/

swagger 1
swagger 2
swagger 3

Now let’s implement the endpoint

Let’s start by writing a method that will return the nornir object fully configured:

prometheus = Prometheus()

def get_nornir(
    filter_sites: Optional[List[str]], filter_dev_types: Optional[List[str]]
) -> Nornir:
    processors = [prometheus, Logger("orchestrator.log", log_level=logging.INFO)]

    return InitNornir(
        inventory={
            "plugin": "ACMEInventory",
            "options": {
                "filter_dev_types": filter_dev_types,
                "filter_sites": filter_sites,
            },
        },
        runner={
            "plugin": "DCAwareRunner",
            "options": {"num_workers": 100}},
    ).with_processors(processors)

Note: We will get back to the Prometheus() processor later on.

Now we need to return a dictionary with the completed, failed and skipped hosts. That information will come from the AggregatedResult and DCAwareRunner report so let’s write a function to generate this response:

def calculate_result(
    dc_runner: DCAwareRunner, results: AggregatedResult
) -> Dict[str, List[str]]:
    report: Dict[str, List[str]] = {
        "failed": [],
        "skipped": [],
        "completed": [h for h, r in results.items() if not r.failed],
    }
    for _, failed, skipped, _ in dc_runner.report():
        report["failed"].extend([h.name for h in failed])
        report["skipped"].extend([h.name for h in skipped])

    return report

Finally, our endpoint is going to be just a few lines as we leverage everything we wrote so far:

@app.route("/upgrade-os/", methods=["POST"])
def upgrade_os_endpoint() -> Response:
    nr = get_nornir(
        request.json.get("filter_sites"), request.json.get("filter_dev_types")
    )
    version = request.json["version"]

    results = nr.run(task=acmeos.upgrade_os, version=version)

    report = calculate_result(nr.runner, results)

    return respond(report)

Let’s try it out!

$ curl -X POST \
  -H "Content-Type: application/json" \
  -d "{  \"version\": \"5.3.1\",  \"filter_dev_sites\": [\"earth\"]}" \
  http://localhost:5000/upgrade-os/ | jq  # use jq to make the output slightly prettier
{
  "failed": [
    "leaf63.earth",
    "leaf81.earth",
    "leaf92.earth",
    "leaf98.earth"
  ],
  "skipped": [
    "leaf93.earth",
    "leaf99.earth"
  ],
  "completed": [
    "edge00.earth",
    "edge01.earth",
    "spine00.earth",
    "spine01.earth",
    "spine02.earth",
    "spine03.earth"
    ...
  ]
}

We could run other sites or devices types if we wanted too:

$ curl -X POST \
  -H "Content-Type: application/json" \
  -d "{\"version\": \"5.3.1\", \"filter_dev_types\": [\"spine\"]}" \
  http://localhost:5000/upgrade-os/ | jq  # use jq to make the output slightly prettier
{
  "failed": [
    "spine00.mars"
  ],
  "skipped": [
    "spine01.mars",
    "spine02.mars",
    "spine03.mars"
  ],
  "completed": [
    "spine00.mercury",
    "spine00.venus",
    "spine00.earth",
    "spine00.jupyter",
    "spine00.uranus",
    ...
  ]
}

Finally we are going to add some observability metrics to our system. To do so we are going to use the Prometheus processor you already saw in the get_nornir method.

The prometheus processor will count successes, changes and failures so we can graph them over time.

# nornir3_demo/plugins/processors/prometheus.py
from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Task

from prometheus_client import Counter


class Prometheus:
    def __init__(self) -> None:
        self.total_task_requests = Counter(
            "total_task_requests", "Total number of task requests"
        )
        self.failed_tasks = Counter("failed_tasks", "Total number of task requests")
        self.total_tasks_per_host = Counter(
            "total_task_requests_per_host",
            "Total number of task requests per host",
            ["host", "site", "dev_type"],
        )
        self.failed_tasks_per_host = Counter(
            "failed_tasks_per_host",
            "Total number of task requests per host",
            ["host", "site", "dev_type"],
        )
    def task_started(self, task: Task) -> None:
        self.total_task_requests.inc()

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        if result.failed:
            self.failed_tasks.inc()

    def task_instance_started(self, task: Task, host: Host) -> None:
        self.total_tasks_per_host.labels(
            task.host.name, task.host.data["site"], task.host.data["dev_type"]
        ).inc()

    def task_instance_completed(
        self, task: Task, host: Host, results: MultiResult
    ) -> None:
        if results.failed:
            self.failed_tasks_per_host.labels(
                task.host.name, task.host.data["site"], task.host.data["dev_type"]
            ).inc()

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass

    def subtask_instance_completed(self, task: Task, host: Host, result: MultiResult) -> None:
        pass

Now we add an endpoint to our orchestrator to expose these metrics:

@app.route("/metrics/")
def metrics() -> Response:
    CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")
    return Response(prometheus_client.generate_latest(), mimetype=CONTENT_TYPE_LATEST)

Let’s try it:

$ curl http://localhost:5000/metrics/
...
total_task_requests_per_host_total{dev_type="spine",host="spine03.jupyter",site="jupyter"} 3.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.uranus",site="uranus"} 3.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.saturn",site="saturn"} 2.0
total_task_requests_per_host_total{dev_type="spine",host="spine03.neptune",site="neptune"} 2.0
...
failed_tasks_per_host_total{dev_type="spine",host="spine03.uranus",site="uranus"} 1.0
failed_tasks_per_host_total{dev_type="spine",host="spine01.saturn",site="saturn"} 1.0
failed_tasks_per_host_total{dev_type="spine",host="spine02.neptune",site="neptune"} 1.0
...

Now you could have prometheus gather this metrics, create dashboard with grafana, and see how the systems behaves over time!

Questions so far?

FIN

David Barroso - linkedin - github - twitter

Part of ipSpace’s Network Automation Roadmap

Slides Github