# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import enum
import ironic.common.exception as exc
from ironic.common.i18n import _
import ironic.common.trait_based_networking.grammar.parser as tbn_parser
[docs]
class Operator(enum.Enum):
AND = "&&"
OR = "||"
[docs]
def eval(self, variable, value):
# NOTE(clif): These can operate on string values, and return the values
# themselves instead of a boolean!
match self.name:
case self.AND.name:
return variable and value
case self.OR.name:
return variable or value
def __str__(self):
return self.value
[docs]
class Comparator(enum.Enum):
EQUALITY = "=="
INEQUALITY = "!="
GT_OR_EQ = ">="
GT = ">"
LT_OR_EQ = "<="
LT = "<"
PREFIX_MATCH = "=~"
[docs]
def eval(self, variable, value):
# TODO(clif): Should we some sort of checking of variable type vs
# requested operator?
match self.name:
case self.EQUALITY.name:
return variable == value
case self.INEQUALITY.name:
return variable != value
case self.GT_OR_EQ.name:
return variable >= value
case self.GT.name:
return variable > value
case self.LT_OR_EQ.name:
return variable <= value
case self.LT.name:
return variable < value
case self.PREFIX_MATCH.name:
if isinstance(variable, str):
return variable.startswith(value)
raise exc.TBNComparatorPrefixMatchTypeMismatch(
_("Prefix match can only be used with variables "
"of type string")
)
def __str__(self):
return self.value
[docs]
class Actions(enum.Enum):
ATTACH_PORT = "attach_port"
ATTACH_PORTGROUP = "attach_portgroup"
BOND_PORTS = "bond_ports"
GROUP_AND_ATTACH_PORTS = "group_and_attach_ports"
[docs]
class Variables(enum.Enum):
NETWORK_NAME = "network.name"
NETWORK_TAGS = "network.tags"
PORT_ADDRESS = "port.address"
PORT_CATEGORY = "port.category"
PORT_IS_PORT = "port.is_port"
PORT_IS_PORTGROUP = "port.is_portgroup"
PORT_PHYSICAL_NETWORK = "port.physical_network"
PORT_VENDOR = "port.vendor"
[docs]
def object_name(self):
return str(self).split(".")[0]
[docs]
def attribute_name(self):
return str(self).split(".")[1]
def __str__(self):
return self.value
[docs]
def retrieve_attribute(attribute_name, tbn_obj):
attribute = getattr(tbn_obj, attribute_name, None)
if attribute is None:
raise exc.TBNAttributeRetrievalException(attr_name=attribute_name)
return attribute
[docs]
class FunctionExpression(object):
def __init__(self, variable):
self._variable = variable
[docs]
def eval(self, port, network):
tbn_obj = port if self._variable.object_name() == "port" else network
attr_name = self._variable.attribute_name()
attr_func = retrieve_attribute(attr_name, tbn_obj)
return attr_func()
def __str__(self):
return f"{self._variable}"
[docs]
class SingleExpression(object):
def __init__(self, variable, comparator, literal):
self._variable = variable
self._comparator = comparator
self._literal = literal
[docs]
def eval(self, port, network):
tbn_obj = port if self._variable.object_name() == "port" else network
attr_name = self._variable.attribute_name()
attribute = retrieve_attribute(attr_name, tbn_obj)
return self._comparator.eval(attribute, self._literal)
def __str__(self):
return f"{self._variable} {self._comparator} '{self._literal}'"
[docs]
class CompoundExpression(object):
def __init__(self, left_expression, operator, right_expression):
self._left_expression = left_expression
self._operator = operator
self._right_expression = right_expression
[docs]
def eval(self, port, network):
left_result = self._left_expression.eval(port, network)
right_result = self._right_expression.eval(port, network)
match self._operator:
case Operator.OR:
return left_result or right_result
case Operator.AND:
return left_result and right_result
def __str__(self):
return (f"{self._left_expression} {self._operator} "
f"{self._right_expression}")
[docs]
class ParenExpression(object):
def __init__(self, expression):
self._expression = expression
[docs]
def eval(self, port, network):
return self._expression.eval(port, network)
def __str__(self):
return f"({self._expression})"
[docs]
class FilterExpression(object):
def __init__(self, expression):
self._expression = expression
[docs]
def eval(self, port, network):
return self._expression.eval(port, network)
def __str__(self):
return f"{self._expression}"
[docs]
@classmethod
def parse(cls, expression):
tree = tbn_parser.FilterExpressionParser.parse(expression)
return tbn_parser.FilterExpressionTransformer().transform(tree)
def __eq__(self, other):
return str(self) == str(other)
[docs]
class TraitAction(object):
NECESSARY_KEYS = [
'action',
'filter',
]
OPTIONAL_KEYS = [
'max_count',
'min_count',
]
ALL_KEYS = OPTIONAL_KEYS + NECESSARY_KEYS
def __init__(self, trait_name, action, filter_expression,
min_count=None, max_count=None):
self.trait_name = trait_name
self.action = action
self.filter_expression = filter_expression
self.min_count = min_count
self.max_count = max_count
[docs]
def matches(self, portlike, network):
"""Check if filter expression matches the port, network pairing."""
return self.filter_expression.eval(portlike, network)
def __eq__(self, other):
return (self.trait_name == other.trait_name
and self.action == other.action
and self.filter_expression == other.filter_expression
and self.min_count == other.min_count
and self.max_count == other.max_count)
[docs]
class NetworkTrait(object):
def __init__(self, name, actions):
self.name = name
self.actions = actions
def __eq__(self, other):
if self.name != other.name:
return False
for action in self.actions:
match_found = False
for other_action in other.actions:
if action == other_action:
match_found = True
break
if not match_found:
return False
return True
[docs]
class PrimordialPort(object):
def __init__(self, ironic_port_like_obj):
# NOTE(clif): Both ironic port and portgroups should support the
# attributes below.
self.id = ironic_port_like_obj.id
self.uuid = ironic_port_like_obj.uuid
self.address = ironic_port_like_obj.address
self.category = ironic_port_like_obj.category
self.physical_network = ironic_port_like_obj.physical_network
self.vendor = ironic_port_like_obj.vendor
[docs]
class Port(PrimordialPort):
[docs]
@classmethod
def from_ironic_port(cls, ironic_port):
return Port(ironic_port)
[docs]
def is_port(self):
return True
[docs]
def is_portgroup(self):
return False
[docs]
class Portgroup(PrimordialPort):
[docs]
@classmethod
def from_ironic_portgroup(cls, ironic_portgroup):
return Portgroup(ironic_portgroup)
[docs]
def is_port(self):
return False
[docs]
def is_portgroup(self):
return True
# TODO(Clif): Draw from VIFs
[docs]
class Network(object):
def __init__(self, network_id, name, tags):
self.id = network_id
self.name = name
self.tags = tags
[docs]
class RenderedAction(object):
def __init__(self, trait_action, node_uuid):
self._trait_action = trait_action
self._node_uuid = node_uuid
def __eq__(self, other):
return (self._trait_action == other._trait_action
and self._node_uuid == other._node_uuid)
[docs]
class AttachPort(RenderedAction):
def __init__(self, trait_action, node_uuid, port_uuid, network_id):
super().__init__(trait_action, node_uuid)
self._port_uuid = port_uuid
self._network_id = network_id
def __str__(self):
return _(f"Attach port '{self._port_uuid}' on node "
f"'{self._node_uuid}' to network '{self._network_id}' "
f"via trait {self._trait_action.trait_name}")
def __eq__(self, other):
return (isinstance(other, AttachPort)
and self._port_uuid == other._port_uuid
and self._network_id == other._network_id
and super().__eq__(other))
[docs]
class AttachPortgroup(RenderedAction):
def __init__(self, trait_action, node_uuid, portgroup_uuid, network_id):
super().__init__(trait_action, node_uuid)
self._portgroup_uuid = portgroup_uuid
self._network_id = network_id
def __str__(self):
return _(f"Attach portgroup '{self._portgroup_uuid}' on node "
f"'{self._node_uuid}' to network '{self._network_id}'"
f"via trait {self._trait_action.trait_name}")
def __eq__(self, other):
return (isinstance(other, AttachPortgroup)
and self._portgroup_uuid == other._portgroup_uuid
and self._network_id == other._network_id
and super().__eq__(other))
[docs]
class NoMatch(RenderedAction):
def __init__(self, trait_action, node_uuid, reason):
super().__init__(trait_action, node_uuid)
self._reason = reason
def __str__(self):
return _(f"No match found for action under trait "
f"'{self._trait_action.trait_name}' "
f"on node '{self._node_uuid}': {self._reason}")
def __eq__(self, other):
return (isinstance(other, NoMatch)
and self._reason == other._reason
and super().__eq__(other))
[docs]
class NotImplementedAction(RenderedAction):
def __init__(self, action):
self._action = action
def __str__(self):
return _(f"Action '{self._action.value}' not yet implemented.")