"""
| Copyright (C) 2007-2017 Jonas Diemer, Philip Axer
| TU Braunschweig, Germany
| All rights reserved.
| See LICENSE file for copyright and license details.
:Authors:
- Jonas Diemer
- Philip Axer
Description
-----------
General purpose plotting functions:
* event model plotting
* gantt plotting (requires the simulation engine)
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
try:
from matplotlib import ticker
from matplotlib import pyplot
from matplotlib import patches
from matplotlib.collections import PatchCollection
import matplotlib as mpl
except ImportError:
print ("matplotlib not available, plotting disabled")
import math
[docs]def augment_range(plot_range):
""" Adds points around every point in plot_range for accurately plotting integer-based curves """
a_range = plot_range + [x + 0.0001 for x in plot_range] + [x - 0.0001 for x in plot_range]
a_range.sort()
a_range = [x for x in a_range if x >= 0] # clear out negative values
return a_range
[docs]def plot_eta(eta, plot_range, label=None, color=None, show=False,filename=None):
""" Plot an eta function """
pyplot.figure()
pyplot.grid(True)
# range including surroundings of integers to get the steps right
augmented_range = plot_range + [x + 0.0001 for x in plot_range] + [x - 0.0001 for x in plot_range]
augmented_range.sort()
pyplot.plot(augmented_range, [eta(x) for x in augmented_range], label=label, color=color)
pyplot.xlim(xmin=0)
pyplot.ylim(ymin=0)
pyplot.xlabel("$\Delta t$")
if filename is not None:
pyplot.savefig(filename, bbox_inches='tight')
if show:
pyplot.show()
[docs]def plot_event_model(model, num_events, file_format=None, separate_plots=True, file_prefix='', ticks_at_steps=False):
""" Plot the Task's eta and delta_min functions.
Intervals in eta are shown half-open, as defined in [Richter2005]_.
:param model: the event model
:type model: model.EventModel
:param num_events: Number of events to plot
:type n: integer
:param file_format: the format of the file to be plotted
:type file_format: string
:param separate_plots: whether eta and delta plots should be combined
:type separate_plots: bool
:param file_prefix: prefix of file name of plots
:type file_prefix: string
:param ticks_at_steps: If True, draw the x-axis ticks at steps of the functions. Otherwise, let matplotlib decide where to draw ticks.
:type ticks_at_steps: bool
:rtype: None
"""
eps = 1e-1 # epsilon
max_delta_t = model.delta_plus(num_events + 1)
# create ranges which have one point at each step of eta
steps_eta_plus = [model.delta_min(x) for x in range(num_events + 2)]
steps_eta_min = [model.delta_plus(x) for x in range(num_events + 2)]
steps_eta = sorted(set(steps_eta_min + steps_eta_plus))
# range including surroundings of integers
augmented_range = sorted(steps_eta + [x + eps for x in steps_eta] + [x - eps for x in steps_eta])
w, h = pyplot.figaspect(0.7)
pyplot.figure(figsize=(w, h))
# plot eta functions
if not separate_plots:
pyplot.subplot(121)
pyplot.subplots_adjust(left=0.05, bottom=0.1, right=0.97, top=0.92, wspace=None, hspace=None)
mpl.rcParams['lines.markeredgewidth'] = 0.3
#eta minus first (so it appears below in case of overlaps)
pyplot.plot([0], [0], 'g-^', label="$\eta^-(\Delta t)$", markeredgecolor='k') #only one point for label + legend (with line and marker)
pyplot.plot(augmented_range, [model.eta_min(x) for x in augmented_range], 'g-') # line only
pyplot.plot(steps_eta_min, [model.eta_min(x) for x in steps_eta_min], 'g^', markeredgecolor='k') # inclusive markers
pyplot.plot(steps_eta_min, [model.eta_min(x - eps) for x in steps_eta_min], 'w^', markeredgecolor='k') # exclusive markers
#now eta plus on top
pyplot.plot([0], [0], 'r-v', label="$\eta^+(\Delta t)$", markeredgecolor='k') #only one point for label + legend (with line and marker)
pyplot.plot(augmented_range, [model.eta_plus(x) for x in augmented_range], 'r-') # line only
pyplot.plot(steps_eta_plus, [model.eta_plus(x) for x in steps_eta_plus], 'rv', markeredgecolor='k') # inclusive markers
pyplot.plot(steps_eta_plus, [model.eta_plus(x + eps) for x in steps_eta_plus], 'wv', markeredgecolor='k') # exclusive markers
pyplot.xlim(xmin=0, xmax=max_delta_t)
pyplot.ylim(ymin=0, ymax=num_events + .5)
if ticks_at_steps:
pyplot.xticks(steps_eta)
pyplot.title("$\eta(\Delta t)$")
pyplot.xlabel("$\Delta t$")
pyplot.ylabel("$n$")
pyplot.legend(loc='best')
if separate_plots and file_format is not None:
pyplot.savefig(file_prefix + "plot-eta." + file_format)
## plot delta functions
if separate_plots:
pyplot.figure()
if not separate_plots:
pyplot.subplot(122)
range_delta = range(num_events + 1)
# plot delta_min first so it appears below when overlapping
pyplot.plot(range_delta, [model.delta_min(x) for x in range_delta], 'r^', markeredgecolor='k',
label="$\delta^-(n)$")
# plot delta_plus on top
if model.delta_plus(2) < float('inf'):
# only plot delta+ if it is not infinity
pyplot.plot(range_delta, [model.delta_plus(x) for x in range_delta], 'gv', markeredgecolor='k',
label="$\delta^+(n)$")
pyplot.xlim(xmin=0, xmax=num_events + .5)
pyplot.ylim(ymin=0)
pyplot.title("$\delta(n)$")
pyplot.xlabel("n")
pyplot.ylabel("$\Delta t$")
pyplot.legend(loc='best')
if file_format is not None:
if separate_plots:
pyplot.savefig(file_prefix + "plot-delta_min." + file_format, bbox_inches='tight')
else:
pyplot.savefig(file_prefix + "plot." + file_format, bbox_inches='tight')
else:
pyplot.show()
def aesthetic_paper_parameters(column_size=252):
fig_width_pt = column_size # Get this from LaTeX using \showthe\columnwidth
inches_per_pt = 1.0 / 72.27 # Convert pt to inch
golden_mean = (math.sqrt(5) - 1.0) / 2.0 # Aesthetic ratio
fig_width = fig_width_pt * inches_per_pt # width in inches
fig_height = fig_width * golden_mean # height in inches
fig_size = [fig_width, fig_height]
params = {
'figure.dpi' : 72.27,
'axes.labelsize': 10,
'text.fontsize': 10,
'legend.fontsize': 10,
'xtick.labelsize': 8,
'ytick.labelsize': 8,
'text.usetex': True,
'figure.figsize': fig_size}
print( params)
return params
[docs]def plot_gantt(tasks, task_results, file_name=None, show=True, xlim=None,
preemtion_bar_height=0.2,
height=1, #height of the box during actual execution
hdist=1, #vertical distance between two execution bars
bar_linewidth=1, #linewidth of execution bars
min_dist_arrows=0.2, # minimum distance between arrows (e.g. in case actications overlap
plot_event_arrival=True, # plot arrival arrows
plot_activation_finishing=False, # plot finishing arrows
annotate_tasks=True, # annotate additional information
task=None, # the task that should be annotated (draws a WCRT arrow for this task)
wcrt_voffset=0.5, # amount of vertical distance used for the wcrt arrow
annotation_offset=0.2, # offset of the annotation (q=1) text
arrow_width=0.05, # arrow width
arrow_head_width=0.4,
arrow_head_length=0.2,
arrow_xscale=1, #scaling for all arrow sizes in x direction
arrow_yoffset=0.1, # arrow vertical offset
xticks_only_on_changes=False, # place tick only when events happen
color_preemtion_bar='0.30',
color_execution_bar='lightblue',
title='Gantt', # title
number_xticks=20, # scale xtick values
):
""" Plot a gantt chart of a given task list.
Execution time information is taken from the task attribute
q_exec_windows which is written by the simulation framework
"""
#matplotlib.rcParams.update(aesthetic_paper_parameters(column_size = 2 * 252))
w, h = pyplot.figaspect(0.4)
fig = pyplot.figure(figsize=(w * 0.5, h * 0.5))
ax = fig.add_subplot(111)
ypos = 0
yticks = list()
xticks = set()
#arrows = list()
pyplot.title(title)
arrow_width *= arrow_xscale
arrow_head_width *= arrow_xscale
for t in tasks:
parts = list()
yticks.append(ypos) # append a y-tick, so we see a dashed line here
if not hasattr(t, 'q_exec_windows'):
print( "task %s has no q_exec_windows assigned! use simulation.py to simulate the CI" % t.name)
exit(-1)
# broken bar segments
for part in t.q_exec_windows:
parts += part
#print "orig parts", t.name, parts
#print "orig execwindows", t.q_exec_windows, parts
if len(parts) == 0:
ypos -= (hdist + height)
continue
# Draw bar segments
parts = [(p[0], p[1] - p[0]) for p in parts] # transform coordinates to the form (start, length)
for p in parts:
xticks.add(p[0])
xticks.add(p[1])
# draw the bars verically aligned to ypos, so that ypos is the middle of the bar
ax.broken_barh(parts,
(ypos - height / 2. , height),
facecolors=color_execution_bar,
alpha=1,
linewidth=bar_linewidth)
# draw preemtion times, activation and response time arrows
xposshift = 0
for q in range(0, len(t.q_exec_windows)):
#draw actication arrows
xpos = t.in_event_model.delta_min(q + 1)
#draw the the preemption bars
#print t.q_exec_windows[q]
task_activity_end = t.q_exec_windows[q][-1][1]
ax.broken_barh([(xpos, task_activity_end - xpos)],
(ypos - preemtion_bar_height / 2. , preemtion_bar_height),
facecolors=color_preemtion_bar,
edgecolor=color_preemtion_bar,
alpha=1,
zorder=0)
# check if there are multiple activations right after each other
# we don't want them to overlap, s
if q >= 1:
if abs(xpos - t.in_event_model.delta_min(q)) < min_dist_arrows:
xposshift += min_dist_arrows
else:
xposshift = 0.0
if plot_event_arrival == True:
# Draw activation times
activation_arrow = patches.FancyArrow(xpos + xposshift, ypos + arrow_yoffset,
0, height / 2. ,
length_includes_head=True,
width=arrow_width,
head_width=arrow_head_width,
head_length=arrow_head_length,
facecolor='black',
alpha=1)
ax.add_patch(activation_arrow)
if plot_activation_finishing == True:
# Draw finishing times
q_wcrt = t.q_exec_windows[q][-1][1]
response_arrow = patches.FancyArrow(q_wcrt, ypos - arrow_yoffset,
0, -height / 2. ,
length_includes_head=True,
width=arrow_width,
head_width=arrow_head_width,
head_length=arrow_head_length,
facecolor='black',
alpha=1)
ax.add_patch(response_arrow)
if annotate_tasks == True and (task and task == t):
first_segment = t.q_exec_windows[q][0][0]
text = '$q=%d$' % (q + 1)
#if (annotate_wcrt and annotate_wcrt != t):
# text = '$t=%.0f$' % (first_segment)
#ax.text(first_segment + annotation_offset, ypos + height / 2. + annotation_offset, text,
# horizontalalignment = 'left',
# verticalalignment = 'bottom')
ax.text(first_segment + annotation_offset, ypos - height / 2., text,
horizontalalignment='left',
verticalalignment='bottom')
if task and task == t:
print( task_results[t].q_wcrt)
wcrt_start = t.in_event_model.delta_min(task_results[t].q_wcrt)
wcrt_end = task_results[t].wcrt + wcrt_start
annotation_ypos = ypos - height / 2. - wcrt_voffset
wcrt_arrow = patches.FancyArrowPatch((wcrt_start, annotation_ypos),
(wcrt_end, annotation_ypos),
arrowstyle="<->", mutation_scale=20.)
ax.add_patch(wcrt_arrow)
ax.text(wcrt_start + task_results[t].wcrt / 2., annotation_ypos, 'WCRT=%.1f' % (task_results[t].wcrt),
horizontalalignment='center',
verticalalignment='center',
backgroundcolor='white')
ypos -= (hdist + height)
xticks = sorted(list(xticks))
ax.set_xlabel('time $\Delta t$')
ax.set_yticks(yticks) # add the ypos markers for each task
if xticks_only_on_changes == True:
ax.set_xticks(xticks)
else:
ax.xaxis.set_major_locator(ticker.MaxNLocator(number_xticks, steps=[1, 2, 5]))
ax.set_yticklabels([t.name for t in tasks]) # set tasknames
ax.autoscale_view(tight=True, scalex=True, scaley=True) # autoscale first
ax.set_ylim(ypos + height / 2. , height) # set ylim manual
if xlim:
ax.set_xlim(-arrow_head_width, xlim)
ax.grid(True)
if file_name is not None:
pyplot.savefig(file_name, bbox_inches='tight')
if show:
pyplot.show()
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4