Static Priority Preemtive Example

Introduction

_images/spp_graph.png

In this section, we will dissect the SPP example which is representative for the ideas behind pyCPA. The full source code of the example is shown at the end of this section.

Before we begin some general reminder:

pyCPA is NOT a tool! It rather is a package of methods and classes which can be embedded into your pyhon application - the spp example is such an example application.

Each pyCPA program consists of three steps:

  • initialization
  • setting up the architecture
  • one or multiple scheduling analyses

The architecture can be entered in two ways, either you provide it with the source code or you can use an XML loader such as the Symta or the SMFF loader. However, in most cases it is sufficient to code your architecture directly in a python file. For this example we assume that our architecture consists of two resources (e.g. CPUs) scheduled by an static-priority-preemptive (SPP) scheduler and four tasks of which some communicate by event-triggering. The environment stimulus (e.g. an sensor or input from another system) is assumed to be periodic with jitter. The application graph is shown on on the right.

Initialization

Now, let’s look at the example. Before we actually start with the program, we import all pycpa modules which are needed for this example

from pycpa import model
from pycpa import analysis
from pycpa import schedulers
from pycpa import graph
from pycpa import options

The interesting module are pycpa.spp which contains scheduler specific algorithms, pycpa.graph which is used to plot a task graph of this example and pycpa.options which controls various modes in which pyCPA can be executed.

pyCPA can be initialized by pycpa.options.init_pycpa(). This will parse the pyCPA related options such as the propagation method, verbosity, maximum-busy window, etc. Conveniently, this also prints the options which will be used for your pyCPA session. This is handy, when you run some analyses in batch jobs and want are uncertain about the exact settings after a few weeks. However, the explicit call of this function is not necessary most of the time, as it is being implicitly called at the beginning of the analysis. It can be useful to control the exact time where the initialization happens in case you want to manually override some options from your code.

System Model

Now, we create an empty system, which is just a container for all other objects:

    # generate an new system
    s = model.System()

The next step is to create two resources R1 and R2 and bind them to the system via pycpa.model.System.bind_resource(). When creating a resource via pycpa.model.Resource(), the first argument of the constructor sets the resource id (a string) and the second defines the scheduling policy. The scheduling policy is defined by a reference to an instance of a scheduler class derived from pycpa.analysis.Scheduler. For SPP, this is pycpa.spp.SPPScheduler. In this class, different functions are defined which for instance compute the multiple-event busy window on that resource or the stopping condition for that particular scheduling policy. The stopping condition specifies how many activations of a task have to be considered for the analysis. The default implementations of these functions from pycpa.analysis.Scheduler can be used for certain schedulers, but generally should be overridden by scheduler-specific versions. For SPP we have to look at all activations which fall in the level-i busy window, thus we choose the spp stopping condition.

    r1 = s.bind_resource(model.Resource("R1", schedulers.SPPScheduler()))
    r2 = s.bind_resource(model.Resource("R2", schedulers.SPPScheduler()))

The next part is to create tasks via pycpa.model.Resource() and bind them to a resource via pycpa.model.Resource.bind_task(). For tasks, we pass some parameters to the constructor, namely the identifier (string), the scheduling_paramter denoting the priority, and the worst- and best-case execution times (wcet and bcet).

    # create and bind tasks to r1
    t11 = r1.bind_task(model.Task("T11", wcet=10, bcet=5, scheduling_parameter=1))
    t12 = r1.bind_task(model.Task("T12", wcet=3, bcet=1, scheduling_parameter=2))

    # create and bind tasks to r2
    t21 = r2.bind_task(model.Task("T21", wcet=2, bcet=2, scheduling_parameter=1))
    t22 = r2.bind_task(model.Task("T22", wcet=9, bcet=4, scheduling_parameter=2))

In case tasks communicate with each other through event propagation (e.g. one task fills the queue of another task), we model this through task links, which are created by pycpa.model.Task.link_dependent_task() A task link is abstract and does not consume any additional time. In case of communication-overhead it must be modeled by using other resources/tasks.

    # specify precedence constraints: T11 -> T21; T12-> T22
    t11.link_dependent_task(t21)
    t12.link_dependent_task(t22)

Lastly we must set the input event models for all tasks that are not activated by another task. An event model is an abstraction of possible activation patterns and are commonly parametrized by an activation period P, a jitter J (optional) and an minimum distance d (optional). Such an event model is created by pycpa.model.PJdEventModel.

    # register a periodic with jitter event model for T11 and T12
    t11.in_event_model = model.PJdEventModel(P=30, J=5)
    t12.in_event_model = model.PJdEventModel(P=15, J=6)

