Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import numpy as np
import scipy.signal as ssg
from functools import partial
from tqdm import tqdm_notebook as tqdm
"""
Calculates distance(s) between a point 'p' and a(n array of) point(s) 'q'
as the norm of the difference vector(s) 'pq'
m = number of values
n = number of coordinates
p ~ (1, n) - q ~ (m, n) => pq ~ (m, n)
"""
def dist(p, q):
pq = p - q
axis = pq.ndim - 1 # so clever! ;)
return np.linalg.norm(pq, axis=axis)
# envelope for the chirp
def window(t, t1):
return np.exp(-(2*t/t1)**100)
# windowed chirp
def wchirp(t, f0, t1, f1):
return window(t, t1) * ssg.chirp(t, f0, t1, f1)
# cross correlation function
def xcorr(u1, u2):
return np.convolve(u1, u2[::-1], 'same')
# returns a function-object with fixed chirp parameters
def chirp_template(f0, t1, f1):
return partial(wchirp, f0=f0, t1=t1, f1=f1)
class BeamStreamer():
O = (0,0,0) # origin of coordinates
c0 = 1500 # speed of sound [m/s]
def __init__(self, f_s, footprint, beacon, noise_level = 0.0, t_margin = 0.0):
self.f_s = f_s
self.beacon = beacon
self.footprint = footprint
self.calc_delays(footprint)
self.init_timebase(t_margin)
self.reset_noise(noise_level)
def calc_delays(self, footprint):
self.distances = dist(footprint, self.beacon) - dist(self.O, self.beacon)
def init_timebase(self, t_margin):
self.T = 1.5 * (np.max(self.delays) + t_margin)
self.N = int(np.ceil(self.T * self.f_s))
self.T = self.N / self.f_s
self.n_s = 1 + 4 * self.N
self.timebase = np.linspace(-2 * self.T, 2 * self.T, self.n_s)
self.sect = slice(self.N, 1 + 3 * self.N)
def reset_noise(self, noise_level=0.0):
shape = (len(self.delays), self.n_s)
self.streams = np.random.normal(0.0, noise_level, shape)
def sim_signals(self, template):
self.reference = template(self.timebase)
for stream, delay in zip(tqdm(self.streams), self.delays):
stream[self.sect] += template(self.timebase[self.sect] - delay)
def beamform(self, nominal_footprint, probe):
W = np.zeros_like(self.timebase[self.sect])
shifts = self.f_s * ((dist(probe, nominal_footprint) - dist(probe, 0)) / self.c0)
shifts = shifts.astype('int')
for stream, shift in zip(self.streams, shifts):
shf = slice(self.sect.start + shift, self.sect.stop + shift)
W += stream[shf]
X = xcorr(W, self.reference[self.sect])
return W, X
@property
def delays(self):
return self.distances / self.c0
@property
def t(self):
return self.timebase[self.sect]
@property
def s(self):
return self.streams[:, self.sect]
''' meshgrid helper '''
def gen_probegrid(center, size, steps):
axis = np.linspace(-size, +size, steps)
P = np.meshgrid(axis, axis)
P[0] += center[0]
P[1] += center[1]
return P
'''
Generate a meshgrid:
- around a position (beacon)
- of given angular size (size_deg)
- of given number of stesp (steps)
'''
def make_grid(beacon, size_deg, steps):
r = dist(beacon, (0,0,0))
size_rad = np.deg2rad(size_deg)
size_m = r * size_rad
return gen_probegrid(beacon, size_m, steps)
class BeamScanner:
def __init__(self, beamformer, nominal_footprint):
self.beamformer = beamformer
self.footprint = nominal_footprint
self.X = []
self.W = []
def scan(self, probe_grid, z_grid):
PX, PY = probe_grid
self.Z = np.zeros_like(PX)
for i in tqdm(range(len(PX)), desc='X'):
for j in tqdm(range(len(PY)), desc='Y', leave=False):
probe = np.array([PX[i,j], PY[i,j], z_grid])
W, X = self.beamformer.beamform(self.footprint, probe)
self.W.append(W)
self.X.append(X)
Q = np.max(X)
self.Z[i, j] = Q