Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • km3py/km3mon
  • cguidi/km3mon
2 results
Show changes
Commits on Source (296)
Showing
with 1616 additions and 167 deletions
......@@ -6,6 +6,7 @@ __pycache__
plots/
logs/
pids/
pipeline.toml
backend/pipeline.toml
setenv.sh
supervisord.conf
.env
variables:
DOCKER_HOST: tcp://docker:2375
DOCKER_DRIVER: overlay2
BACKEND_IMAGE: docker.km3net.de/km3mon-backend
RECO_IMAGE: docker.km3net.de/km3mon-reco
FRONTEND_IMAGE: docker.km3net.de/km3mon-frontend
LIVELOG_IMAGE: docker.km3net.de/km3mon-livelog
stages:
- docker
- release
backend:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $BACKEND_IMAGE:test backend/
tags:
- docker
frontend:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $FRONTEND_IMAGE:test frontend/
tags:
- docker
livelog:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $LIVELOG_IMAGE:test livelog/
tags:
- docker
reco:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $RECO_IMAGE:test reco/
tags:
- docker
backend-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $BACKEND_IMAGE:${CI_COMMIT_TAG:1} backend/
- docker tag $BACKEND_IMAGE:${CI_COMMIT_TAG:1} $BACKEND_IMAGE:latest
- docker push $BACKEND_IMAGE:${CI_COMMIT_TAG:1}
- docker push $BACKEND_IMAGE:latest
tags:
- docker
only:
- tags
frontend-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $FRONTEND_IMAGE:${CI_COMMIT_TAG:1} frontend/
- docker tag $FRONTEND_IMAGE:${CI_COMMIT_TAG:1} $FRONTEND_IMAGE:latest
- docker push $FRONTEND_IMAGE:${CI_COMMIT_TAG:1}
- docker push $FRONTEND_IMAGE:latest
tags:
- docker
only:
- tags
livelog-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $LIVELOG_IMAGE:${CI_COMMIT_TAG:1} livelog/
- docker tag $LIVELOG_IMAGE:${CI_COMMIT_TAG:1} $LIVELOG_IMAGE:latest
- docker push $LIVELOG_IMAGE:${CI_COMMIT_TAG:1}
- docker push $LIVELOG_IMAGE:latest
tags:
- docker
only:
- tags
reco-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $RECO_IMAGE:${CI_COMMIT_TAG:1} reco/
- docker tag $RECO_IMAGE:${CI_COMMIT_TAG:1} $RECO_IMAGE:latest
- docker push $RECO_IMAGE:${CI_COMMIT_TAG:1}
- docker push $RECO_IMAGE:latest
tags:
- docker
only:
- tags
Unreleased changes
------------------
* `ztplot.py` now exits with return code 1 when an `OverflowError` occurs
while converting the event time and dumps the raw event data to `data/`
Version 2
---------
2.2.5 / 2022-12-15
~~~~~~~~~~~~~~~~~~
* `log_analyser.py` now also parses gzipped log files
* Several backend scripts now also log the memory consuption
* Fixes memory issues in several backend scrripts, which were
introduced in MatplotLib 3.5.2
* Updated km3pipe to 9.13.8 and km3db to 0.11.1 in the backend
2.2.4 / 2022-12-01
~~~~~~~~~~~~~~~~~~
* Minor fixes
2.2.3 / 2022-11-20
~~~~~~~~~~~~~~~~~~
* Updates dependencies: km3pipe==9.13.5 which fixes the issue with the
triggered_hits scripts
2.2.2 / 2022-11-14
~~~~~~~~~~~~~~~~~~
* Updated dependencies
* `msg_dumper.py` now als logs `Born` and `Died` tagged messages from the
logging ligier
2.2.1 / 2022-11-07
~~~~~~~~~~~~~~~~~~
* Fixes the log analyser script which was crashing when log files
where gzipped
* Added "Born" and "Died" tags to the message logger system
2.2.0 / 2022-05-17
~~~~~~~~~~~~~~~~~~
* Updated requirements for backend and frontent
* Fixed a few other hangs when DB connection issues occur
2.1.7 / 2022-03-10
~~~~~~~~~~~~~~~~~~
* Reco container updated to Julia 1.7.2 and the latest versions
of all dependencies
* Reversed the colours of the DOM activity map
2.1.6 / 2021-12-01
~~~~~~~~~~~~~~~~~~
* Update km3db to 0.7.3 which has better error handling for
database connection issues (especially related to the AHRS
calibration monitoring)
2.1.5 / 2021-11-24
~~~~~~~~~~~~~~~~~~
* Hotfix (fixes trigger rate and ztplot crash)
2.1.4 / 2021-11-23
~~~~~~~~~~~~~~~~~~
* Renamed the docker image tags from foo:vX.Y.Z to foo:X.Y.Z
2.1.3 / 2021-11-21
~~~~~~~~~~~~~~~~~~
* km3pipe pinned to 9.11.2
* All requirements are pinned now
2.1.2 / 2021-11-05
~~~~~~~~~~~~~~~~~~
* Minor bugfixes in CI
2.1.1 / 2021-11-04
~~~~~~~~~~~~~~~~~~
* Added Docker image CI
2.1.0 / 2021-11-04
~~~~~~~~~~~~~~~~~~
* Triggered hits plot added
2.0.0 / 2021-05-05
~~~~~~~~~~~~~~~~~~
* Fully dockerised monitoring system
* Log analyser added by Rodri <rgracia@km3net.de>
* Log files are now showing plots indicating the number of errors and warnings
thanks to Rodri <rgracia@km3net.de>
* Corrupt event data are now skipped in trigger rate, which previously crashed
the thread
* Several catches of errors in online processing to make things run and log
instead of crash and annoy
* Preliminary support for km3pipe v9
Version 1
---------
1.2.0 / 2019-11-25
~~~~~~~~~~~~~~~~~~
......
SHELL := /bin/bash
default: build
build:
pip install -Ur requirements.txt
start:
@echo Starting supervisord
supervisord -c supervisord.conf
stop:
@echo Shutting down supervisord
supervisorctl shutdown
ps aux|grep gunicorn|awk '{print $2}'|xargs kill -9
.PHONY: build start stop
......@@ -6,96 +6,101 @@ Online monitoring suite for the KM3NeT neutrino detectors.
## Requirements
- Python 3.5+
- Docker
Every other dependency will be installed or updated during the `make` procedure
via the Python package manager `pip`.
Everything is containerised, so no need to install other software. The version
of the pre-built Docker base images is determined by the `KM3MON_VERSION` variable
in the `.env` file (see `example.env`). Ideally, you want to keep the same version
as the checked out Git tag, but scripts which are not part of the base images can
be updated independently. The `backend` for example contains a lot of Python scripts
which can easily be updated without touching the base image, that only consists of
a Python installation with the required packages.
## Usage
## Setup
First, install (or update) the requirements by typing
1. Create a file called `.env` from the `example.env` template and adjust the detector
ID and the IP/port of the servers and also the `KM3MON_VERSION` variable, which should
ideally be set to the latest version. This determines the Docker images to be used
for each service.
make
2. Next, create a `backend/supervisord.conf` from the template file
`backend/supervisord.conf.example` and adjust if needed.
Next, create a ``setenv.sh`` script according to the ``setenv_template.sh``
script and apply the detector settings. Here is an example configuration
3. Create a `backend/pipeline.toml` from the `backend/pipeline.toml.example`
and adapt the settings if needed. Don't forget to add operators and shifters.
```shell
#!/bin/bash
export DETECTOR_ID=43
4. Optionally, adapt the layout of the plots in `frontend/app/routes.py`.
# The ligier to get events (IO_EVT), timeslices (e.g. IO_TSSN) and
# summary slices (IO_SUM)
export DAQ_LIGIER_IP=192.168.0.110
export DAQ_LIGIER_PORT=5553
export TAGS_TO_MIRROR="IO_EVT, IO_SUM, IO_TSSN, MSG, IO_MONIT"
## Start and stop
# The logger ligier (MSG)
export LOG_LIGIER_IP=192.168.0.119
export LOG_LIGIER_PORT=5553
The monitoring system can be started using
# The command to start a ligier on the monitoring machine
# export LIGIER_CMD="JLigier"
export LIGIER_CMD="singularity exec /home/off1user/Software/Jpp_svn2git-rc9.sif JLigier"
export MONITORING_LIGIER_PORT=55530
docker compose up -d
export DETECTOR_MANAGER_IP=192.168.0.120
This will download and build all the required images and launch the containers
for each service. It will also create an overlay network.
# The port for the KM3Web monitoring dashboard
export WEBSERVER_PORT=8081
# The port for the log viewer webserver
export LOGGING_PORT=8082
To stop it it
# The detector configuration to be used in the online reconstruction
export DETX="KM3NeT_00000043_03062019_t0set-A02087174.detx"
# Where to save the time residuals
export ROYFIT_TIMERES="data/time_residuals.csv"
```
Notice the `LIGIER_CMD` which in this case uses a Singularity image of Jpp.
The `DETX` needs to point to a recently calibrated DETX file otherwise the
live reconstruction will not work correctly.
docker compose down
## Monitoring the monitoring
Log files are kept in `logs/`, data dumps in `data/` and plots in `plots/`.
These folders can be used (mounted) by all services defined in the
`docker-compose.yml` file. The `frontent` service for example which runs
the Webserver mounts `plots/` to have access to the plots.
To check the logs or follow them in real-time (`-f`) and limit the rewind
to a number of lines `--tail=N`, e.g.
docker compose logs -f --tail=10 SERVICE_NAME
The `SERVICE_NAME` can be any of `backend`, `frontend`, `ligier`, `ligiermirror`,
`ligierlogmirror`, `reco` or `livelog`.
For the weblog you need to download the latest version of `frontail`
https://github.com/mthenw/frontail/releases
and place it in e.g. `/usr/local/bin` (or another directory which is in
`$PATH`).
## Back-end configuration file
Before starting off, you also need to create a `supervisord.conf`. Usually
simply copying the `supervisord_template.conf` is enough, but make sure
to adjust some of the plots which monitoring only specific DUs.
The file `backend/pipeline.toml` is the heart of most monitoring processes and
can be used to set different kind of parameters, like plot attributes or ranges.
That configuration file is accessibe within each Python script under
`backend/scripts`.
After that, use the following command to start the ``supervisor``, which
you only need to do once:
The monitoring back-end is running inside a Docker container and controlled
by `supervisord`. You can enter the `backend` with
source setenv.sh
make start
docker exec -it monitoring_backend_1 bash
From now on ``supervisorctl`` is the tool to communicate with the monitoring
system. To see the status of the processes, use ``supervisorctl status``,
which will show each process one by one (make sure you call it in the
The ``supervisorctl`` is the tool to communicate with the monitoring
back-end system. To see the status of the processes, use `supervisorctl status`,
it will show each process one by one (make sure you call it in the
folder where you launched it):
```
$ supervisorctl status
ligiers:ligiermirror RUNNING pid 611, uptime 1 day, 7:55:09
ligiers:monitoring_ligier RUNNING pid 610, uptime 1 day, 7:55:09
logging:msg_dumper RUNNING pid 7466, uptime 1 day, 7:28:00
logging:weblog RUNNING pid 7465, uptime 1 day, 7:28:00
monitoring_process:ahrs_calibration RUNNING pid 19612, uptime 1 day, 1:20:32
monitoring_process:dom_activity RUNNING pid 626, uptime 1 day, 7:55:09
monitoring_process:dom_rates RUNNING pid 631, uptime 1 day, 7:55:09
monitoring_process:pmt_hrv RUNNING pid 633, uptime 1 day, 7:55:09
monitoring_process:pmt_rates RUNNING pid 632, uptime 1 day, 7:55:09
monitoring_process:rttc RUNNING pid 9717, uptime 10:55:53
monitoring_process:trigger_rates RUNNING pid 637, uptime 1 day, 7:55:09
monitoring_process:triggermap RUNNING pid 638, uptime 1 day, 7:55:09
monitoring_process:ztplot RUNNING pid 7802, uptime 1 day, 7:26:13
webserver RUNNING pid 29494, uptime 1 day, 0:34:23
alerts:timesync_monitor RUNNING pid 26, uptime 1 day, 5:21:06
logging:chatbot RUNNING pid 11, uptime 1 day, 5:21:06
logging:log_analyser RUNNING pid 10, uptime 1 day, 5:21:06
logging:msg_dumper RUNNING pid 9, uptime 1 day, 5:21:06
monitoring_process:acoustics RUNNING pid 1567, uptime 1 day, 5:20:59
monitoring_process:ahrs_calibration RUNNING pid 91859, uptime 1:09:14
monitoring_process:dom_activity RUNNING pid 1375, uptime 1 day, 5:21:00
monitoring_process:dom_rates RUNNING pid 1378, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_10 RUNNING pid 1376, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_11 RUNNING pid 1379, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_13 RUNNING pid 1377, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_14 RUNNING pid 1568, uptime 1 day, 5:20:59
monitoring_process:pmt_rates_18 RUNNING pid 21, uptime 1 day, 5:21:06
monitoring_process:pmt_rates_9 RUNNING pid 1566, uptime 1 day, 5:20:59
monitoring_process:rttc RUNNING pid 118444, uptime 0:17:20
monitoring_process:trigger_rates RUNNING pid 22, uptime 1 day, 5:21:06
monitoring_process:triggermap RUNNING pid 1796, uptime 1 day, 5:20:58
monitoring_process:ztplot RUNNING pid 24, uptime 1 day, 5:21:06
reconstruction:time_residuals RUNNING pid 27, uptime 1 day, 5:21:06
```
The processes are grouped accordingly (ligier, monitoring_process etc.) and
automaticallly started in the right order.
The processes are grouped accordingly (`logging`, `monitoring_process` etc.) and
automatically started in the right order.
You can stop and start individual services using ``supervisorctl stop
group:process_name`` and ``supervisorctl start group:process_name``
......@@ -105,37 +110,80 @@ a group of processes. Use the ``supervisorctl help`` to find out more and
``supervisorctl help COMMAND`` to get a detailed description of the
corresponding command.
To shut down the monitoring service completely, use ``make stop``.
## Frontent
The frontend is a simple webserver and uses HTML templates to render the websites.
The layout of the plots can be changed in `frontend/app/routes.py` using nested lists.
Each URL endpoint can be assigned to a specific function which uses a specific template
to render the actual page. The templates for the base layout (incuding the menubar) and
each page are under `frontend/app/templates`.
## Configuration file
## Chatbot
A file called `pipeline.toml` can be placed into the root folder of the
monitoring software (usually `~/monitoring`) which can be used to set
different kind of parameters, like plot attributes or ranges.
Here is an example `pipeline.toml`:
The `km3mon` suite comes with a chatbot which can join a channel defined
in the `pipeline.toml` file under the `[Alerts]` section:
``` toml
[Alerts]
botname = "monitoring"
password = "supersecretpassword"
channel = "operations_fr"
operators = [ "a_enzenhoefer", "tamasgal",]
```
[WebServer]
username = "km3net"
password = "swordfish"
[DOMRates]
lowest_rate = 150 # [kHz]
highest_rate = 350 # [kHz]
The password is the actual login password of the bot. Once the `chatbot` service
is running, the bot will notifiy important events like sudden drop of the
trigger rate and can also be used to retrieve information from the monitoring
system, set the current shifts and even control the monitoring services through
the `supervisorctl` interface. Only the operators defined in the configuration
file are allowed to modify services or change the shifters.
To see the bot's capabilities, one simply asks them for help via
`@monitoring help`:
[PMTRates]
lowest_rate = 1000 # [Hz]
highest_rate = 20000 # [Hz]
```
Hi Tamas Gal, I was built to take care of the monitoring alerts.
Here is how you can use me:
- @monitoring shifters are cnorris and bspencer
-> set the new shifters who I may annoy with chat messages and
emails.
- @monitoring status -> show the status of the monitoring system
- @monitoring supervisorctl -> take control over the monitoring system
- @monitoring help -> show this message
```
[TriggerRate]
interval = 300 # time inverval to integrate [s]
with_minor_ticks = true # minor tickmarks on the plot
### Troubleshooting
[TriggerMap]
max_events = 5000 # the number of events to log
#### Database connection needed
The monitoring processes talk to the KM3NeT Oracle DB service and need a valid
session cookie. The monitoring servers of the ORCA and ARCA shore stations are
whitelisted and use the following session cookies (defined in the `.env` file):
- ARCA: `KM3NET_DB_COOKIE=_tgal_192.84.151.58_48d829f9ef104d82b4c1d8557ee5bb55`
- ORCA: `KM3NET_DB_COOKIE=_tgal_188.194.66.108_4c0d9307fb4a423cb0bd8c2b34ba4790`
If you run the system on other machines, you need to provide
the cookie string for that specific machine. To get the cookie string, run the
monitoring system with `docker compose up -d` and connect to the backend with
# docker exec -it monitoring-backend-1 bash
To get a session cookie, query the database however you like, e.g.
# streamds get detectors
It will ask you for your KM3NeT (external) credentials and the required
cookie value is the last column in the file `~/.km3netdb_cookie`
# cat ~/.km3netdb_cookie
.in2p3.fr TRUE / TRUE 0 sid _tgal_131.42.5.23_6d132a51d884b22b2ba861a8847346c
Create a new environment variable in the `.env` file on the host system (not in the Docker
container!) with the following entry (of course with your cookie string):
KM3NET_DB_COOKIE=_tgal_131.42.5.23_6d132a51d884b22b2ba861a8847346c
and restart the whole monitoring system with
docker compose down && docker compose up -d
[ZTPlot]
min_dus = 1
ytick_distance = 25 # [m]
```
{% extends "base.html" %}
{% block main %}
<div class="container-fluid" id="logs">
<div class="row">
<div class="col-md-12">
{% for filename, filesize in files.items() %}
<a href="/logs/{{ filename }}" class="btn btn-{{ 'primary' if filename == 'MSG.log' else 'warning' }}" role="button">{{ filename }} ({{'%0.1f' | format(filesize/1024/1024) }}MB)</a></button>
{% endfor %}
</div>
</div>
</div>
{% endblock %}
FROM python:3.12
MAINTAINER Tamas Gal <tgal@km3net.de>
WORKDIR /monitoring
COPY requirements.txt requirements.txt
RUN pip install -U wheel setuptools pip && pip install -r requirements.txt
COPY . .
CMD ["supervisord", "--nodaemon", "-c", "supervisord.conf"]
......@@ -23,17 +23,19 @@ min_dus = 3
min_doms = 24
ytick_distance = 25
logbook = "Operations+FR"
elog = false
[CalibrateAHRS]
time_range = 72
[LocalDBService]
filename = "data/monitoring.sqlite3"
filename = "/data/monitoring.sqlite3"
[ELOGService]
password = "swordfish"
[Alerts]
enabled = false
botname = "monitoring"
password = "supersecretpassword"
channel = "operations_fr"
......
docopt==0.6.2
km3db==0.14.3
km3io==1.2.2
km3pipe==10.0.3
matplotlib==3.10.0
numba==0.61.0
numpy==2.1.3
pandas==2.2.3
requests==2.32.3
rocketchat-API==1.34.0
seaborn==0.13.2
supervisor==4.2.5
toml==0.10.2
#!/usr/bin/env python
# coding=utf-8
"""
Online Acoustic Monitoring
Usage:
acoustics.py [options]
acoustics.py (-h | --help)
Options:
-d DET_ID Detector ID.
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from datetime import datetime
import gc
import os
import time
import http
import ssl
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib import colors
import numpy as np
import km3db
import km3pipe as kp
from docopt import docopt
def diff(first, second):
second = set(second)
return [item for item in first if item not in second]
def duplicates(lst, item):
return [i for i, x in enumerate(lst) if x == item]
args = docopt(__doc__)
sds = km3db.StreamDS(container="pd")
try:
detid = int(args['-d'])
except ValueError:
detid = (args['-d'])
if type(detid)==int:
detid = km3db.tools.todetoid(detid)
directory = args['-o']
ACOUSTIC_BEACONS = [0, 0, 0]
N_DOMS = 18
N_ABS = 3
DOMS = range(N_DOMS + 1)
DUS = kp.hardware.Detector(det_id=detid).dus
DUS_cycle = list(np.arange(max(DUS)) + 1)
TIT = 600 # Time Interval between Trains of acoustic pulses)
SSW = 160 # Signal Security Window (Window size with signal)
clbmap = km3db.CLBMap(detid)
check = True
while check:
minrun = None
while minrun is None:
try:
table = sds.runs(detid=detid)
minrun = table["RUN"][len(table["RUN"]) - 1]
ind, = np.where((table["RUN"] == minrun))
mintime1 = table['UNIXSTARTTIME'][ind]
mintime = mintime1.values
maxrun = table["RUN"][len(table["RUN"]) - 1]
now = time.time()
now = now - TIT
if (now - mintime / 1000) < TIT:
minrun = table["RUN"][len(table["RUN"]) - 1] - 1
print(now)
except:
pass
N_Pulses_Indicator = [
] # Matrix indicating how many pulses each piezo reveals
for du in DUS_cycle:
N_Pulses_Indicator_DU = [
] # Array indicating for each DU how many pulses each piezo reveals.
for dom in DOMS:
UTB_MIN = []
QF_MAX = []
n = -1
for ab in ACOUSTIC_BEACONS:
n = n + 1
try:
domID = clbmap.omkeys[(du, dom)].dom_id
AcBe = sds.toashort(detid=detid,
minrun=minrun,
domid=domID,
maxrun=maxrun)
ACOUSTIC_BEACONS_TEMP = np.unique(AcBe["EMITTERID"]).tolist()
if np.size(ACOUSTIC_BEACONS_TEMP) < 3:
while np.size(ACOUSTIC_BEACONS_TEMP) < 3:
ACOUSTIC_BEACONS_TEMP.append(0)
ACOUSTIC_BEACONS_TEMP_2 = np.sort(np.abs(ACOUSTIC_BEACONS_TEMP))
m = np.where(np.abs(ACOUSTIC_BEACONS_TEMP) == ACOUSTIC_BEACONS_TEMP_2[n])[0][0]
ab = ACOUSTIC_BEACONS_TEMP[m]
print(ab)
except (KeyError, AttributeError, TypeError):
N_Pulses_Indicator_DU.append(-1.5)
continue
try:
toas_all = sds.toashort(detid=detid,
minrun=minrun,
maxrun=maxrun,
domid=domID,
emitterid=ab)
QF_abdom = toas_all["QUALITYFACTOR"]
UTB_abdom = toas_all["UNIXTIMEBASE"]
TOAS_abdom = toas_all["TOA_S"]
UTB_abdom = UTB_abdom.values
up = np.where(UTB_abdom > (now - TIT))
down = np.where(UTB_abdom < (now))
intr = np.intersect1d(up, down)
UTB_abdom = UTB_abdom[intr]
QF_abdom = QF_abdom[intr]
QF_abdom = QF_abdom.values
QFlist = QF_abdom.tolist()
QFlist.sort(reverse=True)
QF_max = max(QF_abdom)
QF_max_index = np.where(QF_abdom == QF_max)
UTB_signal_min = UTB_abdom[QF_max_index[0][0]] - SSW / 2
UTB_signal_max = UTB_abdom[QF_max_index[0][0]] + SSW / 2
temp1 = np.where(UTB_abdom > (UTB_signal_min))
temp2 = np.where(UTB_abdom < (UTB_signal_max))
inter = np.intersect1d(temp1, temp2)
inter = inter.tolist()
signal_index = inter
# Define the signal index if the the pings are splitted in two parts inside the window
if UTB_signal_min < (now - TIT):
temp1 = np.where(UTB_abdom > (now - TIT))
temp2 = np.where(UTB_abdom < (UTB_signal_max))
inter1 = np.intersect1d(temp1, temp2)
inter1 = inter1.tolist()
temp11 = np.where(UTB_abdom < (now))
temp22 = np.where(UTB_abdom > (now - SSW / 2))
inter2 = np.intersect1d(temp11, temp22)
inter2 = inter2.tolist()
inter = np.union1d(inter1, inter2)
inter = inter.tolist()
signal_index = inter
signal_index = np.array(signal_index)
signal_index = signal_index.astype(int)
signal_index = signal_index.tolist()
if UTB_signal_max > now:
temp1 = np.where(UTB_abdom < (now))
temp2 = np.where(UTB_abdom > (UTB_signal_min))
inter1 = np.intersect1d(temp1, temp2)
inter1 = inter1.tolist()
temp11 = np.where(UTB_abdom > ((now - TIT)))
temp22 = np.where(UTB_abdom < ((now - TIT) + SSW / 2))
inter2 = np.intersect1d(temp11, temp22)
inter2 = inter2.tolist()
inter = np.union1d(inter1, inter2)
inter = inter.tolist()
signal_index = inter
signal_index = np.array(signal_index)
signal_index = signal_index.astype(int)
signal_index = signal_index.tolist()
QF_abdom_index = np.where(QF_abdom)
all_data_index = QF_abdom_index[0].tolist()
noise_index = diff(all_data_index, signal_index)
SIGNAL = QF_abdom[signal_index]
UTB_SIGNAL = UTB_abdom[signal_index]
TOA_SIGNAL = TOAS_abdom[signal_index]
UNIX_TOA_SIGNAL = UTB_abdom[signal_index] + TOAS_abdom[signal_index]
if len(noise_index) != 0:
NOISE = QF_abdom[noise_index]
NOISElist = NOISE.tolist()
NOISElist.sort(reverse=True)
NOISE = NOISE.tolist()
NOISE.sort(reverse=True)
noise_threshold = max(
NOISE) # To be sure not to take signal
# First filter: 22 greatest
Security_Number = len(SIGNAL) # To be sure to take all the pulses
SIGNAL = SIGNAL.tolist()
SIGNAL_OLD = np.array(SIGNAL)
SIGNAL.sort(reverse=True)
QF_first = SIGNAL[0:Security_Number]
# Second filter: delete duplicates (Delete if Unixtimebase + ToA is the same)
QF_second = QF_first
R = []
for r in np.arange(len(QF_first)):
R.append(np.where(SIGNAL_OLD == QF_first[r])[0][0])
UTB_first = np.array(UTB_SIGNAL.tolist())[R]
TOA_first = np.array(TOA_SIGNAL.tolist())[R]
UNIX_TOA = UTB_first + TOA_first
UNIX_TOA = UNIX_TOA.tolist()
UNIX_TOA_index = []
for x in set(UNIX_TOA):
if UNIX_TOA.count(x) > 1:
UNIX_TOA_index.append(duplicates(UNIX_TOA, x))
ind_del = []
for i in range(len(UNIX_TOA_index)):
ind_del.append(UNIX_TOA_index[i][0])
for ide in sorted(ind_del, reverse=True):
del QF_second[ide]
QF_second.sort(reverse=True)
# Third filter: If there are more than 11 elements I will eliminate the worst
if len(QF_second) > 11:
QF_second = np.array(QF_second)
QF_third = [
k for k in QF_second
if (np.where(QF_second == k)[0][0] < 11)
]
else:
QF_third = QF_second
# Fourth filter: I remove the data if it is below the maximum noise
if len(noise_index) != 0:
QF_fourth = [
k for k in QF_third
if k > (noise_threshold + (10 * np.std(NOISE)))
]
else:
QF_fourth = QF_third
# Fifth filter: Check if the clicks are interspersed in the right way
QF_fifth = QF_fourth
Q = []
for q in np.arange(len(QF_fifth)):
Q.append(np.where(SIGNAL_OLD == QF_fifth[q])[0][0])
UTB_fourth = np.array(UTB_SIGNAL.tolist())[Q]
UTB_fourth_l = UTB_fourth.tolist()
D = []
for g in np.arange(len(UTB_fourth_l)):
if ((np.mod((UTB_fourth_l[g] - UTB_fourth_l[0]), 5) > 2
and np.mod(
(UTB_fourth_l[g] - UTB_fourth_l[0]), 5) < 4)
or
(np.mod(
(UTB_fourth_l[g] - UTB_fourth_l[0]), 5) > 5)
):
D.append(g)
for d in sorted(D, reverse=True):
del QF_fifth[d]
# Sixth filter:
if len(noise_index) != 0:
QF_sixth = [
k for k in QF_fifth
if (2*abs(k - max(QF_fifth)) < abs(k - noise_threshold))
]
else:
QF_sixth = QF_fifth
QF_OK = QF_sixth
P = []
for p in np.arange(len(QF_OK)):
P.append(np.where(SIGNAL_OLD == QF_OK[p])[0][0])
UTB_OK = np.array(UTB_SIGNAL.tolist())[P]
UTB_OK_l = UTB_OK.tolist()
UTB_MIN.append(min(UTB_OK_l))
max_QF = max(QF_OK)
QF_MAX.append(max_QF)
NUM = len(QF_OK) # Number of pulses
print(NUM)
if (NUM > 7):
N_Pulses_Indicator_DU.append(1.5)
elif (NUM < 8 and NUM > 3):
N_Pulses_Indicator_DU.append(0.5)
elif (NUM < 4 and NUM > 0):
N_Pulses_Indicator_DU.append(-0.5)
elif (NUM == 0):
N_Pulses_Indicator_DU.append(-1.5)
except (
TypeError, ValueError, http.client.RemoteDisconnected, ssl.SSLError
): # TypeError if no data found for a certain piezo, ValueError if there are zero data for a certain piezo
N_Pulses_Indicator_DU.append(-1.5)
except (
http.client.RemoteDisconnected, ssl.SSLError
): # Bad connection to the DB
N_Pulses_Indicator_DU.append(-2.5)
# To avoid to take wrong beacon signals
dim = np.size(QF_MAX)
pulse_inter = 5.04872989654541
for i in range(dim - 1):
if (np.mod((UTB_MIN[i] - UTB_MIN[i + 1]), pulse_inter) < 10**-3
or np.mod(
(UTB_MIN[i] - UTB_MIN[i + 1]), pulse_inter) > 5):
if QF_MAX[i] <= QF_MAX[i + 1]:
N_Pulses_Indicator_DU[3 * dom + i] = -1.5
else:
N_Pulses_Indicator_DU[3 * dom + i + 1] = -1.5
if i == 0 and dim == 3:
if (np.mod(
(UTB_MIN[i] -
UTB_MIN[i + 2]), pulse_inter) < 10**-3 or np.mod(
(UTB_MIN[i] - UTB_MIN[i + 2]), pulse_inter) > 5):
if QF_MAX[i] <= QF_MAX[i + 2]:
N_Pulses_Indicator_DU[3 * dom + i] = -1.5
else:
N_Pulses_Indicator_DU[3 * dom + i + 2] = -1.5
N_Pulses_Indicator.append(N_Pulses_Indicator_DU)
fig, ax = plt.subplots(figsize=(9, 7))
duab = []
DUs = []
for du in DUS_cycle:
duabdu = []
duab1 = (du - 0.2) * np.ones(N_DOMS + 1)
duab2 = (du) * np.ones(N_DOMS + 1)
duab3 = (du + 0.2) * np.ones(N_DOMS + 1)
duabdu.append(duab1)
duabdu.append(duab2)
duabdu.append(duab3)
duab.append(duabdu)
DUs.append(np.array(N_Pulses_Indicator[du - 1]))
iAB1 = []
iAB2 = []
iAB3 = []
for i in DOMS:
iAB1.append(3 * i)
iAB2.append(3 * i + 1)
iAB3.append(3 * i + 2)
colorsList = [(0.6, 0, 1), (0, 0, 0), (1, 0.3, 0), (1, 1, 0), (0.2, 0.9, 0)]
CustomCmap = matplotlib.colors.ListedColormap(colorsList)
bounds = [-3, -2, -1, 0, 1, 2]
norma = colors.BoundaryNorm(bounds, CustomCmap.N)
for du in DUS:
color = ax.scatter(duab[du - 1][0],
DOMS,
s=20,
c=DUs[du - 1][iAB1],
norm=norma,
marker='s',
cmap=CustomCmap)
color = ax.scatter(duab[du - 1][1],
DOMS,
s=20,
c=DUs[du - 1][iAB2],
norm=norma,
marker='s',
cmap=CustomCmap)
color = ax.scatter(duab[du - 1][2],
DOMS,
s=20,
c=DUs[du - 1][iAB3],
norm=norma,
marker='s',
cmap=CustomCmap)
cbar = plt.colorbar(color)
cbar.ax.get_yaxis().set_ticks([])
for j, lab in enumerate(
['$No DB conn.$','$0. pings$', '$1-3 pings$', '$4-7 pings$', '$>7. pings$']):
cbar.ax.text(4, (1.5 * j + 1) / 8.0, lab, ha='center', va='center')
cbar.ax.get_yaxis().labelpad = 18
ax.set_xticks(np.arange(1, max(DUS) + 1, step=1))
ax.set_yticks(np.arange(0, 19, step=1))
ax.grid(color='k', linestyle='-', linewidth=0.2)
ax.set_xlabel('DUs', fontsize=18)
ax.set_ylabel('Floors', fontsize=18)
ts = now
DATE = datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
ax.set_title(
r' %.16s Detection of the pings emitted by autonomous beacons' % DATE,
fontsize=10)
my_path = os.path.abspath(directory)
my_file = 'Online_Acoustic_Monitoring.png'
fig.savefig(os.path.join(my_path, my_file))
plt.close('all')
gc.collect()
print(time.time())
check = False
check_time = time.time() - now - TIT
print(check_time)
time.sleep(abs(TIT - check_time))
check = True
......@@ -12,7 +12,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......@@ -21,63 +21,103 @@ from __future__ import division
from datetime import datetime
from collections import deque, defaultdict
from functools import partial
import gc
import io
import os
import random
import time
import threading
import numpy as np
import matplotlib
matplotlib.use('Agg')
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as md
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import km3db
import km3pipe as kp
from km3pipe.io.daq import TMCHData
from km3modules.common import MemoryObserver
from km3modules.ahrs import fit_ahrs, get_latest_ahrs_calibration
import km3pipe.style
km3pipe.style.use('km3pipe')
km3pipe.style.use("km3pipe")
class CalibrateAHRS(kp.Module):
def configure(self):
self.plots_path = self.require('plots_path')
det_id = self.require('det_id')
self.time_range = self.get('time_range', default=24 * 3) # hours
self.detector = kp.hardware.Detector(det_id=det_id)
self.plots_path = self.require("plots_path")
det_id = self.require("det_id")
det_oid = km3db.tools.todetoid(det_id)
self.time_range = self.get("time_range", default=24 * 3) # hours
self.n_points_in_graph = self.get(
"n_points_in_graph", default=2000
) # x-axis resolution (per floor)
self.data = {}
self.dus = set()
self.clbmap = kp.db.CLBMap(det_oid=det_id)
self.cuckoo = kp.time.Cuckoo(60, self.create_plot)
self.cuckoo_log = kp.time.Cuckoo(10, print)
self.data = {}
self.queue_size = 100000
self.detector = kp.hardware.Detector(det_id=det_id)
self.clbmap = km3db.CLBMap(det_oid=det_oid)
self.cuckoo = kp.time.Cuckoo(60, self.create_plot) # plot update interval [s]
self.cuckoo_log = kp.time.Cuckoo(10, self.cprint)
self.cuckoo_stats = kp.time.Cuckoo(300, self._show_stats)
# 10Hz monitoring (including bases)
n_messages = int(
10 * self.detector.n_doms * self.time_range * 60 ** 2 + self.detector.n_dus
)
self.fraction_to_keep = (
self.n_points_in_graph * self.detector.n_doms / n_messages
)
self.cprint(f"Fraction to keep: {self.fraction_to_keep}")
self.queue_size = int(
self.n_points_in_graph * 1.1
) # a bit of safety margin due to randomness
self.cprint(
f"Queue size for each module: {self.queue_size}, time range: {self.time_range} hours"
)
self.lock = threading.Lock()
self.index = 0
def _show_stats(self):
"""Print some data statistics"""
messages = ["Recorded data:"]
for du, data in self.data.items():
messages.append(f"DU {du}: ")
for floor, times in data["times"].items():
messages.append(
f" floor {floor}: {len(times)} ({times[0]}, {times[-1]})"
)
self.cprint("\n".join(messages))
def _register_du(self, du):
"""Create data cache for DU"""
self.data[du] = {}
for ahrs_param in ('yaw', 'pitch', 'roll'):
for ahrs_param in ("yaw", "pitch", "roll"):
self.data[du][ahrs_param] = defaultdict(
partial(deque, maxlen=self.queue_size))
self.data[du]['times'] = defaultdict(
partial(deque, maxlen=self.queue_size))
partial(deque, maxlen=self.queue_size)
)
self.data[du]["times"] = defaultdict(partial(deque, maxlen=self.queue_size))
self.dus.add(du)
def process(self, blob):
self.index += 1
if self.index % 29 != 0:
if random.random() > self.fraction_to_keep:
return blob
self.cuckoo_stats()
now = datetime.utcnow()
tmch_data = TMCHData(io.BytesIO(blob['CHData']))
tmch_data = TMCHData(io.BytesIO(blob["CHData"]))
dom_id = tmch_data.dom_id
clb = self.clbmap.dom_ids[dom_id]
if clb.floor == 0:
......@@ -95,76 +135,89 @@ class CalibrateAHRS(kp.Module):
if du not in self.dus:
self._register_du(du)
cyaw, cpitch, croll = fit_ahrs(tmch_data.A, tmch_data.H, *calib)
self.cuckoo_log("DU{}-DOM{} (random pick): calibrated yaw={}".format(
clb.du, clb.floor, cyaw))
self.cuckoo_log(
"DU{}-DOM{} (random pick): calibrated yaw={}".format(
clb.du, clb.floor, cyaw
)
)
with self.lock:
self.data[du]['yaw'][clb.floor].append(cyaw)
self.data[du]['pitch'][clb.floor].append(cpitch)
self.data[du]['roll'][clb.floor].append(croll)
self.data[du]['times'][clb.floor].append(now)
self.data[du]["yaw"][clb.floor].append(cyaw)
self.data[du]["pitch"][clb.floor].append(cpitch)
self.data[du]["roll"][clb.floor].append(croll)
self.data[du]["times"][clb.floor].append(now)
self.cuckoo.msg()
return blob
def create_plot(self):
print(self.__class__.__name__ + ": updating plot.")
self.cprint(self.__class__.__name__ + ": updating plot.")
if self.time_range > 24:
xfmt = md.DateFormatter('%Y-%m-%d %H:%M')
xfmt = md.DateFormatter("%Y-%m-%d %H:%M")
else:
xfmt = md.DateFormatter('%H:%M')
xlim = (datetime.utcfromtimestamp(time.time() -
self.time_range * 60 * 60),
datetime.utcnow())
xfmt = md.DateFormatter("%H:%M")
xlim = (
datetime.utcfromtimestamp(time.time() - int(self.time_range * 60 * 60)),
datetime.utcnow(),
)
for du in self.dus:
data = self.data[du]
for ahrs_param in data.keys():
fig, ax = plt.subplots(figsize=(16, 6))
sns.set_palette("husl", 18)
ax.set_title("AHRS {} Calibration on DU{}\n{}".format(
ahrs_param, du, datetime.utcnow()))
ax.set_title(
"AHRS {} Calibration on DU{}\n{}".format(
ahrs_param, du, datetime.utcnow()
)
)
ax.set_xlabel("UTC time")
ax.xaxis.set_major_formatter(xfmt)
ax.set_ylabel(ahrs_param)
with self.lock:
for floor in sorted(data[ahrs_param].keys()):
ax.plot(data['times'][floor],
data[ahrs_param][floor],
marker='.',
linestyle='none',
label="Floor {}".format(floor))
ax.plot(
data["times"][floor],
data[ahrs_param][floor],
marker=".",
linestyle="none",
label="Floor {}".format(floor),
)
ax.set_xlim(xlim)
lgd = plt.legend(bbox_to_anchor=(1.005, 1),
loc=2,
borderaxespad=0.)
lgd = plt.legend(bbox_to_anchor=(1.005, 1), loc=2, borderaxespad=0.0)
fig.tight_layout()
plt.savefig(os.path.join(
self.plots_path,
ahrs_param + '_calib_du{}.png'.format(du)),
bbox_extra_artists=(lgd, ),
bbox_inches='tight')
plt.close('all')
plt.savefig(
os.path.join(
self.plots_path, ahrs_param + "_calib_du{}.png".format(du)
),
bbox_extra_artists=(lgd,),
bbox_inches="tight",
)
plt.close("all")
gc.collect()
def main():
from docopt import docopt
args = docopt(__doc__)
det_id = int(args['-d'])
plots_path = args['-o']
ligier_ip = args['-l']
ligier_port = int(args['-p'])
det_id = int(args["-d"])
plots_path = args["-o"]
ligier_ip = args["-l"]
ligier_port = int(args["-p"])
pipe = kp.Pipeline()
pipe.attach(kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags='IO_MONIT',
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(kp.io.daq.DAQProcessor)
pipe.attach(MemoryObserver, every=600000)
pipe.attach(
kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags="IO_MONIT",
timeout=60 * 60 * 24 * 7,
max_queue=2000,
)
pipe.attach(CalibrateAHRS, det_id=det_id, plots_path=plots_path)
pipe.drain()
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -13,16 +13,181 @@ Options:
-h --help Show this screen.
"""
from datetime import datetime
from pprint import pprint
import random
import re
import requests
import subprocess
import time
import threading
import toml
from rocketchat_API.rocketchat import RocketChat
from RocketChatBot import RocketChatBot
import km3pipe as kp
log = kp.logger.get_logger("chatbot")
URL = "https://chat.km3net.de"
CONFIG = "pipeline.toml"
RECONNECT_INTERVAL = 30
class RocketChatBot(object):
def __init__(self, botname, passwd, server, command_character=None):
self.botname = botname
self.api = RocketChat(user=botname, password=passwd, server_url=server)
self.commands = [(['echo', ], self.echo)]
self.auto_answers = []
self.direct_answers = []
self.unknow_command = ['command not found', ]
self.lastts = {}
self.command_character = command_character
def echo(self, msg, user, channel_id):
self.send_message('@' + user + ' : ' + msg, channel_id)
def get_status(self, auser):
return self.api.users_get_presence(username=auser)
def send_message(self, msg, channel_id):
self.api.chat_post_message(channel=channel_id, text=msg)
def add_dm_handler(self, command, action):
self.commands.append((command, action))
def add_auto_answer(self, triggers, answers):
self.auto_answers.append((triggers, answers))
def add_direct_answer(self, triggers, answers):
self.direct_answers.append((triggers, answers))
def handle_command_character_message(self, message, channel_id):
msg = message['msg'].lstrip(self.command_character)
command = msg.split()[0].lower()
arguments = " ".join(msg.split()[1:])
user = message['u']['username']
for cmd_list in self.commands:
if command.lower() in cmd_list[0]:
cmd_list[1](arguments, user, channel_id)
return
if not self.handle_auto_answer(message, self.direct_answers, channel_id):
self.send_message('@' + user + ' :' + random.choice(self.unknow_command), channel_id)
def handle_direct_message(self, message, channel_id):
msg = message['msg'].lstrip('@' + self.botname).strip()
if len(msg) > 0:
command = msg.split()[0].lower()
arguments = " ".join(msg.split()[1:])
user = message['u']['username']
for cmd_list in self.commands:
if command.lower() in cmd_list[0]:
cmd_list[1](arguments, user, channel_id)
return
if not self.handle_auto_answer(message, self.direct_answers, channel_id):
self.send_message('@' + user + ' :' + random.choice(self.unknow_command), channel_id)
else:
self.send_message('Here I am', channel_id)
def handle_auto_answer(self, message, answers, channel_id):
for kind in answers:
for k in kind[0]:
if k in message['msg'].lower():
self.send_message(random.choice(kind[1]) + ' @' + message['u']['username'], channel_id)
return True
return False
def handle_messages(self, messages, channel_id):
for message in messages['messages']:
if message['u']['username'] != self.botname:
pprint(message)
if message['u']['username'] == 'rocket.cat':
continue
if message['msg'].startswith('@' + self.botname):
threading.Thread(target=self.handle_direct_message, args=(message, channel_id)).start()
elif self.command_character is not None and message['msg'].startswith(self.command_character):
threading.Thread(target=self.handle_command_character_message, args=(message, channel_id)).start()
elif 'mentions' not in message or message.get('mentions') == []:
threading.Thread(target=self.handle_auto_answer, args=(message, self.auto_answers, channel_id)).start()
def load_ts(self, channel_id, messages):
if len(messages) > 0:
self.lastts[channel_id] = messages[0]['ts']
else:
self.lastts[channel_id] = ''
def load_channel_ts(self, channel_id):
self.load_ts(channel_id, self.api.channels_history(channel_id).json()['messages'])
def load_group_ts(self, channel_id):
self.load_ts(channel_id, self.api.groups_history(channel_id).json()['messages'])
def load_im_ts(self, channel_id):
response = self.api.im_history(channel_id).json()
if response.get('success'):
self.load_ts(channel_id, self.api.im_history(channel_id).json()['messages'])
def process_messages(self, messages, channel_id):
try:
if "success" in messages:
if messages['success'] == False:
raise RuntimeError(messages['error'])
if len(messages['messages']) > 0:
self.lastts[channel_id] = messages['messages'][0]['ts']
self.handle_messages(messages, channel_id)
except Exception as e:
pprint(e)
def process_channel(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = datetime.now().isoformat()
self.process_messages(self.api.channels_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def process_group(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = ''
self.process_messages(self.api.groups_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def process_im(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = ''
self.process_messages(self.api.im_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def run(self):
for channel in self.api.channels_list_joined().json().get('channels'):
self.load_channel_ts(channel.get('_id'))
for group in self.api.groups_list().json().get('groups'):
self.load_group_ts(group.get('_id'))
for im in self.api.im_list().json().get('ims'):
self.load_im_ts(im.get('_id'))
while 1:
for channel in self.api.channels_list_joined().json().get('channels'):
threading.Thread(target=self.process_channel, args=(channel.get('_id'),)).start()
# for group in self.api.groups_list().json().get('groups'):
# threading.Thread(target=self.process_group, args=(group.get('_id'),)).start()
#
# for im in self.api.im_list().json().get('ims'):
# threading.Thread(target=self.process_im, args=(im.get('_id'),)).start()
time.sleep(1)
with open(CONFIG, 'r') as fobj:
print(f"Reading configuration from {CONFIG}")
config = toml.load(fobj)
BOTNAME = config['Alerts']['botname']
PASSWORD = config['Alerts']['password']
......@@ -30,40 +195,74 @@ with open(CONFIG, 'r') as fobj:
def get_channel_id(channel):
rocket = RocketChat(BOTNAME, PASSWORD, server_url=URL)
print(f"Getting channel ID for channel: {channel}")
tries = 0
while True:
tries += 1
try:
rocket = RocketChat(BOTNAME, PASSWORD, server_url=URL)
except requests.exceptions.ConnectionError as e:
log.error(f"Unable to connect to the RocketChat server: {e}")
except Exception as e:
log.error(f"Unknown error occured: {e}")
else:
break
interval = tries * RECONNECT_INTERVAL
print(f"Reconnecting in {interval} seconds...")
time.sleep(interval)
channels = rocket.channels_list().json()['channels']
channels = rocket.channels_list(count=0).json()['channels']
for c in channels:
print(f" -> {c['name']} => {c['_id']}")
if c['name'] == channel:
print(f"Found channel ID for {channel} is {c['_id']}")
return c['_id']
log.error("No channel found with name {}".format(channel))
CHANNEL_ID = get_channel_id(CHANNEL)
def run():
print("Running the monitoring bot system")
bot = spawn_bot()
register_handlers(bot)
# bot.send_message("ChatBot (re)started. I am up and running!", CHANNEL_ID)
bot.run()
def spawn_bot():
print("Spawning the bot")
return RocketChatBot(BOTNAME, PASSWORD, URL)
def is_shifter(user):
print(f"Checking if {user} is a shifter")
with open(CONFIG, 'r') as fobj:
config = toml.load(fobj)
return user in config['Alerts']['shifters']
try:
alerts_config = config['Alerts']
except KeyError:
log.error("No 'Alerts' section found in the configuration file")
return False
try:
return user in alerts_config['shifters']
except KeyError:
log.error("No 'shifters' section found in 'Alerts' of the configuration file")
return False
def is_operator(user):
print(f"Checking if {user} is an operator")
with open(CONFIG, 'r') as fobj:
config = toml.load(fobj)
return user in config['Alerts']['operators']
def register_handlers(bot):
print("Registering API handlers")
def greet(msg, user, channel_id):
if channel_id != CHANNEL_ID:
print("skipping")
......@@ -71,16 +270,22 @@ def register_handlers(bot):
bot.send_message('hello @' + user, channel_id)
def status(msg, user, channel_id):
print(f"Reporting status to channel {channel_id}")
if channel_id != CHANNEL_ID:
print("skipping")
print(f"Skipping channel with ID {channel_id}")
return
if not is_shifter(user) and not is_operator(user):
bot.send_message(
"Sorry @{}, only operators and shifters are allowed to mess "
"with me, sorry...".format(user), channel_id)
return
status = "```\n" + subprocess.check_output(
['supervisorctl', 'status']).decode('ascii') + "\n```"
print("Asking subservisorctl for the status")
try:
status = "```\n" + subprocess.check_output(
['supervisorctl', 'status']).decode('ascii') + "\n```"
except subprocess.CalledProcessError as e:
status = "```\n{}\n```".format(e.output.decode('ascii'))
print("Sending status")
bot.send_message(status, channel_id)
def supervisorctl(msg, user, channel_id):
......@@ -147,6 +352,7 @@ def register_handlers(bot):
(['supervisorctl'], supervisorctl)]
for trigger, handler in handlers:
bot.add_dm_handler(trigger, handler)
print("All handlers are registered")
def main():
......
......@@ -14,7 +14,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......@@ -22,6 +22,7 @@ from __future__ import division
from collections import deque, defaultdict
from functools import partial
import gc
from io import BytesIO
import os
import time
......@@ -61,7 +62,7 @@ class DOMActivityPlotter(kp.Module):
timestamp = summaryslice.header.time_stamp
for dom_id, _ in summaryslice.summary_frames.items():
du, dom, _ = self.detector.doms[dom_id]
du, dom, *_ = self.detector.doms[dom_id]
self.last_activity[(du, dom)] = timestamp
self.cuckoo.msg()
......@@ -69,7 +70,7 @@ class DOMActivityPlotter(kp.Module):
return blob
def create_plot(self):
print(self.__class__.__name__ + ": updating plot.")
self.cprint(self.__class__.__name__ + ": updating plot.")
filename = os.path.join(self.plots_path, 'dom_activity.png')
# now = kp.time.tai_timestamp()
now = time.time()
......@@ -94,7 +95,9 @@ class DOMActivityPlotter(kp.Module):
"DOM Activity for DetID-{} - via Summary Slices".format(
self.detector.det_id),
vmin=0.0,
vmax=15 * 60)
vmax=15 * 60,
cmap="cividis_r")
gc.collect()
def main():
......
......@@ -14,12 +14,13 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from __future__ import division
import gc
from io import BytesIO
import os
......@@ -63,7 +64,7 @@ class DOMRates(kp.Module):
summaryslice = blob['RawSummaryslice']
self.rates = {} # TODO: review this hack
for dom_id, rates in summaryslice.summary_frames.items():
du, dom, _ = self.detector.doms[dom_id]
du, dom, *_ = self.detector.doms[dom_id]
self.rates[(du, dom)] = np.sum(rates) / 1000
self.cuckoo.msg()
......@@ -72,7 +73,7 @@ class DOMRates(kp.Module):
def create_plot(self):
"""Creates the actual plot"""
print(self.__class__.__name__ + ": updating plot.")
self.cprint(self.__class__.__name__ + ": updating plot.")
filename = os.path.join(self.plots_path, 'dom_rates.png')
plot_dom_parameters(
......@@ -87,7 +88,8 @@ class DOMRates(kp.Module):
missing='black',
under='darkorchid',
over='deeppink')
print("done")
self.cprint("plot up to date.")
gc.collect()
def main():
......
......@@ -14,7 +14,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID.
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......
......@@ -12,7 +12,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......@@ -20,6 +20,7 @@ from __future__ import division
from datetime import datetime
from collections import deque, defaultdict
import gc
import os
import shutil
import time
......@@ -40,6 +41,7 @@ from km3pipe.hardware import Detector
from km3pipe.io import CHPump
from km3pipe.io.daq import (DAQProcessor, DAQPreamble, DAQSummaryslice,
DAQEvent)
from km3modules.common import MemoryObserver
import km3pipe.style
km3pipe.style.use('km3pipe')
......@@ -62,7 +64,7 @@ class TriggerMap(Module):
self.det = kp.hardware.Detector(det_id=det_id)
self.dus = sorted(self.det.dus)
self.n_rows = self.det.n_doms
self.n_rows = 18 * len(self.dus)
self.run = True
self.hits = deque(maxlen=self.max_events)
......@@ -74,36 +76,44 @@ class TriggerMap(Module):
self.thread = threading.Thread(target=self.plot).start()
def process(self, blob):
tag = str(blob['CHPrefix'].tag)
if not tag == 'IO_EVT':
return blob
event_hits = blob['Hits']
with lock:
run_id = blob['EventInfo'].run_id[0]
if run_id > self.current_run_id:
self.cprint(f"New run: {run_id}")
self.current_run_id = run_id
for _run_id in set(list(self.runchanges.keys()) + [run_id]):
self.runchanges[_run_id] += 1
if _run_id != self.current_run_id and \
self.runchanges[_run_id] > self.max_events:
self.print("Removing run {} from the annotation list".
format(_run_id))
self.log.info("Removing run {} from the annotation list".
format(_run_id))
del self.runchanges[_run_id]
self.n_events += 1
hits = np.zeros(self.n_rows)
for dom_id in event_hits.dom_id:
du, floor, _ = self.det.doms[dom_id]
if dom_id not in self.det.doms:
fname = "IO_EVT_{}.dat".format(round(time.time(), 3))
with open(fname, "bw") as fobj:
fobj.write(blob["CHData"])
self.log.error(
"Invalid DOM ID: %s. Raw event data dump written to %s",
dom_id, fname
)
break
du, floor, *_ = self.det.doms[dom_id]
du_idx = self.dus.index(du)
hits[du_idx * 18 + floor - 1] += 1
self.hits.append(hits)
triggered_hits = np.zeros(self.n_rows)
for dom_id in event_hits.dom_id[event_hits.triggered.astype(
'bool')]:
du, floor, _ = self.det.doms[dom_id]
if dom_id not in self.det.doms:
# we already check above
break
du, floor, *_ = self.det.doms[dom_id]
du_idx = self.dus.index(du)
triggered_hits[du_idx * 18 + floor - 1] += 1
self.triggered_hits.append(triggered_hits)
......@@ -117,13 +127,20 @@ class TriggerMap(Module):
time.sleep(50)
def create_plots(self):
self.cprint("Updating plots")
if len(self.hits) > 0:
self.create_plot(self.hits, "Hits on DOMs", 'hitmap')
self.cprint("Hits plot updated.")
else:
self.cprint("No hits recorded yet")
if len(self.triggered_hits) > 0:
self.create_plot(self.triggered_hits, "Trigger Map", 'triggermap')
self.cprint("Triggered hits plot updated.")
else:
self.cprint("No triggered hits recorded yet")
def create_plot(self, hits, title, filename):
fig, ax = plt.subplots(figsize=(16, 8))
fig, ax = plt.subplots(figsize=(16, 16))
ax.grid(True)
ax.set_axisbelow(True)
hit_matrix = np.array([np.array(x) for x in hits]).transpose()
......@@ -139,7 +156,7 @@ class TriggerMap(Module):
yticks = np.arange(self.n_rows)
ytick_labels = [
"DU{}-DOM{}".format(du, floor) if floor in [1, 6, 12] else ""
for (du, floor, _) in self.det.doms.values()
for du in self.dus for floor in range(1, 18+1)
]
ax.set_yticks(yticks)
ax.set_yticklabels(ytick_labels)
......@@ -156,7 +173,7 @@ class TriggerMap(Module):
for run, n_events_since_runchange in self.runchanges.items():
if n_events_since_runchange >= self.max_events:
continue
self.print("Annotating run {} ({} events passed)".format(
self.log.info("Annotating run {} ({} events passed)".format(
run, n_events_since_runchange))
x_pos = min(self.n_events,
self.max_events) - n_events_since_runchange
......@@ -184,6 +201,7 @@ class TriggerMap(Module):
plt.savefig(f_tmp, dpi=120, bbox_inches="tight")
plt.close('all')
shutil.move(f_tmp, f)
gc.collect()
def finish(self):
self.run = False
......@@ -201,6 +219,7 @@ def main():
ligier_port = int(args['-p'])
pipe = kp.Pipeline()
pipe.attach(MemoryObserver, every=1000)
pipe.attach(
kp.io.ch.CHPump,
host=ligier_ip,
......@@ -209,7 +228,7 @@ def main():
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(kp.io.daq.DAQProcessor)
pipe.attach(TriggerMap, det_id=det_id, plots_path=plots_path)
pipe.attach(TriggerMap, det_id=det_id, plots_path=plots_path, only_if="Hits")
pipe.drain()
......
#!/usr/bin/env python
# coding=utf-8
# Filename: log_analyser.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
from collections import defaultdict
import gzip
import sys
import re
import numpy as np
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use("Agg")
import gc
import matplotlib.pyplot as plt
import os
import datetime
from datetime import datetime as dt
from datetime import timezone as tz
import time
# Event names and colours
EVENTS = dict(ERROR="red", WARNING="orange", Died="deeppink", Born="steelblue")
BUFFER_SIZE = 16 * 1024**2 # buffer size for the lines when parsing the log
REGEX_LOG_LINE = re.compile(".+ ([A-Za-z]+) \[([A-Za-z]+)\]: .+")
REGEX_LEGACY_LOG_LINE = re.compile("^.+ \[([A-Za-z]+)\]: .+")
def plot_log_statistics(out_file, summary, title):
"""Creates a categorical bar plot for each event and process"""
processes = sorted(summary.keys())
xs = np.arange(len(processes))
w = 0.8 / len(EVENTS)
fig, ax = plt.subplots()
for idx, (event, color) in enumerate(EVENTS.items()):
x_offset = idx * w + w / 2 - w * len(EVENTS) / 2
ax.bar(
xs + x_offset,
[summary[process][event] for process in processes],
width=w,
color=color,
label=event,
)
ax.set_xticks(xs, processes)
ax.set_ylabel("count")
ax.legend()
ax.set_ylim(1e-1, 1e6)
ax.set_yscale("log")
ax.grid(True)
ax.set_title(title)
plt.savefig(out_file)
plt.close("all")
gc.collect()
def seconds_to_UTC_midnight():
"""Returns the seconds until next midnight"""
tomorrow = dt.now(tz.utc) + datetime.timedelta(days=1)
midnight = dt(
year=tomorrow.year,
month=tomorrow.month,
day=tomorrow.day,
hour=0,
minute=0,
second=0,
tzinfo=tz.utc,
)
return (midnight - dt.now(tz.utc)).seconds
def process_log_file(log_file):
"""Generates a dictionary of event counts in a log file
The returned dictionary has the structure dict[PROCESS][EVENT] => count.
"""
summary = defaultdict(lambda: defaultdict(int))
if log_file.endswith(".gz"):
opener = gzip.open
filemode = "rt"
else:
opener = open
filemode = "r"
n_lines_parsed = 0
n_lines_unparsed = 0
with opener(log_file, filemode) as fobj:
lines_chunk = fobj.readlines(BUFFER_SIZE)
while lines_chunk:
for line in lines_chunk:
m = REGEX_LOG_LINE.match(line)
if m is not None:
tag = m[1]
process = m[2]
else:
m = REGEX_LEGACY_LOG_LINE.match(line)
if m is not None:
tag = "MSG"
process = m[1]
else:
n_lines_unparsed += 1
continue
if tag in ("Born", "Died"):
summary[process][tag] += 1
for severity in ("WARNING", "ERROR"):
if (
severity in line
or severity.lower() in line
or severity.lower().capitalize() in line
):
summary[process][severity] += 1
n_lines_parsed += 1
lines_chunk = fobj.readlines(BUFFER_SIZE)
print(f" parsed lines: {n_lines_parsed}")
print(f" unparsed lines: {n_lines_unparsed}")
for process, stats in summary.items():
print(f" {process}:")
for event, n_lines in stats.items():
print(f" {event}: {n_lines}")
return summary
def main():
log_dir = "/logs/"
regexp = "^MSG_(.+)\.log"
while True:
for fname in os.listdir(log_dir):
plot_fpath = os.path.join(log_dir, os.path.splitext(fname.replace(".gz", ""))[0] + ".png")
log_fpath = os.path.join(log_dir, fname)
if re.match(regexp, fname) and not os.path.exists(plot_fpath):
print("-> Processing ", fname)
summary = process_log_file(log_fpath)
title = os.path.basename(fname)
plot_log_statistics(plot_fpath, summary, title)
time.sleep(seconds_to_UTC_midnight() + 5 * 60)
if __name__ == "__main__":
main()
......@@ -4,7 +4,7 @@
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
"""
Dumps MSG data from Ligier to a file.
Dumps MSG and other data from Ligier to a file.
Usage:
msg_dumper.py [options]
......@@ -20,10 +20,20 @@ Options:
"""
import datetime
import os
import re
from shutil import copyfile
from km3pipe import Pipeline, Module
from km3pipe.io import CHPump
from km3modules.common import MemoryObserver
PROCESS_NAME_REGEX = re.compile(r'.*([A-Z])\d{3}/[A-Z]\d{3}.*')
PROCESS_NAME_MAPPING = {
"A": "AcousticDataFilter",
"F": "DataFilter",
"Q": "DataQueue",
"W": "DataWriter",
}
def current_date_str(fmt="%Y-%m-%d"):
......@@ -38,13 +48,15 @@ class MSGDumper(Module):
self.current_date = current_date_str()
self.filename = self.prefix + ".log"
self.filepath = os.path.join(self.path, self.filename)
self.cprint("Logging to {}".format(self.filepath))
self.fobj = open(self.filepath, 'a')
self.idx = 0
def update_file_descriptor(self):
current_date = current_date_str()
if self.current_date != current_date:
archived_name = "{}_{}.log".format(self.prefix, self.current_date)
self.print("Cycling the log file: {} -> {}".format(
self.cprint("Cycling the log file: {} -> {}".format(
self.filename, archived_name))
copyfile(self.filepath, os.path.join(self.path, archived_name))
self.fobj.close()
......@@ -52,21 +64,24 @@ class MSGDumper(Module):
self.current_date = current_date
def process(self, blob):
if self.idx % 1000 == 0:
self.cprint(f"Number of messages processed so far: {self.idx}")
data = blob['CHData'].decode()
tag = str(blob["CHPrefix"].tag)
source = "Other"
if " A0" in data:
source = "AcousticDataFilter"
if " F0" in data:
source = "DataFilter"
if " Q0" in data:
source = "DataQueue"
if " W0" in data:
source = "DataWriter"
entry = "{} [{}]: {}\n".format(self.filename, source, data)
match = PROCESS_NAME_REGEX.match(data)
if match is not None:
source = PROCESS_NAME_MAPPING.get(match[1], "Unknown process")
entry = "{} {} [{}]: {}\n".format(self.filename, tag, source, data)
self.update_file_descriptor()
self.fobj.write(entry)
self.fobj.flush()
self.idx += 1
return blob
def finish(self):
......@@ -84,12 +99,13 @@ def main():
prefix = args['-x']
pipe = Pipeline()
pipe.attach(MemoryObserver, every=10000)
pipe.attach(CHPump,
host=ligier_ip,
port=ligier_port,
tags='MSG',
tags='MSG,Born,Died',
timeout=7 * 60 * 60 * 24,
max_queue=500)
max_queue=5000)
pipe.attach(MSGDumper, prefix=prefix, path=path)
pipe.drain()
......
File moved
......@@ -16,13 +16,14 @@ Options:
-u DU The DU to monitor [default: 1].
-d DET_ID Detector ID [default: 29].
-i INTERVAL Time interval for one pixel [default: 10].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from datetime import datetime
import io
import os
import gc
from collections import defaultdict
import threading
import time
......@@ -146,6 +147,7 @@ class PMTRates(kp.Module):
fig.tight_layout()
plt.savefig(filename)
plt.close('all')
gc.collect()
def process(self, blob):
try:
......@@ -161,7 +163,7 @@ class PMTRates(kp.Module):
self.log.debug(f"DOM ID {dom_id} not in detector definition!")
return blob
du, floor, _ = self.detector.doms[dom_id]
du, floor, *_ = self.detector.doms[dom_id]
if du != self.du:
return blob
......@@ -169,7 +171,7 @@ class PMTRates(kp.Module):
y_base = (floor - 1) * 31
if np.random.rand() > 0.99:
print(f"Rates for DOM ID {dom_id} DU {du}: {tmch_data.pmt_rates}")
self.cprint(f"Rates for DOM ID {dom_id} DU {du}: {tmch_data.pmt_rates}")
hrv_flags = reversed("{0:b}".format(tmch_data.hrvbmp).zfill(32))
for channel_id, (rate, hrv_flag) in enumerate(
......