# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_log import log
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import base
from ironic.common.inspection_rules import utils
from ironic.drivers import utils as driver_utils
from ironic import objects
LOG = log.getLogger(__name__)
ACTIONS = {
"fail": "FailAction",
"set-attribute": "SetAttributeAction",
"set-capability": "SetCapabilityAction",
"unset-capability": "UnsetCapabilityAction",
"extend-attribute": "ExtendAttributeAction",
"add-trait": "AddTraitAction",
"remove-trait": "RemoveTraitAction",
"set-plugin-data": "SetPluginDataAction",
"extend-plugin-data": "ExtendPluginDataAction",
"unset-plugin-data": "UnsetPluginDataAction",
"log": "LogAction",
"del-attribute": "DelAttributeAction",
"set-port-attribute": "SetPortAttributeAction",
"extend-port-attribute": "ExtendPortAttributeAction",
"del-port-attribute": "DelPortAttributeAction",
}
[docs]
def get_action(op_name):
"""Get operator class by name."""
class_name = ACTIONS[op_name]
return globals()[class_name]
[docs]
def update_nested_dict(d, key_path, value):
keys = key_path.split('.') if isinstance(key_path, str) else key_path
current = d
for key in keys[:-1]:
current = current.setdefault(key, {})
current[keys[-1]] = value
return d
[docs]
class ActionBase(base.Base, metaclass=abc.ABCMeta):
"""Abstract base class for rule action plugins."""
FORMATTED_ARGS = []
"""List of params to be formatted with python format."""
[docs]
@abc.abstractmethod
def __call__(self, task, *args, **kwargs):
"""Run action on successful rule match."""
[docs]
def execute_with_loop(self, task, action, inventory, plugin_data):
loop_items = action.get('loop', [])
results = []
if isinstance(loop_items, (list, dict)):
for item in loop_items:
action_copy = action.copy()
action_copy['args'] = item
results.append(self.execute_action(task, action_copy,
inventory, plugin_data))
return results
[docs]
def execute_action(self, task, action, inventory, plugin_data):
processed_args = self._process_args(task, action, inventory,
plugin_data)
return self(task, **processed_args)
[docs]
class LogAction(ActionBase):
FORMATTED_ARGS = ['msg']
VALID_LOG_LEVELS = {'debug', 'info', 'warning', 'error', 'critical'}
[docs]
def __call__(self, task, msg, level='info'):
level = level.lower()
if level not in self.VALID_LOG_LEVELS:
raise exception.InspectionRuleExecutionFailure(
_("Invalid log level: %(level)s. Choose from %(levels)s") % {
'level': level, 'levels': self.VALID_LOG_LEVELS})
getattr(LOG, level)(msg)
[docs]
class FailAction(ActionBase):
[docs]
def __call__(self, task, msg):
raise exception.HardwareInspectionFailure(error=str(msg))
[docs]
class SetAttributeAction(ActionBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, path, value):
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
setattr(task.node, attr_path_parts[0], value)
else:
base_attr = getattr(task.node, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current.setdefault(part, {})
current[attr_path_parts[-1]] = value
setattr(task.node, attr_path_parts[0], base_attr)
task.node.save()
except Exception as exc:
msg = _("Failed to set attribute %(path)s "
"with value %(value)s: %(exc)s") % {
'path': path, 'value': value, 'exc': exc}
LOG.error(msg)
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class ExtendAttributeAction(ActionBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, path, value, unique=False):
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
current = getattr(task.node, attr_path_parts[0], [])
else:
base_attr = getattr(task.node, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current.setdefault(part, {})
current = current.setdefault(attr_path_parts[-1], [])
if not isinstance(current, list):
msg = _("Cannot extend non-list attribute %(path)s with "
"value %(value)s") % {'path': path, 'value': value}
raise exception.RuleActionExecutionFailure(reason=msg)
if not unique or value not in current:
current.append(value)
if len(attr_path_parts) == 1:
setattr(task.node, attr_path_parts[0], current)
else:
setattr(task.node, attr_path_parts[0], base_attr)
task.node.save()
except Exception as exc:
msg = _("Failed to extend attribute %(path)s: %(exc)s") % {
'path': path, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class DelAttributeAction(ActionBase):
[docs]
def __call__(self, task, path):
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
delattr(task.node, attr_path_parts[0])
else:
base_attr = getattr(task.node, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current[part]
del current[attr_path_parts[-1]]
setattr(task.node, attr_path_parts[0], base_attr)
task.node.save()
except Exception as exc:
msg = _("Failed to delete attribute at %(path)s: %(exc)s") % {
'path': path, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class AddTraitAction(ActionBase):
[docs]
def __call__(self, task, name):
try:
new_trait = objects.Trait(task.context, node_id=task.node.id,
trait=name)
new_trait.create()
except Exception as exc:
msg = _("Failed to add new trait %(name)s: %(exc)s") % {
'name': name, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class RemoveTraitAction(ActionBase):
[docs]
def __call__(self, task, name):
try:
objects.Trait.destroy(task.context, node_id=task.node.id,
trait=name)
except exception.NodeTraitNotFound as exc:
LOG.warning(_("Failed to remove trait %(name)s: %(exc)s"),
{'name': name, 'exc': exc})
except Exception as exc:
msg = _("Failed to remove trait %(name)s: %(exc)s") % {
'name': name, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class SetCapabilityAction(ActionBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, name, value):
try:
driver_utils.add_node_capability(task, name, value)
except Exception as exc:
msg = _("Failed to set capability %(name)s: %(exc)s") % {
'name': name, 'exc': exc}
raise exception.RuleActionExecutionFailure(
reason=msg)
[docs]
class UnsetCapabilityAction(ActionBase):
[docs]
def __call__(self, task, name):
try:
driver_utils.remove_node_capability(task, name)
except Exception as exc:
msg = _("Failed to unset capability %(name)s: %(exc)s") % {
'name': name, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class SetPluginDataAction(ActionBase):
REQUIRES_PLUGIN_DATA = True
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, path, value, plugin_data):
try:
update_nested_dict(plugin_data, path, value)
except Exception as exc:
msg = _("Failed to set plugin data at %(path)s: %(exc)s") % {
'path': path, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class ExtendPluginDataAction(ActionBase):
REQUIRES_PLUGIN_DATA = True
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, path, value, plugin_data, unique=False):
try:
current = self._get_nested_value(plugin_data, path) or []
update_nested_dict(plugin_data, path, current)
if not unique or (value not in current):
current.append(value)
except Exception as exc:
msg = _("Failed to extend plugin data at %(path)s: %(exc)s") % {
'path': path, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
@staticmethod
def _get_nested_value(d, key_path, default=None):
keys = key_path.split('.') if isinstance(key_path, str) else key_path
current = d
try:
for key in keys:
current = current[key]
return current
except (KeyError, TypeError):
return default
[docs]
class UnsetPluginDataAction(ActionBase):
REQUIRES_PLUGIN_DATA = True
[docs]
def __call__(self, task, path, plugin_data):
try:
if not self._unset_nested_dict(plugin_data, path):
LOG.warning("Path %s not found", path)
except Exception as exc:
msg = _("Failed to unset plugin data at %(path)s: %(exc)s") % {
'path': path, 'exc': exc}
raise exception.RuleActionExecutionFailure(reason=msg)
@staticmethod
def _unset_nested_dict(d, key_path):
keys = key_path.split('.') if isinstance(key_path, str) else key_path
current = d
for key in keys[:-1]:
if not isinstance(current, dict) or key not in current:
return False
current = current[key]
target_key = keys[-1]
if isinstance(current, dict) and target_key in current:
if len(current) == 1:
parent = d
for key in keys[:-2]:
parent = parent[key]
if len(keys) > 1:
del parent[keys[-2]]
else:
del current[target_key]
return True
return False
[docs]
class SetPortAttributeAction(ActionBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, port_id, path, value):
port = next((p for p in task.ports if p.uuid == port_id), None)
if not port:
raise exception.PortNotFound(port=port_id)
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
setattr(port, attr_path_parts[0], value)
else:
base_attr = getattr(port, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current.setdefault(part, {})
current[attr_path_parts[-1]] = value
setattr(port, attr_path_parts[0], base_attr)
port.save()
except Exception as exc:
msg = _("Failed to set attribute %(path)s for port "
"%(port_id)s: %(exc)s") % {
'path': path, 'port_id': port_id, 'exc': exc}
LOG.error(msg)
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class ExtendPortAttributeAction(ActionBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, port_id, path, value, unique=False):
port = next((p for p in task.ports if p.uuid == port_id), None)
if not port:
raise exception.PortNotFound(port=port_id)
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
current = getattr(port, attr_path_parts[0], [])
else:
base_attr = getattr(port, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current.setdefault(part, {})
current = current.setdefault(attr_path_parts[-1], [])
if not isinstance(current, list):
msg = (_("Cannot extend non-list attribute %(path)s with "
" value %(value)s") % {'path': path, 'value': value})
raise exception.RuleActionExecutionFailure(reason=msg)
if not unique or value not in current:
current.append(value)
if len(attr_path_parts) == 1:
setattr(port, attr_path_parts[0], current)
else:
setattr(port, attr_path_parts[0], base_attr)
port.save()
except Exception as exc:
msg = _("Failed to extend attribute %(path)s for port "
"%(port_id)s: %(exc)s") % {
'path': path, 'port_id': port_id, 'exc': exc}
LOG.error(msg)
raise exception.RuleActionExecutionFailure(reason=msg)
[docs]
class DelPortAttributeAction(ActionBase):
[docs]
def __call__(self, task, port_id, path):
port = next((p for p in task.ports if p.uuid == port_id), None)
if not port:
raise exception.PortNotFound(port=port_id)
try:
attr_path_parts = utils.normalize_path(path)
if len(attr_path_parts) == 1:
delattr(port, attr_path_parts[0])
else:
base_attr = getattr(port, attr_path_parts[0])
current = base_attr
for part in attr_path_parts[1:-1]:
current = current[part]
del current[attr_path_parts[-1]]
setattr(port, attr_path_parts[0], base_attr)
port.save()
except Exception as exc:
msg = ("Failed to delete attribute %(path)s for port "
"%(port_id)s: %(exc)s") % {
'path': path, 'port_id': port_id, 'exc': str(exc)}
LOG.error(msg)
raise exception.RuleActionExecutionFailure(reason=msg)