ref: 13c3fba4c19f51ef0c00547a7eea9be7fbcb9bdb
parent: c7713b225d8866f9d6010c89669647929ee8abad
author: Paul Brossier <[email protected]>
date: Fri Mar 3 00:57:50 EST 2006
splitting up tasks in many files splitting up tasks in many files
--- /dev/null
+++ b/python/aubio/task/beat.py
@@ -1,0 +1,39 @@
+from aubio.aubioclass import *
+from onset import taskonset
+
+class taskbeat(taskonset):
+ def __init__(self,input,params=None,output=None):
+ """ open the input file and initialize arguments
+ parameters should be set *before* calling this method.
+ """
+ taskonset.__init__(self,input,output=None,params=params)
+ self.btwinlen = 512**2/self.params.hopsize
+ self.btstep = self.btwinlen/4
+ self.btoutput = fvec(self.btstep,self.channels)
+ self.dfframe = fvec(self.btwinlen,self.channels)
+ self.bt = beattracking(self.btwinlen,self.channels)
+ self.pos2 = 0
+
+ def __call__(self):
+ taskonset.__call__(self)
+ # write to current file
+ if self.pos2 == self.btstep - 1 :
+ self.bt.do(self.dfframe,self.btoutput)
+ for i in range (self.btwinlen - self.btstep):
+ self.dfframe.set(self.dfframe.get(i+self.btstep,0),i,0)
+ for i in range(self.btwinlen - self.btstep, self.btwinlen):
+ self.dfframe.set(0,i,0)
+ self.pos2 = -1;
+ self.pos2 += 1
+ val = self.opick.pp.getval()
+ self.dfframe.set(val,self.btwinlen - self.btstep + self.pos2,0)
+ i=0
+ for i in range(1,int( self.btoutput.get(0,0) ) ):
+ if self.pos2 == self.btoutput.get(i,0) and \
+ aubio_silence_detection(self.myvec(),
+ self.params.silence)!=1:
+ return self.frameread, 0
+
+ def eval(self,results):
+ pass
+
--- /dev/null
+++ b/python/aubio/task/cut.py
@@ -1,0 +1,41 @@
+from task import task
+from aubio.aubioclass import *
+
+class taskcut(task):
+ def __init__(self,input,slicetimes,params=None,output=None):
+ """ open the input file and initialize arguments
+ parameters should be set *before* calling this method.
+ """
+ task.__init__(self,input,output=None,params=params)
+ self.newname = "%s%s%09.5f%s%s" % (self.input.split(".")[0].split("/")[-1],".",
+ self.frameread*self.params.step,".",self.input.split(".")[-1])
+ self.fileo = sndfile(self.newname,model=self.filei)
+ self.myvec = fvec(self.params.hopsize,self.channels)
+ self.mycopy = fvec(self.params.hopsize,self.channels)
+ self.slicetimes = slicetimes
+
+ def __call__(self):
+ task.__call__(self)
+ # write to current file
+ if len(self.slicetimes) and self.frameread >= self.slicetimes[0][0]:
+ self.slicetimes.pop(0)
+ # write up to 1st zero crossing
+ zerocross = 0
+ while ( abs( self.myvec.get(zerocross,0) ) > self.params.zerothres ):
+ zerocross += 1
+ writesize = self.fileo.write(zerocross,self.myvec)
+ fromcross = 0
+ while (zerocross < self.readsize):
+ for i in range(self.channels):
+ self.mycopy.set(self.myvec.get(zerocross,i),fromcross,i)
+ fromcross += 1
+ zerocross += 1
+ del self.fileo
+ self.fileo = sndfile("%s%s%09.5f%s%s" %
+ (self.input.split(".")[0].split("/")[-1],".",
+ self.frameread*self.params.step,".",self.input.split(".")[-1]),model=self.filei)
+ writesize = self.fileo.write(fromcross,self.mycopy)
+ else:
+ writesize = self.fileo.write(self.readsize,self.myvec)
+
+
--- /dev/null
+++ b/python/aubio/task/onset.py
@@ -1,0 +1,184 @@
+from aubio.task.task import task
+from aubio.task.utils import *
+from aubio.aubioclass import *
+
+class taskonset(task):
+ def __init__(self,input,output=None,params=None):
+ """ open the input file and initialize arguments
+ parameters should be set *before* calling this method.
+ """
+ task.__init__(self,input,params=params)
+ self.opick = onsetpick(self.params.bufsize,
+ self.params.hopsize,
+ self.channels,
+ self.myvec,
+ self.params.threshold,
+ mode=get_onset_mode(self.params.onsetmode),
+ dcthreshold=self.params.dcthreshold,
+ derivate=self.params.derivate)
+ self.olist = []
+ self.ofunc = []
+ self.maxofunc = 0
+ self.last = 0
+ if self.params.localmin:
+ self.ovalist = [0., 0., 0., 0., 0.]
+
+ def __call__(self):
+ task.__call__(self)
+ isonset,val = self.opick.do(self.myvec)
+ if (aubio_silence_detection(self.myvec(),self.params.silence)):
+ isonset=0
+ if self.params.storefunc:
+ self.ofunc.append(val)
+ if self.params.localmin:
+ if val > 0: self.ovalist.append(val)
+ else: self.ovalist.append(0)
+ self.ovalist.pop(0)
+ if (isonset == 1):
+ if self.params.localmin:
+ # find local minima before peak
+ i=len(self.ovalist)-1
+ while self.ovalist[i-1] < self.ovalist[i] and i > 0:
+ i -= 1
+ now = (self.frameread+1-i)
+ else:
+ now = self.frameread
+ # take back delay
+ if self.params.delay != 0.: now -= self.params.delay
+ if now < 0 :
+ now = 0
+ if self.params.mintol:
+ # prune doubled
+ if (now - self.last) > self.params.mintol:
+ self.last = now
+ return now, val
+ else:
+ return now, val
+
+
+ def fprint(self,foo):
+ print self.params.step*foo[0]
+
+ def eval(self,inputdata,ftru,mode='roc',vmode=''):
+ from txtfile import read_datafile
+ from onsetcompare import onset_roc, onset_diffs, onset_rocloc
+ ltru = read_datafile(ftru,depth=0)
+ lres = []
+ for i in range(len(inputdata)): lres.append(inputdata[i][0]*self.params.step)
+ if vmode=='verbose':
+ print "Running with mode %s" % self.params.onsetmode,
+ print " and threshold %f" % self.params.threshold,
+ print " on file", self.input
+ #print ltru; print lres
+ if mode == 'local':
+ l = onset_diffs(ltru,lres,self.params.tol)
+ mean = 0
+ for i in l: mean += i
+ if len(l): mean = "%.3f" % (mean/len(l))
+ else: mean = "?0"
+ return l, mean
+ elif mode == 'roc':
+ self.orig, self.missed, self.merged, \
+ self.expc, self.bad, self.doubled = \
+ onset_roc(ltru,lres,self.params.tol)
+ elif mode == 'rocloc':
+ self.v = {}
+ self.v['orig'], self.v['missed'], self.v['Tm'], \
+ self.v['expc'], self.v['bad'], self.v['Td'], \
+ self.v['l'], self.v['labs'] = \
+ onset_rocloc(ltru,lres,self.params.tol)
+
+ def plot(self,onsets,ofunc,wplot,oplots,nplot=False):
+ import Gnuplot, Gnuplot.funcutils
+ import aubio.txtfile
+ import os.path
+ import numarray
+ from aubio.onsetcompare import onset_roc
+
+ x1,y1,y1p = [],[],[]
+ oplot = []
+ if self.params.onsetmode in ('mkl','kl'): ofunc[0:10] = [0] * 10
+
+ self.lenofunc = len(ofunc)
+ self.maxofunc = max(ofunc)
+ # onset detection function
+ downtime = numarray.arange(len(ofunc))*self.params.step
+ oplot.append(Gnuplot.Data(downtime,ofunc,with='lines',title=self.params.onsetmode))
+
+ # detected onsets
+ if not nplot:
+ for i in onsets:
+ x1.append(i[0]*self.params.step)
+ y1.append(self.maxofunc)
+ y1p.append(-self.maxofunc)
+ #x1 = numarray.array(onsets)*self.params.step
+ #y1 = self.maxofunc*numarray.ones(len(onsets))
+ if x1:
+ oplot.append(Gnuplot.Data(x1,y1,with='impulses'))
+ wplot.append(Gnuplot.Data(x1,y1p,with='impulses'))
+
+ oplots.append(oplot)
+
+ # check if ground truth datafile exists
+ datafile = self.input.replace('.wav','.txt')
+ if datafile == self.input: datafile = ""
+ if not os.path.isfile(datafile):
+ self.title = "" #"(no ground truth)"
+ else:
+ t_onsets = aubio.txtfile.read_datafile(datafile)
+ x2 = numarray.array(t_onsets).resize(len(t_onsets))
+ y2 = self.maxofunc*numarray.ones(len(t_onsets))
+ wplot.append(Gnuplot.Data(x2,y2,with='impulses'))
+
+ tol = 0.050
+
+ orig, missed, merged, expc, bad, doubled = \
+ onset_roc(x2,x1,tol)
+ self.title = "GD %2.3f%% FP %2.3f%%" % \
+ ((100*float(orig-missed-merged)/(orig)),
+ (100*float(bad+doubled)/(orig)))
+
+
+ def plotplot(self,wplot,oplots,outplot=None):
+ from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
+ import re
+ # audio data
+ time,data = audio_to_array(self.input)
+ wplot = [make_audio_plot(time,data)] + wplot
+ self.title = self.input
+ # prepare the plot
+ g = gnuplot_init(outplot)
+
+ g('set multiplot')
+
+ # hack to align left axis
+ g('set lmargin 6')
+ g('set tmargin 0')
+ g('set format x ""')
+ g('set format y ""')
+ g('set noytics')
+
+ for i in range(len(oplots)):
+ # plot onset detection functions
+ g('set size 1,%f' % (0.7/(len(oplots))))
+ g('set origin 0,%f' % (float(i)*0.7/(len(oplots))))
+ g('set xrange [0:%f]' % (self.lenofunc*self.params.step))
+ g.plot(*oplots[i])
+
+ g('set tmargin 3.0')
+ g('set xlabel "time (s)" 1,0')
+ g('set format x "%1.1f"')
+
+ g('set title \'%s %s\'' % (re.sub('.*/','',self.input),self.title))
+
+ # plot waveform and onsets
+ g('set size 1,0.3')
+ g('set origin 0,0.7')
+ g('set xrange [0:%f]' % max(time))
+ g('set yrange [-1:1]')
+ g.ylabel('amplitude')
+ g.plot(*wplot)
+
+ g('unset multiplot')
+
+
--- /dev/null
+++ b/python/aubio/task/params.py
@@ -1,0 +1,26 @@
+from aubio.aubioclass import aubio_pitchm_freq
+
+class taskparams(object):
+ """ default parameters for task classes """
+ def __init__(self,input=None,output=None):
+ self.silence = -70
+ self.derivate = False
+ self.localmin = False
+ self.delay = 4.
+ self.storefunc = False
+ self.bufsize = 512
+ self.hopsize = 256
+ self.samplerate = 44100
+ self.tol = 0.05
+ self.mintol = 0.0
+ self.step = float(self.hopsize)/float(self.samplerate)
+ self.threshold = 0.1
+ self.onsetmode = 'dual'
+ self.pitchmode = 'yin'
+ self.pitchsmooth = 7
+ self.pitchmin=100.
+ self.pitchmax=1000.
+ self.dcthreshold = -1.
+ self.omode = aubio_pitchm_freq
+ self.verbose = False
+
--- /dev/null
+++ b/python/aubio/task/pitch.py
@@ -1,0 +1,175 @@
+from aubio.task.task import task
+from aubio.task.silence import tasksilence
+from aubio.task.utils import *
+from aubio.aubioclass import *
+
+class taskpitch(task):
+ def __init__(self,input,params=None):
+ task.__init__(self,input,params=params)
+ self.shortlist = [0. for i in range(self.params.pitchsmooth)]
+ self.pitchdet = pitchdetection(mode=get_pitch_mode(self.params.pitchmode),
+ bufsize=self.params.bufsize,
+ hopsize=self.params.hopsize,
+ channels=self.channels,
+ samplerate=self.srate,
+ omode=self.params.omode)
+
+ def __call__(self):
+ from aubio.median import short_find
+ task.__call__(self)
+ if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
+ freq = -1.
+ else:
+ freq = self.pitchdet(self.myvec)
+ minpitch = self.params.pitchmin
+ maxpitch = self.params.pitchmax
+ if maxpitch and freq > maxpitch :
+ freq = -1.
+ elif minpitch and freq < minpitch :
+ freq = -1.
+ if self.params.pitchsmooth:
+ self.shortlist.append(freq)
+ self.shortlist.pop(0)
+ smoothfreq = short_find(self.shortlist,
+ len(self.shortlist)/2)
+ return smoothfreq
+ else:
+ return freq
+
+ def compute_all(self):
+ """ Compute data """
+ mylist = []
+ while(self.readsize==self.params.hopsize):
+ freq = self()
+ mylist.append(freq)
+ if self.params.verbose:
+ self.fprint("%s\t%s" % (self.frameread*self.params.step,freq))
+ return mylist
+
+ def gettruth(self):
+ """ extract ground truth array in frequency """
+ import os.path
+ """ from wavfile.txt """
+ datafile = self.input.replace('.wav','.txt')
+ if datafile == self.input: datafile = ""
+ """ from file.<midinote>.wav """
+ # FIXME very weak check
+ floatpit = self.input.split('.')[-2]
+ if not os.path.isfile(datafile) and len(self.input.split('.')) < 3:
+ print "no ground truth "
+ return False,False
+ elif floatpit:
+ try:
+ self.truth = aubio_miditofreq(float(floatpit))
+ print "ground truth found in filename:", self.truth
+ tasksil = tasksilence(self.input)
+ time,pitch =[],[]
+ while(tasksil.readsize==tasksil.params.hopsize):
+ tasksil()
+ time.append(tasksil.params.step*tasksil.frameread)
+ if not tasksil.issilence:
+ pitch.append(self.truth)
+ else:
+ pitch.append(-1.)
+ return time,pitch #0,aubio_miditofreq(float(floatpit))
+ except ValueError:
+ # FIXME very weak check
+ if not os.path.isfile(datafile):
+ print "no ground truth found"
+ return 0,0
+ else:
+ from aubio.txtfile import read_datafile
+ values = read_datafile(datafile)
+ time, pitch = [], []
+ for i in range(len(values)):
+ time.append(values[i][0])
+ pitch.append(values[i][1])
+ return time,pitch
+
+ def eval(self,results):
+ def mmean(l):
+ return sum(l)/max(float(len(l)),1)
+
+ from aubio.median import percental
+ timet,pitcht = self.gettruth()
+ res = []
+ for i in results:
+ #print i,self.truth
+ if i <= 0: pass
+ else: res.append(self.truth-i)
+ if not res or len(res) < 3:
+ avg = self.truth; med = self.truth
+ else:
+ avg = mmean(res)
+ med = percental(res,len(res)/2)
+ return self.truth, self.truth-med, self.truth-avg
+
+ def neweval(self,results):
+ timet,pitcht = self.gettruth()
+ for i in timet:
+ print results[i]
+ return self.truth, self.truth-med, self.truth-avg
+
+ def plot(self,pitch,wplot,oplots,outplot=None):
+ import numarray
+ import Gnuplot
+
+ self.eval(pitch)
+ downtime = self.params.step*numarray.arange(len(pitch))
+ oplots.append(Gnuplot.Data(downtime,pitch,with='lines',
+ title=self.params.pitchmode))
+
+
+ def plotplot(self,wplot,oplots,outplot=None,multiplot = 1):
+ from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
+ import re
+ import Gnuplot
+ # audio data
+ time,data = audio_to_array(self.input)
+ f = make_audio_plot(time,data)
+
+ # check if ground truth exists
+ timet,pitcht = self.gettruth()
+ if timet and pitcht:
+ oplots = [Gnuplot.Data(timet,pitcht,with='lines',
+ title='ground truth')] + oplots
+
+ t = Gnuplot.Data(0,0,with='impulses')
+
+ g = gnuplot_init(outplot)
+ g('set title \'%s\'' % (re.sub('.*/','',self.input)))
+ g('set multiplot')
+ # hack to align left axis
+ g('set lmargin 15')
+ # plot waveform and onsets
+ g('set size 1,0.3')
+ g('set origin 0,0.7')
+ g('set xrange [0:%f]' % max(time))
+ g('set yrange [-1:1]')
+ g.ylabel('amplitude')
+ g.plot(f)
+ g('unset title')
+ # plot onset detection function
+
+
+ g('set size 1,0.7')
+ g('set origin 0,0')
+ g('set xrange [0:%f]' % max(time))
+ g('set yrange [100:%f]' % self.params.pitchmax)
+ g('set key right top')
+ g('set noclip one')
+ g('set format x ""')
+ g('set log y')
+ #g.xlabel('time (s)')
+ g.ylabel('f0 (Hz)')
+ if multiplot:
+ for i in range(len(oplots)):
+ # plot onset detection functions
+ g('set size 1,%f' % (0.7/(len(oplots))))
+ g('set origin 0,%f' % (float(i)*0.7/(len(oplots))))
+ g('set xrange [0:%f]' % max(time))
+ g.plot(oplots[i])
+ else:
+ g.plot(*oplots)
+ g('unset multiplot')
+
--- /dev/null
+++ b/python/aubio/task/silence.py
@@ -1,0 +1,28 @@
+from aubio.task.task import task
+from aubio.aubioclass import *
+
+class tasksilence(task):
+ wassilence = 1
+ issilence = 1
+ def __call__(self):
+ task.__call__(self)
+ if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
+ if self.wassilence == 1: self.issilence = 1
+ else: self.issilence = 2
+ self.wassilence = 1
+ else:
+ if self.wassilence <= 0: self.issilence = 0
+ else: self.issilence = -1
+ self.wassilence = 0
+ if self.issilence == -1:
+ return max(self.frameread-self.params.delay,0.), -1
+ elif self.issilence == 2:
+ return max(self.frameread+self.params.delay,0.), 2
+
+ def fprint(self,foo):
+ print self.params.step*foo[0],
+ if foo[1] == 2: print "OFF"
+ else: print "ON"
+
+
+
--- /dev/null
+++ b/python/aubio/task/task.py
@@ -1,0 +1,54 @@
+from aubio.aubioclass import *
+from params import taskparams
+
+class task(taskparams):
+ """ default template class to apply tasks on a stream """
+ def __init__(self,input,output=None,params=None):
+ """ open the input file and initialize default argument
+ parameters should be set *before* calling this method.
+ """
+ import time
+ self.tic = time.time()
+ if params == None: self.params = taskparams()
+ else: self.params = params
+ self.frameread = 0
+ self.readsize = self.params.hopsize
+ self.input = input
+ self.filei = sndfile(self.input)
+ self.srate = self.filei.samplerate()
+ self.channels = self.filei.channels()
+ self.params.step = float(self.params.hopsize)/float(self.srate)
+ self.myvec = fvec(self.params.hopsize,self.channels)
+ self.output = output
+
+ def __call__(self):
+ self.readsize = self.filei.read(self.params.hopsize,self.myvec)
+ self.frameread += 1
+
+ def compute_all(self):
+ """ Compute data """
+ mylist = []
+ while(self.readsize==self.params.hopsize):
+ tmp = self()
+ if tmp:
+ mylist.append(tmp)
+ if self.params.verbose:
+ self.fprint(tmp)
+ return mylist
+
+ def fprint(self,foo):
+ print foo
+
+ def eval(self,results):
+ """ Eval data """
+ pass
+
+ def plot(self):
+ """ Plot data """
+ pass
+
+ def time(self):
+ import time
+ print "CPU time is now %f seconds," % time.clock(),
+ print "task execution took %f seconds" % (time.time() - self.tic)
+
--- /dev/null
+++ b/python/aubio/task/utils.py
@@ -1,0 +1,74 @@
+from aubio.aubioclass import *
+
+def get_onset_mode(nvalue):
+ """ utility function to convert a string to aubio_onsetdetection_type """
+ if nvalue == 'complexdomain' or nvalue == 'complex' :
+ return aubio_onset_complex
+ elif nvalue == 'hfc' :
+ return aubio_onset_hfc
+ elif nvalue == 'phase' :
+ return aubio_onset_phase
+ elif nvalue == 'specdiff' :
+ return aubio_onset_specdiff
+ elif nvalue == 'energy' :
+ return aubio_onset_energy
+ elif nvalue == 'kl' :
+ return aubio_onset_kl
+ elif nvalue == 'mkl' :
+ return aubio_onset_mkl
+ elif nvalue == 'dual' :
+ return 'dual'
+ else:
+ import sys
+ print "unknown onset detection function selected"
+ sys.exit(1)
+
+def get_pitch_mode(nvalue):
+ """ utility function to convert a string to aubio_pitchdetection_type """
+ if nvalue == 'mcomb' :
+ return aubio_pitch_mcomb
+ elif nvalue == 'yin' :
+ return aubio_pitch_yin
+ elif nvalue == 'fcomb' :
+ return aubio_pitch_fcomb
+ elif nvalue == 'schmitt':
+ return aubio_pitch_schmitt
+ else:
+ import sys
+ print "error: unknown pitch detection function selected"
+ sys.exit(1)
+
+def check_onset_mode(option, opt, value, parser):
+ """ wrapper function to convert a list of modes to
+ aubio_onsetdetection_type """
+ nvalues = parser.rargs[0].split(',')
+ val = []
+ for nvalue in nvalues:
+ val.append(get_onset_mode(nvalue))
+ setattr(parser.values, option.dest, val)
+
+def check_pitch_mode(option, opt, value, parser):
+ """ utility function to convert a string to aubio_pitchdetection_type"""
+ nvalues = parser.rargs[0].split(',')
+ val = []
+ for nvalue in nvalues:
+ val.append(get_pitch_mode(nvalue))
+ setattr(parser.values, option.dest, val)
+
+def check_pitchm_mode(option, opt, value, parser):
+ """ utility function to convert a string to aubio_pitchdetection_mode """
+ nvalue = parser.rargs[0]
+ if nvalue == 'freq' :
+ setattr(parser.values, option.dest, aubio_pitchm_freq)
+ elif nvalue == 'midi' :
+ setattr(parser.values, option.dest, aubio_pitchm_midi)
+ elif nvalue == 'cent' :
+ setattr(parser.values, option.dest, aubio_pitchm_cent)
+ elif nvalue == 'bin' :
+ setattr(parser.values, option.dest, aubio_pitchm_bin)
+ else:
+ import sys
+ print "error: unknown pitch detection output selected"
+ sys.exit(1)
+
+
--- a/python/aubio/tasks.py
+++ /dev/null
@@ -1,587 +1,0 @@
-from aubioclass import *
-
-def get_onset_mode(nvalue):
- """ utility function to convert a string to aubio_onsetdetection_type """
- if nvalue == 'complexdomain' or nvalue == 'complex' :
- return aubio_onset_complex
- elif nvalue == 'hfc' :
- return aubio_onset_hfc
- elif nvalue == 'phase' :
- return aubio_onset_phase
- elif nvalue == 'specdiff' :
- return aubio_onset_specdiff
- elif nvalue == 'energy' :
- return aubio_onset_energy
- elif nvalue == 'kl' :
- return aubio_onset_kl
- elif nvalue == 'mkl' :
- return aubio_onset_mkl
- elif nvalue == 'dual' :
- return 'dual'
- else:
- import sys
- print "unknown onset detection function selected"
- sys.exit(1)
-
-def get_pitch_mode(nvalue):
- """ utility function to convert a string to aubio_pitchdetection_type """
- if nvalue == 'mcomb' :
- return aubio_pitch_mcomb
- elif nvalue == 'yin' :
- return aubio_pitch_yin
- elif nvalue == 'fcomb' :
- return aubio_pitch_fcomb
- elif nvalue == 'schmitt':
- return aubio_pitch_schmitt
- else:
- import sys
- print "error: unknown pitch detection function selected"
- sys.exit(1)
-
-def check_onset_mode(option, opt, value, parser):
- """ wrapper function to convert a list of modes to
- aubio_onsetdetection_type """
- nvalues = parser.rargs[0].split(',')
- val = []
- for nvalue in nvalues:
- val.append(get_onset_mode(nvalue))
- setattr(parser.values, option.dest, val)
-
-def check_pitch_mode(option, opt, value, parser):
- """ utility function to convert a string to aubio_pitchdetection_type"""
- nvalues = parser.rargs[0].split(',')
- val = []
- for nvalue in nvalues:
- val.append(get_pitch_mode(nvalue))
- setattr(parser.values, option.dest, val)
-
-def check_pitchm_mode(option, opt, value, parser):
- """ utility function to convert a string to aubio_pitchdetection_mode """
- nvalue = parser.rargs[0]
- if nvalue == 'freq' :
- setattr(parser.values, option.dest, aubio_pitchm_freq)
- elif nvalue == 'midi' :
- setattr(parser.values, option.dest, aubio_pitchm_midi)
- elif nvalue == 'cent' :
- setattr(parser.values, option.dest, aubio_pitchm_cent)
- elif nvalue == 'bin' :
- setattr(parser.values, option.dest, aubio_pitchm_bin)
- else:
- import sys
- print "error: unknown pitch detection output selected"
- sys.exit(1)
-
-class taskparams(object):
- """ default parameters for task classes """
- def __init__(self,input=None,output=None):
- self.silence = -70
- self.derivate = False
- self.localmin = False
- self.delay = 4.
- self.storefunc = False
- self.bufsize = 512
- self.hopsize = 256
- self.samplerate = 44100
- self.tol = 0.05
- self.mintol = 0.0
- self.step = float(self.hopsize)/float(self.samplerate)
- self.threshold = 0.1
- self.onsetmode = 'dual'
- self.pitchmode = 'yin'
- self.pitchsmooth = 7
- self.pitchmin=100.
- self.pitchmax=1000.
- self.dcthreshold = -1.
- self.omode = aubio_pitchm_freq
- self.verbose = False
-
-class task(taskparams):
- """ default template class to apply tasks on a stream """
- def __init__(self,input,output=None,params=None):
- """ open the input file and initialize default argument
- parameters should be set *before* calling this method.
- """
- import time
- self.tic = time.time()
- if params == None: self.params = taskparams()
- else: self.params = params
- self.frameread = 0
- self.readsize = self.params.hopsize
- self.input = input
- self.filei = sndfile(self.input)
- self.srate = self.filei.samplerate()
- self.channels = self.filei.channels()
- self.params.step = float(self.params.hopsize)/float(self.srate)
- self.myvec = fvec(self.params.hopsize,self.channels)
- self.output = output
-
- def __call__(self):
- self.readsize = self.filei.read(self.params.hopsize,self.myvec)
- self.frameread += 1
-
- def compute_all(self):
- """ Compute data """
- mylist = []
- while(self.readsize==self.params.hopsize):
- tmp = self()
- if tmp:
- mylist.append(tmp)
- if self.params.verbose:
- self.fprint(tmp)
- return mylist
-
- def fprint(self,foo):
- print foo
-
- def eval(self,results):
- """ Eval data """
- pass
-
- def plot(self):
- """ Plot data """
- pass
-
- def time(self):
- import time
- print "CPU time is now %f seconds," % time.clock(),
- print "task execution took %f seconds" % (time.time() - self.tic)
-
-class tasksilence(task):
- wassilence = 1
- issilence = 1
- def __call__(self):
- task.__call__(self)
- if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
- if self.wassilence == 1: self.issilence = 1
- else: self.issilence = 2
- self.wassilence = 1
- else:
- if self.wassilence <= 0: self.issilence = 0
- else: self.issilence = -1
- self.wassilence = 0
- if self.issilence == -1:
- return max(self.frameread-self.params.delay,0.), -1
- elif self.issilence == 2:
- return max(self.frameread+self.params.delay,0.), 2
-
- def fprint(self,foo):
- print self.params.step*foo[0],
- if foo[1] == 2: print "OFF"
- else: print "ON"
-
-class taskpitch(task):
- def __init__(self,input,params=None):
- task.__init__(self,input,params=params)
- self.shortlist = [0. for i in range(self.params.pitchsmooth)]
- self.pitchdet = pitchdetection(mode=get_pitch_mode(self.params.pitchmode),
- bufsize=self.params.bufsize,
- hopsize=self.params.hopsize,
- channels=self.channels,
- samplerate=self.srate,
- omode=self.params.omode)
-
- def __call__(self):
- from median import short_find
- task.__call__(self)
- if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
- freq = -1.
- else:
- freq = self.pitchdet(self.myvec)
- minpitch = self.params.pitchmin
- maxpitch = self.params.pitchmax
- if maxpitch and freq > maxpitch :
- freq = -1.
- elif minpitch and freq < minpitch :
- freq = -1.
- if self.params.pitchsmooth:
- self.shortlist.append(freq)
- self.shortlist.pop(0)
- smoothfreq = short_find(self.shortlist,
- len(self.shortlist)/2)
- return smoothfreq
- else:
- return freq
-
- def compute_all(self):
- """ Compute data """
- mylist = []
- while(self.readsize==self.params.hopsize):
- freq = self()
- mylist.append(freq)
- if self.params.verbose:
- self.fprint("%s\t%s" % (self.frameread*self.params.step,freq))
- return mylist
-
- def gettruth(self):
- """ extract ground truth array in frequency """
- import os.path
- """ from wavfile.txt """
- datafile = self.input.replace('.wav','.txt')
- if datafile == self.input: datafile = ""
- """ from file.<midinote>.wav """
- # FIXME very weak check
- floatpit = self.input.split('.')[-2]
-
- if not os.path.isfile(datafile) and not len(self.input.split('.')) < 3:
- print "no ground truth "
- return False,False
- elif floatpit:
- try:
- self.truth = aubio_miditofreq(float(floatpit))
- print "ground truth found in filename:", self.truth
- tasksil = tasksilence(self.input)
- time,pitch =[],[]
- while(tasksil.readsize==tasksil.params.hopsize):
- tasksil()
- time.append(tasksil.params.step*tasksil.frameread)
- if not tasksil.issilence:
- pitch.append(self.truth)
- else:
- pitch.append(-1.)
- return time,pitch #0,aubio_miditofreq(float(floatpit))
- except ValueError:
- # FIXME very weak check
- if not os.path.isfile(datafile):
- print "no ground truth found"
- return 0,0
- else:
- from aubio.txtfile import read_datafile
- values = read_datafile(datafile)
- time, pitch = [], []
- for i in range(len(values)):
- time.append(values[i][0])
- pitch.append(values[i][1])
- return time,pitch
-
- def eval(self,results):
- def mmean(l):
- return sum(l)/max(float(len(l)),1)
-
- from median import percental
- timet,pitcht = self.gettruth()
- res = []
- for i in results:
- #print i,self.truth
- if i <= 0: pass
- else: res.append(self.truth-i)
- if not res or len(res) < 3:
- avg = self.truth; med = self.truth
- else:
- avg = mmean(res)
- med = percental(res,len(res)/2)
- return self.truth, self.truth-med, self.truth-avg
-
- def plot(self,pitch,wplot,oplots,outplot=None):
- import numarray
- import Gnuplot
-
- downtime = self.params.step*numarray.arange(len(pitch))
- oplots.append(Gnuplot.Data(downtime,pitch,with='lines',
- title=self.params.pitchmode))
-
-
- def plotplot(self,wplot,oplots,outplot=None,multiplot = 1):
- from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
- import re
- import Gnuplot
- # audio data
- time,data = audio_to_array(self.input)
- f = make_audio_plot(time,data)
-
- # check if ground truth exists
- timet,pitcht = self.gettruth()
- if timet and pitcht:
- oplots = [Gnuplot.Data(timet,pitcht,with='lines',
- title='ground truth')] + oplots
-
- t = Gnuplot.Data(0,0,with='impulses')
-
- g = gnuplot_init(outplot)
- g('set title \'%s\'' % (re.sub('.*/','',self.input)))
- g('set multiplot')
- # hack to align left axis
- g('set lmargin 15')
- # plot waveform and onsets
- g('set size 1,0.3')
- g('set origin 0,0.7')
- g('set xrange [0:%f]' % max(time))
- g('set yrange [-1:1]')
- g.ylabel('amplitude')
- g.plot(f)
- g('unset title')
- # plot onset detection function
-
-
- g('set size 1,0.7')
- g('set origin 0,0')
- g('set xrange [0:%f]' % max(time))
- g('set yrange [100:%f]' % self.params.pitchmax)
- g('set key right top')
- g('set noclip one')
- g('set format x ""')
- g('set log y')
- #g.xlabel('time (s)')
- g.ylabel('f0 (Hz)')
- if multiplot:
- for i in range(len(oplots)):
- # plot onset detection functions
- g('set size 1,%f' % (0.7/(len(oplots))))
- g('set origin 0,%f' % (float(i)*0.7/(len(oplots))))
- g('set xrange [0:%f]' % max(time))
- g.plot(oplots[i])
- else:
- g.plot(*oplots)
- g('unset multiplot')
-
-
-class taskonset(task):
- def __init__(self,input,output=None,params=None):
- """ open the input file and initialize arguments
- parameters should be set *before* calling this method.
- """
- task.__init__(self,input,params=params)
- self.opick = onsetpick(self.params.bufsize,
- self.params.hopsize,
- self.channels,
- self.myvec,
- self.params.threshold,
- mode=get_onset_mode(self.params.onsetmode),
- dcthreshold=self.params.dcthreshold,
- derivate=self.params.derivate)
- self.olist = []
- self.ofunc = []
- self.maxofunc = 0
- self.last = 0
- if self.params.localmin:
- self.ovalist = [0., 0., 0., 0., 0.]
-
- def __call__(self):
- task.__call__(self)
- isonset,val = self.opick.do(self.myvec)
- if (aubio_silence_detection(self.myvec(),self.params.silence)):
- isonset=0
- if self.params.storefunc:
- self.ofunc.append(val)
- if self.params.localmin:
- if val > 0: self.ovalist.append(val)
- else: self.ovalist.append(0)
- self.ovalist.pop(0)
- if (isonset == 1):
- if self.params.localmin:
- # find local minima before peak
- i=len(self.ovalist)-1
- while self.ovalist[i-1] < self.ovalist[i] and i > 0:
- i -= 1
- now = (self.frameread+1-i)
- else:
- now = self.frameread
- # take back delay
- if self.params.delay != 0.: now -= self.params.delay
- if now < 0 :
- now = 0
- if self.params.mintol:
- # prune doubled
- if (now - self.last) > self.params.mintol:
- self.last = now
- return now, val
- else:
- return now, val
-
-
- def fprint(self,foo):
- print self.params.step*foo[0]
-
- def eval(self,inputdata,ftru,mode='roc',vmode=''):
- from txtfile import read_datafile
- from onsetcompare import onset_roc, onset_diffs, onset_rocloc
- ltru = read_datafile(ftru,depth=0)
- lres = []
- for i in range(len(inputdata)): lres.append(inputdata[i][0]*self.params.step)
- if vmode=='verbose':
- print "Running with mode %s" % self.params.onsetmode,
- print " and threshold %f" % self.params.threshold,
- print " on file", self.input
- #print ltru; print lres
- if mode == 'local':
- l = onset_diffs(ltru,lres,self.params.tol)
- mean = 0
- for i in l: mean += i
- if len(l): mean = "%.3f" % (mean/len(l))
- else: mean = "?0"
- return l, mean
- elif mode == 'roc':
- self.orig, self.missed, self.merged, \
- self.expc, self.bad, self.doubled = \
- onset_roc(ltru,lres,self.params.tol)
- elif mode == 'rocloc':
- self.v = {}
- self.v['orig'], self.v['missed'], self.v['Tm'], \
- self.v['expc'], self.v['bad'], self.v['Td'], \
- self.v['l'], self.v['labs'] = \
- onset_rocloc(ltru,lres,self.params.tol)
-
- def plot(self,onsets,ofunc,wplot,oplots,nplot=False):
- import Gnuplot, Gnuplot.funcutils
- import aubio.txtfile
- import os.path
- import numarray
- from aubio.onsetcompare import onset_roc
-
- x1,y1,y1p = [],[],[]
- oplot = []
- if self.params.onsetmode in ('mkl','kl'): ofunc[0:10] = [0] * 10
-
- self.lenofunc = len(ofunc)
- self.maxofunc = max(ofunc)
- # onset detection function
- downtime = numarray.arange(len(ofunc))*self.params.step
- oplot.append(Gnuplot.Data(downtime,ofunc,with='lines',title=self.params.onsetmode))
-
- # detected onsets
- if not nplot:
- for i in onsets:
- x1.append(i[0]*self.params.step)
- y1.append(self.maxofunc)
- y1p.append(-self.maxofunc)
- #x1 = numarray.array(onsets)*self.params.step
- #y1 = self.maxofunc*numarray.ones(len(onsets))
- if x1:
- oplot.append(Gnuplot.Data(x1,y1,with='impulses'))
- wplot.append(Gnuplot.Data(x1,y1p,with='impulses'))
-
- oplots.append(oplot)
-
- # check if ground truth datafile exists
- datafile = self.input.replace('.wav','.txt')
- if datafile == self.input: datafile = ""
- if not os.path.isfile(datafile):
- self.title = "" #"(no ground truth)"
- else:
- t_onsets = aubio.txtfile.read_datafile(datafile)
- x2 = numarray.array(t_onsets).resize(len(t_onsets))
- y2 = self.maxofunc*numarray.ones(len(t_onsets))
- wplot.append(Gnuplot.Data(x2,y2,with='impulses'))
-
- tol = 0.050
-
- orig, missed, merged, expc, bad, doubled = \
- onset_roc(x2,x1,tol)
- self.title = "GD %2.3f%% FP %2.3f%%" % \
- ((100*float(orig-missed-merged)/(orig)),
- (100*float(bad+doubled)/(orig)))
-
-
- def plotplot(self,wplot,oplots,outplot=None):
- from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
- import re
- # audio data
- time,data = audio_to_array(self.input)
- wplot = [make_audio_plot(time,data)] + wplot
- self.title = self.input
- # prepare the plot
- g = gnuplot_init(outplot)
-
- g('set multiplot')
-
- # hack to align left axis
- g('set lmargin 6')
- g('set tmargin 0')
- g('set format x ""')
- g('set format y ""')
- g('set noytics')
-
- for i in range(len(oplots)):
- # plot onset detection functions
- g('set size 1,%f' % (0.7/(len(oplots))))
- g('set origin 0,%f' % (float(i)*0.7/(len(oplots))))
- g('set xrange [0:%f]' % (self.lenofunc*self.params.step))
- g.plot(*oplots[i])
-
- g('set tmargin 3.0')
- g('set xlabel "time (s)" 1,0')
- g('set format x "%1.1f"')
-
- g('set title \'%s %s\'' % (re.sub('.*/','',self.input),self.title))
-
- # plot waveform and onsets
- g('set size 1,0.3')
- g('set origin 0,0.7')
- g('set xrange [0:%f]' % max(time))
- g('set yrange [-1:1]')
- g.ylabel('amplitude')
- g.plot(*wplot)
-
- g('unset multiplot')
-
-class taskcut(task):
- def __init__(self,input,slicetimes,params=None,output=None):
- """ open the input file and initialize arguments
- parameters should be set *before* calling this method.
- """
- task.__init__(self,input,output=None,params=params)
- self.newname = "%s%s%09.5f%s%s" % (self.input.split(".")[0].split("/")[-1],".",
- self.frameread*self.params.step,".",self.input.split(".")[-1])
- self.fileo = sndfile(self.newname,model=self.filei)
- self.myvec = fvec(self.params.hopsize,self.channels)
- self.mycopy = fvec(self.params.hopsize,self.channels)
- self.slicetimes = slicetimes
-
- def __call__(self):
- task.__call__(self)
- # write to current file
- if len(self.slicetimes) and self.frameread >= self.slicetimes[0][0]:
- self.slicetimes.pop(0)
- # write up to 1st zero crossing
- zerocross = 0
- while ( abs( self.myvec.get(zerocross,0) ) > self.params.zerothres ):
- zerocross += 1
- writesize = self.fileo.write(zerocross,self.myvec)
- fromcross = 0
- while (zerocross < self.readsize):
- for i in range(self.channels):
- self.mycopy.set(self.myvec.get(zerocross,i),fromcross,i)
- fromcross += 1
- zerocross += 1
- del self.fileo
- self.fileo = sndfile("%s%s%09.5f%s%s" %
- (self.input.split(".")[0].split("/")[-1],".",
- self.frameread*self.params.step,".",self.input.split(".")[-1]),model=self.filei)
- writesize = self.fileo.write(fromcross,self.mycopy)
- else:
- writesize = self.fileo.write(self.readsize,self.myvec)
-
-class taskbeat(taskonset):
- def __init__(self,input,params=None,output=None):
- """ open the input file and initialize arguments
- parameters should be set *before* calling this method.
- """
- taskonset.__init__(self,input,output=None,params=params)
- self.btwinlen = 512**2/self.params.hopsize
- self.btstep = self.btwinlen/4
- self.btoutput = fvec(self.btstep,self.channels)
- self.dfframe = fvec(self.btwinlen,self.channels)
- self.bt = beattracking(self.btwinlen,self.channels)
- self.pos2 = 0
-
- def __call__(self):
- taskonset.__call__(self)
- # write to current file
- if self.pos2 == self.btstep - 1 :
- self.bt.do(self.dfframe,self.btoutput)
- for i in range (self.btwinlen - self.btstep):
- self.dfframe.set(self.dfframe.get(i+self.btstep,0),i,0)
- for i in range(self.btwinlen - self.btstep, self.btwinlen):
- self.dfframe.set(0,i,0)
- self.pos2 = -1;
- self.pos2 += 1
- val = self.opick.pp.getval()
- self.dfframe.set(val,self.btwinlen - self.btstep + self.pos2,0)
- i=0
- for i in range(1,int( self.btoutput.get(0,0) ) ):
- if self.pos2 == self.btoutput.get(i,0) and \
- aubio_silence_detection(self.myvec(),
- self.params.silence)!=1:
- return self.frameread, 0
-
- def eval(self,results):
- pass
--- a/python/aubiocut
+++ b/python/aubiocut
@@ -5,7 +5,7 @@
"""
import sys
-from aubio.tasks import *
+from aubio.task import *
usage = "usage: %s [options] -i soundfile" % sys.argv[0]
--- a/python/aubiopitch
+++ b/python/aubiopitch
@@ -5,7 +5,7 @@
"""
import sys
-from aubio.tasks import *
+from aubio.task import *
usage = "usage: %s [options] -i soundfile" % sys.argv[0]