"""
| Copyright (C) 2007-2017 Jonas Diemer, Philip Axer, Daniel Thiele, Johannes Schlatow
| TU Braunschweig, Germany
| All rights reserved.
| See LICENSE file for copyright and license details.
:Authors:
- Jonas Diemer
- Philip Axer
- Johannes Schlatow
Description
-----------
It should be imported in scripts that do the analysis.
We model systems composed of resources and tasks.
Tasks are activated by events, modeled as event models.
The general System Model is described in Section 3.6.1 in [Jersak2005]_
or Section 3.1 in [Henia2005]_.
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
import math
import logging
import copy
import warnings
from . import options
from . import util
INFINITY = float('inf')
logger = logging.getLogger(__name__)
def _warn_float(value, reason=""):
""" Prints a warning with reason if value is float.
"""
if type(value) == float:
warnings.warn("You are using floats, " +
"this may yield non-pessimistic results (" +
reason + ")", UserWarning)
[docs]class ConstraintsManager(object):
""" This class manages all system-wide constraints such as deadlines,
buffersizes and more.
"""
def __init__(self):
# # local task deadlines
self._wcrt_constraints = dict()
# # latency contraints
self._path_constraints = dict()
# # buffer size constraints
self._backlog_constraints = dict()
# # resource load constraints
self._load_constraints = dict()
[docs] def add_wcrt_constraint(self, task, deadline):
""" adds a local task deadline constraint
wcrt must be less or equal than deadline
"""
self._wcrt_constraints[task] = deadline
[docs] def add_path_constraint(self, path, deadline, n=1):
""" adds a path latency constraint
latency for n events must be less or equal than deadline
"""
self._path_constraints[path] = (deadline, n)
[docs] def add_backlog_constraint(self, task, size):
""" adds a buffer size constraint
backlog must be less or equal than size
"""
self._backlog_constraints[task] = size
[docs] def add_load_constraint(self, resource, load):
""" adds a resource load constraint
actual load on the specified resource must be less or equal than load
"""
self._load_constraints[resource] = load
[docs]class EventModel (object):
""" The event model describing the activation of tasks as described
in [Jersak2005]_, [Richter2005]_, [Henia2005]_.
Internally, we use :math:`\delta^-(n)` and :math:`\delta^+(n)`,
which represent the minimum/maximum time window containing n events.
They can be transformed into
:math:`\eta^+(\Delta t)` and :math:`\eta^-(\Delta t)`
which represent the maximum/minimum number of events arriving within
:math:`\Delta t`.
"""
def __init__(self, name='min', container=dict(), **kwargs):
""" CTOR
If called without parameters, a maximal event model (unbounded amount
of activations) is created
"""
# # Enables or disables caching
self.en_caching = not options.get_opt('nocaching')
# # Cache to speedup busy window calculations
self.delta_min_cache = dict()
self.delta_plus_cache = dict()
self.eta_min_cache = dict()
self.eta_plus_cache = dict()
self.eta_min_closed_cache = dict()
self.eta_plus_closed_cache = dict()
# # Takes arbitrary objects that will be propagated along
# with the event model.
# Remark: propagation stops at junctions (for now)
self.container = container
# # String description of event model
self.__description__ = name
# After all mandatory attributes have been initialized above, load
# those set in kwargs
for key in kwargs:
setattr(self, key, kwargs[key])
def deltamin_func(self, n):
# # Event model delta function (internal)
# maximal model: unlimited activations
return 0
def deltaplus_func(self, n):
# # Event model delta function (internal)
# minimal model: no activation
return float("inf")
[docs] @staticmethod
def delta_min_from_eta_plus(etaplus_func):
""" Delta-minus Function
Return the minimum time window containing n activations.
The delta_minus-function is derived from the eta_plus-function.
This function is rarely needed, as EventModels are represented
by delta-functions internally.
Equation 3.7 from [Schliecker2011]_.
"""
def delta_min(n):
if n < 2:
return 0
if n == INFINITY:
return float('NaN')
hi = 10
lo = 0
# search an upper bound
while etaplus_func(hi) < n:
lo = hi
hi *= 10
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = etaplus_func(mid)
if midval < n:
lo = mid + 1
else:
hi = mid
hi -= 1
if hi >= 0:
assert etaplus_func(hi) < n
assert etaplus_func(hi + 1) >= n
return int(math.floor(hi))
return delta_min
[docs] @staticmethod
def delta_plus_from_eta_min(etamin_func):
""" Delta-plus Function
Return the maximum time window containing n activations.
The delta_plus-function is derived from the eta_minus-function.
This function is rarely needed, as EventModels are represented
by delta-functions internally.
Equation 3.8 from [Schliecker2011]_.
"""
def delta_plus(n):
if n < 2:
return 0
if n == INFINITY:
return float('NaN')
hi = 10
lo = 0
# search an upper bound
while etamin_func(hi) < n - 1:
lo = hi
hi *= 10
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = etamin_func(mid)
if midval < n - 1:
lo = mid + 1
else:
hi = mid
hi -= 1
if hi >= 0:
assert etamin_func(hi) < n - 1
assert etamin_func(hi + 1) >= n - 1
return int(math.floor(hi+1))
return delta_plus
[docs] def eta_plus(self, w):
""" Eta-plus Function
Return the maximum number of events in a time window w.
Derived from Equation 3.5 from [Schliecker2011]_,
but assuming half-open intervals for w
as defined in [Richter2005]_.
"""
n = self.eta_plus_cache.get(w, None)
if n is not None:
return n
# the window for 0 activations is 0
if w <= 0:
return 0
# if the window does not include 2 activations, assume that one has
# occured
if self.delta_min(2) > w:
return 1
# if delta_min is constant zero, eta_plus is always infinity
if self.delta_min(INFINITY) == 0:
return INFINITY
hi = 10
lo = 2
# search an upper bound
while self.delta_min(hi) < w:
lo = hi
hi *= 2
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = self.delta_min(mid)
if midval < w:
lo = mid + 1
else:
hi = mid
hi -= 1
assert self.delta_min(hi) < w
assert self.delta_min(hi + 1) >= w
if self.en_caching:
self.eta_plus_cache[w] = hi
return hi
[docs] def eta_plus_closed(self, w):
""" Eta-plus Function
Return the maximum number of events in a time window w.
Derived from Equation 3.5 from [Schliecker2011]_,
but assuming CLOSED intervals for w
as defined in [Richter2005]_.
This is technically identical to eta_plus(w + EPSILON),
but the use of epsilon has issues with float precision,
as w+EPSILON == w for large w and small Epsilon
(e.g. 40000000+1e-9)
"""
n = self.eta_plus_closed_cache.get(w, None)
if n is not None:
return n
# if the window does not include 2 activations, assume that one has
# occured
if self.delta_min(2) > w:
return 1
# if delta_min is constant zero, eta_plus is always infinity
if self.delta_min(INFINITY) == 0:
return INFINITY
hi = 10
lo = 2
# search an upper bound
while self.delta_min(hi) <= w:
lo = hi
hi *= 2
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = self.delta_min(mid)
if midval <= w:
lo = mid + 1
else:
hi = mid
hi -= 1
assert self.delta_min(hi) <= w
assert self.delta_min(hi + 1) > w
if self.en_caching:
self.eta_plus_closed_cache[w] = hi
return hi
[docs] def eta_min(self, w):
""" Eta-minus Function
Return the minimum number of events in a time window w.
Derived from Equation 3.6 from [Schliecker2011]_,
but different, as Eq. 3.6 is wrong.
"""
n = self.eta_min_cache.get(w, None)
if n is not None:
return n
if w < 0:
w = 0
MAX_EVENTS = 10000
hi = 10
lo = 2
# search an upper bound
while self.delta_plus(hi) <= w:
if(hi > MAX_EVENTS):
logger.error("w=%f" % w + " n=%d" % hi +
"deltaplus(n)=%d" % self.delta_plus(hi))
return hi
lo = hi
hi *= 10
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = self.delta_plus(mid)
if midval <= w:
lo = mid + 1
else:
hi = mid
hi -= 1
if (self.delta_plus(hi) > w):
print ("delta_plus(" + str(hi) + ") = " + str(self.delta_plus(hi)) + " > " + str(w))
assert self.delta_plus(hi) <= w
assert self.delta_plus(hi + 1) > w
if self.en_caching:
self.eta_min_cache[w] = hi-1
return hi-1
[docs] def eta_min_closed(self, w):
""" Eta-minus Function
Return the minimum number of events in a time window w.
Using CLOSED intevals
"""
n = self.eta_min_closed_cache.get(w, None)
if n is not None:
return n
if w < 0:
w = 0
MAX_EVENTS = 10000
hi = 10
lo = 2
# search an upper bound
while self.delta_plus(hi) < w:
if(hi > MAX_EVENTS):
logger.error("w=%f" % w + " n=%d" % hi +
"deltaplus(n)=%d" % self.delta_plus(hi))
return hi
lo = hi
hi *= 10
# apply binary search
while lo < hi:
mid = (lo + hi) // 2
midval = self.delta_plus(mid)
if midval < w:
lo = mid + 1
else:
hi = mid
hi -= 1
assert self.delta_plus(hi) < w
assert self.delta_plus(hi + 1) >= w
if self.en_caching:
self.eta_min_closed_cache[w] = hi-1
return hi-1
[docs] def delta_min(self, n):
""" Delta-minus Function
Return the minimum time interval between
the first and the last event
of any series of n events.
This is actually a wrapper to allow caching of delta functions.
"""
if n < 2:
return 0
# # Caching is activated
if self.en_caching == True:
d = self.delta_min_cache.get(n, None)
if d == None:
d = self.deltamin_func(n)
self.delta_min_cache[n] = d
return d
# # default policy
return self.deltamin_func(n)
[docs] def delta_plus(self, n):
""" Delta-plus Function
Return the maximum time interval between
the first and the last event
of any series of n events.
This is actually a wrapper to allow caching of delta functions.
"""
if n < 2:
return 0
# # Caching is activated
if self.en_caching == True:
d = self.delta_plus_cache.get(n, None)
if d == None:
d = self.deltaplus_func(n)
self.delta_plus_cache[n] = d
return d
# # default policy
return self.deltaplus_func(n)
[docs] def load(self, accuracy=1000):
""" Returns the asymptotic load,
i.e. the avg. number of events per time
"""
# print "load = ", float(self.eta_plus(accuracy)),"/",accuracy
# return float(self.eta_plus(accuracy)) / accuracy
if self.delta_min(accuracy) == 0:
return float("inf")
else:
return float(accuracy) / self.delta_min(accuracy)
def flush_cache(self):
self.delta_min_cache = dict()
self.delta_plus_cache = dict()
self.eta_min_cache = dict()
self.eta_plus_cache = dict()
self.eta_min_closed_cache = dict()
self.eta_plus_closed_cache = dict()
def __repr__(self):
""" Return a description of the Event-Model"""
return self.__description__
[docs]class PJdEventModel (EventModel):
""" A periodic, jitter, min-distance event model.
"""
def __init__(self, P=0, J=0, dmin=0, phi=0, name='min', **kwargs):
""" Periodic, Jitter, min. distance event model. Offset can be supplied
but is not evaluated by all analyses.
"""
EventModel.__init__(self, name, **kwargs)
# setup event model
self.set_PJd(P, J, dmin, phi)
[docs] def set_PJd(self, P, J=0, dmin=0, phi=0, early_arrival=False):
""" Sets the event model to a periodic activation
with jitter and minimum distance.
Equations 1 and 2 from [Schliecker2008]_.
"""
_warn_float(P, "Period")
_warn_float(J, "Jitter")
_warn_float(dmin, "dmin")
# save away the properties in case a local analysis uses them directly
self.P = P
self.J = J
self.dmin = dmin
# offset for some context sensitive analyses
self.phi = phi
if self.phi > 0:
self.__description__ = "P={} J={} d={} phi={}".format(P, J, dmin, phi)
else:
self.__description__ = "P={} J={} d={}".format(P, J, dmin)
if early_arrival:
raise(NotImplementedError)
def deltaplus_func(self, n):
return (n - 1) * self.P + self.J
def deltamin_func(self, n):
return max((n - 1) * self.dmin, (n - 1) * self.P - self.J)
[docs]class CTEventModel (EventModel):
""" c events every T time event model.
"""
def __init__(self, c, T, dmin=1, name='min', **kwargs):
EventModel.__init__(self, name, kwargs)
self.set_c_in_T(c, T, dmin)
[docs] def set_c_in_T(self, c, T, dmin=1):
""" Sets the event-model to a periodic Task
with period T and c activations per period.
No minimum arrival rate is assumed (delta_plus = infinity)!
Cf. Equation 1 in [Diemer2010]_.
"""
assert c*dmin <= T
self.__description__ = "%d every %d, dmin=%d" % (c, T, dmin)
self.c = c
self.T = T
self.dmin = dmin
def deltamin_func(self, n):
if self.c == 0 or self.T >= INFINITY:
return 0
if n == INFINITY:
return INFINITY
else:
return (n - 1) * self.dmin + int(math.floor(float(n - 1) / self.c)
* (self.T - self.c * self.dmin))
def deltaplus_func(self, n):
return INFINITY
[docs]class LimitedDeltaEventModel(EventModel):
""" User supplied event model on a limited delta domain.
"""
def __init__(self,
limited_delta_min_func=None,
limited_delta_plus_func=None,
limit_q_min=float('inf'),
limit_q_plus=float('inf'),
min_additive=util.recursive_min_additive,
max_additive=util.recursive_max_additive,
name='min',
**kwargs):
EventModel.__init__(self, name, kwargs)
self.set_limited_delta(limited_delta_min_func, limited_delta_plus_func, limit_q_min, limit_q_plus, min_additive, max_additive)
[docs] def set_limited_delta(self,
limited_delta_min_func,
limited_delta_plus_func,
limit_q_min=float('inf'),
limit_q_plus=float('inf'),
min_additive=util.recursive_min_additive,
max_additive=util.recursive_max_additive):
""" Sets the event model to an arbitrary function specified
by limited_delta_min_func and limited_delta_plus_func.
Contrary to directly setting deltamin_func and deltaplus_func,
the given functions are only valid in a limited domain [0, limit_q_min]
and [0, limit_q_plus] respectively.
For values of q beyond this range, a conservative extension
(additive extension) is used.
You can also supply a list() object to this function by using
lambda x: limited_delta_min_list[x]
"""
self.__description__ = "ltd. direct"
self.max_additive = max_additive
self.min_additive = min_additive
self.limited_delta_min_func = limited_delta_min_func
self.limited_delta_plus_func = limited_delta_plus_func
self.limit_q_min = limit_q_min
self.limit_q_plus = limit_q_plus
def deltamin_func(self, n):
if n == float("inf"):
return float("inf")
elif n > self.limit_q_min: # return additive extension if necessary
q_max = self.limit_q_min - 1
ret = self.max_additive(lambda x: self.delta_min(x + 1),
n - 1, q_max, self.delta_min_cache)
return ret
else:
return self.limited_delta_min_func(n)
def deltaplus_func(self, n):
if n == float("inf"):
return float("inf")
elif n > self.limit_q_plus: # return additive extension if necessary
q_max = self.limit_q_plus - 1
ret = self.min_additive(lambda x: self.delta_plus(x + 1),
n - 1, q_max, self.delta_plus_cache)
return ret
else:
return self.limited_delta_plus_func(n)
[docs]class TraceEventModel (LimitedDeltaEventModel):
def __init__(self, trace_points=[], min_sample_size=20,
min_additive=util.recursive_min_additive,
max_additive=util.recursive_max_additive,
name='min',
**kwargs):
LimitedDeltaEventModel.__init__(self, name=name, **kwargs)
self.trace_points = trace_points
self.min_sample_size = min_sample_size
self.min_addititive = min_additive
self.max_additive = max_additive
self.set_limited_trace(trace_points, min_sample_size, min_additive, max_additive)
[docs] def set_limited_trace(self,
trace_points,
min_sample_size=20,
min_additive=util.recursive_min_additive,
max_additive=util.recursive_max_additive):
""" Compute a pseudo-conservative event model from a given trace
(e.g. from SymTA/S TraceAnalyzer or similar).
trace_points must be a list of integers encoding the arrival time
of an event. The algorithm will compute delta_min and delta_plus based
on the trace by evaluating all candidates.
min_sample_size is the minimum amount of candidates that must
be available to derive a representative deltamin/deltaplus
"""
for p in set(trace_points):
if type(p) == float:
warnings.warn("You are using floats in your timestamps,"
" this may yield non-pessimistic results"
" consider using time conversion from pycpa.util")
break
trace = trace_points
q_max = len(trace_points)
try:
import numpy
nptrace = numpy.array(trace)
def raw_deltamin_func(n):
a = nptrace[0:q_max-n+1]
b = nptrace[(n-1):q_max]
d = numpy.amin(b-a)
return d
def raw_deltaplus_func(n):
a = nptrace[0:q_max-n+1]
b = nptrace[(n-1):q_max]
d = numpy.amin(b-a)
return d
except ImportError:
def raw_deltamin_func(n):
""" raw trace deltamin_func, only valid in the interval [0,q_max]
"""
assert n >= 0
assert n <= q_max
d = min(trace[q + n - 1] - trace[q] for q in range(0, q_max - n + 1))
return d
def raw_deltaplus_func(n):
""" raw trace deltaplus_func, only valid in the interval [0,q_max]
"""
assert n >= 0
assert n <= q_max
d = max(trace[q + n - 1] - trace[q] for q in range(0, q_max - n + 1))
return d
# set the trace as a limited delta function and let pycpa extrapolate
limit_q_max = max(2, q_max - min_sample_size)
# print("q_max", q_max, "trace_size", trace.size, limit_q_max)
self.set_limited_delta(raw_deltamin_func, raw_deltaplus_func,
limit_q_max, limit_q_max, min_additive, max_additive)
self.__description__ = "trace-based"
[docs]class Junction (object):
""" A junction combines multiple event models into one output event model
This is used to model multi-input tasks.
Typical semantics are "and" and "or" strategies.
See Chapter 4 in [Jersak2005]_ for definitions and details.
"""
def __init__(self, name="unknown", strategy=None):
""" CTOR """
# # Name
self.name = name
# # Strategy for the model propagation
self.strategy = strategy
# # Set of input tasks
self.prev_tasks = set()
# # Output event model
self.out_event_model = None
# # Link to next Tasks or Junctions,
# i.e. where to supply event model to
self.next_tasks = set()
self.in_event_models = dict()
# # store analysis results of sampling delay
self.analysis_results = dict()
# # at some point Junction looks like a task
# i.e. provide wcet, bcet for duck-typing
self.bcet = 0
self.wcet = 0
# # create a task to id mapping
self.mapping = dict()
[docs] def map_task(self, src_task, identifier):
""" maps an identifier to src_task """
self.mapping[src_task] = identifier
@property
def mode(self):
return str(self.strategy)
def invalidate_event_model_cache(self):
for t in self.next_tasks:
t.invalidate_event_model_cache()
def link_dependent_task(self, task):
task.prev_task = self
self.next_tasks.add(task)
[docs] def clean(self):
""" mark event models as invalid """
self.out_event_model = None
self.in_event_models.clear()
def __repr__(self):
return self.name + " " + str(self.strategy) + " junction"
[docs]class Task (object):
""" A Task is an entity which is mapped on a resource and consumes service.
Tasks are activated by events, which are described by EventModel.
Events are queued in FIFO order at the input of the task,
see Section 3.6.1 in [Jersak2005]_ or Section 3.1 in [Henia2005]_.
"""
def __init__(self, name, *args, **kwargs):
""" CTOR """
# # Descriptive string
self.name = name
# # Link to Resource to which Task is mapped
self.resource = None
# # Link the Path if the task takes part in chained communication
# FIXME: A task can be part of more than one path! Is this used anywhere?
self.path = None
# # Link to Mutex to which Task is mapped
self.mutex = None
# # Link to next Tasks, i.e. where to supply event model to
# # Multiple tasks possible (fork semantic)
self.next_tasks = set()
# Link to previous Task, i.e. the one which supplies our in_event_model
self.prev_task = None
# # Worst-case execution time
self.wcet = 0
# # Best-case execution time
self.bcet = 0
# # Event model activating the Task
self.in_event_model = None
# # Omit analysis
self.skip_analysis = False
# # Set event model propagation
from . import propagation
if not 'OutEventModelClass' in kwargs:
self.OutEventModelClass = propagation.default_propagation_method()
self.analysis_results = None
# compatability to the old call semantics (name, bcet, wcet,
# scheduling_parameter)
if len(args) == 3:
self.bcet = args[0]
self.wcet = args[1]
self.scheduling_parameter = args[2]
# After all mandatory attributes have been initialized above, load
# those set in kwargs
for key in kwargs:
setattr(self, key, kwargs[key])
assert(self.bcet <= self.wcet)
def __repr__(self):
""" Returns string representation of Task """
return self.name
[docs] def load(self, accuracy=100):
""" Returns the load generated by this task """
return self.in_event_model.load(accuracy) * float(self.wcet)
[docs] def bind_resource(self, r):
""" Bind a Task t to a Resource/Mutex r """
self.resource = r
r.tasks.add(self)
for t in r.tasks:
assert t.resource == r
[docs] def unbind_resource(self):
""" Remove a task from its resource """
if self.resource and self in self.resource.tasks:
self.resource.tasks.remove(self)
self.resource = None
[docs] def bind_mutex(self, m):
""" Bind a Task t to a Mutex r """
self.mutex = m
m.tasks.add(self)
[docs] def unbind_mutex(self):
""" Remove a task fromk its mutex """
if self.mutex and self in self.mutex.tasks:
self.mutex.tasks.remove(self)
self.mutex = None
[docs] def link_dependent_task(self, t):
""" Link a dependent task t to the task
The dependent task t is activated by the completion of the task.
This method returns the t argument, which enables elegant task
linking. E.g. to link t0 -> t1 -> t2, call:
t0.link_dependent_task(t1).link_dependent_task(t2)
"""
self.next_tasks.add(t)
if isinstance(t, Task):
t.prev_task = self
else:
t.prev_tasks.add(self)
return t
[docs] def get_resource_interferers(self):
""" returns the set of tasks sharing the same Resource as Task ti
excluding ti itself
"""
if self.resource is None:
return []
interfering_tasks = copy.copy(self.resource.tasks)
interfering_tasks.remove(self)
return interfering_tasks
[docs] def get_mutex_interferers(self):
""" returns the set of tasks sharing the same Mutex as Task ti
excluding ti itself
"""
if self.mutex is None:
return []
interfering_tasks = copy.copy(self.mutex.tasks)
interfering_tasks.remove(self)
return interfering_tasks
def invalidate_event_model_cache(self):
if self.in_event_model is not None:
self.in_event_model.flush_cache()
[docs] def clean(self):
""" Cleans all intermediate analysis results """
# invalidate downstream junctions
for n in self.next_tasks:
if isinstance(n, Junction):
n.clean()
# if this task is activated by another task, we discard the event model
if self.prev_task:
self.in_event_model = None
else:
self.in_event_model.flush_cache()
if self.analysis_results is not None:
self.analysis_results.clean()
def update_execution_time(self, task_results=None):
return
[docs]class Resource (object):
""" A Resource provides service to tasks. """
def __init__(self, name=None, scheduler=None, **kwargs):
""" CTOR """
# # Set of tasks mapped to this Resource
self.tasks = set()
# # Resource identifier
self.name = name
# # Analysis function
self.scheduler = scheduler
# After all mandatory attributes have been initialized above, load
# those set in kwargs
for key in kwargs:
setattr(self, key, kwargs[key])
def __repr__(self):
""" Return string representation of Resource """
s = str(self.name)
return s
[docs] def load(self, accuracy=10000):
""" returns the asymptotic load """
l = 0
for t in self.tasks:
try:
l += t.load(accuracy)
except TypeError:
logger.warn("cannot compute load for %s, skipping load "
"analysis for this resource" % (self.name))
return 0.
assert l < float('inf'), "Load on resource {} is infinity"\
.format(self.name)
assert l >= 0., "Load should be non-negative"
return l
[docs] def bind_task(self, t):
""" Bind task t to resource
Returns t """
t.bind_resource(self)
for task in self.tasks:
assert task.resource == self
return t
[docs] def unmap_tasks(self):
""" unmap all tasks from this resource """
for task in self.tasks:
task.resource = None
self.tasks = set()
def get_task_by_name(self, name):
for t in self.tasks:
if t.name == name:
return t
return None
[docs]class StandardForkStrategy(object):
""" Standard fork strategy: propagates unmodified output event model to all tasks. """
def __init__(self):
self.name = "Standard"
[docs] def output_event_model(self, fork, dst_task=None, task_results=None):
"""
This strategy does not distinguish between destination tasks.
:param fork: Fork from which to take the output event model.
:type fork: model.Fork
:param dst_task: destination task
:type fork: model.Task
"""
return fork.out_event_model
[docs]class Fork (Task):
""" A Fork allows the modification (determined by the assigned strategy)
of output event models dependent on the destination task.
"""
def __init__(self, name, strategy=StandardForkStrategy(), *args, **kwargs):
# # set default fork strategy
self.strategy = strategy
# # call Task CTOR
Task.__init__(self, name, *args, **kwargs)
# # store the output event model (used by the fork strategy)
self.out_event_model = None
# # create a task to id mapping
self.mapping = dict()
[docs] def clean(self):
Task.clean(self)
self.out_event_model = None
[docs] def map_task(self, dst_task, identifier):
""" maps an identifier to dst_task """
self.mapping[dst_task] = identifier
[docs] def get_mapping(self, dst_task):
""" returns the identifier mapped to dst_task (or raises KeyError) """
return self.mapping[dst_task]
[docs]class Mutex(object):
""" A mutually-exclusive shared Resource.
Shared resources create timing interferences between tasks
which may be executed on different resources (e.g. multi-core CPU)
but require access to a common resource (e.g. shared main memory) to execute.
See e.g. Chapter 5 in [Schliecker2011]_.
"""
def __init__(self, name=None):
""" CTOR """
# # Set of tasks mapped to this Resource
self.tasks = set()
# # Resource identifier
self.name = name
[docs]class EffectChain(object):
""" An cause-effect chain describes a (functional) chain of independent tasks.
All tasks within a chain are time-triggered and hence sample their input data independently.
"""
def __init__(self, name, tasks=None):
self.name = name
self.tasks = tasks
def add_task(self, task):
self.tasks.append(task)
[docs] def task_sequence(self, writers_only=False):
""" Generates and returns the sequence of reader/writer tasks in the form of [reader_0, writer_0, reader_1, writer_1,...].
A task in this sequence therefore acts either as a reader or a writer. Tasks at odd positions in this
sequence are readers while tasks at even positions are writers.
:param writers_only: if true, only include writer tasks in sequence (omit readers)
:type writers_only: boolean
"""
sequence = list()
for task in self.tasks:
# add reading and writing tasks
if not writers_only:
sequence.append(task)
sequence.append(task)
return sequence
[docs]class Path(object):
""" A Path describes a (event) chain of tasks.
Required for path analysis (e.g. end-to-end latency).
The information stored in Path classes could be derived from the task graph
(see Task.next_tasks and Task.prev_task),
but having redundancy here is more flexible (e.g. path analysis may only be
interesting for some task chains).
"""
def __init__(self, name, tasks=None):
""" CTOR """
# # List of tasks in Path (must be in correct order)
if tasks is not None:
self.tasks = tasks
self.__link_tasks(tasks)
else:
self.tasks = list()
# # create backlink to this path from the tasks
# # so a task knows its Path
for t in self.tasks:
t.path = self
# # Name of Path
self.name = name
## Constant overhead to add to the latency of the path
self.overhead = 0
def __link_tasks(self, tasks):
""" linking all tasks along a path"""
assert len(tasks) > 0
if len(tasks) == 1:
return # This is a fake path with just one task
for i in zip(tasks[0:-1], tasks[1:]):
i[0].link_dependent_task(i[1])
def __repr__(self):
""" Return str representation """
# return str(self.name)
s = str(self.name) + ": "
for c in self.tasks:
s += " -> " + str(c)
return s
[docs] def print_all(self):
""" Print all tasks in Path. Uses __str__() """
print(str(self))
[docs]class System(object):
""" The System is the top-level entity of the system model.
It contains resources, junctions, tasks and paths.
"""
def __init__(self, name=''):
""" CTOR """
# # Name
self.name = name
# Set of resources, indexed by an ID, e.g. (x,y) tuple for mesh systems
self.resources = set()
# # Set of task chains
self.paths = set()
# # Set of junctions
self.junctions = set()
# # constraints bookkeeping
self.constraints = ConstraintsManager()
def __repr__(self):
""" Return a string representation of the System """
s = 'paths:'
for h in sorted(self.paths, key=str):
s += str(h) + ", "
s += '\nresources:'
for r in sorted(self.resources, key=str):
# s += str(k)+":"+str(r)+", "
s += str(r) + ", "
return s
[docs] def bind_junction(self, j):
""" Registers a junction object in the System.
Logically, the junction neither belongs
to a system nor to a resource,
for sake of convenience we associate junctions with the system.
"""
self.junctions.add(j)
return j
[docs] def bind_resource(self, r):
""" Add a Resource to the System """
self.resources.add(r)
return r
def get_resource_by_name(self, resource_name):
for r in self.resources:
if r.name == resource_name:
return r
return None
[docs] def bind_path(self, path):
""" Add a Path to the System """
self.paths.add(path)
# NOTE: call to "link_dependent_tasks()" on each task of the path now
# inside Path
return path
[docs] def print_subgraphs(self):
""" enumerate all subgraphs of the application graph.
if a subgraph is not well-formed (e.g. a source is missing),
this algorithm may
not work correctly (it will eventually produce to many subgraphs)
"""
subgraphs = list()
unreachable = set()
for resource in self.resources:
unreachable |= set(resource.tasks)
while len(unreachable) > 0:
# pick one random start task (in case the app graph is not well-
# formed)
root_task = iter(unreachable).next()
# but prefer a task with a source attached
for t in unreachable:
if t.in_event_model is not None:
root_task = t
break
reachable = util.breadth_first_search(root_task)
subgraphs.append(reachable)
unreachable = unreachable - reachable
logger.warning("Application graph consists of %d disjoint subgraphs:" %
len(subgraphs))
idx = 0
for subgraph in subgraphs:
logger.info("Subgraph %d" % idx)
idx += 1
for task in subgraph:
logger.info("\t%s" % task)
return subgraphs
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4