分布式波达方向定位尝试

想把odas这个项目弄成分布式的,用一个主机同时监控多个树莓派进行波达方向定位。源码备份在doa-project,防丢用。

树莓派

新的 raspbian buster 自带了python 3.7,安装pipenv即可:

1
proxychains4 pip install pipenv

环境准备

初始化虚拟环境

为odas项目新建文件夹odas,初始化pipenv环境:

1
2
3
4
mkdir odas
cd odas
proxychains4 pipenv --python 3.7
# 或 pipenv --python /usr/bin/python3

安装声卡驱动

1
2
3
4
5
6
7
8
9
10
11
12
mkdir 3rd-lib
cd 3rd-lib

# 安装声卡驱动
git clone https://github.com/respeaker/seeed-voicecard.git
cd seeed-voicecard
sudo proxychains4 ./install.sh
sudo reboot -h now

# 测试声卡
arecord -L
aplay -L

编译odas项目

1
2
3
4
5
6
7
8
9
10
11
12
13
# 编译odas
cd 3rd-lib/
proxychains4 git clone https://github.com/introlab/odas.git
mkdir odas/build
cd odas/build
cmake ..
make

# 做软链接方便使用
ln -s ~/odas/3rd-lib/odas/bin/odaslive ~/.local/bin/odaslive

# 测试odas
odaslive -c ~/respeaker-6mic-odas.cfg

安装pyzmq

新的buster自带了libzmq 4.3.1,安装pyzmq即可:

1
proxychains4 pip install pyzmq

测试:

1
2
3
>>> import zmq
>>> zmq.zmq_version()
'4.3.1'

配置防火墙

检查端口是否开启可以使用nc -zvw3 192.168.1.108 5557z用于扫描端口的零I/O模式,v显示详情,w3三秒后超时)。

为zmq打开防火墙端口:

1
2
sudo ufw allow proto tcp to 0.0.0.0/0 port 5556 comment "zmq publish port of odas for sound source tracking"
sudo ufw allow proto tcp to 0.0.0.0/0 port 5557 comment "zmq publish port of odas for sound source localization"

注:我一开始为了用上Python 3.7,早早更新了buster的测试版,现在buster正式发布了,更新时报错:

1
2
E: Repository 'http://mirrors.aliyun.com/raspbian/raspbian buster InRelease' changed its 'Suite' value from 'testing' to 'stable'
N: This must be accepted explicitly before updates for this repository can be applied. See apt-secure(8) manpage for details.

按照Explicitly accept change for PPA ‘Label’方法中说的,把apt-get改成apt即可。

使用pyzmq发布定位数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from multiprocessing import Process
import os
import socket
import zmq


def sst_write(rpi_host='0.0.0.0', rpi_port=9000, zmq_host='*', zmq_port=5556):
"""
A process, listened on socket://rpi_host:port,
receive odas Sound Source Tracking(SST) json format data in bytes.
Reassemble json byte frames and publish using zmq on tcp://zmq_host:zmq_port.
:return: None
"""
write_pid = os.getpid()
print('#Write Process# %s start' % write_pid)
bind_ip, bind_port = rpi_host, rpi_port
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((bind_ip, bind_port))
server.listen(5) # max backlog of connections
print('#Write Process# listening on {}:{}'.format(bind_ip, bind_port))
client_socket, address = server.accept()
print('#Write Process# accepted connection from {}:{}'.format(address[0], address[1]))

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind(f"tcp://{zmq_host}:{zmq_port}")

temp = b''
while True:
t = 0
temp = client_socket.recv(1024)
for i in range(5):
t = temp.find(b'}', t) + 1
zmq_socket.send(temp[:t])
temp = temp[t:]


def ssl_write(rpi_host='0.0.0.0', rpi_port=9001, zmq_host='*', zmq_port=5557):
"""
A process, listened on socket://rpi_host:port,
receive odas Sound Source Localization(SSL) json format data in bytes.
Reassemble json byte frames and publish using zmq on tcp://zmq_host:zmq_port.
:return: None
"""
write_pid = os.getpid()
print('#Write Process# %s start' % write_pid)
bind_ip, bind_port = rpi_host, rpi_port
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((bind_ip, bind_port))
server.listen(5) # max backlog of connections
print('#Write Process# listening on {}:{}'.format(bind_ip, bind_port))
client_socket, address = server.accept()
print('#Write Process# accepted connection from {}:{}'.format(address[0], address[1]))

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind(f"tcp://{zmq_host}:{zmq_port}")

temp = b''
while True:
t = 0
temp = client_socket.recv(1024)
for i in range(5):
t = temp.find(b'}', t) + 1
zmq_socket.send(temp[:t])
temp = temp[t:]


if __name__ == '__main__':
try:
p_ssl_w = Process(target=ssl_write)
p_sst_w = Process(target=sst_write)
# p_ssl_r = Process(target=ssl_read, kwargs={"verbose": True})
# 启动子进程pw,写入:
p_ssl_w.start()
p_sst_w.start()
# 等待pw结束:
p_ssl_w.join()
p_sst_w.join()
except KeyboardInterrupt:
p_ssl_w.terminate()
p_sst_w.terminate()
print('KeyboardInterrupt pw pr Terminated')

远端主机