Plotting the Task-Graph

Then, we plot the taskgraph to a pdf file by using pycpa.graph.graph_system() from the graph module.

    # plot the system graph to visualize the architecture
    g = graph.graph_system(s, 'spp_graph.pdf', dotout='spp_graph.dot')

Analysis

The analysis is performed by calling pycpa.analysis.analyze_system(). This will will find the fixed-point of the scheduling problem and terminate if a result was found or if the system is not feasible (e.g. one busy window or the amount a propagations was larger than a limit or the load on a resource is larger one).

    # perform the analysis
    print("Performing analysis")
    task_results = analysis.analyze_system(s)

pycpa.analysis.analyze_system() returns a dictionary with results for each task in the form of instances to pycpa.analysis.TaskResult. Finally, we print out some of the results:

    # print the worst case response times (WCRTs)
    print("Result:")
    for r in sorted(s.resources, key=str):
        for t in sorted(r.tasks, key=str):
            print("%s: wcrt=%d" % (t.name, task_results[t].wcrt))

The output of this example is:

pyCPA - Compositional Performance Analysis in Python.

Version 1.2
Copyright (C) 2010-2017, TU Braunschweig, Germany. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

invoked via: examples/spp_test.py

check_violations :       False 
debug            :       False 
e2e_improved     :       False 
max_iterations   :        1000 
max_wcrt         :         inf 
nocaching        :       False 
propagation      : busy_window 
show             :       False 
verbose          :       False 



Performing analysis
Result:
T11: wcrt=10
    b_wcrt=q*WCET:1*10=10
T12: wcrt=13
    b_wcrt=T11:eta*WCET:1*10=10, q*WCET:1*3=3
T21: wcrt=2
    b_wcrt=q*WCET:1*2=2
T22: wcrt=19
    b_wcrt=T21:eta*WCET:1*2=2, q*WCET:2*9=18

As you can see, the worst-case response times of the tasks are 10, 13, 2 and 19.

This is the full spp-test file.

"""
| Copyright (C) 2010 Philip Axer
| TU Braunschweig, Germany
| All rights reserved.
| See LICENSE file for copyright and license details.

:Authors:
         - Philip Axer

Description
-----------

Simple SPP example
"""

from pycpa import model
from pycpa import analysis
from pycpa import schedulers
from pycpa import graph
from pycpa import options

def spp_test():
    # init pycpa and trigger command line parsing
    options.init_pycpa()

    # generate an new system
    s = model.System()

    # add two resources (CPUs) to the system
    # and register the static priority preemptive scheduler
    r1 = s.bind_resource(model.Resource("R1", schedulers.SPPScheduler()))
    r2 = s.bind_resource(model.Resource("R2", schedulers.SPPScheduler()))

    # create and bind tasks to r1
    t11 = r1.bind_task(model.Task("T11", wcet=10, bcet=5, scheduling_parameter=1))
    t12 = r1.bind_task(model.Task("T12", wcet=3, bcet=1, scheduling_parameter=2))

    # create and bind tasks to r2
    t21 = r2.bind_task(model.Task("T21", wcet=2, bcet=2, scheduling_parameter=1))
    t22 = r2.bind_task(model.Task("T22", wcet=9, bcet=4, scheduling_parameter=2))

    # specify precedence constraints: T11 -> T21; T12-> T22
    t11.link_dependent_task(t21)
    t12.link_dependent_task(t22)

    # register a periodic with jitter event model for T11 and T12
    t11.in_event_model = model.PJdEventModel(P=30, J=5)
    t12.in_event_model = model.PJdEventModel(P=15, J=6)

    # plot the system graph to visualize the architecture
    g = graph.graph_system(s, 'spp_graph.pdf', dotout='spp_graph.dot')

    # perform the analysis
    print("Performing analysis")
    task_results = analysis.analyze_system(s)

    # print the worst case response times (WCRTs)
    print("Result:")
    for r in sorted(s.resources, key=str):
        for t in sorted(r.tasks, key=str):
            print("%s: wcrt=%d" % (t.name, task_results[t].wcrt))
            print("    b_wcrt=%s" % (task_results[t].b_wcrt_str()))
            

    expected_wcrt = dict()
    expected_wcrt[t11] = 10
    expected_wcrt[t12] = 13
    expected_wcrt[t21] = 2
    expected_wcrt[t22] = 19

    for t in expected_wcrt.keys():
        assert(expected_wcrt[t] == task_results[t].wcrt)

if __name__ == "__main__":
    spp_test()