Source code for pycpa.smff_loader

"""
| Copyright (C) 2012-2017 Philip Axer
| TU Braunschweig, Germany
| All rights reserved.
| See LICENSE file for copyright and license details.

:Authors:
         - Philip Axer

Description
-----------

SMFF import/annotate
"""
from __future__ import absolute_import

import xml.dom.minidom
import logging

from . import options
from . import model
from . import schedulers



logger = logging.getLogger("smff_loader")

def _calc_in_out_jitter(task_model):
    in_jitter = 0
    source_task = task_model
    while source_task.prev_task is not None:
        source_task = source_task.prev_task
        in_jitter += source_task.analysis_results.wcrt - source_task.analysis_results.bcrt
    in_jitter += source_task.in_event_model.J
    out_jitter = in_jitter + task_model.analysis_results.wcrt - task_model.analysis_results.bcrt
    return in_jitter, out_jitter


[docs]class InvalidSMFFXMLException (Exception): def __init__(self, value, dom_node): self.value = value self.dom_node = dom_node def __str__(self): return repr(self.value + " in node " % self.dom_node.nodeName)
class SMFFApplication(object): def __init__(self, xml_node=None): # # corresponding dom node self.xml_node = xml_node # # application name self.name = "none" # # application id self.id = -1 # # set of tasks and tasklinks in the application (if they are mapped to a comm resource) self.tasks_pycpa = set() # # set of tasklinks in the application self.links_pycpa = set() # # resolve name mappings self.id_to_link_pycpa = dict() self.id_to_task_pycpa = dict() # task to resource mapping, TaskID -> ResourceID self.task_mapping = dict() self.task_link_mapping = dict()
[docs]class SMFFLoader(object): """ a simple SMFF xml loader reverse engineered sources, implements only a functional subset """ def __init__(self): self.system = model.System() # # the root dom node self.xml_root = None # # the smff applications self.smff_applications = set() # # all cpu resources in the system self.resources_pycpa = set() # # all communication resources in the system self.comm_resources_pycpa = set() # # resolve name mappings self.id_to_resource_pycpa = dict() self.id_to_comm_resource_pycpa = dict() self.id_to_smff_applications = dict() def parse(self, filename): self.xml_root = xml.dom.minidom.parse(filename) # save xml_root node in the system model self.system.xml_node = self.xml_root self._handle_system_model(self.xml_root) return self.system def _handle_system_model(self, system_model_node): # skip the configuration node #--- # parse the platform node platform_node = system_model_node.getElementsByTagName("Platform")[0] self._handle_platform(platform_node) # parse the applications node applications_node = system_model_node.getElementsByTagName("Applications")[0] self._handle_applications(applications_node) def _handle_platform(self, platform_node): for resource_node in platform_node.getElementsByTagName("Resource"): self._handle_resource(resource_node) for comm_resource_node in platform_node.getElementsByTagName("CommResource"): self._handle_comm_resource(comm_resource_node) def _scheduler_from_string(self, scheduler): """ parse the scheduling string and return a window function """ if scheduler == "SPPScheduler": return schedulers.SPPScheduler() if scheduler == "SPNPScheduler": return schedulers.SPNPScheduler() return None def _handle_resource(self, resource_node): """ <Resource shortName="ResId:2" resID="2"> <attachedTo ID="1"/> <ResourceType name="GenericResourceType"/> <ResourceGroup name="GenericResourceGroup"/> <Scheduler name="SPPScheduler"/> </Resource> """ # ResourceType and ResourceGroup are skipped # parse names short_name = resource_node.attributes["shortName"].nodeValue resource_id = int(resource_node.attributes["resID"].nodeValue) # get scheduler node scheduler_node = resource_node.getElementsByTagName("Scheduler")[0] scheduler_string = scheduler_node.attributes["name"].nodeValue scheduler = self._scheduler_from_string(scheduler_string) if scheduler == None: raise InvalidSMFFXMLException("Scheduler not recognized", scheduler_node) # add a resource to pycpa resource_model = self.system.bind_resource(model.Resource(short_name, scheduler)) resource_model.xml_node = resource_node resource_model.smff_id = resource_id # map id to pycpa model self.id_to_resource_pycpa[resource_id] = resource_model def _handle_comm_resource(self, comm_resource_node): # ResourceType and ResourceGroup are skipped # parse names short_name = comm_resource_node.attributes["shortName"].nodeValue resource_id = int(comm_resource_node.attributes["resID"].nodeValue) # get scheduler node scheduler_node = comm_resource_node.getElementsByTagName("Scheduler")[0] scheduler_string = scheduler_node.attributes["name"].nodeValue scheduler = self._scheduler_from_string(scheduler_string) if scheduler == None: raise InvalidSMFFXMLException("Scheduler not recognized", scheduler_node) # add a resource to pycpa resource_model = self.system.bind_resource(model.Resource(short_name, scheduler)) resource_model.xml_node = comm_resource_node resource_model.smff_id = resource_id # map id to pycpa model self.id_to_comm_resource_pycpa[resource_id] = resource_model def _handle_applications(self, applications_node): for application_node in applications_node.getElementsByTagName("Application"): self._handle_application(application_node) # check mapping sanity for resource_model in self.system.resources: if len(resource_model.tasks) == 0: logger.info("no tasks on resource %s." % resource_model.name) def _handle_application(self, application_node): smff_application = SMFFApplication(application_node) self.smff_applications.add(smff_application) smff_application.name = application_node.attributes["appV"].nodeValue smff_application.id = int(application_node.attributes["appID"].nodeValue) mapping_node = application_node.getElementsByTagName("Mapping")[0] self._handle_mapping(mapping_node, smff_application) for task_node in application_node.getElementsByTagName("Task"): self._handle_task(task_node, smff_application) for task_link_node in application_node.getElementsByTagName("TaskLink"): self._handle_task_link(task_link_node, smff_application) for (lid, task_model) in smff_application.id_to_link_pycpa.items(): rid = smff_application.task_link_mapping[lid] resource_model = self.id_to_comm_resource_pycpa[rid] task_model.bind_resource(resource_model) for (tid, task_model) in smff_application.id_to_task_pycpa.items(): rid = smff_application.task_mapping[tid] resource_model = self.id_to_resource_pycpa[rid] task_model.bind_resource(resource_model) def _handle_scheduling_parameter(self, scheduling_parameter_node, task_pycpa): # <SchedulingParameter name="SchedulingPriority" priority="5"/> name = scheduling_parameter_node.attributes["name"].nodeValue if name == "SchedulingPriority": priority = int(scheduling_parameter_node.attributes["priority"].nodeValue) task_pycpa.scheduling_parameter = priority else: raise InvalidSMFFXMLException("scheduling policy not recognized", scheduling_parameter_node) def _handle_activation_pattern(self, activation_pattern_node, task_model): # parse event model name = activation_pattern_node.attributes["name"].nodeValue if name == "PJActivation": # # source jitter = int(activation_pattern_node.attributes["activationJitter"].nodeValue) period = int(activation_pattern_node.attributes["activationPeriod"].nodeValue) em = model.PJdEventModel(P=period, J=jitter) task_model.in_event_model = em return if name == "EventActivation": return None raise InvalidSMFFXMLException("activation pattern not recognized", activation_pattern_node) def _handle_profile(self, profile_node, task_model): active = bool(profile_node.attributes["active"].nodeValue) if not active: return activation_pattern_node = profile_node.getElementsByTagName("ActivationPattern")[0] self._handle_activation_pattern(activation_pattern_node, task_model) wcet = int(profile_node.attributes["wcet"].nodeValue) bcet = int(profile_node.attributes["bcet"].nodeValue) task_model.wcet = wcet task_model.bcet = bcet def _handle_task(self, task_node, smff_application): # parse names short_name = task_node.attributes["shortName"].nodeValue task_id = int(task_node.attributes["ID"].nodeValue) # create the task task_model = model.Task(name=short_name) task_model.xml_node = task_node task_model.smff_id = task_id # parse scheduling parameter scheduling_parameter_node = task_node.getElementsByTagName("SchedulingParameter")[0] self._handle_scheduling_parameter(scheduling_parameter_node, task_model) for profile_node in task_node.getElementsByTagName("Profile"): self._handle_profile(profile_node, task_model) # some tasks in smff have a wcet=0, these must be set to the highest priority if task_model.wcet == 0: task_model.scheduling_parameter = -1 # register the id smff_application.id_to_task_pycpa[task_id] = task_model # add task to application pool smff_application.tasks_pycpa.add(task_model) def _handle_task_link(self, task_link_node, smff_application): """<TaskLink shortName="A4TL0-1" ID="0" wcet="2147483647" bcet="0" msgCount="1" msgSize="1" trgt="1" src="0"> <SchedulingParameter name="SchedulingPriority" priority="-1"/> </TaskLink> """ short_name = task_link_node.attributes["shortName"].nodeValue link_id = int(task_link_node.attributes["ID"].nodeValue) trgt_id = int(task_link_node.attributes["trgt"].nodeValue) src_id = int(task_link_node.attributes["src"].nodeValue) trgt_pycpa = smff_application.id_to_task_pycpa[trgt_id] src_pycpa = smff_application.id_to_task_pycpa[src_id] # get mapping of this link if link_id in smff_application.task_link_mapping: # create pycpa object task_model = model.Task(name=short_name) task_model.smff_id = link_id task_model.xml_node = task_link_node # parse scheduling parameter scheduling_parameter_node = task_link_node.getElementsByTagName("SchedulingParameter")[0] self._handle_scheduling_parameter(scheduling_parameter_node, task_model) for profile_node in task_link_node.getElementsByTagName("Profile"): self._handle_profile(profile_node, task_model) # some tasks in smff have a wcet=0, these must be set to the highest priority if task_model.wcet == 0: task_model.scheduling_parameter = -1 # register id -> pycpa model mapping smff_application.id_to_link_pycpa[link_id] = task_model # link all tasks: src -> link -> trgt src_pycpa.link_dependent_task(task_model) task_model.link_dependent_task(trgt_pycpa) # add task to application pool smff_application.links_pycpa.add(task_model) else: # no task just link src and trgt src_pycpa.link_dependent_task(trgt_pycpa) def _handle_mapping(self, mapping_node, smff_application): """-<Mapping> <mapTask rid="0" tid="2"/> <mapTask rid="0" tid="1"/> <mapTask rid="0" tid="0"/> <mapLink rid="0" lid="1"/> <mapLink rid="0" lid="0"/> </Mapping>""" for maptask_node in mapping_node.getElementsByTagName("mapTask"): rid = int(maptask_node.attributes["rid"].nodeValue) tid = int(maptask_node.attributes["tid"].nodeValue) smff_application.task_mapping[tid] = rid for maptasklink_node in mapping_node.getElementsByTagName("mapLink"): # # tricky: when task_link is mapped to a comm_resource (a crid attribute exists) # # we map the link as a pycpa task crid_attr = maptasklink_node.getAttributeNode("crid") lid = int(maptasklink_node.attributes["lid"].nodeValue) if crid_attr is not None: crid = int(maptasklink_node.attributes["crid"].nodeValue) smff_application.task_link_mapping[lid] = crid else: rid = int(maptasklink_node.attributes["rid"].nodeValue) logger.info("decided to skip link id %d, because it is mapped to computing resource %d" % (lid, rid)) def _annotate_task(self, task_result_node, task_model, smff_application): in_jitter, out_jitter = _calc_in_out_jitter(task_model) in_dmin = 0 if task_model.prev_task is not None: in_dmin = task_model.prev_task.analysis_results.bcrt out_dmin = task_model.analysis_results.bcrt task_result_node.setAttribute("name", str(task_model.name)) task_result_node.setAttribute("id", str(task_model.smff_id)) task_result_node.setAttribute("wcrt", str(task_model.analysis_results.wcrt)) task_result_node.setAttribute("bcrt", str(task_model.analysis_results.bcrt)) task_result_node.setAttribute("input_dmin", str(in_dmin)) task_result_node.setAttribute("output_dmin", str(out_dmin)) task_result_node.setAttribute("input_jitter", str(in_jitter)) task_result_node.setAttribute("output_jitter", str(out_jitter)) def _annotate_resource(self, resources_result_node, resource_model): resource_result_node = None if resource_model.xml_node.tagName == "Resource": resource_result_node = self.xml_root.createElement("Resource") elif resource_model.xml_node.tagName == "CommResource": resource_result_node = self.xml_root.createElement("CommResource") else: raise InvalidSMFFXMLException("Invalid resource xml description", resource_model.xml_node) resources_result_node.appendChild(resource_result_node) resource_result_node.setAttribute("name", str(resource_model.name)) resource_result_node.setAttribute("ID", str(resource_model.smff_id)) resource_result_node.setAttribute("load", str(resource_model.load())) def _annotate_resources(self, analysis_node): resources_result_node = self.xml_root.createElement("Resources") analysis_node.appendChild(resources_result_node) for resource_model in self.system.resources: self._annotate_resource(resources_result_node, resource_model) def _annotate_tasks(self, application_node, smff_application): for task_model in smff_application.tasks_pycpa: task_result_node = None if task_model.xml_node.tagName == "Task": task_result_node = self.xml_root.createElement("Task") application_node.appendChild(task_result_node) else: raise InvalidSMFFXMLException("Invalid task xml description, expected Task", task_model.xml_node) self._annotate_task(task_result_node, task_model, smff_application) for task_model in smff_application.links_pycpa: if task_model.xml_node.tagName == "TaskLink": task_result_node = self.xml_root.createElement("TaskLink") application_node.appendChild(task_result_node) else: raise InvalidSMFFXMLException("Invalid task xml description, expected TaskLink", task_model.xml_node) self._annotate_task(task_result_node, task_model, smff_application) def _annotate_applications(self, analysis_node): applications_result_node = self.xml_root.createElement("Applications") analysis_node.appendChild(applications_result_node) for smff_application in self.smff_applications: self._annotate_application(applications_result_node, smff_application) def _annotate_application(self, applications_result_node, smff_application): application_result_node = self.xml_root.createElement("Application") applications_result_node.appendChild(application_result_node) application_result_node.setAttribute("appV", str(smff_application.name)) application_result_node.setAttribute("appID", str(smff_application.id)) self._annotate_tasks(application_result_node, smff_application) def annotate_results(self): analysis_node = None # # remove old analysis results while len(self.xml_root.childNodes[0].getElementsByTagName("Analysis")) > 0: analysis_node = self.xml_root.childNodes[0].getElementsByTagName("Analysis")[0] self.xml_root.childNodes[0].removeChild(analysis_node) analysis_node = self.xml_root.createElement("Analysis") self.xml_root.childNodes[-1].appendChild(analysis_node) for option, attr in options._opts_dict.items(): analysis_node.setAttribute(option, attr) self._annotate_resources(analysis_node) self._annotate_applications(analysis_node) def write(self, filename): f = open(filename, 'w') data = self.xml_root.toxml() f.write(data) f.close()
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4