作为一名新晋奶爸和程序员,我在新身份中最常思考的问题就是 “照料婴儿的工作真的无法自动化吗?”
当然,这也许能够实现,就算有给孩子换尿布的机器人(假设有足够多的父母同意在自己蹒跚学步的孩子身上测试这样的设备),愿意自动化照料婴儿的父母还真为数不多。
作为父亲,我首先意识到的事情是:婴儿很多时候都会在哭,即使我在家,也不可能总是能听到孩子的哭声。
通常,商用婴儿监视器可以填补这一空白,它们充当对讲机,让你在另一个房间也能听到婴儿的哭声。
但我很快意识到:商用婴儿监视器没有我想象中的理想设备智能:
它们只能充当一个传声筒:把声音从源头带到扬声器,却无法发现孩子哭声的含义;
当家长要去到另一个房间里时,相应要把扬声器带到另一个房间,无法在任何其他现有的音频设备上播放声音;
扬声器通常是低功率扬声器,无法连接到外部扬声器 - 这意味着,如果我在另一个房间播放音乐,我可能会听不到孩子的哭声,即便监控器和我在同一个房间也无法听到;
大多数扬声器都是在低功率无线电波上工作的,这意味着如果婴儿在他 / 她的房间里,而你必须走到楼下,它们才能工作。
因此,我萌生了自制一个更好用的 “智能婴儿监视器”的想法。
说干就干,我先给这个 “智能婴儿监视器”定义了一些需要的功能。
它可以运行于价廉物美的树莓派(RaspberryPI)与 USB 麦克风。
当孩子开始 / 停止哭泣时,它应该检测到孩子的哭声,并通知我(理想情况下是在我的手机上),或者跟踪我仪表板上的数据点,或者运行相应的任务。它不应该是一个单纯的对讲器,简单地将声音从一个源传递到另一个兼容的设备。
它能够在扬声器,智能手机,电脑等设备上传输音频。
它不受源和扬声器之间距离的影响,无需在整个房子里将扬声器移来移去。
它还应该有一个摄像头,可以利用摄像头对孩子实时监控,当他一开始哭,我便可以抓拍到图片或婴儿床的短视频,以检查有什么不对劲。
来看看一个新晋奶爸如何使用工程师的大脑和开源工具来完成这项任务吧。
采集音频样本首先,购买一块树莓派(RaspberryPi),在 SD 卡上烧录好 Linux 操作系统(建议使用 RaspberryPI3 或更高版本),运行 Tensorflow 模型。还可以购买一个与树莓派兼容的 USB 麦克风。
然后安装需要的相关项:
[sudo]apt-getinstallffmpeglamelibatlas-base-devalsa-utils
[sudo]pip3installtensorflow
第一步,必须记录足够的音频样本,婴儿在什么时候哭,在什么时候不哭。稍后将利用这些样本来训练音频检测模型。
注意:在这个例子中,我将展示如何利用声音检测来识别婴儿的哭声,同样的精准程序可以用来检测任何其它类型的声音 - 只要它们足够长 (例如:警报或邻居家的钻孔声)。
首先,查看音频输入设备:
arecord-l
在树莓派(RaspberryPI)上,得到以下输出 (注意,有两个 USB 麦克风):
****ListofCAPTUREHardwareDevices****
card1:Device[USBPnPSoundDevice],device0:USBAudio[USBAudio]
Subdevices:0/1
Subdevice#0:subdevice#0
card2:Device_1[USBPnPSoundDevice],device0:USBAudio[USBAudio]
Subdevices:0/1
Subdevice#0:subdevice#0
我利用第二个麦克风来记录声音 - 即卡 2,设备 0。识别它的 ALSA 方法要么是 hw:2,0(直接访问硬件设备),要么是 plughw:2,0(如果需要的话,它会输入采样率和格式转换插件)。确保 SD 卡上有足够的空间,然后开始录制一些音频:
arecord-Dplughw:2,0-c1-fcd|lame-audio.mp3
和孩子在同一个房间里,记录几分钟或几个小时的音频 - 最好是长时间的沉默、婴儿哭声和其他与之无关的声音 -,录音完成后按 Ctrl-C。尽可能多的重复这个过程多次,在一天中的不同时刻或不同的日子里获取不同的音频样本。
标注音频示例一旦有了足够的音频样本,就可以把它们复制到电脑上来训练模型了 - 可以使用 SCP 复制文件,也可以直接从 SD 卡上复制。
把它们都存储在相同目录下,例如:~/datasets/sound-detect/audio。另外,为每个示例音频文件创建一个新文件夹,它包含一个音频文件 (名为 audio.mp3)和一个标注文件 (名为 labels.json),利用它来标记音频文件中的负 / 正音频段,原始数据集的结构如下:
~/datasets/sound-detect/audio
-> sample_1
-> audio.mp3
-> labels.json
-> sample_2
-> audio.mp3
-> labels.json
...
下面:标注录制的音频文件 - 如果它包含了孩子几个小时的哭声,可能会特别受虐。在你最喜欢的音频播放器或 Audacity 中打开每个数据集音频文件,并在每个示例目录中创建一个新的 label.json 文件。确定哭泣开始的确切时间和结束时间,并在 labels.json 中标注为 time_string -> label 的关键值结构。例:
{
"00:00":"negative",
"02:13":"positive",
"04:57":"negative",
"15:41":"positive",
"18:24":"negative"
}
在上面的例子中,00:00 到 02:12 之间的所有音频段将被标记为负,02:13 到 04:56 之间的所有音频段将被标记为正,以此类推。
生成数据集对所有的音频示例标注完成之后,接下来是生成数据集,最后将它输入到 Tensorflow 模型中去。首先,创建了一个名为 micmon 的通用库和一组用于声音监视的实用工具。然后,开始安装:
gitclonegit@github.com:/BlackLight/micmon.git
cdmicmon
[sudo]pip3install-rrequirements.txt
[sudo]python3setup.pybuildinstall
本模型设计基于音频的频率样本而非原始音频,因为,在这里我们想检测到一个特定的声音,这个声音有着特定的 “频谱”标签,即:基频(或基频下降的窄带范围)和一组特定的谐波。这些谐波频率与基波之间的比率既不受振幅的影响(频率比恒定,与输入幅度无关),也不受相位的影响 (无论何时开始记录,连续的声音都会有相同的频谱特征)。
这种与振幅和相位无关的特性使得这种方法更有可能训练出一个鲁棒的声音检测模型,而不是简单地将原始音频样本馈送到模型中。此外,该模型可以更简单(可以在不影响性能的情况下将多个频率分为一组,从而可以有效地实现降维),无论样本持续时间多长,该模型将 50~ 100 个频带作为输入值,一秒钟的原始音频通常包含 44100 个数据点,并且输入的长度随着样本的持续时间而增加,并且不太容易发生过拟合。
micmon 能计算音频样本某些段的 FFT(快速傅里叶变换),将结果频谱分为低通和高通滤波器的频带,并将结果保存到一组 numpy 压缩 (.npz)文件中。可以通过在命令行上执行 micmon-datagen 命令来实现:
micmon-datagen\
--low250--high2500--bins100\
--sample-duration2--channels1\
~/datasets/sound-detect/audio
~/datasets/sound-detect/data
在上面的示例中,我们从存储在~/dataset/sound-detect/audio 下的原始音频样本生成一个数据集,并将生成的频谱数据存储到~/datasets/sound-detect/data. –low 和~/datasets/sound-detect/data. --high 中,low 和 high 分别表示最低和最高频率,最低频率的默认值为 20Hz(人耳可闻的最低频率),最高频率的默认值为 20kHz(健康的年轻人耳可闻的最高频率)。
通过对此范围做出限定,尽可能多地捕获希望检测到的其他类型的音频背景和无关谐波的声音。在本案例中,250-2500 赫兹的范围足以检测婴儿的哭声。
婴儿的哭声通常是高频的(歌剧女高音能达到的最高音符在 1000 赫兹左右),在这里设置了至少双倍的最高频率,以确保能获得足够高的谐波 (谐波是更高的频率),但也不要将最高频率设得太高,以防止其他背景声音的谐波。我剪切掉了频率低于 250 赫兹的音频信号 - 婴儿的哭声不太可能发生在低频段,例如,可以打开一些 positive 音频样本,利用均衡器 / 频谱分析仪,检查哪些频率在 positive 样本中占主导地位,并将数据集集中在这些频率上。--bins 指定了频率空间的组数(默认值:100),更大的数值意味着更高的频率分辨率 / 粒度,但如果太高,可能会使模型容易发生过度拟合。
脚本将原始音频分割成较小的段,并计算每个段的频谱标签。示例持续时间指定每个音频段有多长时间(默认:2 秒)。对于持续时间较长的声音,取更大的值会起到更好的作用,但它同时会减少检测的时间,而且可能会在短音上失效。对于持续时间较短的声音,可以取较低的值,但捕获的片段可能没有足够的信息量来可靠地识别声音。
除了 micmon-datagen 脚本之外,也可以利用 micmonAPI,编写脚本来生成数据集。例:
importos
frommicmon.audioimportAudioDirectory,AudioPlayer,AudioFile
frommicmon.datasetimportDatasetWriter
basedir=os.path.expanduser('~/datasets/sound-detect')
audio_dir=os.path.join(basedir,'audio')
datasets_dir=os.path.join(basedir,'data')
cutoff_frequencies=[250,2500]
#Scanthebaseaudio_dirforlabelledaudiosamples
audio_dirs=AudioDirectory.scan(audio_dir)
#Savethespectruminformationandlabelsofthesamplestoa
#differentcompressedfileforeachaudiofile.
foraudio_dirinaudio_dirs:
dataset_file=os.path.join(datasets_dir,os.path.basename(audio_dir.path)+'.npz')
print(f'Processingaudiosample{audio_dir.path}')
withAudioFile(audio_dir)asreader,\
DatasetWriter(dataset_file,
low_freq=cutoff_frequencies[0],
high_freq=cutoff_frequencies[1])aswriter:
forsampleinreader:
writer+=sample
无论是使用 micmon-datagen 还是使用 micmon Python API 生成数据集,在过程结束时,应该在~/datasets/sound-detect/data 目录下找到一堆 . npz 文件,每个标注后的音频原始文件对应一个数据集。之后,便可以利用这个数据集来训练神经网络进行声音检测。
训练模型micmon 利用 Tensorflow+Keras 来定义和训练模型,有了 PythonAPI,可以很容易地实现。例如:
importos
fromtensorflow.kerasimportlayers
frommicmon.datasetimportDataset
frommicmon.modelimportModel
#Thisisadirectorythatcontainsthesaved.npzdatasetfiles
datasets_dir=os.path.expanduser('~/datasets/sound-detect/data')
#Thisistheoutputdirectorywherethemodelwillbesaved
model_dir=os.path.expanduser('~/models/sound-detect')
#Thisisthenumberoftrainingepochsforeachdatasetsample
epochs=2
#Loadthedatasetsfromthecompressedfiles.
#70%ofthedatapointswillbeincludedinthetrainingset,
#30%ofthedatapointswillbeincludedintheevaluationset
#andusedtoevaluatetheperformanceofthemodel.datasets=
Dataset.scan(datasets_dir,validation_split=0.3)
labels=['negative','positive']
freq_bins=len(datasets[0].samples[0])
#Createanetworkwith4layers(oneinputlayer,twointermediatelayersandoneoutputlayer).
#Thefirstintermediatelayerinthisexamplewillhavetwicethenumberofunitsasthenumber
#ofinputunits,whilethesecondintermediatelayerwillhave75%ofthenumberof
#inputunits.Wealsospecifythenamesforthelabelsandthelowandhighfrequencyrange
#usedwhensampling.
model=Model(
[
layers.Input(shape=(freq_bins,)),
layers.Dense(int(2*freq_bins),activation='relu'),
layers.Dense(int(0.75*freq_bins),activation='relu'),
layers.Dense(len(labels),activation='softmax'),
],
labels=labels,
low_freq=datasets[0].low_freq,
high_freq=datasets[0].high_freq)
#Trainthemodel
forepochinrange(epochs):
fori,datasetinenumerate(datasets):
print(f'[epoch{epoch+1}/{epochs}][audiosample{i+1}/{len(datasets)}]')
model.fit(dataset)
evaluation=model.evaluate(dataset)
print(f'Validationsetlossandaccuracy:{evaluation}')
#Savethemodel
model.save(model_dir,overwrite=True)
运行此脚本后(在对模型的准确性感到满意后),可以在~/models/sound-detect 目录下找保存的新模型。在我的这个例子中,我采集~ 5 小时的声音就足够用了,通过定义一个较优的频率范围来训练模型,准确率大于 98%。如果是在计算机上训练模型,只需将其复制到 RaspberryPI,便可以准备进入下一步了。
利用模型进行预测这时候,制作一个脚本:利用以前训练过的模型,当孩子开始哭的时候,通知我们:
importos
frommicmon.audioimportAudioDevice
frommicmon.modelimportModel
model_dir=os.path.expanduser('~/models/sound-detect')
model=Model.load(model_dir)
audio_system='alsa'#Supported:alsaandpulse
audio_device='plughw:2,0'#Getlistofrecognizedinputdeviceswitharecord-l
withAudioDevice(audio_system,device=audio_device)assource:
forsampleinsource:
source.pause()#Pauserecordingwhileweprocesstheframe
prediction=model.predict(sample)
print(prediction)
source.resume()#Resumerecording
在 RaspberryPI 上运行脚本,并让它运行一段时间 - 如果在过去 2 秒内没有检测到哭声,它将在标准输出中打印 negative,如果在过去 2 秒内检测到哭声否,则在标准输出中打印 positive。
然而,如果孩子哭了,简单地将消息打印到标准输出中并没有太大作用 - 我们希望得到明确实时通知!
可以利用 Platypush 来实现这个功能。在本例中,我们将使用 pushbullet 集成在检测到 cry 时向我们的手机发送消息。接下来安装 Redis(Platypush 用于接收消息)和 Platypush,利用 HTTP 和 Pushbullet 来集成:
[sudo]apt-getinstallredis-server
[sudo]systemctlstartredis-server.service
[sudo]systemctlenableredis-server.service
[sudo]pip3install'platypush[http,pushbullet]'
将 Pushbullet 应用程序安装在智能手机上,到 pushbullet.com 上以获取 API token。然后创建一个~/.config/platypush/config.yaml 文件,该文件启用 HTTP 和 Pushbullet 集成:
backend.http:
enabled:True
pushbullet:
token:YOUR_TOKEN
接下来,对前面的脚本进行修改,不让它将消息打印到标准输出,而是触发一个可以被 Platypush hook 捕获的自定义事件 CustomEvent:
#!/usr/bin/python3
importargparse
importloggingimportos
importsys
fromplatypushimportRedisBus
fromplatypush.message.event.customimportCustomEvent
frommicmon.audioimportAudioDevice
frommicmon.modelimportModel
logger=logging.getLogger('micmon')
defget_args():
parser=argparse.ArgumentParser()
parser.add_argument('model_path',help='Pathtothefile/directorycontainingthesavedTensorflowmodel')
parser.add_argument('-i',help='Inputsounddevice(e.g.hw:0,1ordefault)',required=True,dest='sound_device')
parser.add_argument('-e',help='Nameoftheeventthatshouldberaisedwhenapositiveeventoccurs',required=True,dest='event_type')
parser.add_argument('-s','--sound-server',help='Soundservertobeused(available:alsa,pulse)',required=False,default='alsa',dest='sound_server')
parser.add_argument('-P','--positive-label',help='Modeloutputlabelname/indextoindicateapositivesample(default:positive)',required=False,default='positive',dest='positive_label')
parser.add_argument('-N','--negative-label',help='Modeloutputlabelname/indextoindicateanegativesample(default:negative)',required=False,default='negative',dest='negative_label')
parser.add_argument('-l','--sample-duration',help='LengthoftheFFTaudiosamples(default:2seconds)',required=False,type=float,default=2.,dest='sample_duration')
parser.add_argument('-r','--sample-rate',help='Samplerate(default:44100Hz)',required=False,type=int,default=44100,dest='sample_rate')
parser.add_argument('-c','--channels',help='Numberofaudiorecordingchannels(default:1)',required=False,type=int,default=1,dest='channels')
parser.add_argument('-f','--ffmpeg-bin',help='FFmpegexecutablepath(default:ffmpeg)',required=False,default='ffmpeg',dest='ffmpeg_bin')
parser.add_argument('-v','--verbose',help='Verbose/debugmode',required=False,action='store_true',dest='debug')
parser.add_argument('-w','--window-duration',help='Durationofthelook-backwindow(default:10seconds)',required=False,type=float,default=10.,dest='window_length')
parser.add_argument('-n','--positive-samples',help='Numberofpositivesamplesdetectedoverthewindowdurationtotriggertheevent(default:1)',required=False,type=int,default=1,dest='positive_samples')
opts,args=parser.parse_known_args(sys.argv[1:])
returnopts
defmain():
args=get_args()
ifargs.debug:
logger.setLevel(logging.DEBUG)
model_dir=os.path.abspath(os.path.expanduser(args.model_path))
model=Model.load(model_dir)
window=[]
cur_prediction=args.negative_label
bus=RedisBus()
withAudioDevice(system=args.sound_server,
device=args.sound_device,
sample_duration=args.sample_duration,
sample_rate=args.sample_rate,
channels=args.channels,
ffmpeg_bin=args.ffmpeg_bin,
debug=args.debug)assource:
forsampleinsource:
source.pause()#Pauserecordingwhileweprocesstheframe
prediction=model.predict(sample)
logger.debug(f'Sampleprediction:{prediction}')
has_change=False
iflen(window) window+=[prediction]
else:
window=window[1:]+[prediction]
positive_samples=len([predforpredinwindowifpred==args.positive_label])
ifargs.positive_samples>=positive_samplesand\
prediction==args.positive_labeland\
cur_prediction!=args.positive_label:
cur_prediction=args.positive_label
has_change=True
logging.info(f'Positivesamplethresholddetected({positive_samples}/{len(window)})')elifargs.positive_samples>positive_samplesand\prediction==args.negative_labeland\cur_prediction!=args.negative_label:cur_prediction=args.negative_labelhas_change=Truelogging.info(f'Negativesamplethresholddetected({len(window)-positive_samples}/{len(window)})')
ifhas_change:
evt=CustomEvent(subtype=args.event_type,state=prediction)
bus.post(evt)
source.resume()#Resumerecording
if__name__=='__main__':
main()
将上面的脚本保存为~/bin/micmon_detect.py。如果在滑动窗口时间内上检测到 positive_samples 样本(为了减少预测错误或临时故障引起的噪声),则脚本触发事件,并且它只会在当前预测从 negative 到 positive 的情况下触发事件。然后,它被分派给 Platypush。对于其它不同的声音模型(不一定是哭泣婴儿),该脚本也是通用的,对应其它正 / 负标签、其它频率范围和其它类型的输出事件,这个脚本也能工作。
创建一个 Platypush hook 来对事件作出响应,并向设备发送通知。首先,创建 Platypush 脚本目录:
mkdir-p~/.config/platypush/scripts
cd~/.config/platypush/scripts
#Definethedirectoryasamoduletouch__init__.py
#Createascriptforthebaby-cryeventsvibabymonitor.py
babymonitor.py 的内容为:
fromplatypush.contextimportget_plugin
fromplatypush.event.hookimporthook
fromplatypush.message.event.customimportCustomEvent
@hook(CustomEvent,subtype='baby-cry',state='positive')
defon_baby_cry_start(event,**_):
pb=get_plugin('pushbullet')
pb.send_note(title='Babycrystatus',body='Thebabyiscrying!')
@hook(CustomEvent,subtype='baby-cry',state='negative')
defon_baby_cry_stop(event,**_):
pb=get_plugin('pushbullet')
pb.send_note(title='Babycrystatus',body='Thebabystoppedcrying-goodjob!')
为 Platypush 创建一个服务文件,并启动 / 启用服务,这样它就会在终端上启动:
mkdir-p~/.config/systemd/user
wget-O~/.config/systemd/user/platypush.service\
https://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service
systemctl--userstartplatypush.service
systemctl--userenableplatypush.service
为婴儿监视器创建一个服务文件 - 如:
~/.config/systemd/user/babymonitor.service:
[Unit]
Description=Monitortodetectmybaby'scries
After=network.targetsound.target
[Service]
ExecStart=/home/pi/bin/micmon_detect.py-iplughw:2,0-ebaby-cry-w10-n2~/models/sound-detect
Restart=always
RestartSec=10
[Install]
WantedBy=default.target
该服务将启动 ALSA 设备 plughw:2,0 上的麦克风监视器,如果在过去 10 秒内检测到至少 2 个 positive 2 秒样本,并且先前的状态为 negative,则会触发 state=positive 事件;如果在过去 10 秒内检测到少于 2 个 positive 样本,并且先前的状态为 positive,则 state=negative。然后可以启动 / 启用服务:
systemctl--userstartbabymonitor.service
systemctl--userenablebabymonitor.service
确认一旦婴儿开始哭泣,就会在手机上收到通知。如果没有收到通知,可以检查一下音频示例的标签、神经网络的架构和参数,或样本长度 / 窗口 / 频带等参数是否正确。
此外,这是一个相对基本的自动化例子 - 可以为它添加更多的自动化任务。例如,可以向另一个 Platypush 设备发送请求 (例如:在卧室或客厅),用 TTS 插件大声提示婴儿在哭。还可以扩展 micmon_detect.py 脚本,以便捕获的音频样本也可以通过 HTTP 流 - 例如使用 Flask 包装器和 ffmpeg 进行音频转换。另一个有趣的用例是,当婴儿开始 / 停止哭泣时,将数据点发送到本地数据库 (可以参考我先前关于 “如何使用 Platypush+PostgreSQL+Mosquitto+Grafana 创建灵活和自我管理的仪表板”的文章 https://towardsdatascience.com/how-to-build-your-home-infrastructure-for-data-collection-and-visualization-and-be-the-real-owner-af9b33723b0c):这是一组相当有用的数据,可以用来跟踪婴儿睡觉、醒着或需要喂食时的情况。虽然监测宝宝一直是我开发 micmon 的初衷,但是同样的程序也可以用来训练和检测其它类型声音的模型。最后,可以考虑使用一组良好的电源或锂电池组,这样监视器便可以便携化了。
安装宝贝摄像头有了一个好的音频馈送和检测方法之后,还可以添加一个视频馈送,以保持对孩子的监控。一开始,我 在 RaspberryPI3 上安装了一个 PiCamera 用于音频检测,后来,我发现这个配置相当不切实际。想想看:一个 RaspberryPi 3、一个附加的电池包和一个摄像头,组合在一起会相当笨拙;如果你找到一个轻型相机,可以很容易地安装在支架或灵活的手臂上,而且可以四处移动,这样,无论他 / 她在哪里,都可以密切关注孩子。最终,我选择了体积较小的 RaspberryPi Zero,它与 PiCamera 兼容,再配一个小电池。
婴儿监视器摄像头模块的第一个原型
同样,先插入一个烧录了与 RaspberryPI 兼容的操作系统的 SD 卡。然后在其插槽中插入一个与 RaspberryPI 兼容的摄像头,确保摄像头模块在 raspi-config 中启用,安装集成有 PiCamera 的 Platypush:
[sudo]pip3install'platypush[http,camera,picamera]'
然后在~/.config/platypush/config.yaml: 中添加相机配置:
camera.pi:
listen_port:5001
在 Platypush 重新启动时检查此配置,并通过 HTTP 从摄像头获取快照:
wgethttp://raspberry-pi:8008/camera/pi/photo.jpg
或在浏览器中打开视频:
http://raspberry-pi:8008/camera/pi/video.mjpg
同样,当应用程序启动时,可以创建一个 hook,该 hook 通过 TCP/H264 启动摄像头馈送:
mkdir-p~/.config/platypush/scripts
cd~/.config/platypush/scripts
touch__init__.py
vicamera.py
也可以通过 VLC:播放视频。
vlctcp/h264://raspberry-pi:5001
在手机上通过 VLC 应用程序或利用 RPi Camera Viewer 应用程序观看视频。
从想法到最后实现效果还不错,这也算是一个新晋奶爸从护理琐事中脱身的自我救赎吧。