Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 18-restructuring-of-the-monitoring-suite
  • 21-alert-for-max-displacement-of-a-dom
  • 25-adjust-plot-colour-schemes-for-colour-blind-people
  • 29-request-for-shifter-alert-for-low-trigger-rates
  • 3.0-pre
  • 36-integrate-online-acoustic-monitoring-script-for-orca-site-3
  • 39-trigger-map-x-axis
  • 51-support-for-km3pipe-v9
  • 67-ahrs-monitoring-on-arca
  • acoustic_monitoring
  • acoustic_monitoring_la
  • cguidi-master-patch-75149
  • cleanup-layout-configuration
  • fix-chatbot
  • jdq_branch
  • master
  • supernova
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.1.1
  • v1.2.0
  • v2.0.0
  • v2.1.0
  • v2.1.1
  • v2.1.2
  • v2.1.3
  • v2.1.4
  • v2.1.5
  • v2.1.6
  • v2.1.7
  • v2.2.0
  • v2.2.1
  • v2.2.2
  • v2.2.3
  • v2.2.4
  • v2.2.5
  • v2.2.6
  • v2.3.0
  • v2.3.1
40 results

Target

Select target project
  • km3py/km3mon
  • cguidi/km3mon
2 results
Select Git revision
  • 18-restructuring-of-the-monitoring-suite
  • 29-request-for-shifter-alert-for-low-trigger-rates
  • 36-integrate-online-acoustic-monitoring-script-for-orca-site
  • master
  • patch-1
  • patch-2
  • supernova
  • undefined
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.1.1
  • v1.2.0
