Skip to content
Snippets Groups Projects
trigger_rates.py 9.92 KiB
#!/usr/bin/env python
# coding=utf-8
# Filename: trigger_rates.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
"""
Monitors trigger rates.

Usage:
    trigger_rates.py [options]
    trigger_rates.py (-h | --help)

Options:
    -l LIGIER_IP    The IP of the ligier [default: 127.0.0.1].
    -p LIGIER_PORT  The port of the ligier [default: 5553].
    -o PLOT_DIR     The directory to save the plot [default: plots].
    -h --help       Show this screen.

"""
from __future__ import division, print_function

from datetime import datetime
from collections import defaultdict, deque, OrderedDict
from itertools import chain
from functools import partial
import sys
from io import BytesIO
from os.path import join, exists
import shutil
import time
import threading

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as md

import km3pipe as kp
from km3pipe.config import Config
from km3pipe.io.daq import (DAQPreamble, DAQEvent, is_3dshower, is_3dmuon,
                            is_mxshower)
import km3pipe.style

VERSION = "1.0"
km3pipe.style.use('km3pipe')


class TriggerRate(kp.Module):
    """Trigger rate plotter"""
    def configure(self):
        self.plots_path = self.require('plots_path')
        self.data_path = self.get('data_path', default='data')
        self.interval = self.get("interval",
                                 default=self.trigger_rate_sampling_period())
        self.filename = self.get("filename", default="trigger_rates")
        self.with_minor_ticks = self.get("with_minor_ticks", default=False)

        self.sendmail = kp.time.Cuckoo(
            15 * 60, partial(kp.tools.sendmail, "orca.alerts@km3net.de"))

        print("Update interval: {}s".format(self.interval))
        self.trigger_counts = defaultdict(int)
        self.trigger_rates = OrderedDict()
        self._trigger_types = ["Overall", "3DMuon", "MXShower", "3DShower"]
        self.trigger_rates_fobj = None

        self.initialise_data_logging()

        self.styles = {
            "xfmt": md.DateFormatter('%Y-%m-%d %H:%M'),
            "general": dict(markersize=6, linestyle=':', linewidth=1),
            "Overall": dict(marker='D', color='tomato', markeredgewidth=1),
            "3DMuon": dict(marker='X', color='dodgerblue'),
            "MXShower": dict(marker='v', color='orange'),
            "3DShower": dict(marker='^', color='olivedrab'),
        }

        queue_len = int(60 * 24 / (self.interval / 60))
        for trigger in self._trigger_types:
            self.trigger_rates[trigger] = deque(maxlen=queue_len)

        self.run = True
        threading.Thread(target=self.plot).start()
        self.lock = threading.Lock()

        self.run_changes = []
        self.current_run_id = 0
        self.det_id = 0

    def initialise_data_logging(self):
        """Set up a CSV to store the trigger rate data"""
        filename = join(self.data_path, "trigger_rates.csv")
        if not exists(filename):
            self.trigger_rates_fobj = open(filename, "w")
            self.trigger_rates_fobj.write('timestamp,' +
                                          ','.join(self._trigger_types) + '\n')
        else:
            self.trigger_rates_fobj = open(filename, "a")
        self.trigger_rates_fobj.flush()

    def process(self, blob):
        """Analyse the trigger flags for an incoming event"""
        if not str(blob['CHPrefix'].tag) == 'IO_EVT':
            return blob
        sys.stdout.write('.')
        sys.stdout.flush()

        data = blob['CHData']
        data_io = BytesIO(data)
        preamble = DAQPreamble(file_obj=data_io)  # noqa
        event = DAQEvent(file_obj=data_io)
        self.det_id = event.header.det_id
        if event.header.run > self.current_run_id:
            self.current_run_id = event.header.run
            self._log_run_change()
        tm = event.trigger_mask
        with self.lock:
            self.trigger_counts["Overall"] += 1
            self.trigger_counts["3DShower"] += is_3dshower(tm)
            self.trigger_counts["MXShower"] += is_mxshower(tm)
            self.trigger_counts["3DMuon"] += is_3dmuon(tm)

        print(self.trigger_counts)

        return blob

    def _log_run_change(self):
        """Keep track of a run change"""
        self.print("New run: %s" % self.current_run_id)
        now = datetime.utcnow()
        self.run_changes.append((now, self.current_run_id))

    def _get_run_changes_to_plot(self):
        """Retrieve all run numbers to be plotted on the trigger rate plot"""
        self.print("Checking run changes out of range")
        overall_rates = self.trigger_rates['Overall']
        if not overall_rates:
            self.print("No trigger rates logged  yet, nothing to remove.")
            return
        self.print("  all:     {}".format(self.run_changes))
        run_changes_to_plot = []
        min_timestamp = min(overall_rates)[0]
        self.print("  earliest timestamp to plot: {}".format(min_timestamp))
        for timestamp, run in self.run_changes:
            if timestamp > min_timestamp:
                run_changes_to_plot.append((timestamp, run))
        self.print("  to plot: {}".format(run_changes_to_plot))
        return run_changes_to_plot

    def plot(self):
        """The plot loop, calling the plotter every `self.interval` seconds."""
        while self.run:
            time.sleep(self.interval)
            timestamp, trigger_rates = self.calculate_trigger_rates()
            self.write_trigger_rates(timestamp, trigger_rates)
            self.create_plot()

    def write_trigger_rates(self, timestamp, trigger_rates):
        """Write the trigger rate information to the CSV file"""
        entry = f"{timestamp}"
        for trigger_type in self._trigger_types:
            try:
                trigger_rate = trigger_rates[trigger_type]
            except KeyError:
                trigger_rate = 0
            if trigger_rate == 0:
                self.sendmail("Subject: Trigger rate is 0Hz!\n\n")
            entry += f",{trigger_rate}"
        entry += '\n'
        self.trigger_rates_fobj.write(entry)
        self.trigger_rates_fobj.flush()

    def calculate_trigger_rates(self):
        """Calculate the trigger rates from the event trigger parameters"""
        timestamp = datetime.utcnow()
        trigger_rates = {}
        with self.lock:
            for trigger, n_events in self.trigger_counts.items():
                trigger_rate = n_events / self.interval
                self.trigger_rates[trigger].append((timestamp, trigger_rate))
                trigger_rates[trigger] = trigger_rate
            self.trigger_counts = defaultdict(int)
        return timestamp.timestamp(), trigger_rates

    def create_plot(self):
        """Create the trigger rate plot"""
        print('\n' + self.__class__.__name__ + ": updating plot.")

        fig, ax = plt.subplots(figsize=(16, 4))

        for trigger, rates in self.trigger_rates.items():
            if not rates:
                self.log.warning("Empty rates, skipping...")
                continue
            timestamps, trigger_rates = zip(*rates)
            ax.plot(timestamps,
                    trigger_rates,
                    **self.styles[trigger],
                    **self.styles['general'],
                    label=trigger)

        run_changes_to_plot = self._get_run_changes_to_plot()
        self.print("Recorded run changes: {}".format(run_changes_to_plot))
        all_rates = [r for d, r in chain(*self.trigger_rates.values())]
        if not all_rates:
            self.log.warning("Empty rates, skipping...")
            return
        min_trigger_rate = min(all_rates)
        max_trigger_rate = max(all_rates)
        for run_start, run in run_changes_to_plot:
            plt.text(run_start, (min_trigger_rate + max_trigger_rate) / 2,
                     "\nRUN %s  " % run,
                     rotation=60,
                     verticalalignment='top',
                     fontsize=8,
                     color='gray')
            ax.axvline(run_start, color='#ff0f5b', linestyle='--',
                       alpha=0.8)  # added

        ax.set_title("Trigger Rates for DetID-{0}\n{1} UTC".format(
            self.det_id,
            datetime.utcnow().strftime("%c")))
        ax.set_xlabel("time")
        ax.set_ylabel("trigger rate [Hz]")
        ax.xaxis.set_major_formatter(self.styles["xfmt"])
        ax.grid(True, which='minor')
        if self.with_minor_ticks:
            ax.minorticks_on()
        plt.legend()

        fig.tight_layout()

        filename = join(self.plots_path, self.filename + '_lin.png')
        filename_tmp = join(self.plots_path, self.filename + '_lin_tmp.png')
        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
        shutil.move(filename_tmp, filename)

        try:
            ax.set_yscale('log')
        except ValueError:
            pass

        filename = join(self.plots_path, self.filename + '.png')
        filename_tmp = join(self.plots_path, self.filename + '_tmp.png')
        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
        shutil.move(filename_tmp, filename)

        plt.close('all')
        print("Plot updated at '{}'.".format(filename))

    def trigger_rate_sampling_period(self):
        """This is obsolete and will be removed"""
        try:
            return int(Config().get("Monitoring",
                                    "trigger_rate_sampling_period"))
        except (TypeError, ValueError):
            return 180

    def finish(self):
        self.trigger_rates_fobj.close()
        self.run = False


def main():
    from docopt import docopt
    args = docopt(__doc__, version=VERSION)

    plots_path = args['-o']
    ligier_ip = args['-l']
    ligier_port = int(args['-p'])

    pipe = kp.Pipeline()
    pipe.attach(kp.io.ch.CHPump,
                host=ligier_ip,
                port=ligier_port,
                tags='IO_EVT',
                timeout=60 * 60 * 24 * 7,
                max_queue=200000)
    pipe.attach(TriggerRate, interval=300, plots_path=plots_path)
    pipe.drain()


if __name__ == '__main__':
    main()