使用pyzmq订阅定位数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from multiprocessing import Process
import os
import json
import asyncio
from aiohttp import web, WSMsgType
import zmq


def sst_read(zmq_host='192.168.1.108', zmq_port=5556, aio_host='0.0.0.0', aio_port=8080, verbose=False):
"""
A process, used for subscribing zmq socket on tcp://zmq_host:zmq_port,
receiving Sound Source Tracking(SST) data frames.
In the meantime the process runs an aiohttp server on ws://aio_host:aio_port,
sending SST data frames to user browser via websocket connection.
User browser websocket-client should send a 'received' message after every data frame received,
to tell the server 'last message received, please send next.'
:output: Sound Source Tracking output like this:
{
"timeStamp": 45602,
"src": [
{ "id": 43, "tag": "dynamic", "x": 0.025, "y": 0.128, "z": 0.991, "activity": 1.000 },
{ "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 },
{ "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 },
{ "id": 0, "tag": "", "x": 0.000, "y": 0.000, "z": 0.000, "activity": 0.000 }
]
}
:return: None
"""
read_pid = os.getpid()
print('#Read Process# %s start' % read_pid)
routes = web.RouteTableDef()

context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)

print("Collecting updates from odas publisher...")
zmq_socket.connect(f"tcp://{zmq_host}:{zmq_port}")
zmq_socket.setsockopt_string(zmq.SUBSCRIBE, '{')

@routes.get('/ws')
async def websocket_handler(request):
print('websocket connection open')

ws = web.WebSocketResponse()
await ws.prepare(request)
c = 0
await ws.send_str(zmq_socket.recv_string())
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
print('websocket connection close')
await ws.close()
else:
c += 1
tmp = zmq_socket.recv_string()
await ws.send_str(tmp)
if verbose:
c += 1
print(f'\r{c}', end='', flush=True)
# TODO: debug jump, delete later
# for i in range(100):
# zmq_socket.recv_string()
elif msg.type == WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
return ws

app = web.Application()
app.add_routes(routes)
web.run_app(app, host=aio_host, port=aio_port)


def ssl_read(zmq_host='192.168.1.108', zmq_port=5557, aio_host='0.0.0.0', aio_port=8081, verbose=False):
"""
A process, used for subscribing zmq socket on tcp://zmq_host:zmq_port,
receiving Sound Source Localization(SSL) data frames.
In the meantime the process runs an aiohttp server on ws://aio_host:aio_port/ws,
sending SSL data frames to user browser via websocket connection.
User browser websocket-client should send a 'received' message after every data frame received,
to tell the server 'last message received, please send next.'
:output: Sound Source Localization output like this:
{
"timeStamp": 45608,
"src": [
{ "x": 0.132, "y": 0.181, "z": 0.975, "E": 0.557 },
{ "x": 0.198, "y": 0.342, "z": 0.918, "E": 0.130 },
{ "x": 0.000, "y": 0.273, "z": 0.962, "E": 0.018 },
{ "x": 0.000, "y": 0.339, "z": 0.941, "E": 0.006 }
]
}
:return: None
"""
read_pid = os.getpid()
print('#Read Process# %s start' % read_pid)
routes = web.RouteTableDef()

context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)

print("Collecting updates from odas publisher...")
zmq_socket.connect(f"tcp://{zmq_host}:{zmq_port}")
zmq_socket.setsockopt_string(zmq.SUBSCRIBE, '{')

@routes.get('/ws')
async def websocket_handler(request):
print('websocket connection open')

ws = web.WebSocketResponse()
await ws.prepare(request)
c = 0
await ws.send_str(zmq_socket.recv_string())
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
print('websocket connection close')
await ws.close()
else:
tmp = zmq_socket.recv_string()
await ws.send_str(tmp)
if verbose:
c += 1
print(f'\r{c}', end='', flush=True)
# TODO: debug jump, delete later
# for i in range(100):
# zmq_socket.recv_string()
elif msg.type == WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
return ws

app = web.Application()
app.add_routes(routes)
web.run_app(app, host=aio_host, port=aio_port)


if __name__ == '__main__':
try:
p_ssl_r = Process(target=ssl_read, kwargs={"verbose": True})
# p_ssl_r = Process(target=ssl_read)
p_sst_r = Process(target=sst_read)
# 启动子进程pr,读取:
p_ssl_r.start()
p_sst_r.start()
except KeyboardInterrupt:
p_ssl_r.terminate()
p_sst_r.terminate()
print('KeyboardInterrupt pw pr Terminated')

注意: 这里我使用了aiohttp将数据发布为websocket,在浏览器上进行可视化。

使用three.js可视化数据

封装ant-design组件

为了以后用着方便,我特意将可视化代码封装为ant-design的组件:

1
2
3
4
# 新建文件夹
mkdir odas-front
# 安装antd测试
npm install antd --save

注意: 在电脑上用nvm控制node版本,最新的已经是版本12.x了。但是port安装的yarn使用的是node --version 10.16.0,因此还是要配合安装nvm install 10.16.0,使默认node变成10.16.0(12版本会报错)。

1
2
3
安装`create-umi`
npm create umi
选择`app`,并同意使用`typescript`,再选择`dva`和`antd`

代码基本照抄odas的Element版本。

分享到