13 results
Show changes
Commits on Source (540)
Showing
with 1948 additions and 384 deletions
Makefile
data/
__pycache__
*.pyc
*.pid
*.detx
plots/
logs/
pids/
backend/pipeline.toml
setenv.sh
supervisord.conf
.env
variables:
DOCKER_HOST: tcp://docker:2375
DOCKER_DRIVER: overlay2
BACKEND_IMAGE: docker.km3net.de/km3mon-backend
RECO_IMAGE: docker.km3net.de/km3mon-reco
FRONTEND_IMAGE: docker.km3net.de/km3mon-frontend
LIVELOG_IMAGE: docker.km3net.de/km3mon-livelog
stages:
- docker
- release
backend:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $BACKEND_IMAGE:test backend/
tags:
- docker
frontend:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $FRONTEND_IMAGE:test frontend/
tags:
- docker
livelog:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $LIVELOG_IMAGE:test livelog/
tags:
- docker
reco:
image: docker:stable
services:
- docker:dind
stage: docker
script:
- docker build --pull -t $RECO_IMAGE:test reco/
tags:
- docker
backend-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $BACKEND_IMAGE:${CI_COMMIT_TAG:1} backend/
- docker tag $BACKEND_IMAGE:${CI_COMMIT_TAG:1} $BACKEND_IMAGE:latest
- docker push $BACKEND_IMAGE:${CI_COMMIT_TAG:1}
- docker push $BACKEND_IMAGE:latest
tags:
- docker
only:
- tags
frontend-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $FRONTEND_IMAGE:${CI_COMMIT_TAG:1} frontend/
- docker tag $FRONTEND_IMAGE:${CI_COMMIT_TAG:1} $FRONTEND_IMAGE:latest
- docker push $FRONTEND_IMAGE:${CI_COMMIT_TAG:1}
- docker push $FRONTEND_IMAGE:latest
tags:
- docker
only:
- tags
livelog-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $LIVELOG_IMAGE:${CI_COMMIT_TAG:1} livelog/
- docker tag $LIVELOG_IMAGE:${CI_COMMIT_TAG:1} $LIVELOG_IMAGE:latest
- docker push $LIVELOG_IMAGE:${CI_COMMIT_TAG:1}
- docker push $LIVELOG_IMAGE:latest
tags:
- docker
only:
- tags
reco-release:
image: docker:stable
services:
- docker:dind
stage: release
script:
- docker build --pull -t $RECO_IMAGE:${CI_COMMIT_TAG:1} reco/
- docker tag $RECO_IMAGE:${CI_COMMIT_TAG:1} $RECO_IMAGE:latest
- docker push $RECO_IMAGE:${CI_COMMIT_TAG:1}
- docker push $RECO_IMAGE:latest
tags:
- docker
only:
- tags
Unreleased changes
------------------
* `ztplot.py` now exits with return code 1 when an `OverflowError` occurs
while converting the event time and dumps the raw event data to `data/`
Version 2
---------
2.2.5 / 2022-12-15
~~~~~~~~~~~~~~~~~~
* `log_analyser.py` now also parses gzipped log files
* Several backend scripts now also log the memory consuption
* Fixes memory issues in several backend scrripts, which were
introduced in MatplotLib 3.5.2
* Updated km3pipe to 9.13.8 and km3db to 0.11.1 in the backend
2.2.4 / 2022-12-01
~~~~~~~~~~~~~~~~~~
* Minor fixes
2.2.3 / 2022-11-20
~~~~~~~~~~~~~~~~~~
* Updates dependencies: km3pipe==9.13.5 which fixes the issue with the
triggered_hits scripts
2.2.2 / 2022-11-14
~~~~~~~~~~~~~~~~~~
* Updated dependencies
* `msg_dumper.py` now als logs `Born` and `Died` tagged messages from the
logging ligier
2.2.1 / 2022-11-07
~~~~~~~~~~~~~~~~~~
* Fixes the log analyser script which was crashing when log files
where gzipped
* Added "Born" and "Died" tags to the message logger system
2.2.0 / 2022-05-17
~~~~~~~~~~~~~~~~~~
* Updated requirements for backend and frontent
* Fixed a few other hangs when DB connection issues occur
2.1.7 / 2022-03-10
~~~~~~~~~~~~~~~~~~
* Reco container updated to Julia 1.7.2 and the latest versions
of all dependencies
* Reversed the colours of the DOM activity map
2.1.6 / 2021-12-01
~~~~~~~~~~~~~~~~~~
* Update km3db to 0.7.3 which has better error handling for
database connection issues (especially related to the AHRS
calibration monitoring)
2.1.5 / 2021-11-24
~~~~~~~~~~~~~~~~~~
* Hotfix (fixes trigger rate and ztplot crash)
2.1.4 / 2021-11-23
~~~~~~~~~~~~~~~~~~
* Renamed the docker image tags from foo:vX.Y.Z to foo:X.Y.Z
2.1.3 / 2021-11-21
~~~~~~~~~~~~~~~~~~
* km3pipe pinned to 9.11.2
* All requirements are pinned now
2.1.2 / 2021-11-05
~~~~~~~~~~~~~~~~~~
* Minor bugfixes in CI
2.1.1 / 2021-11-04
~~~~~~~~~~~~~~~~~~
* Added Docker image CI
2.1.0 / 2021-11-04
~~~~~~~~~~~~~~~~~~
* Triggered hits plot added
2.0.0 / 2021-05-05
~~~~~~~~~~~~~~~~~~
* Fully dockerised monitoring system
* Log analyser added by Rodri <rgracia@km3net.de>
* Log files are now showing plots indicating the number of errors and warnings
thanks to Rodri <rgracia@km3net.de>
* Corrupt event data are now skipped in trigger rate, which previously crashed
the thread
* Several catches of errors in online processing to make things run and log
instead of crash and annoy
* Preliminary support for km3pipe v9
Version 1
---------
1.2.0 / 2019-11-25
~~~~~~~~~~~~~~~~~~
* Top 10 events are now saved
* Added automatic ELOG entry for massive evenets, monitored in ``ztplot.py``
1.1.1 / 2019-10-23
~~~~~~~~~~~~~~~~~~
* Several bugfixes and improvements
1.1.0 / 2019-10-03
~~~~~~~~~~~~~~~~~~
* Several bugfixes and improvements
1.0.0 / 2019-06-29
~~~~~~~~~~~~~~~~~~
* First major release, using supervisord
SHELL := /bin/bash
STANDARD_TAGS := "IO_EVT, IO_SUM, IO_TSL, IO_TSL0, IO_TSL1, IO_TSL2, IO_TSSN, MSG, IO_MONIT"
LIGIER := $(shell command -v JLigier 2> /dev/null)
ifndef LIGIER
LIGIER := "SJ\ JLigier"
endif
default: build
build:
pip install -Ur requirements.txt
start:
@echo Creating tmux session...
@tmux new-session -d -s ${SESSION_NAME} \
|| (echo Run \"make stop\" to close the current session.; exit 1)
@tmux rename-window -t ${SESSION_NAME}:1 main
@tmux split-window -v -t ${SESSION_NAME}:main
@tmux split-window -v -t ${SESSION_NAME}:main
@echo Launching our own ligier
@#
@tmux send-keys -t ${SESSION_NAME}:main.1 \
"${LIGIER} -d2 -P ${MONITORING_LIGIER_PORT}" Enter
@sleep 1 # wait a second for JLigier
@echo Setting up the ligier mirror
@#
@tmux send-keys -t ${SESSION_NAME}:main.2 \
"ligiermirror -m \"${STANDARD_TAGS}\"" \
" -q ${MONITORING_LIGIER_PORT}" \
" -p ${DAQ_LIGIER_PORT} ${DAQ_LIGIER_IP}" \
Enter
@echo Starting the web server on 0.0.0.0:${WEBSERVER_PORT}
@#
@tmux send-keys -t ${SESSION_NAME}:main.3 \
"gunicorn --pid gunicorn.pid -w 4 -b 0.0.0.0:${WEBSERVER_PORT} km3mon:app" Enter
@tmux select-layout even-vertical
@echo Starting the monitoring scripts
@sleep 3
@# DOM activity and DOM rates
@#
@tmux new-window -n doms -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:doms
@tmux send-keys -t ${SESSION_NAME}:doms.1 \
"python scripts/dom_activity.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux send-keys -t ${SESSION_NAME}:doms.2 \
"python scripts/dom_rates.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux select-layout even-vertical
@# PMT rates and HRV
@#
@tmux new-window -n pmts -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:pmts
@tmux send-keys -t ${SESSION_NAME}:pmts.1 \
"python scripts/pmt_rates.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT} -i 20 -u 1" Enter
@tmux send-keys -t ${SESSION_NAME}:pmts.2 \
"python scripts/pmt_hrv.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT} -i 20 -u 1" Enter
@tmux select-layout even-vertical
@# Trigger rates
@#
@tmux new-window -n trigger -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:trigger
@tmux send-keys -t ${SESSION_NAME}:trigger.1 \
"python scripts/trigger_rates.py -p ${MONITORING_LIGIER_PORT}" Enter
@tmux send-keys -t ${SESSION_NAME}:trigger.2 \
"python scripts/live_triggermap.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux select-layout even-vertical
@# ZT-Plots
@#
@tmux new-window -n ztplots -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:ztplots
@tmux send-keys -t ${SESSION_NAME}:ztplots.1 \
"python scripts/ztplot.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux send-keys -t ${SESSION_NAME}:ztplots.2 \
"echo narf"
@tmux select-layout even-vertical
@# AHRS/RTTC
@#
@tmux new-window -n ahrs-rttc -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:ahrs-rttc
@tmux send-keys -t ${SESSION_NAME}:ahrs-rttc.1 \
"python scripts/ahrs_calibration.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux send-keys -t ${SESSION_NAME}:ahrs-rttc.2 \
"python scripts/rttc.py -d ${DETECTOR_ID} -l ${DETECTOR_MANAGER_IP}" Enter
@tmux select-layout even-vertical
@# K40
@#
@tmux new-window -n k40 -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:k40
@tmux send-keys -t ${SESSION_NAME}:k40.1 \
"python scripts/k40_calibration.py -d ${DETECTOR_ID} -p ${MONITORING_LIGIER_PORT}" Enter
@tmux select-layout even-vertical
@# Logs
@#
@tmux new-window -n log -t ${SESSION_NAME}
@tmux split-window -v -t ${SESSION_NAME}:log
@tmux send-keys -t ${SESSION_NAME}:log.1 \
"python scripts/msg_dumper.py -l 127.0.0.1 -p ${MONITORING_LIGIER_PORT} -f logs/MSG.log" Enter
@tmux send-keys -t ${SESSION_NAME}:log.2 \
"touch logs/MSG.log && frontail logs/*.log --ui-highlight --ui-highlight-preset frontail.json --theme dark -l 10000 -n 200 -p 8082" Enter
@tmux select-layout even-vertical
stop:
@echo Stopping monitoring session...
tmux kill-session -t ${SESSION_NAME}
kill -9 $(shell cat gunicorn.pid)
@sleep 5
clean:
rm Makefile
.PHONY: build start stop clean
# km3mon
Monitoring facility for the KM3NeT neutrino detector.
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.3268538.svg)](https://doi.org/10.5281/zenodo.3268538)
Online monitoring suite for the KM3NeT neutrino detectors.
## Requirements
There are two requirements needed:
- Docker
Everything is containerised, so no need to install other software. The version
of the pre-built Docker base images is determined by the `KM3MON_VERSION` variable
in the `.env` file (see `example.env`). Ideally, you want to keep the same version
as the checked out Git tag, but scripts which are not part of the base images can
be updated independently. The `backend` for example contains a lot of Python scripts
which can easily be updated without touching the base image, that only consists of
a Python installation with the required packages.
## Setup
1. Create a file called `.env` from the `example.env` template and adjust the detector
ID and the IP/port of the servers and also the `KM3MON_VERSION` variable, which should
ideally be set to the latest version. This determines the Docker images to be used
for each service.
2. Next, create a `backend/supervisord.conf` from the template file
`backend/supervisord.conf.example` and adjust if needed.
3. Create a `backend/pipeline.toml` from the `backend/pipeline.toml.example`
and adapt the settings if needed. Don't forget to add operators and shifters.
4. Optionally, adapt the layout of the plots in `frontend/app/routes.py`.
## Start and stop
The monitoring system can be started using
docker compose up -d
This will download and build all the required images and launch the containers
for each service. It will also create an overlay network.
To stop it it
docker compose down
## Monitoring the monitoring
- Python 3.5+
- tmux
Log files are kept in `logs/`, data dumps in `data/` and plots in `plots/`.
These folders can be used (mounted) by all services defined in the
`docker-compose.yml` file. The `frontent` service for example which runs
the Webserver mounts `plots/` to have access to the plots.
Both of them should be available ony any DAQ system you encounter. If not,
contact the administrators.
To check the logs or follow them in real-time (`-f`) and limit the rewind
to a number of lines `--tail=N`, e.g.
Every other dependency will be installed or updated during the `make` procedure
via the Python package manager `pip`.
docker compose logs -f --tail=10 SERVICE_NAME
## Usage
The `SERVICE_NAME` can be any of `backend`, `frontend`, `ligier`, `ligiermirror`,
`ligierlogmirror`, `reco` or `livelog`.
First, install (or update) the requirements by typing
## Back-end configuration file
make
The file `backend/pipeline.toml` is the heart of most monitoring processes and
can be used to set different kind of parameters, like plot attributes or ranges.
That configuration file is accessibe within each Python script under
`backend/scripts`.
Next check out the `configure` options with
The monitoring back-end is running inside a Docker container and controlled
by `supervisord`. You can enter the `backend` with
./configure --help
which will print the following screen:
docker exec -it monitoring_backend_1 bash
The ``supervisorctl`` is the tool to communicate with the monitoring
back-end system. To see the status of the processes, use `supervisorctl status`,
it will show each process one by one (make sure you call it in the
folder where you launched it):
```
_ _ __ __ ___ __ __ _____ _ _
( )/ )( \/ )(__ )( \/ )( _ )( \( )
) ( ) ( (_ \ ) ( )(_)( ) (
(_)\_)(_/\/\_)(___/(_/\/\_)(_____)(_)\_)
Usage: ./configure [options]
OPTION DESCRIPTION DEFAULT
--detector-id Detector ID 29
--daq-ligier-ip DAQ Ligier 192.168.0.110
--daq-ligier-port Port of the DAQ Ligier 5553
--monitoring-ligier-port Port of the monitoring Ligier 5553
--tmux-session-name TMUX session name km3mon
--webserver-port Port of the web server 8080
All invalid options are silently ignored.
$ supervisorctl status
alerts:timesync_monitor RUNNING pid 26, uptime 1 day, 5:21:06
logging:chatbot RUNNING pid 11, uptime 1 day, 5:21:06
logging:log_analyser RUNNING pid 10, uptime 1 day, 5:21:06
logging:msg_dumper RUNNING pid 9, uptime 1 day, 5:21:06
monitoring_process:acoustics RUNNING pid 1567, uptime 1 day, 5:20:59
monitoring_process:ahrs_calibration RUNNING pid 91859, uptime 1:09:14
monitoring_process:dom_activity RUNNING pid 1375, uptime 1 day, 5:21:00
monitoring_process:dom_rates RUNNING pid 1378, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_10 RUNNING pid 1376, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_11 RUNNING pid 1379, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_13 RUNNING pid 1377, uptime 1 day, 5:21:00
monitoring_process:pmt_rates_14 RUNNING pid 1568, uptime 1 day, 5:20:59
monitoring_process:pmt_rates_18 RUNNING pid 21, uptime 1 day, 5:21:06
monitoring_process:pmt_rates_9 RUNNING pid 1566, uptime 1 day, 5:20:59
monitoring_process:rttc RUNNING pid 118444, uptime 0:17:20
monitoring_process:trigger_rates RUNNING pid 22, uptime 1 day, 5:21:06
monitoring_process:triggermap RUNNING pid 1796, uptime 1 day, 5:20:58
monitoring_process:ztplot RUNNING pid 24, uptime 1 day, 5:21:06
reconstruction:time_residuals RUNNING pid 27, uptime 1 day, 5:21:06
```
and configure the ``Makefile`` with
The processes are grouped accordingly (`logging`, `monitoring_process` etc.) and
automatically started in the right order.
./configure --your --options
You can stop and start individual services using ``supervisorctl stop
group:process_name`` and ``supervisorctl start group:process_name``
After that, a `Makefile` is generated and you can start the monitoring facility
with
Since the system knows the order, you can safely ``restart all`` or just
a group of processes. Use the ``supervisorctl help`` to find out more and
``supervisorctl help COMMAND`` to get a detailed description of the
corresponding command.
make start
## Frontent
If you want to stop it:
The frontend is a simple webserver and uses HTML templates to render the websites.
The layout of the plots can be changed in `frontend/app/routes.py` using nested lists.
Each URL endpoint can be assigned to a specific function which uses a specific template
to render the actual page. The templates for the base layout (incuding the menubar) and
each page are under `frontend/app/templates`.
make stop
## Chatbot
easy.
The `km3mon` suite comes with a chatbot which can join a channel defined
in the `pipeline.toml` file under the `[Alerts]` section:
## Configuration file
``` toml
[Alerts]
botname = "monitoring"
password = "supersecretpassword"
channel = "operations_fr"
operators = [ "a_enzenhoefer", "tamasgal",]
```
A file called `pipeline.toml` can be placed into the root folder of the
monitoring software (usually `~/monitoring`) which can be used to set
different kind of parameters, like plot attributes or ranges.
Here is an example `pipeline.toml`:
The password is the actual login password of the bot. Once the `chatbot` service
is running, the bot will notifiy important events like sudden drop of the
trigger rate and can also be used to retrieve information from the monitoring
system, set the current shifts and even control the monitoring services through
the `supervisorctl` interface. Only the operators defined in the configuration
file are allowed to modify services or change the shifters.
To see the bot's capabilities, one simply asks them for help via
`@monitoring help`:
```
[DOMRates]
lowest_rate = 150 # [kHz]
highest_rate = 350 # [kHz]
Hi Tamas Gal, I was built to take care of the monitoring alerts.
Here is how you can use me:
- @monitoring shifters are cnorris and bspencer
-> set the new shifters who I may annoy with chat messages and
emails.
- @monitoring status -> show the status of the monitoring system
- @monitoring supervisorctl -> take control over the monitoring system
- @monitoring help -> show this message
```
[PMTRates]
lowest_rate = 1000 # [Hz]
highest_rate = 20000 # [Hz]
### Troubleshooting
[TriggerRate]
interval = 300 # time inverval to integrate [s]
with_minor_ticks = true # minor tickmarks on the plot
#### Database connection needed
[TriggerMap]
max_events = 5000 # the number of events to log
The monitoring processes talk to the KM3NeT Oracle DB service and need a valid
session cookie. The monitoring servers of the ORCA and ARCA shore stations are
whitelisted and use the following session cookies (defined in the `.env` file):
[ZTPlot]
min_dus = 1
ytick_distance = 25 # [m]
```
- ARCA: `KM3NET_DB_COOKIE=_tgal_192.84.151.58_48d829f9ef104d82b4c1d8557ee5bb55`
- ORCA: `KM3NET_DB_COOKIE=_tgal_188.194.66.108_4c0d9307fb4a423cb0bd8c2b34ba4790`
If you run the system on other machines, you need to provide
the cookie string for that specific machine. To get the cookie string, run the
monitoring system with `docker compose up -d` and connect to the backend with
# docker exec -it monitoring-backend-1 bash
To get a session cookie, query the database however you like, e.g.
# streamds get detectors
It will ask you for your KM3NeT (external) credentials and the required
cookie value is the last column in the file `~/.km3netdb_cookie`
# cat ~/.km3netdb_cookie
.in2p3.fr TRUE / TRUE 0 sid _tgal_131.42.5.23_6d132a51d884b22b2ba861a8847346c
Create a new environment variable in the `.env` file on the host system (not in the Docker
container!) with the following entry (of course with your cookie string):
KM3NET_DB_COOKIE=_tgal_131.42.5.23_6d132a51d884b22b2ba861a8847346c
and restart the whole monitoring system with
docker compose down && docker compose up -d
After a `make stop` and `make start`, the file is parsed and the default
values are overwritten by those defined in the configuration file.
from os.path import join, exists
from functools import wraps
import toml
from flask import render_template, send_from_directory, request, Response
from app import app
CONFIG_PATH = "pipeline.toml"
PLOTS_PATH = "../plots"
USERNAME = None
PASSWORD = None
app.config['FREEZER_DESTINATION'] = '../km3web'
PLOTS = [['dom_activity', 'dom_rates'], ['pmt_rates', 'pmt_hrv'],
['trigger_rates'], ['ztplot', 'triggermap']]
AHRS_PLOTS = [['yaw_calib'], ['pitch_calib'], ['roll_calib']]
TRIGGER_PLOTS = [['trigger_rates'], ['trigger_rates_lin']]
K40_PLOTS = [['intradom'], ['angular_k40rate_distribution']]
RTTC_PLOTS = [['rttc']]
RECO_PLOTS = [['track_reco', 'ztplot_roy'], ['time_residuals']]
COMPACT_PLOTS = [['dom_activity', 'dom_rates', 'pmt_rates', 'pmt_hrv'],
['trigger_rates', 'trigger_rates_lin'],
['ztplot', 'ztplot_roy', 'triggermap']]
SN_PLOTS = [['sn_bg_distribution']]
if exists(CONFIG_PATH):
config = toml.load(CONFIG_PATH)
if "WebServer" in config:
print("Reading authentication information from '%s'" % CONFIG_PATH)
USERNAME = config["WebServer"]["username"]
PASSWORD = config["WebServer"]["password"]
def check_auth(username, password):
"""This function is called to check if a username /
password combination is valid.
"""
if USERNAME is not None and PASSWORD is not None:
return username == USERNAME and password == PASSWORD
else:
return True
def authenticate():
"""Sends a 401 response that enables basic auth"""
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
@app.after_request
def add_header(r):
"""
Disable caches.
"""
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
r.headers["Pragma"] = "no-cache"
r.headers["Expires"] = "0"
r.headers["Cache-Control"] = "public, max-age=0"
return r
@app.route('/')
@app.route('/index.html')
@requires_auth
def index():
return render_template('plots.html', plots=PLOTS)
@app.route('/ahrs.html')
@requires_auth
def ahrs():
return render_template('plots.html', plots=AHRS_PLOTS)
@app.route('/reco.html')
@requires_auth
def reco():
return render_template('plots.html', plots=RECO_PLOTS)
@app.route('/sn.html')
@requires_auth
def supernova():
return render_template('plots.html', plots=SN_PLOTS)
@app.route('/compact.html')
@requires_auth
def compact():
return render_template('plots.html', plots=COMPACT_PLOTS)
@app.route('/rttc.html')
@requires_auth
def rttc():
return render_template(
'plots.html',
plots=RTTC_PLOTS,
info=
"Cable Round Trip Time calculated from realtime data provided by the "
"Detector Manager. The red lines shows the median and the STD "
"from the past 24 hours. "
"RTTC = Cable_RTT - (TX_Slave + RX_Slave + TX_Master + RX_Master)")
@app.route('/k40.html')
@requires_auth
def k40():
return render_template(
'plots.html',
plots=K40_PLOTS,
info="The first plot shows the intra-DOM calibration. "
"y-axis: delta_t [ns], x-axis: cosine of angles. "
"The second plot the angular distribution of K40 rates. "
"y-axis: rate [Hz], x-axis: cosine of angles. "
"blue=before, red=after")
@app.route('/trigger.html')
@requires_auth
def trigger():
return render_template('plots.html', plots=TRIGGER_PLOTS)
@app.route('/plots/<path:filename>')
@requires_auth
def custom_static(filename):
print(filename)
filepath = join(app.root_path, PLOTS_PATH)
print(filepath)
return send_from_directory(join(app.root_path, PLOTS_PATH), filename)
/* Move down content because we have a fixed navbar that is 50px tall */
body {
padding-top: 50px;
padding-bottom: 20px;
}
img.plot {
width: 100%;
}
.plot-container {
text-align: center;
}
.ruler {
background-color: steelblue;
position: absolute;
}
#horizontal{
margin-top: 50px;
width:98%;
height:4px;
opacity: 0.5;
display: none;
}
#vertical{
width:1px;
height:100%;
display: none;
}
FROM python:3.12
MAINTAINER Tamas Gal <tgal@km3net.de>
WORKDIR /monitoring
COPY requirements.txt requirements.txt
RUN pip install -U wheel setuptools pip && pip install -r requirements.txt
COPY . .
CMD ["supervisord", "--nodaemon", "-c", "supervisord.conf"]
[WebServer]
username = "km3net"
password = "anothersupersecretpassword"
[DOMRates]
lowest_rate = 150
highest_rate = 350
[PMTRates]
lowest_rate = 1000
highest_rate = 20000
hrv_ratio_threshold = 0.85
[TriggerRate]
interval = 300
with_minor_ticks = true
[TriggerMap]
max_events = 5000
[ZTPlot]
min_dus = 3
min_doms = 24
ytick_distance = 25
logbook = "Operations+FR"
elog = false
[CalibrateAHRS]
time_range = 72
[LocalDBService]
filename = "/data/monitoring.sqlite3"
[ELOGService]
password = "swordfish"
[Alerts]
enabled = false
botname = "monitoring"
password = "supersecretpassword"
channel = "operations_fr"
operators = [ "a_enzenhoefer", "tamasgal",]
docopt==0.6.2
km3db==0.14.3
km3io==1.2.2
km3pipe==10.0.3
matplotlib==3.10.0
numba==0.61.0
numpy==2.1.3
pandas==2.2.3
requests==2.32.3
rocketchat-API==1.34.0
seaborn==0.13.2
supervisor==4.2.5
toml==0.10.2
#!/usr/bin/env python
# coding=utf-8
"""
Online Acoustic Monitoring
Usage:
acoustics.py [options]
acoustics.py (-h | --help)
Options:
-d DET_ID Detector ID.
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from datetime import datetime
import gc
import os
import time
import http
import ssl
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib import colors
import numpy as np
import km3db
import km3pipe as kp
from docopt import docopt
def diff(first, second):
second = set(second)
return [item for item in first if item not in second]
def duplicates(lst, item):
return [i for i, x in enumerate(lst) if x == item]
args = docopt(__doc__)
sds = km3db.StreamDS(container="pd")
try:
detid = int(args['-d'])
except ValueError:
detid = (args['-d'])
if type(detid)==int:
detid = km3db.tools.todetoid(detid)
directory = args['-o']
ACOUSTIC_BEACONS = [0, 0, 0]
N_DOMS = 18
N_ABS = 3
DOMS = range(N_DOMS + 1)
DUS = kp.hardware.Detector(det_id=detid).dus
DUS_cycle = list(np.arange(max(DUS)) + 1)
TIT = 600 # Time Interval between Trains of acoustic pulses)
SSW = 160 # Signal Security Window (Window size with signal)
clbmap = km3db.CLBMap(detid)
check = True
while check:
minrun = None
while minrun is None:
try:
table = sds.runs(detid=detid)
minrun = table["RUN"][len(table["RUN"]) - 1]
ind, = np.where((table["RUN"] == minrun))
mintime1 = table['UNIXSTARTTIME'][ind]
mintime = mintime1.values
maxrun = table["RUN"][len(table["RUN"]) - 1]
now = time.time()
now = now - TIT
if (now - mintime / 1000) < TIT:
minrun = table["RUN"][len(table["RUN"]) - 1] - 1
print(now)
except:
pass
N_Pulses_Indicator = [
] # Matrix indicating how many pulses each piezo reveals
for du in DUS_cycle:
N_Pulses_Indicator_DU = [
] # Array indicating for each DU how many pulses each piezo reveals.
for dom in DOMS:
UTB_MIN = []
QF_MAX = []
n = -1
for ab in ACOUSTIC_BEACONS:
n = n + 1
try:
domID = clbmap.omkeys[(du, dom)].dom_id
AcBe = sds.toashort(detid=detid,
minrun=minrun,
domid=domID,
maxrun=maxrun)
ACOUSTIC_BEACONS_TEMP = np.unique(AcBe["EMITTERID"]).tolist()
if np.size(ACOUSTIC_BEACONS_TEMP) < 3:
while np.size(ACOUSTIC_BEACONS_TEMP) < 3:
ACOUSTIC_BEACONS_TEMP.append(0)
ACOUSTIC_BEACONS_TEMP_2 = np.sort(np.abs(ACOUSTIC_BEACONS_TEMP))
m = np.where(np.abs(ACOUSTIC_BEACONS_TEMP) == ACOUSTIC_BEACONS_TEMP_2[n])[0][0]
ab = ACOUSTIC_BEACONS_TEMP[m]
print(ab)
except (KeyError, AttributeError, TypeError):
N_Pulses_Indicator_DU.append(-1.5)
continue
try:
toas_all = sds.toashort(detid=detid,
minrun=minrun,
maxrun=maxrun,
domid=domID,
emitterid=ab)
QF_abdom = toas_all["QUALITYFACTOR"]
UTB_abdom = toas_all["UNIXTIMEBASE"]
TOAS_abdom = toas_all["TOA_S"]
UTB_abdom = UTB_abdom.values
up = np.where(UTB_abdom > (now - TIT))
down = np.where(UTB_abdom < (now))
intr = np.intersect1d(up, down)
UTB_abdom = UTB_abdom[intr]
QF_abdom = QF_abdom[intr]
QF_abdom = QF_abdom.values
QFlist = QF_abdom.tolist()
QFlist.sort(reverse=True)
QF_max = max(QF_abdom)
QF_max_index = np.where(QF_abdom == QF_max)
UTB_signal_min = UTB_abdom[QF_max_index[0][0]] - SSW / 2
UTB_signal_max = UTB_abdom[QF_max_index[0][0]] + SSW / 2
temp1 = np.where(UTB_abdom > (UTB_signal_min))
temp2 = np.where(UTB_abdom < (UTB_signal_max))
inter = np.intersect1d(temp1, temp2)
inter = inter.tolist()
signal_index = inter
# Define the signal index if the the pings are splitted in two parts inside the window
if UTB_signal_min < (now - TIT):
temp1 = np.where(UTB_abdom > (now - TIT))
temp2 = np.where(UTB_abdom < (UTB_signal_max))
inter1 = np.intersect1d(temp1, temp2)
inter1 = inter1.tolist()
temp11 = np.where(UTB_abdom < (now))
temp22 = np.where(UTB_abdom > (now - SSW / 2))
inter2 = np.intersect1d(temp11, temp22)
inter2 = inter2.tolist()
inter = np.union1d(inter1, inter2)
inter = inter.tolist()
signal_index = inter
signal_index = np.array(signal_index)
signal_index = signal_index.astype(int)
signal_index = signal_index.tolist()
if UTB_signal_max > now:
temp1 = np.where(UTB_abdom < (now))
temp2 = np.where(UTB_abdom > (UTB_signal_min))
inter1 = np.intersect1d(temp1, temp2)
inter1 = inter1.tolist()
temp11 = np.where(UTB_abdom > ((now - TIT)))
temp22 = np.where(UTB_abdom < ((now - TIT) + SSW / 2))
inter2 = np.intersect1d(temp11, temp22)
inter2 = inter2.tolist()
inter = np.union1d(inter1, inter2)
inter = inter.tolist()
signal_index = inter
signal_index = np.array(signal_index)
signal_index = signal_index.astype(int)
signal_index = signal_index.tolist()
QF_abdom_index = np.where(QF_abdom)
all_data_index = QF_abdom_index[0].tolist()
noise_index = diff(all_data_index, signal_index)
SIGNAL = QF_abdom[signal_index]
UTB_SIGNAL = UTB_abdom[signal_index]
TOA_SIGNAL = TOAS_abdom[signal_index]
UNIX_TOA_SIGNAL = UTB_abdom[signal_index] + TOAS_abdom[signal_index]
if len(noise_index) != 0:
NOISE = QF_abdom[noise_index]
NOISElist = NOISE.tolist()
NOISElist.sort(reverse=True)
NOISE = NOISE.tolist()
NOISE.sort(reverse=True)
noise_threshold = max(
NOISE) # To be sure not to take signal
# First filter: 22 greatest
Security_Number = len(SIGNAL) # To be sure to take all the pulses
SIGNAL = SIGNAL.tolist()
SIGNAL_OLD = np.array(SIGNAL)
SIGNAL.sort(reverse=True)
QF_first = SIGNAL[0:Security_Number]
# Second filter: delete duplicates (Delete if Unixtimebase + ToA is the same)
QF_second = QF_first
R = []
for r in np.arange(len(QF_first)):
R.append(np.where(SIGNAL_OLD == QF_first[r])[0][0])
UTB_first = np.array(UTB_SIGNAL.tolist())[R]
TOA_first = np.array(TOA_SIGNAL.tolist())[R]
UNIX_TOA = UTB_first + TOA_first
UNIX_TOA = UNIX_TOA.tolist()
UNIX_TOA_index = []
for x in set(UNIX_TOA):
if UNIX_TOA.count(x) > 1:
UNIX_TOA_index.append(duplicates(UNIX_TOA, x))
ind_del = []
for i in range(len(UNIX_TOA_index)):
ind_del.append(UNIX_TOA_index[i][0])
for ide in sorted(ind_del, reverse=True):
del QF_second[ide]
QF_second.sort(reverse=True)
# Third filter: If there are more than 11 elements I will eliminate the worst
if len(QF_second) > 11:
QF_second = np.array(QF_second)
QF_third = [
k for k in QF_second
if (np.where(QF_second == k)[0][0] < 11)
]
else:
QF_third = QF_second
# Fourth filter: I remove the data if it is below the maximum noise
if len(noise_index) != 0:
QF_fourth = [
k for k in QF_third
if k > (noise_threshold + (10 * np.std(NOISE)))
]
else:
QF_fourth = QF_third
# Fifth filter: Check if the clicks are interspersed in the right way
QF_fifth = QF_fourth
Q = []
for q in np.arange(len(QF_fifth)):
Q.append(np.where(SIGNAL_OLD == QF_fifth[q])[0][0])
UTB_fourth = np.array(UTB_SIGNAL.tolist())[Q]
UTB_fourth_l = UTB_fourth.tolist()
D = []
for g in np.arange(len(UTB_fourth_l)):
if ((np.mod((UTB_fourth_l[g] - UTB_fourth_l[0]), 5) > 2
and np.mod(
(UTB_fourth_l[g] - UTB_fourth_l[0]), 5) < 4)
or
(np.mod(
(UTB_fourth_l[g] - UTB_fourth_l[0]), 5) > 5)
):
D.append(g)
for d in sorted(D, reverse=True):
del QF_fifth[d]
# Sixth filter:
if len(noise_index) != 0:
QF_sixth = [
k for k in QF_fifth
if (2*abs(k - max(QF_fifth)) < abs(k - noise_threshold))
]
else:
QF_sixth = QF_fifth
QF_OK = QF_sixth
P = []
for p in np.arange(len(QF_OK)):
P.append(np.where(SIGNAL_OLD == QF_OK[p])[0][0])
UTB_OK = np.array(UTB_SIGNAL.tolist())[P]
UTB_OK_l = UTB_OK.tolist()
UTB_MIN.append(min(UTB_OK_l))
max_QF = max(QF_OK)
QF_MAX.append(max_QF)
NUM = len(QF_OK) # Number of pulses
print(NUM)
if (NUM > 7):
N_Pulses_Indicator_DU.append(1.5)
elif (NUM < 8 and NUM > 3):
N_Pulses_Indicator_DU.append(0.5)
elif (NUM < 4 and NUM > 0):
N_Pulses_Indicator_DU.append(-0.5)
elif (NUM == 0):
N_Pulses_Indicator_DU.append(-1.5)
except (
TypeError, ValueError, http.client.RemoteDisconnected, ssl.SSLError
): # TypeError if no data found for a certain piezo, ValueError if there are zero data for a certain piezo
N_Pulses_Indicator_DU.append(-1.5)
except (
http.client.RemoteDisconnected, ssl.SSLError
): # Bad connection to the DB
N_Pulses_Indicator_DU.append(-2.5)
# To avoid to take wrong beacon signals
dim = np.size(QF_MAX)
pulse_inter = 5.04872989654541
for i in range(dim - 1):
if (np.mod((UTB_MIN[i] - UTB_MIN[i + 1]), pulse_inter) < 10**-3
or np.mod(
(UTB_MIN[i] - UTB_MIN[i + 1]), pulse_inter) > 5):
if QF_MAX[i] <= QF_MAX[i + 1]:
N_Pulses_Indicator_DU[3 * dom + i] = -1.5
else:
N_Pulses_Indicator_DU[3 * dom + i + 1] = -1.5
if i == 0 and dim == 3:
if (np.mod(
(UTB_MIN[i] -
UTB_MIN[i + 2]), pulse_inter) < 10**-3 or np.mod(
(UTB_MIN[i] - UTB_MIN[i + 2]), pulse_inter) > 5):
if QF_MAX[i] <= QF_MAX[i + 2]:
N_Pulses_Indicator_DU[3 * dom + i] = -1.5
else:
N_Pulses_Indicator_DU[3 * dom + i + 2] = -1.5
N_Pulses_Indicator.append(N_Pulses_Indicator_DU)
fig, ax = plt.subplots(figsize=(9, 7))
duab = []
DUs = []
for du in DUS_cycle:
duabdu = []
duab1 = (du - 0.2) * np.ones(N_DOMS + 1)
duab2 = (du) * np.ones(N_DOMS + 1)
duab3 = (du + 0.2) * np.ones(N_DOMS + 1)
duabdu.append(duab1)
duabdu.append(duab2)
duabdu.append(duab3)
duab.append(duabdu)
DUs.append(np.array(N_Pulses_Indicator[du - 1]))
iAB1 = []
iAB2 = []
iAB3 = []
for i in DOMS:
iAB1.append(3 * i)
iAB2.append(3 * i + 1)
iAB3.append(3 * i + 2)
colorsList = [(0.6, 0, 1), (0, 0, 0), (1, 0.3, 0), (1, 1, 0), (0.2, 0.9, 0)]
CustomCmap = matplotlib.colors.ListedColormap(colorsList)
bounds = [-3, -2, -1, 0, 1, 2]
norma = colors.BoundaryNorm(bounds, CustomCmap.N)
for du in DUS:
color = ax.scatter(duab[du - 1][0],
DOMS,
s=20,
c=DUs[du - 1][iAB1],
norm=norma,
marker='s',
cmap=CustomCmap)
color = ax.scatter(duab[du - 1][1],
DOMS,
s=20,
c=DUs[du - 1][iAB2],
norm=norma,
marker='s',
cmap=CustomCmap)
color = ax.scatter(duab[du - 1][2],
DOMS,
s=20,
c=DUs[du - 1][iAB3],
norm=norma,
marker='s',
cmap=CustomCmap)
cbar = plt.colorbar(color)
cbar.ax.get_yaxis().set_ticks([])
for j, lab in enumerate(
['$No DB conn.$','$0. pings$', '$1-3 pings$', '$4-7 pings$', '$>7. pings$']):
cbar.ax.text(4, (1.5 * j + 1) / 8.0, lab, ha='center', va='center')
cbar.ax.get_yaxis().labelpad = 18
ax.set_xticks(np.arange(1, max(DUS) + 1, step=1))
ax.set_yticks(np.arange(0, 19, step=1))
ax.grid(color='k', linestyle='-', linewidth=0.2)
ax.set_xlabel('DUs', fontsize=18)
ax.set_ylabel('Floors', fontsize=18)
ts = now
DATE = datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
ax.set_title(
r' %.16s Detection of the pings emitted by autonomous beacons' % DATE,
fontsize=10)
my_path = os.path.abspath(directory)
my_file = 'Online_Acoustic_Monitoring.png'
fig.savefig(os.path.join(my_path, my_file))
plt.close('all')
gc.collect()
print(time.time())
check = False
check_time = time.time() - now - TIT
print(check_time)
time.sleep(abs(TIT - check_time))
check = True
#!/usr/bin/env python
# coding=utf-8
# vim: ts=4 sw=4 et
"""
Runs the AHRS calibration online.
Usage:
ahrs_calibration.py [options]
ahrs_calibration.py (-h | --help)
Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from __future__ import division
from datetime import datetime
from collections import deque, defaultdict
from functools import partial
import gc
import io
import os
import random
import time
import threading
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as md
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import km3db
import km3pipe as kp
from km3pipe.io.daq import TMCHData
from km3modules.common import MemoryObserver
from km3modules.ahrs import fit_ahrs, get_latest_ahrs_calibration
import km3pipe.style
km3pipe.style.use("km3pipe")
class CalibrateAHRS(kp.Module):
def configure(self):
self.plots_path = self.require("plots_path")
det_id = self.require("det_id")
det_oid = km3db.tools.todetoid(det_id)
self.time_range = self.get("time_range", default=24 * 3) # hours
self.n_points_in_graph = self.get(
"n_points_in_graph", default=2000
) # x-axis resolution (per floor)
self.data = {}
self.dus = set()
self.detector = kp.hardware.Detector(det_id=det_id)
self.clbmap = km3db.CLBMap(det_oid=det_oid)
self.cuckoo = kp.time.Cuckoo(60, self.create_plot) # plot update interval [s]
self.cuckoo_log = kp.time.Cuckoo(10, self.cprint)
self.cuckoo_stats = kp.time.Cuckoo(300, self._show_stats)
# 10Hz monitoring (including bases)
n_messages = int(
10 * self.detector.n_doms * self.time_range * 60 ** 2 + self.detector.n_dus
)
self.fraction_to_keep = (
self.n_points_in_graph * self.detector.n_doms / n_messages
)
self.cprint(f"Fraction to keep: {self.fraction_to_keep}")
self.queue_size = int(
self.n_points_in_graph * 1.1
) # a bit of safety margin due to randomness
self.cprint(
f"Queue size for each module: {self.queue_size}, time range: {self.time_range} hours"
)
self.lock = threading.Lock()
self.index = 0
def _show_stats(self):
"""Print some data statistics"""
messages = ["Recorded data:"]
for du, data in self.data.items():
messages.append(f"DU {du}: ")
for floor, times in data["times"].items():
messages.append(
f" floor {floor}: {len(times)} ({times[0]}, {times[-1]})"
)
self.cprint("\n".join(messages))
def _register_du(self, du):
"""Create data cache for DU"""
self.data[du] = {}
for ahrs_param in ("yaw", "pitch", "roll"):
self.data[du][ahrs_param] = defaultdict(
partial(deque, maxlen=self.queue_size)
)
self.data[du]["times"] = defaultdict(partial(deque, maxlen=self.queue_size))
self.dus.add(du)
def process(self, blob):
self.index += 1
if random.random() > self.fraction_to_keep:
return blob
self.cuckoo_stats()
now = datetime.utcnow()
tmch_data = TMCHData(io.BytesIO(blob["CHData"]))
dom_id = tmch_data.dom_id
clb = self.clbmap.dom_ids[dom_id]
if clb.floor == 0:
self.log.info("Skipping base CLB")
return blob
yaw = tmch_data.yaw
calib = get_latest_ahrs_calibration(clb.upi, max_version=4)
if calib is None:
self.log.warning("No calibration found for CLB UPI '%s'", clb.upi)
return blob
du = clb.du
if du not in self.dus:
self._register_du(du)
cyaw, cpitch, croll = fit_ahrs(tmch_data.A, tmch_data.H, *calib)
self.cuckoo_log(
"DU{}-DOM{} (random pick): calibrated yaw={}".format(
clb.du, clb.floor, cyaw
)
)
with self.lock:
self.data[du]["yaw"][clb.floor].append(cyaw)
self.data[du]["pitch"][clb.floor].append(cpitch)
self.data[du]["roll"][clb.floor].append(croll)
self.data[du]["times"][clb.floor].append(now)
self.cuckoo.msg()
return blob
def create_plot(self):
self.cprint(self.__class__.__name__ + ": updating plot.")
if self.time_range > 24:
xfmt = md.DateFormatter("%Y-%m-%d %H:%M")
else:
xfmt = md.DateFormatter("%H:%M")
xlim = (
datetime.utcfromtimestamp(time.time() - int(self.time_range * 60 * 60)),
datetime.utcnow(),
)
for du in self.dus:
data = self.data[du]
for ahrs_param in data.keys():
fig, ax = plt.subplots(figsize=(16, 6))
sns.set_palette("husl", 18)
ax.set_title(
"AHRS {} Calibration on DU{}\n{}".format(
ahrs_param, du, datetime.utcnow()
)
)
ax.set_xlabel("UTC time")
ax.xaxis.set_major_formatter(xfmt)
ax.set_ylabel(ahrs_param)
with self.lock:
for floor in sorted(data[ahrs_param].keys()):
ax.plot(
data["times"][floor],
data[ahrs_param][floor],
marker=".",
linestyle="none",
label="Floor {}".format(floor),
)
ax.set_xlim(xlim)
lgd = plt.legend(bbox_to_anchor=(1.005, 1), loc=2, borderaxespad=0.0)
fig.tight_layout()
plt.savefig(
os.path.join(
self.plots_path, ahrs_param + "_calib_du{}.png".format(du)
),
bbox_extra_artists=(lgd,),
bbox_inches="tight",
)
plt.close("all")
gc.collect()
def main():
from docopt import docopt
args = docopt(__doc__)
det_id = int(args["-d"])
plots_path = args["-o"]
ligier_ip = args["-l"]
ligier_port = int(args["-p"])
pipe = kp.Pipeline()
pipe.attach(MemoryObserver, every=600000)
pipe.attach(
kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags="IO_MONIT",
timeout=60 * 60 * 24 * 7,
max_queue=2000,
)
pipe.attach(CalibrateAHRS, det_id=det_id, plots_path=plots_path)
pipe.drain()
if __name__ == "__main__":
main()
#!/usr/bin/env python
# coding=utf-8
# Filename: chatbot.py
# Author: Tamas Gal <tgal@km3net.de>
"""
The monitoring chatbot.
Usage:
chatbot.py
chatbot.py (-h | --help)
Options:
-h --help Show this screen.
"""
from datetime import datetime
from pprint import pprint
import random
import re
import requests
import subprocess
import time
import threading
import toml
from rocketchat_API.rocketchat import RocketChat
import km3pipe as kp
log = kp.logger.get_logger("chatbot")
URL = "https://chat.km3net.de"
CONFIG = "pipeline.toml"
RECONNECT_INTERVAL = 30
class RocketChatBot(object):
def __init__(self, botname, passwd, server, command_character=None):
self.botname = botname
self.api = RocketChat(user=botname, password=passwd, server_url=server)
self.commands = [(['echo', ], self.echo)]
self.auto_answers = []
self.direct_answers = []
self.unknow_command = ['command not found', ]
self.lastts = {}
self.command_character = command_character
def echo(self, msg, user, channel_id):
self.send_message('@' + user + ' : ' + msg, channel_id)
def get_status(self, auser):
return self.api.users_get_presence(username=auser)
def send_message(self, msg, channel_id):
self.api.chat_post_message(channel=channel_id, text=msg)
def add_dm_handler(self, command, action):
self.commands.append((command, action))
def add_auto_answer(self, triggers, answers):
self.auto_answers.append((triggers, answers))
def add_direct_answer(self, triggers, answers):
self.direct_answers.append((triggers, answers))
def handle_command_character_message(self, message, channel_id):
msg = message['msg'].lstrip(self.command_character)
command = msg.split()[0].lower()
arguments = " ".join(msg.split()[1:])
user = message['u']['username']
for cmd_list in self.commands:
if command.lower() in cmd_list[0]:
cmd_list[1](arguments, user, channel_id)
return
if not self.handle_auto_answer(message, self.direct_answers, channel_id):
self.send_message('@' + user + ' :' + random.choice(self.unknow_command), channel_id)
def handle_direct_message(self, message, channel_id):
msg = message['msg'].lstrip('@' + self.botname).strip()
if len(msg) > 0:
command = msg.split()[0].lower()
arguments = " ".join(msg.split()[1:])
user = message['u']['username']
for cmd_list in self.commands:
if command.lower() in cmd_list[0]:
cmd_list[1](arguments, user, channel_id)
return
if not self.handle_auto_answer(message, self.direct_answers, channel_id):
self.send_message('@' + user + ' :' + random.choice(self.unknow_command), channel_id)
else:
self.send_message('Here I am', channel_id)
def handle_auto_answer(self, message, answers, channel_id):
for kind in answers:
for k in kind[0]:
if k in message['msg'].lower():
self.send_message(random.choice(kind[1]) + ' @' + message['u']['username'], channel_id)
return True
return False
def handle_messages(self, messages, channel_id):
for message in messages['messages']:
if message['u']['username'] != self.botname:
pprint(message)
if message['u']['username'] == 'rocket.cat':
continue
if message['msg'].startswith('@' + self.botname):
threading.Thread(target=self.handle_direct_message, args=(message, channel_id)).start()
elif self.command_character is not None and message['msg'].startswith(self.command_character):
threading.Thread(target=self.handle_command_character_message, args=(message, channel_id)).start()
elif 'mentions' not in message or message.get('mentions') == []:
threading.Thread(target=self.handle_auto_answer, args=(message, self.auto_answers, channel_id)).start()
def load_ts(self, channel_id, messages):
if len(messages) > 0:
self.lastts[channel_id] = messages[0]['ts']
else:
self.lastts[channel_id] = ''
def load_channel_ts(self, channel_id):
self.load_ts(channel_id, self.api.channels_history(channel_id).json()['messages'])
def load_group_ts(self, channel_id):
self.load_ts(channel_id, self.api.groups_history(channel_id).json()['messages'])
def load_im_ts(self, channel_id):
response = self.api.im_history(channel_id).json()
if response.get('success'):
self.load_ts(channel_id, self.api.im_history(channel_id).json()['messages'])
def process_messages(self, messages, channel_id):
try:
if "success" in messages:
if messages['success'] == False:
raise RuntimeError(messages['error'])
if len(messages['messages']) > 0:
self.lastts[channel_id] = messages['messages'][0]['ts']
self.handle_messages(messages, channel_id)
except Exception as e:
pprint(e)
def process_channel(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = datetime.now().isoformat()
self.process_messages(self.api.channels_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def process_group(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = ''
self.process_messages(self.api.groups_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def process_im(self, channel_id):
if channel_id not in self.lastts:
self.lastts[channel_id] = ''
self.process_messages(self.api.im_history(channel_id, oldest=self.lastts[channel_id]).json(),
channel_id)
def run(self):
for channel in self.api.channels_list_joined().json().get('channels'):
self.load_channel_ts(channel.get('_id'))
for group in self.api.groups_list().json().get('groups'):
self.load_group_ts(group.get('_id'))
for im in self.api.im_list().json().get('ims'):
self.load_im_ts(im.get('_id'))
while 1:
for channel in self.api.channels_list_joined().json().get('channels'):
threading.Thread(target=self.process_channel, args=(channel.get('_id'),)).start()
# for group in self.api.groups_list().json().get('groups'):
# threading.Thread(target=self.process_group, args=(group.get('_id'),)).start()
#
# for im in self.api.im_list().json().get('ims'):
# threading.Thread(target=self.process_im, args=(im.get('_id'),)).start()
time.sleep(1)
with open(CONFIG, 'r') as fobj:
print(f"Reading configuration from {CONFIG}")
config = toml.load(fobj)
BOTNAME = config['Alerts']['botname']
PASSWORD = config['Alerts']['password']
CHANNEL = config['Alerts']['channel']
def get_channel_id(channel):
print(f"Getting channel ID for channel: {channel}")
tries = 0
while True:
tries += 1
try:
rocket = RocketChat(BOTNAME, PASSWORD, server_url=URL)
except requests.exceptions.ConnectionError as e:
log.error(f"Unable to connect to the RocketChat server: {e}")
except Exception as e:
log.error(f"Unknown error occured: {e}")
else:
break
interval = tries * RECONNECT_INTERVAL
print(f"Reconnecting in {interval} seconds...")
time.sleep(interval)
channels = rocket.channels_list(count=0).json()['channels']
for c in channels:
print(f" -> {c['name']} => {c['_id']}")
if c['name'] == channel:
print(f"Found channel ID for {channel} is {c['_id']}")
return c['_id']
log.error("No channel found with name {}".format(channel))
CHANNEL_ID = get_channel_id(CHANNEL)
def run():
print("Running the monitoring bot system")
bot = spawn_bot()
register_handlers(bot)
# bot.send_message("ChatBot (re)started. I am up and running!", CHANNEL_ID)
bot.run()
def spawn_bot():
print("Spawning the bot")
return RocketChatBot(BOTNAME, PASSWORD, URL)
def is_shifter(user):
print(f"Checking if {user} is a shifter")
with open(CONFIG, 'r') as fobj:
config = toml.load(fobj)
try:
alerts_config = config['Alerts']
except KeyError:
log.error("No 'Alerts' section found in the configuration file")
return False
try:
return user in alerts_config['shifters']
except KeyError:
log.error("No 'shifters' section found in 'Alerts' of the configuration file")
return False
def is_operator(user):
print(f"Checking if {user} is an operator")
with open(CONFIG, 'r') as fobj:
config = toml.load(fobj)
return user in config['Alerts']['operators']
def register_handlers(bot):
print("Registering API handlers")
def greet(msg, user, channel_id):
if channel_id != CHANNEL_ID:
print("skipping")
return
bot.send_message('hello @' + user, channel_id)
def status(msg, user, channel_id):
print(f"Reporting status to channel {channel_id}")
if channel_id != CHANNEL_ID:
print(f"Skipping channel with ID {channel_id}")
return
if not is_shifter(user) and not is_operator(user):
bot.send_message(
"Sorry @{}, only operators and shifters are allowed to mess "
"with me, sorry...".format(user), channel_id)
return
print("Asking subservisorctl for the status")
try:
status = "```\n" + subprocess.check_output(
['supervisorctl', 'status']).decode('ascii') + "\n```"
except subprocess.CalledProcessError as e:
status = "```\n{}\n```".format(e.output.decode('ascii'))
print("Sending status")
bot.send_message(status, channel_id)
def supervisorctl(msg, user, channel_id):
if channel_id != CHANNEL_ID:
print("skipping")
return
if not is_shifter(user) and not is_operator(user):
bot.send_message(
"Sorry @{}, only operators and shifters are allowed to mess "
"with me, sorry...".format(user), channel_id)
return
try:
output = subprocess.check_output(
['supervisorctl'] + msg.split(),
stderr=subprocess.STDOUT).decode('ascii')
except subprocess.CalledProcessError as e:
output = e.output.decode('ascii')
print("supervisorctl ({}), called by {}:\n{}".format(
msg, user, output))
bot.send_message(output, channel_id)
def shifters(msg, user, channel_id):
if channel_id != CHANNEL_ID:
print("skipping")
return
if not is_operator(user):
bot.send_message(
"Sorry @{}, only operators are allowed to set shifters!".
format(user), channel_id)
return
try:
with open(CONFIG, 'r') as fobj:
config = toml.load(fobj)
shifters = msg[3:].strip()
config['Alerts']['shifters'] = shifters
with open(CONFIG, 'w') as fobj:
toml.dump(config, fobj)
msg = f'Alright, the new shifters are {shifters}, welcome!'
print(msg)
bot.send_message(msg, channel_id)
except Exception as e:
bot.send_message(f'something went horribly wrong... {e}',
channel_id)
def help(msg, user, channel_id):
if channel_id != CHANNEL_ID:
print("skipping", channel_id)
return
help_str = f"""
Hi @{user} I was built to take care of the monitoring alerts.
Here is how you can use me:
- `@{BOTNAME} shifters are cnorris and bspencer`
-> set the new shifters who I may annoy with chat messages and
emails.
- `@{BOTNAME} status` -> show the status of the monitoring system
- `@{BOTNAME} supervisorctl` -> take control over the monitoring system
- `@{BOTNAME} help` -> show this message
"""
bot.send_message(help_str, channel_id)
handlers = [(['hello', 'hey', 'hi', 'ciao'], greet), (['status'], status),
(['help'], help), (['shifters'], shifters),
(['supervisorctl'], supervisorctl)]
for trigger, handler in handlers:
bot.add_dm_handler(trigger, handler)
print("All handlers are registered")
def main():
from docopt import docopt
args = docopt(__doc__)
run()
if __name__ == '__main__':
main()
......@@ -14,7 +14,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......@@ -22,6 +22,7 @@ from __future__ import division
from collections import deque, defaultdict
from functools import partial
import gc
from io import BytesIO
import os
import time
......@@ -61,7 +62,7 @@ class DOMActivityPlotter(kp.Module):
timestamp = summaryslice.header.time_stamp
for dom_id, _ in summaryslice.summary_frames.items():
du, dom, _ = self.detector.doms[dom_id]
du, dom, *_ = self.detector.doms[dom_id]
self.last_activity[(du, dom)] = timestamp
self.cuckoo.msg()
......@@ -69,7 +70,7 @@ class DOMActivityPlotter(kp.Module):
return blob
def create_plot(self):
print(self.__class__.__name__ + ": updating plot.")
self.cprint(self.__class__.__name__ + ": updating plot.")
filename = os.path.join(self.plots_path, 'dom_activity.png')
# now = kp.time.tai_timestamp()
now = time.time()
......@@ -94,7 +95,9 @@ class DOMActivityPlotter(kp.Module):
"DOM Activity for DetID-{} - via Summary Slices".format(
self.detector.det_id),
vmin=0.0,
vmax=15 * 60)
vmax=15 * 60,
cmap="cividis_r")
gc.collect()
def main():
......
......@@ -14,12 +14,13 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from __future__ import division
import gc
from io import BytesIO
import os
......@@ -63,7 +64,7 @@ class DOMRates(kp.Module):
summaryslice = blob['RawSummaryslice']
self.rates = {} # TODO: review this hack
for dom_id, rates in summaryslice.summary_frames.items():
du, dom, _ = self.detector.doms[dom_id]
du, dom, *_ = self.detector.doms[dom_id]
self.rates[(du, dom)] = np.sum(rates) / 1000
self.cuckoo.msg()
......@@ -72,7 +73,7 @@ class DOMRates(kp.Module):
def create_plot(self):
"""Creates the actual plot"""
print(self.__class__.__name__ + ": updating plot.")
self.cprint(self.__class__.__name__ + ": updating plot.")
filename = os.path.join(self.plots_path, 'dom_rates.png')
plot_dom_parameters(
......@@ -87,7 +88,8 @@ class DOMRates(kp.Module):
missing='black',
under='darkorchid',
over='deeppink')
print("done")
self.cprint("plot up to date.")
gc.collect()
def main():
......
......@@ -14,7 +14,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID.
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......
......@@ -12,7 +12,7 @@ Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: plots].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
......@@ -20,6 +20,7 @@ from __future__ import division
from datetime import datetime
from collections import deque, defaultdict
import gc
import os
import shutil
import time
......@@ -40,6 +41,7 @@ from km3pipe.hardware import Detector
from km3pipe.io import CHPump
from km3pipe.io.daq import (DAQProcessor, DAQPreamble, DAQSummaryslice,
DAQEvent)
from km3modules.common import MemoryObserver
import km3pipe.style
km3pipe.style.use('km3pipe')
......@@ -61,6 +63,9 @@ class TriggerMap(Module):
self.max_events = self.get("max_events", default=1000)
self.det = kp.hardware.Detector(det_id=det_id)
self.dus = sorted(self.det.dus)
self.n_rows = 18 * len(self.dus)
self.run = True
self.hits = deque(maxlen=self.max_events)
self.triggered_hits = deque(maxlen=self.max_events)
......@@ -71,36 +76,46 @@ class TriggerMap(Module):
self.thread = threading.Thread(target=self.plot).start()
def process(self, blob):
tag = str(blob['CHPrefix'].tag)
if not tag == 'IO_EVT':
return blob
event_hits = blob['Hits']
with lock:
run_id = blob['EventInfo'].run_id[0]
if run_id > self.current_run_id:
self.cprint(f"New run: {run_id}")
self.current_run_id = run_id
for _run_id in set(list(self.runchanges.keys()) + [run_id]):
self.runchanges[_run_id] += 1
if _run_id != self.current_run_id and \
self.runchanges[_run_id] > self.max_events:
self.print("Removing run {} from the annotation list".
format(_run_id))
self.log.info("Removing run {} from the annotation list".
format(_run_id))
del self.runchanges[_run_id]
self.n_events += 1
hits = np.zeros(self.det.n_doms)
hits = np.zeros(self.n_rows)
for dom_id in event_hits.dom_id:
du, floor, _ = self.det.doms[dom_id]
hits[(du - 1) * self.det.n_doms + floor - 1] += 1
if dom_id not in self.det.doms:
fname = "IO_EVT_{}.dat".format(round(time.time(), 3))
with open(fname, "bw") as fobj:
fobj.write(blob["CHData"])
self.log.error(
"Invalid DOM ID: %s. Raw event data dump written to %s",
dom_id, fname
)
break
du, floor, *_ = self.det.doms[dom_id]
du_idx = self.dus.index(du)
hits[du_idx * 18 + floor - 1] += 1
self.hits.append(hits)
triggered_hits = np.zeros(self.det.n_doms)
triggered_hits = np.zeros(self.n_rows)
for dom_id in event_hits.dom_id[event_hits.triggered.astype(
'bool')]:
du, floor, _ = self.det.doms[dom_id]
triggered_hits[(du - 1) * self.det.n_doms + floor - 1] += 1
if dom_id not in self.det.doms:
# we already check above
break
du, floor, *_ = self.det.doms[dom_id]
du_idx = self.dus.index(du)
triggered_hits[du_idx * 18 + floor - 1] += 1
self.triggered_hits.append(triggered_hits)
return blob
......@@ -112,13 +127,20 @@ class TriggerMap(Module):
time.sleep(50)
def create_plots(self):
self.cprint("Updating plots")
if len(self.hits) > 0:
self.create_plot(self.hits, "Hits on DOMs", 'hitmap')
self.cprint("Hits plot updated.")
else:
self.cprint("No hits recorded yet")
if len(self.triggered_hits) > 0:
self.create_plot(self.triggered_hits, "Trigger Map", 'triggermap')
self.cprint("Triggered hits plot updated.")
else:
self.cprint("No triggered hits recorded yet")
def create_plot(self, hits, title, filename):
fig, ax = plt.subplots(figsize=(16, 8))
fig, ax = plt.subplots(figsize=(16, 16))
ax.grid(True)
ax.set_axisbelow(True)
hit_matrix = np.array([np.array(x) for x in hits]).transpose()
......@@ -131,10 +153,10 @@ class TriggerMap(Module):
origin='lower',
zorder=3,
norm=LogNorm(vmin=1, vmax=np.amax(hit_matrix)))
yticks = np.arange(self.det.n_doms)
yticks = np.arange(self.n_rows)
ytick_labels = [
"DU{}-DOM{}".format(dom[0], dom[1])
for dom in self.det.doms.values()
"DU{}-DOM{}".format(du, floor) if floor in [1, 6, 12] else ""
for du in self.dus for floor in range(1, 18+1)
]
ax.set_yticks(yticks)
ax.set_yticklabels(ytick_labels)
......@@ -151,13 +173,13 @@ class TriggerMap(Module):
for run, n_events_since_runchange in self.runchanges.items():
if n_events_since_runchange >= self.max_events:
continue
self.print("Annotating run {} ({} events passed)".format(
self.log.info("Annotating run {} ({} events passed)".format(
run, n_events_since_runchange))
x_pos = min(self.n_events,
self.max_events) - n_events_since_runchange
plt.text(
x_pos,
self.det.n_doms,
self.n_rows,
"\nRUN %s " % run,
rotation=60,
verticalalignment='top',
......@@ -179,6 +201,7 @@ class TriggerMap(Module):
plt.savefig(f_tmp, dpi=120, bbox_inches="tight")
plt.close('all')
shutil.move(f_tmp, f)
gc.collect()
def finish(self):
self.run = False
......@@ -196,6 +219,7 @@ def main():
ligier_port = int(args['-p'])
pipe = kp.Pipeline()
pipe.attach(MemoryObserver, every=1000)
pipe.attach(
kp.io.ch.CHPump,
host=ligier_ip,
......@@ -204,7 +228,7 @@ def main():
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(kp.io.daq.DAQProcessor)
pipe.attach(TriggerMap, det_id=det_id, plots_path=plots_path)
pipe.attach(TriggerMap, det_id=det_id, plots_path=plots_path, only_if="Hits")
pipe.drain()
......
#!/usr/bin/env python
# coding=utf-8
# Filename: log_analyser.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
from collections import defaultdict
import gzip
import sys
import re
import numpy as np
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use("Agg")
import gc
import matplotlib.pyplot as plt
import os
import datetime
from datetime import datetime as dt
from datetime import timezone as tz
import time
# Event names and colours
EVENTS = dict(ERROR="red", WARNING="orange", Died="deeppink", Born="steelblue")
BUFFER_SIZE = 16 * 1024**2 # buffer size for the lines when parsing the log
REGEX_LOG_LINE = re.compile(".+ ([A-Za-z]+) \[([A-Za-z]+)\]: .+")
REGEX_LEGACY_LOG_LINE = re.compile("^.+ \[([A-Za-z]+)\]: .+")
def plot_log_statistics(out_file, summary, title):
"""Creates a categorical bar plot for each event and process"""
processes = sorted(summary.keys())
xs = np.arange(len(processes))
w = 0.8 / len(EVENTS)
fig, ax = plt.subplots()
for idx, (event, color) in enumerate(EVENTS.items()):
x_offset = idx * w + w / 2 - w * len(EVENTS) / 2
ax.bar(
xs + x_offset,
[summary[process][event] for process in processes],
width=w,
color=color,
label=event,
)
ax.set_xticks(xs, processes)
ax.set_ylabel("count")
ax.legend()
ax.set_ylim(1e-1, 1e6)
ax.set_yscale("log")
ax.grid(True)
ax.set_title(title)
plt.savefig(out_file)
plt.close("all")
gc.collect()
def seconds_to_UTC_midnight():
"""Returns the seconds until next midnight"""
tomorrow = dt.now(tz.utc) + datetime.timedelta(days=1)
midnight = dt(
year=tomorrow.year,
month=tomorrow.month,
day=tomorrow.day,
hour=0,
minute=0,
second=0,
tzinfo=tz.utc,
)
return (midnight - dt.now(tz.utc)).seconds
def process_log_file(log_file):
"""Generates a dictionary of event counts in a log file
The returned dictionary has the structure dict[PROCESS][EVENT] => count.
"""
summary = defaultdict(lambda: defaultdict(int))
if log_file.endswith(".gz"):
opener = gzip.open
filemode = "rt"
else:
opener = open
filemode = "r"
n_lines_parsed = 0
n_lines_unparsed = 0
with opener(log_file, filemode) as fobj:
lines_chunk = fobj.readlines(BUFFER_SIZE)
while lines_chunk:
for line in lines_chunk:
m = REGEX_LOG_LINE.match(line)
if m is not None:
tag = m[1]
process = m[2]
else:
m = REGEX_LEGACY_LOG_LINE.match(line)
if m is not None:
tag = "MSG"
process = m[1]
else:
n_lines_unparsed += 1
continue
if tag in ("Born", "Died"):
summary[process][tag] += 1
for severity in ("WARNING", "ERROR"):
if (
severity in line
or severity.lower() in line
or severity.lower().capitalize() in line
):
summary[process][severity] += 1
n_lines_parsed += 1
lines_chunk = fobj.readlines(BUFFER_SIZE)
print(f" parsed lines: {n_lines_parsed}")
print(f" unparsed lines: {n_lines_unparsed}")
for process, stats in summary.items():
print(f" {process}:")
for event, n_lines in stats.items():
print(f" {event}: {n_lines}")
return summary
def main():
log_dir = "/logs/"
regexp = "^MSG_(.+)\.log"
while True:
for fname in os.listdir(log_dir):
plot_fpath = os.path.join(log_dir, os.path.splitext(fname.replace(".gz", ""))[0] + ".png")
log_fpath = os.path.join(log_dir, fname)
if re.match(regexp, fname) and not os.path.exists(plot_fpath):
print("-> Processing ", fname)
summary = process_log_file(log_fpath)
title = os.path.basename(fname)
plot_log_statistics(plot_fpath, summary, title)
time.sleep(seconds_to_UTC_midnight() + 5 * 60)
if __name__ == "__main__":
main()
#!/usr/bin/env python
# coding=utf-8
# Filename: msg_dumper.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
"""
Dumps MSG and other data from Ligier to a file.
Usage:
msg_dumper.py [options]
msg_dumper.py (-h | --help)
Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-o LOG_DIR Directory to dump the messages [default: logs].
-x PREFIX Prefix for the log files [default: MSG].
-h --help Show this screen.
"""
import datetime
import os
import re
from shutil import copyfile
from km3pipe import Pipeline, Module
from km3pipe.io import CHPump
from km3modules.common import MemoryObserver
PROCESS_NAME_REGEX = re.compile(r'.*([A-Z])\d{3}/[A-Z]\d{3}.*')
PROCESS_NAME_MAPPING = {
"A": "AcousticDataFilter",
"F": "DataFilter",
"Q": "DataQueue",
"W": "DataWriter",
}
def current_date_str(fmt="%Y-%m-%d"):
"""Return the current datetime string"""
return datetime.datetime.utcnow().strftime(fmt)
class MSGDumper(Module):
def configure(self):
self.path = os.path.abspath(self.require('path'))
self.prefix = self.require('prefix')
self.current_date = current_date_str()
self.filename = self.prefix + ".log"
self.filepath = os.path.join(self.path, self.filename)
self.cprint("Logging to {}".format(self.filepath))
self.fobj = open(self.filepath, 'a')
self.idx = 0
def update_file_descriptor(self):
current_date = current_date_str()
if self.current_date != current_date:
archived_name = "{}_{}.log".format(self.prefix, self.current_date)
self.cprint("Cycling the log file: {} -> {}".format(
self.filename, archived_name))
copyfile(self.filepath, os.path.join(self.path, archived_name))
self.fobj.close()
self.fobj = open(self.filepath, 'w')
self.current_date = current_date
def process(self, blob):
if self.idx % 1000 == 0:
self.cprint(f"Number of messages processed so far: {self.idx}")
data = blob['CHData'].decode()
tag = str(blob["CHPrefix"].tag)
source = "Other"
match = PROCESS_NAME_REGEX.match(data)
if match is not None:
source = PROCESS_NAME_MAPPING.get(match[1], "Unknown process")
entry = "{} {} [{}]: {}\n".format(self.filename, tag, source, data)
self.update_file_descriptor()
self.fobj.write(entry)
self.fobj.flush()
self.idx += 1
return blob
def finish(self):
if self.fobj is not None:
self.fobj.close()
def main():
from docopt import docopt
args = docopt(__doc__)
ligier_ip = args['-l']
ligier_port = int(args['-p'])
path = args['-o']
prefix = args['-x']
pipe = Pipeline()
pipe.attach(MemoryObserver, every=10000)
pipe.attach(CHPump,
host=ligier_ip,
port=ligier_port,
tags='MSG,Born,Died',
timeout=7 * 60 * 60 * 24,
max_queue=5000)
pipe.attach(MSGDumper, prefix=prefix, path=path)
pipe.drain()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# coding=utf-8
# Filename: online_reco.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
"""
Visualisation routines for online reconstruction.
Usage:
online_reco.py [options]
online_reco.py (-h | --help)
Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-o PLOT_DIR The directory to save the plot [default: www/plots].
-h --help Show this screen.
"""
from collections import deque
from datetime import datetime
import time
import os
import threading
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import km3pipe as kp
import km3pipe.style
km3pipe.style.use('km3pipe')
class RecoPlotter(kp.Module):
def configure(self):
self.fontsize = 16
self.plots_path = self.require('plots_path')
self.max_events = self.get('max_events', default=5000)
self.plots = {
'reco_zenith': {
'title': 'Zenith distribution of online track reconstructions',
'xlabel': 'cos(zenith)',
'ylabel': 'normed count',
'function': 'hist',
'options': {
'bins': 180,
'histtype': "step",
'normed': True,
'lw': 3
},
'subplots': {
'gandalf': {
'data': deque(maxlen=self.max_events),
'subplot_options': {
'label': "JGandalf",
}
}
}
},
'reco_quality': {
'title': 'Quality of online track reconstructions',
'xlabel': 'Quality',
'ylabel': 'normed count',
'function': 'hist',
'options': {
'bins': 100,
'histtype': "step",
'normed': True,
'lw': 3
},
'subplots': {
'gandalf': {
'data': deque(maxlen=self.max_events),
'subplot_options': {
'label': "JGandalf",
}
}
}
},
}
self.plot_interval = 60 # [s]
threading.Thread(target=self.plot).start()
def process(self, blob):
track = blob['RecoTrack']
if track.status == 1:
zenith = np.cos(
kp.math.angle_between([0, 0, -1],
[track.dx, track.dy, track.dz]))
reco_name = track.reco
self._add_reco_parameter('reco_zenith', reco_name, zenith)
self._add_reco_parameter('reco_quality', reco_name, track.Q)
return blob
def _add_reco_parameter(self, parameter, reco_name, value):
"""Add the value to the parameter cache"""
for reco, subplot in self.plots[parameter]['subplots']:
if reco == reco_name:
subplot['data'].append(value)
def plot(self):
while True:
time.sleep(self.plot_interval)
self.create_plots()
def create_plots(self):
for name, plot in self.plots.items():
plt.clf()
fig, ax = plt.subplots(figsize=(16, 8))
for subplot in self.plots['subplots'].values():
getattr(ax, plot['function'])(subplot['data'],
**self.plots['options'],
**subplot['options'])
ax.set_title(plot['title'] +
"\n%s UTC" % datetime.utcnow().strftime("%c"))
ax.set_xlabel(plot['xlabel'], fontsize=self.fontsize)
ax.set_ylabel(plot['ylabel'], fontsize=self.fontsize)
ax.tick_params(labelsize=self.fontsize)
ax.set_yscale("log")
plt.legend(fontsize=self.fontsize, loc=2)
filename = os.path.join(self.plots_path, '%s.png' % name)
plt.savefig(filename, dpi=120, bbox_inches="tight")
plt.close('all')
def main():
from docopt import docopt
args = docopt(__doc__)
plots_path = args['-o']
ligier_ip = args['-l']
ligier_port = int(args['-p'])
pipe = kp.Pipeline()
pipe.attach(kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags='IO_OLINE',
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(kp.io.daq.DAQProcessor)
pipe.attach(RecoPlotter, plots_path=plots_path)
pipe.drain()
if __name__ == '__main__':
main()