Skip to content

Using microphone peak as input

I’m currently on a project that involve disabled peoples, audio and kinect. Theses boys and girls are doing lot of loud sounds, so the idea is to use their sound as a trigger. We can use gstreamer to make that work quite easily, cause it have everything we need: a audio source, and level calculator.

import pygst
pygst.require('0.10')
import gst, gobject
gobject.threads_init()

pipeline = gst.parse_launch(
    'pulsesrc ! audioconvert ! '
    'audio/x-raw-int,channels=2,rate=44100,endianness=1234,'
    'width=32,depth=32,signed=(bool)True !'
    'level name=level interval=10000000 !'
    'fakesink')

level = pipeline.get_by_name('level')
bus = pipeline.get_bus()
bus.add_signal_watch()

def show_peak(bus, message):
    # filter only on level messages
    if message.src is not level:
        return
    if not message.structure.has_key('peak'):
        return
    # read peak
    print 'peak', message.structure['peak'][0]

# connect the callback
bus.connect('message', show_peak)

# start the pipeline
pipeline.set_state(gst.STATE_PLAYING)

ctx = gobject.gobject.main_context_default()
while ctx:
    ctx.iteration()

The output could be something like this :

peak -35.2370719856
peak -35.0252114393
peak -10.8591229158
peak -4.6007387433
peak -4.85102463679
peak -6.45292575653
peak -6.83102903668
peak -7.39486319074
peak -13.9852340247
peak -17.423901433
peak -35.0852178272
peak -35.8725208237

Next, we can use that information to record their sound, and use it on some scenario. So, instead of use the fakesink, we can use appsink. This module allow you to read the buffer pushed by the previous module. So we can put theses buffers into a list, and use them when needed 🙂

The state machine will handle the 3 phases :

  1. Wait for a peak > -30db
  2. Recording the sound, stop when the peak is < -32db
  3. Replay the last sound

Note: The -30 / -32 are taken from my tests. If you have more noise, you need to adjust theses triggers.

And here is the final example:

import pygst
pygst.require('0.10')
import gst, gobject
gobject.threads_init()

pipeline_play = None
pipeline = gst.parse_launch(
    'pulsesrc ! audioconvert ! '
    'audio/x-raw-int,channels=2,rate=44100,endianness=1234,'
    'width=32,depth=32,signed=(bool)True !'
    'level name=level interval=10000000 !'
    'appsink name=app emit-signals=True')

state = 'wait'
peak = -99
buffers = []
level = pipeline.get_by_name('level')
app = pipeline.get_by_name('app')
bus = pipeline.get_bus()
bus.add_signal_watch()

def show_peak(bus, message):
    global peak
    # filter only on level messages
    if message.src is not level:
        return
    if not message.structure.has_key('peak'):
        return
    # read peak
    peak = message.structure['peak'][0]

def enqueue_audio_buffer(app):
    buffers.append(str(app.emit('pull-buffer')))

def play_sample(sample):
    global pipeline_play
    with open('audio.dat', 'wb') as fd:
        fd.write(sample)
    if pipeline_play is None:
        pipeline_play = gst.parse_launch(
            'filesrc location=audio.dat !'
            'audio/x-raw-int,channels=2,rate=44100,endianness=1234,'
            'width=32,depth=32,signed=(bool)True !'
            'audioamplify amplification=2 ! autoaudiosink')
    pipeline_play.set_state(gst.STATE_NULL)
    pipeline_play.set_state(gst.STATE_PLAYING)

# connect the callback
bus.connect('message', show_peak)
app.connect('new-buffer', enqueue_audio_buffer)

# start the pipeline
pipeline.set_state(gst.STATE_PLAYING)

# main loop
ctx = gobject.gobject.main_context_default()
while ctx:
    ctx.iteration()
    print state, peak

    # wait for somebody to make a sound
    if state == 'wait':
        if peak > -30:
            state = 'record'
            continue
        # discard any buffer
        buffers = []

    # record the current audio, until the peak is going down
    elif state == 'record':
        if peak < -32:
            state = 'replay'
            continue

    # replay last sound
    elif state == 'replay':
        play_sample(''.join(buffers))
        state = 'wait'

Usage: Just make a sound... and it will replay just after.