#!/usr/bin/env python3
# qpaeq is a equalizer interface for pulseaudio's equalizer sinks
# Copyright (C) 2009 Jason Newton .
import os,math,sys
try:
from PyQt5 import QtWidgets,QtCore
import dbus.mainloop.pyqt5
import dbus
except ImportError as e:
sys.stderr.write('There was an error importing needed libraries\n'
'Make sure you have qt5 and dbus-python installed\n'
'The error that occurred was:\n'
'\t%s\n' % (str(e)))
sys.exit(-1)
from functools import partial
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
SYNC_TIMEOUT = 4*1000
CORE_PATH = "/org/pulseaudio/core1"
CORE_IFACE = "org.PulseAudio.Core1"
def connect():
try:
if 'PULSE_DBUS_SERVER' in os.environ:
address = os.environ['PULSE_DBUS_SERVER']
else:
bus = dbus.SessionBus() # Should be UserBus, but D-Bus doesn't implement that yet.
server_lookup = bus.get_object('org.PulseAudio1', '/org/pulseaudio/server_lookup1')
address = server_lookup.Get('org.PulseAudio.ServerLookup1', 'Address', dbus_interface='org.freedesktop.DBus.Properties')
return dbus.connection.Connection(address)
except Exception as e:
sys.stderr.write('There was an error connecting to pulseaudio, '
'please make sure you have the pulseaudio dbus '
'module loaded, exiting...\n')
sys.exit(-1)
#TODO: signals: sink Filter changed, sink reconfigured (window size) (sink iface)
#TODO: manager signals: new sink, removed sink, new profile, removed profile
#TODO: add support for changing of window_size 1000-fft_size (adv option)
#TODO: reconnect support loop 1 second trying to reconnect
#TODO: just resample the filters for profiles when loading to different sizes
#TODO: add preamp
prop_iface='org.freedesktop.DBus.Properties'
eq_iface='org.PulseAudio.Ext.Equalizing1.Equalizer'
device_iface='org.PulseAudio.Core1.Device'
class QPaeq(QtWidgets.QWidget):
manager_path='/org/pulseaudio/equalizing1'
manager_iface='org.PulseAudio.Ext.Equalizing1.Manager'
core_iface='org.PulseAudio.Core1'
core_path='/org/pulseaudio/core1'
module_name='module-equalizer-sink'
def __init__(self):
QtWidgets.QWidget.__init__(self)
self.setWindowTitle('qpaeq')
self.slider_widget=None
self.sink_name=None
self.filter_state=None
self.create_layout()
self.set_connection()
self.connect_to_sink(self.sinks[0])
self.set_callbacks()
self.setMinimumSize(self.sizeHint())
def create_layout(self):
self.main_layout=QtWidgets.QVBoxLayout()
self.setLayout(self.main_layout)
toprow_layout=QtWidgets.QHBoxLayout()
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
#sizePolicy.setHeightForWidth(self.profile_box.sizePolicy().hasHeightForWidth())
toprow_layout.addWidget(QtWidgets.QLabel('Sink'))
self.sink_box = QtWidgets.QComboBox()
self.sink_box.setSizePolicy(sizePolicy)
self.sink_box.setDuplicatesEnabled(False)
self.sink_box.setInsertPolicy(QtWidgets.QComboBox.InsertAlphabetically)
#self.sink_box.setSizeAdjustPolicy(QtWidgets.QComboBox.AdjustToContents)
toprow_layout.addWidget(self.sink_box)
toprow_layout.addWidget(QtWidgets.QLabel('Channel'))
self.channel_box = QtWidgets.QComboBox()
self.channel_box.setSizePolicy(sizePolicy)
toprow_layout.addWidget(self.channel_box)
toprow_layout.addWidget(QtWidgets.QLabel('Preset'))
self.profile_box = QtWidgets.QComboBox()
self.profile_box.setSizePolicy(sizePolicy)
self.profile_box.setInsertPolicy(QtWidgets.QComboBox.InsertAlphabetically)
#self.profile_box.setSizeAdjustPolicy(QtWidgets.QComboBox.AdjustToContents)
toprow_layout.addWidget(self.profile_box)
large_icon_size=self.style().pixelMetric(QtWidgets.QStyle.PM_LargeIconSize)
large_icon_size=QtCore.QSize(large_icon_size,large_icon_size)
save_profile=QtWidgets.QToolButton()
save_profile.setIcon(self.style().standardIcon(QtWidgets.QStyle.SP_DriveFDIcon))
save_profile.setIconSize(large_icon_size)
save_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly)
save_profile.clicked.connect(self.save_profile)
remove_profile=QtWidgets.QToolButton()
remove_profile.setIcon(self.style().standardIcon(QtWidgets.QStyle.SP_TrashIcon))
remove_profile.setIconSize(large_icon_size)
remove_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly)
remove_profile.clicked.connect(self.remove_profile)
toprow_layout.addWidget(save_profile)
toprow_layout.addWidget(remove_profile)
reset_button = QtWidgets.QPushButton('Reset')
reset_button.clicked.connect(self.reset)
toprow_layout.addStretch()
toprow_layout.addWidget(reset_button)
self.layout().addLayout(toprow_layout)
self.profile_box.activated.connect(self.load_profile)
self.channel_box.activated.connect(self.select_channel)
def connect_to_sink(self,name):
#TODO: clear slots for profile buttons
#flush any pending saves for other sinks
if self.filter_state is not None:
self.filter_state.flush_state()
sink=self.connection.get_object(object_path=name)
self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
self.sink=dbus.Interface(sink,dbus_interface=eq_iface)
self.filter_state=FilterState(sink)
#sample_rate,filter_rate,channels,channel)
self.channel_box.clear()
self.channel_box.addItem('All',self.filter_state.channels)
for i in range(self.filter_state.channels):
self.channel_box.addItem('%d' %(i+1,),i)
self.setMinimumSize(self.sizeHint())
self.set_slider_widget(SliderArray(self.filter_state))
self.sink_name=name
#set the signal listener for this sink
core=self._get_core()
#temporary hack until signal filtering works properly
core.ListenForSignal('',[dbus.ObjectPath(self.sink_name),dbus.ObjectPath(self.manager_path)])
#for x in ['FilterChanged']:
# core.ListenForSignal("%s.%s" %(self.eq_iface,x),[dbus.ObjectPath(self.sink_name)])
#core.ListenForSignal(self.eq_iface,[dbus.ObjectPath(self.sink_name)])
self.sink.connect_to_signal('FilterChanged',self.read_filter)
def set_slider_widget(self,widget):
layout=self.layout()
if self.slider_widget is not None:
i=layout.indexOf(self.slider_widget)
layout.removeWidget(self.slider_widget)
self.slider_widget.deleteLater()
layout.insertWidget(i,self.slider_widget)
else:
layout.addWidget(widget)
self.slider_widget=widget
self.read_filter()
def _get_core(self):
core_obj=self.connection.get_object(object_path=self.core_path)
core=dbus.Interface(core_obj,dbus_interface=self.core_iface)
return core
def sink_added(self,sink):
#TODO: preserve selected sink
self.update_sinks()
def sink_removed(self,sink):
#TODO: preserve selected sink, try connecting to backup otherwise
if sink==self.sink_name:
#connect to new sink?
pass
self.update_sinks()
def save_profile(self):
#popup dialog box for name
current=self.profile_box.currentIndex()
profile,ok=QtWidgets.QInputDialog.getItem(self,'Preset Name','Preset',self.profiles,current)
if not ok or profile=='':
return
if profile in self.profiles:
mbox=QtWidgets.QMessageBox(self)
mbox.setText('%s preset already exists'%(profile,))
mbox.setInformativeText('Do you want to save over it?')
mbox.setStandardButtons(mbox.Save|mbox.Discard|mbox.Cancel)
mbox.setDefaultButton(mbox.Save)
ret=mbox.exec_()
if ret!=mbox.Save:
return
self.sink.SaveProfile(self.filter_state.channel,dbus.String(profile))
if self.filter_state.channel==self.filter_state.channels:
for x in range(1,self.filter_state.channels):
self.sink.LoadProfile(x,dbus.String(profile))
def remove_profile(self):
#find active profile name, remove it
profile=self.profile_box.currentText()
manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface)
manager.RemoveProfile(dbus.String(profile))
def load_profile(self,x):
profile=self.profile_box.itemText(x)
self.filter_state.load_profile(profile)
def select_channel(self,x):
self.filter_state.channel = self.channel_box.itemData(x)
self._set_profile_name()
self.filter_state.readback()
#TODO: add back in preamp!
#print(frequencies)
#main_layout.addLayout(self.create_slider(partial(self.update_coefficient,0),
# 'Preamp')[0]
#)
def set_connection(self):
self.connection=connect()
self.manager_obj=self.connection.get_object(object_path=self.manager_path)
manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface)
try:
self.sinks=manager_props.Get(self.manager_iface,'EqualizedSinks')
except dbus.exceptions.DBusException:
# probably module not yet loaded, try to load it:
try:
core=self.connection.get_object(object_path=self.core_path)
core.LoadModule(self.module_name,{},dbus_interface=self.core_iface)
# yup, we don't need to re-create manager_obj and manager_props,
# these are late-bound
self.sinks=manager_props.Get(self.manager_iface,'EqualizedSinks')
except dbus.exceptions.DBusException:
sys.stderr.write('It seems that running pulseaudio does not support '
'equalizer features and loading %s module failed.\n'
'Exiting...\n' % self.module_name)
sys.exit(-1)
def set_callbacks(self):
manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface)
manager.connect_to_signal('ProfilesChanged',self.update_profiles)
manager.connect_to_signal('SinkAdded',self.sink_added)
manager.connect_to_signal('SinkRemoved',self.sink_removed)
#self._get_core().ListenForSignal(self.manager_iface,[])
#self._get_core().ListenForSignal(self.manager_iface,[dbus.ObjectPath(self.manager_path)])
#core=self._get_core()
#for x in ['ProfilesChanged','SinkAdded','SinkRemoved']:
# core.ListenForSignal("%s.%s" %(self.manager_iface,x),[dbus.ObjectPath(self.manager_path)])
self.update_profiles()
self.update_sinks()
def update_profiles(self):
#print('update profiles called!')
manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface)
self.profiles=manager_props.Get(self.manager_iface,'Profiles')
self.profile_box.blockSignals(True)
self.profile_box.clear()
self.profile_box.addItems(self.profiles)
self.profile_box.blockSignals(False)
self._set_profile_name()
def update_sinks(self):
self.sink_box.blockSignals(True)
self.sink_box.clear()
for x in self.sinks:
sink=self.connection.get_object(object_path=x)
sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
simple_name=sink_props.Get(device_iface,'Name')
self.sink_box.addItem(simple_name,x)
self.sink_box.blockSignals(False)
self.sink_box.setMinimumSize(self.sink_box.sizeHint())
def read_filter(self):
#print(self.filter_frequencies)
self.filter_state.readback()
def reset(self):
coefs=dbus.Array([1/math.sqrt(2.0)]*(self.filter_state.filter_rate//2+1))
preamp=1.0
self.filter_state.set_filter(preamp,coefs)
def _set_profile_name(self):
self.profile_box.blockSignals(True)
profile_name=self.sink.BaseProfile(self.filter_state.channel)
if profile_name is not None:
i=self.profile_box.findText(profile_name)
if i>=0:
self.profile_box.setCurrentIndex(i)
self.profile_box.blockSignals(False)
class SliderArray(QtWidgets.QWidget):
def __init__(self,filter_state,parent=None):
super(SliderArray,self).__init__(parent)
#self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;')
#self.setStyleSheet('font-family: monospace;'+outline%('blue'))
self.filter_state=filter_state
self.setLayout(QtWidgets.QHBoxLayout())
self.sub_array=None
self.set_sub_array(SliderArraySub(self.filter_state))
self.inhibit_resize=0
def set_sub_array(self,widget):
if self.sub_array is not None:
self.layout().removeWidget(self.sub_array)
self.sub_array.disconnect_signals()
self.sub_array.deleteLater()
self.sub_array=widget
self.layout().addWidget(self.sub_array)
self.sub_array.connect_signals()
self.filter_state.readback()
def resizeEvent(self,event):
super(SliderArray,self).resizeEvent(event)
if self.inhibit_resize==0:
self.inhibit_resize+=1
#self.add_sliders_to_fit()
t=QtCore.QTimer(self)
t.setSingleShot(True)
t.setInterval(0)
t.timeout.connect(partial(self.add_sliders_to_fit,event))
t.start()
def add_sliders_to_fit(self,event):
if event.oldSize().width()>0 and event.size().width()>0:
i=len(self.filter_state.frequencies)*int(round(float(event.size().width())/event.oldSize().width()))
else:
i=len(self.filter_state.frequencies)
t_w=self.size().width()
def evaluate(filter_state, target, variable):
base_freqs=self.filter_state.freq_proper(self.filter_state.DEFAULT_FREQUENCIES)
filter_state._set_frequency_values(subdivide(base_freqs,variable))
new_widget=SliderArraySub(filter_state)
w=new_widget.sizeHint().width()
return w-target
def searcher(initial,evaluator):
i=initial
def d(e): return 1 if e>=0 else -1
error=evaluator(i)
old_direction=d(error)
i-=old_direction
while True:
error=evaluator(i)
direction=d(error)
if direction!=old_direction:
k=i-1
#while direction<0 and error!=0:
# k-=1
# error=evaluator(i)
# direction=d(error)
return k, evaluator(k)
i-=direction
old_direction=direction
searcher(i,partial(evaluate,self.filter_state,t_w))
self.set_sub_array(SliderArraySub(self.filter_state))
self.inhibit_resize-=1
class SliderArraySub(QtWidgets.QWidget):
def __init__(self,filter_state,parent=None):
super(SliderArraySub,self).__init__(parent)
self.filter_state=filter_state
self.setLayout(QtWidgets.QGridLayout())
self.slider=[None]*len(self.filter_state.frequencies)
self.label=[None]*len(self.slider)
#self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;')
#self.setStyleSheet('font-family: monospace;'+outline%('blue'))
qt=QtCore.Qt
#self.layout().setHorizontalSpacing(1)
def add_slider(slider,label, c):
self.layout().addWidget(slider,0,c,qt.AlignHCenter)
self.layout().addWidget(label,1,c,qt.AlignHCenter)
self.layout().setColumnMinimumWidth(c,max(label.sizeHint().width(),slider.sizeHint().width()))
def create_slider(slider_label):
slider=QtWidgets.QSlider(QtCore.Qt.Vertical,self)
label=SliderLabel(slider_label,filter_state,self)
slider.setRange(-1000,2000)
slider.setSingleStep(1)
return (slider,label)
self.preamp_slider,self.preamp_label=create_slider('Preamp')
add_slider(self.preamp_slider,self.preamp_label,0)
for i,hz in enumerate(self.filter_state.frequencies):
slider,label=create_slider(self.hz2label(hz))
self.slider[i]=slider
#slider.setStyleSheet('font-family: monospace;'+outline%('red',))
self.label[i]=label
c=i+1
add_slider(slider,label,i+1)
def hz2label(self, hz):
if hz==0:
label_text='DC'
elif hz==self.filter_state.sample_rate//2:
label_text='Coda'
else:
label_text=hz2str(hz)
return label_text
def connect_signals(self):
def connect(writer,reader,slider,label):
slider.valueChanged.connect(writer)
self.filter_state.readFilter.connect(reader)
label_cb=partial(slider.setValue,0)
label.clicked.connect(label_cb)
return label_cb
self.preamp_writer_cb=self.write_preamp
self.preamp_reader_cb=self.sync_preamp
self.preamp_label_cb=connect(self.preamp_writer_cb,
self.preamp_reader_cb,
self.preamp_slider,
self.preamp_label)
self.writer_callbacks=[None]*len(self.slider)
self.reader_callbacks=[None]*len(self.slider)
self.label_callbacks=[None]*len(self.label)
for i in range(len(self.slider)):
self.writer_callbacks[i]=partial(self.write_coefficient,i)
self.reader_callbacks[i]=partial(self.sync_coefficient,i)
self.label_callbacks[i]=connect(self.writer_callbacks[i],
self.reader_callbacks[i],
self.slider[i],
self.label[i])
def disconnect_signals(self):
def disconnect(writer,reader,label_cb,slider,label):
slider.valueChanged.disconnect(writer)
self.filter_state.readFilter.disconnect(reader)
label.clicked.disconnect(label_cb)
disconnect(self.preamp_writer_cb, self.preamp_reader_cb,
self.preamp_label_cb, self.preamp_slider, self.preamp_label)
for i in range(len(self.slider)):
disconnect(self.writer_callbacks[i],
self.reader_callbacks[i],
self.label_callbacks[i],
self.slider[i],
self.label[i])
def write_preamp(self, v):
self.filter_state.preamp=self.slider2coef(v)
self.filter_state.seed()
def sync_preamp(self):
self.preamp_slider.blockSignals(True)
self.preamp_slider.setValue(self.coef2slider(self.filter_state.preamp))
self.preamp_slider.blockSignals(False)
def write_coefficient(self,i,v):
self.filter_state.coefficients[i]=self.slider2coef(v)/math.sqrt(2.0)
self.filter_state.seed()
def sync_coefficient(self,i):
slider=self.slider[i]
slider.blockSignals(True)
slider.setValue(self.coef2slider(math.sqrt(2.0)*self.filter_state.coefficients[i]))
slider.blockSignals(False)
@staticmethod
def slider2coef(x):
return (1.0+(x/1000.0))
@staticmethod
def coef2slider(x):
return int((x-1.0)*1000)
outline='border-width: 1px; border-style: solid; border-color: %s;'
class SliderLabel(QtWidgets.QLabel):
clicked=QtCore.pyqtSignal()
def __init__(self,label_text,filter_state,parent=None):
super(SliderLabel,self).__init__(parent)
self.setStyleSheet('font-family: monospace;')
self.setText(label_text)
self.setMinimumSize(self.sizeHint())
def mouseDoubleClickEvent(self, event):
self.clicked.emit()
super(SliderLabel,self).mouseDoubleClickEvent(event)
#until there are server side state savings, do it in the client but try and avoid
#simulaneous broadcasting situations
class FilterState(QtCore.QObject):
#DEFAULT_FREQUENCIES=map(float,[25,50,75,100,150,200,300,400,500,800,1e3,1.5e3,3e3,5e3,7e3,10e3,15e3,20e3])
DEFAULT_FREQUENCIES=[31.75,63.5,125,250,500,1e3,2e3,4e3,8e3,16e3]
readFilter=QtCore.pyqtSignal()
def __init__(self,sink):
super(FilterState,self).__init__()
self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
self.sink=dbus.Interface(sink,dbus_interface=eq_iface)
self.sample_rate=self.get_eq_attr('SampleRate')
self.filter_rate=self.get_eq_attr('FilterSampleRate')
self.channels=self.get_eq_attr('NChannels')
self.channel=self.channels
self.set_frequency_values(self.DEFAULT_FREQUENCIES)
self.sync_timer=QtCore.QTimer()
self.sync_timer.setSingleShot(True)
self.sync_timer.timeout.connect(self.save_state)
def get_eq_attr(self,attr):
return self.sink_props.Get(eq_iface,attr)
def freq_proper(self,xs):
return [0]+xs+[self.sample_rate//2]
def _set_frequency_values(self,freqs):
self.frequencies=freqs
#print('base',self.frequencies)
self.filter_frequencies=[int(round(x)) for x in self.translate_rates(self.filter_rate,self.sample_rate,
self.frequencies)]
self.coefficients=[0.0]*len(self.frequencies)
self.preamp=1.0
def set_frequency_values(self,freqs):
self._set_frequency_values(self.freq_proper(freqs))
@staticmethod
def translate_rates(dst,src,rates):
return list([x*dst/src for x in rates])
def seed(self):
self.sink.SeedFilter(self.channel,self.filter_frequencies,self.coefficients,self.preamp)
self.sync_timer.start(SYNC_TIMEOUT)
def readback(self):
coefs,preamp=self.sink.FilterAtPoints(self.channel,self.filter_frequencies)
self.coefficients=coefs
self.preamp=preamp
self.readFilter.emit()
def set_filter(self,preamp,coefs):
self.sink.SetFilter(self.channel,dbus.Array(coefs),self.preamp)
self.sync_timer.start(SYNC_TIMEOUT)
def save_state(self):
print('saving state')
self.sink.SaveState()
def load_profile(self,profile):
self.sink.LoadProfile(self.channel,dbus.String(profile))
self.sync_timer.start(SYNC_TIMEOUT)
def flush_state(self):
if self.sync_timer.isActive():
self.sync_timer.stop()
self.save_state()
def safe_log(k,b):
i=0
while k//b!=0:
i+=1
k=k//b
return i
def hz2str(hz):
p=safe_log(hz,10.0)
if p<3:
return '%dHz' %(hz,)
elif hz%1000==0:
return '%dKHz' %(hz/(10.0**3),)
else:
return '%.1fKHz' %(hz/(10.0**3),)
def subdivide(xs, t_points):
while len(xs)