Source code for pycpa.path_analysis

""" Compositional Performance Analysis Algorithms for Path Latencies

| Copyright (C) 2007-2017 Jonas Diemer, Philip Axer, Johannes Schlatow
| TU Braunschweig, Germany
| All rights reserved.
| See LICENSE file for copyright and license details.

:Authors:
         - Jonas Diemer
         - Philip Axer
         - Johannes Schlatow

Description
-----------

This module contains methods for the ananlysis of path latencies.
It should be imported in scripts that do the analysis.
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division

from . import options
from . import model
from . import util

import math


[docs]def end_to_end_latency(path, task_results, n=1 , task_overhead=0, path_overhead=0, **kwargs): """ Computes the worst-/best-case e2e latency for n tokens to pass the path. The constant path.overhead is added to the best- and worst-case latencies. :param path: the path :type path: model.Path :param n: amount of events :type n: integer :param task_overhead: A constant task_overhead is added once per task to both min and max latency :type task_overhead: integer :param path_overhead: A constant path_overhead is added once per path to both min and max latency :type path_overhead: integer :rtype: tuple (best-case latency, worst-case latency) """ if options.get_opt('e2e_improved') == True: (lmin, lmax) = end_to_end_latency_improved(path, task_results, n, **kwargs) else: (lmin, lmax) = end_to_end_latency_classic(path, task_results, n, **kwargs) for t in path.tasks: # implcitly check if t is a junction if t in task_results: # add per-task overheads lmin += task_overhead lmax += task_overhead # add per-path overhead lmin += path_overhead + path.overhead lmax += path_overhead + path.overhead return (lmin, lmax)
[docs]def end_to_end_latency_classic(path, task_results, n=1, injection_rate='max', **kwargs): """ Computes the worst-/best-case end-to-end latency Assumes that all tasks in the system have successfully been analyzed. Assumes that events enter the path at maximum/minumum rate. The end-to-end latency is the sum of the individual task's worst-case response times. This corresponds to Definition 7.3 in [Richter2005]_. :param path: the path :type path: model.Path :param n: amount of events :type n: integer :param injection_rate: assumed injection rate is maximum or minimum :type injection_rate: string 'max' or 'min' :rtype: tuple (best case latency, worst case latency) """ lmax = 0 lmin = 0 # check if path is a list of Tasks or a Path object tasks = path if isinstance(path, model.Path): tasks = path.tasks for t in tasks: # implcitly check if t is a junction if t in task_results: # sum up best- and worst-case response times lmax += task_results[t].wcrt lmin += task_results[t].bcrt elif isinstance(t, model.Junction): # add sampling delay induced by the junction (if available) prev_task = tasks[tasks.index(t)-1] if prev_task in t.analysis_results: lmin += t.analysis_results[prev_task].bcrt lmax += t.analysis_results[prev_task].wcrt else: print("Warning: no task_results for task %s" % t.name) if injection_rate == 'max': # add the eastliest possible release of event n lmax += tasks[0].in_event_model.delta_min(n) elif injection_rate == 'min': # add the latest possible release of event n lmax += tasks[0].in_event_model.delta_plus(n) # add the earliest possible release of event n lmin += tasks[0].in_event_model.delta_min(n) return lmin, lmax
def _event_arrival_path(path, n, e_0=0): """ Returns the latest arrival time of the n-th event with respect to an event 0 of task 0 (first task in path) This is :math:`e_0(n)` from Lemma 1 in [Schliecker2009recursive]_. """ # if e_0 is None: # the entry time of the first event if n > 0: e = e_0 + path.tasks[0].in_event_model.delta_plus(n + 1) elif n < 0: e = e_0 - path.tasks[0].in_event_model.delta_min(-n + 1) else: e = 0 # same event, so the difference is 0 return e def _event_exit_path(path, task_results, i, n, e_0=0): """ Returns the latest exit time of the n-th event relative to the arrival of an event 0 (cf. Lemma 2 in [Schliecker2009recursive]_) In contrast to Lemma 2, k_max is set so that all busy times are taken into account. """ # logger.debug("calculating exit for task %d, n=%d" % (i, n)) if i == -1: # The exit of task -1 is the arrival of task 0. e = _event_arrival_path(path, n, e_0) elif path.tasks[i] not in task_results: # skip task if there are no results for this # (this may happen if, e.g., a chain analysis has been performed) return _event_exit_path(path, task_results, i-1, n, e_0) else: e = float('-inf') k_max = len(task_results[path.tasks[i]].busy_times) # print("k_max:",k_max) for k in range(1, k_max): e_k = _event_exit_path(path, task_results, i - 1, n - k + 1, e_0) + \ task_results[path.tasks[i]].busy_times[k] # print("e_k:",e_k) if e_k > e: # print("task %d, n=%d k=%d, new e=%d" % (i, n, k, e_k)) e = e_k # print("exit for task %d, n=%d is %d" % (i, n, e)) return e
[docs]def end_to_end_latency_improved(path, task_results, n=1, e_0=0, **kwargs): """ Performs the path analysis presented in [Schliecker2009recursive]_, which improves results compared to end_to_end_latency() for n>1 and bursty event models. lat(n) """ lmax = 0 lmin = 0 lmax = _event_exit_path(path, task_results, len(path.tasks) - 1, n - 1, e_0) - e_0 for t in path.tasks: if isinstance(t, model.Task) and t in task_results: # sum up best-case response times lmin += task_results[t].bcrt elif isinstance(t, model.Junction): print("Error: path contains junctions") else: print("Warning: no task_results for task %s" % t.name) # add the earliest possible release of event n # TODO: Can lmin be improved? lmin += path.tasks[0].in_event_model.delta_min(n) return lmin, lmax
[docs]def cause_effect_chain_data_age(chain, task_results, details=None): """ computes the data age of the given cause effect chain :param chain: model.EffectChain :param task_results: dict of analysis.TaskResult """ return cause_effect_chain(chain, task_results, details, 'data-age')
[docs]def cause_effect_chain_reaction_time(chain, task_results, details=None): """ computes the data age of the given cause effect chain :param chain: model.EffectChain :param task_results: dict of analysis.TaskResult """ return cause_effect_chain(chain, task_results, details, 'reaction-time')
[docs]def cause_effect_chain(chain, task_results, details=None, semantics='data-age'): """ computes the data age of the given cause effect chain :param chain: model.EffectChain :param task_results: dict of analysis.TaskResult """ sequence = chain.task_sequence(writers_only=True) if details is None: details = dict() periods = [_period(t) for t in sequence] if util.GCD(periods) != min(periods): print("Error: cause-effect chain analysis requires harmonic periods") l_max = _phi(sequence[0]) + _jitter(sequence[0]) details[sequence[0].name+'-PHI+J'] = l_max for i in range(len(sequence)): # add write-to-read delay for all but the last task if i < len(sequence)-1: if semantics == 'data-age': # add write to read delay delay = _calculate_backward_distance(sequence[i], sequence[i+1], task_results, details=details) elif semantics == 'reaction-time': delay = _calculate_forward_distance(sequence[i], sequence[i+1], task_results, details=details) else: raise NotImplementedException() l_max += delay # add read-to-write delay (response time) for all tasks delay = task_results[sequence[i]].wcrt details[sequence[i].name+'-WCRT'] = delay l_max += delay return l_max
def _phi(task): if hasattr(task.in_event_model, 'phi'): return task.in_event_model.phi else: return 0 def _period(task): return task.in_event_model.P def _jitter(task): if hasattr(task.in_event_model, 'J'): return task.in_event_model.J else: return 0 def _calculate_backward_distance(writer, reader, task_results, details): """ computes backward distance (for data age) """ if _period(reader) < _period(writer): # oversampling candidates = set() if _period(writer) % _period(reader) != 0: candidates.add(_period(writer) + task_results[writer].wcrt - task_results[writer].bcrt) else: for n in range(int(math.ceil(_period(writer)/_period(reader)))): candidates.add(_rplus(reader, task_results, n) - _wmin(writer, task_results, 0)) # include previous cycle? if _wplus(writer, task_results) > _rmin(reader, task_results, n): candidates.add(_rplus(reader, task_results, n) - _wmin(writer, task_results, -1)) result = max(candidates) else: # undersampling or same period candidates = set() candidates.add(_period(writer) + task_results[writer].wcrt - task_results[writer].bcrt) if _period(reader) % _period(writer) == 0: # include previous cycle? if _wplus(writer, task_results) > _rmin(reader, task_results): candidates.add(_rplus(reader, task_results) - _wmin(writer, task_results, -1)) # include all other possible writers for n in range(int(math.ceil(_period(reader)/_period(writer)))): if _wplus(writer, task_results, n) <= _rplus(reader, task_results): candidates.add(_rplus(reader, task_results) - _wmin(writer, task_results, n)) result = min([c for c in candidates if c >= 0]) details[writer.name+'-'+reader.name+'-delay'] = result return result def _calculate_forward_distance(writer, reader, task_results, details): """ computes forward distance (for reaction time) """ if _period(reader) < _period(writer): # oversampling candidates = set() candidates.add(_period(reader)) if _period(writer) % _period(reader) == 0: for n in range(int(math.ceil(_period(writer)/_period(reader)))): if _rmin(reader, task_results, n) >= _wplus(writer, task_results, 0): candidates.add(_rplus(reader, task_results, n) - _wmin(writer, task_results, 0)) # include previous cycle? if _wplus(writer, task_results) > _rmin(reader, task_results, n): candidates.add(_rplus(reader, task_results, n) - _wmin(writer, task_results, -1)) result = min([c for c in candidates if c >= 0]) else: # undersampling or same period candidates = set() if _period(reader) % _period(writer) != 0: candidates.add(_period(reader)) else: # include all possible writers for n in range(int(math.ceil(_period(reader)/_period(writer)))): candidates.add(_rplus(reader, task_results) - _wmin(writer, task_results, n)) # if write time can be earlier than read time, add distance to next reader if _wplus(writer, task_results, n) > _rmin(reader, task_results): candidates.add(_rplus(reader, task_results, 1) - _wmin(writer, task_results, n)) result = max(candidates) details[writer.name+'-'+reader.name+'-delay'] = result return result def _wplus(writer, task_results, n=0): return n*_period(writer) + _phi(writer) + task_results[writer].wcrt + _jitter(writer) def _wmin(writer, task_results, n=0): return n*_period(writer) + _phi(writer) + task_results[writer].bcrt - _jitter(writer) def _rplus(reader, task_results, n=0): return _wplus(reader, task_results, n) - task_results[reader].bcrt def _rmin(reader, task_results, n=0): return n*_period(reader) + _phi(reader) - _jitter(reader) # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4