summaryrefslogtreecommitdiff
path: root/polybar/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'polybar/scripts')
-rwxr-xr-xpolybar/scripts/micfn34
-rwxr-xr-xpolybar/scripts/mpris_tail.py535
2 files changed, 569 insertions, 0 deletions
diff --git a/polybar/scripts/micfn b/polybar/scripts/micfn
new file mode 100755
index 0000000..bebe7d6
--- /dev/null
+++ b/polybar/scripts/micfn
@@ -0,0 +1,34 @@
+#!/bin/sh
+
+function helpfn() {
+ echo "micfn, a quick mic toggle script"
+}
+
+active_port=$(pactl list | sed -n '/^Source/,/^$/p' | grep 'Active Port' | cut -d ' ' -f3)
+
+if amixer get Capture | grep "\[off\]" > /dev/null
+then
+ status="OFF"
+else
+ status="ON"
+fi
+
+function toggle() {
+ amixer set Capture toggle > /dev/null
+ if amixer get Capture | grep "\[off\]" > /dev/null
+ then
+ notify-send -t 3000 "$active_port" "Microphone OFF"
+ else
+ notify-send -t 3000 "$active_port" "Microphone ON"
+ fi
+}
+
+if [ "$1" == "-h" -o "$1" == "--help" ]; then
+ helpfn
+ exit 0
+elif [ "$1" == "-t" -o "$1" == "--toggle" ]; then
+ toggle
+ exit 0
+elif [ $# -eq 0 ]; then
+ echo $status
+fi
diff --git a/polybar/scripts/mpris_tail.py b/polybar/scripts/mpris_tail.py
new file mode 100755
index 0000000..9cd4900
--- /dev/null
+++ b/polybar/scripts/mpris_tail.py
@@ -0,0 +1,535 @@
+#!/usr/bin/env python3
+
+import sys
+import dbus
+import os
+from operator import itemgetter
+import argparse
+import re
+from urllib.parse import unquote
+import time
+from dbus.mainloop.glib import DBusGMainLoop
+from gi.repository import GLib
+DBusGMainLoop(set_as_default=True)
+
+
+FORMAT_STRING = '{icon} {artist} - {title}'
+FORMAT_REGEX = re.compile(r'(\{:(?P<tag>.*?)(:(?P<format>[wt])(?P<formatlen>\d+))?:(?P<text>.*?):\})', re.I)
+FORMAT_TAG_REGEX = re.compile(r'(?P<format>[wt])(?P<formatlen>\d+)')
+SAFE_TAG_REGEX = re.compile(r'[{}]')
+
+class PlayerManager:
+ def __init__(self, blacklist = [], connect = True):
+ self.blacklist = blacklist
+ self._connect = connect
+ self._session_bus = dbus.SessionBus()
+ self.players = {}
+
+ self.print_queue = []
+ self.connected = False
+ self.player_states = {}
+
+ self.refreshPlayerList()
+
+ if self._connect:
+ self.connect()
+ loop = GLib.MainLoop()
+ try:
+ loop.run()
+ except KeyboardInterrupt:
+ print("interrupt received, stopping…")
+
+ def connect(self):
+ self._session_bus.add_signal_receiver(self.onOwnerChangedName, 'NameOwnerChanged')
+ self._session_bus.add_signal_receiver(self.onChangedProperties, 'PropertiesChanged',
+ path = '/org/mpris/MediaPlayer2',
+ sender_keyword='sender')
+
+ def onChangedProperties(self, interface, properties, signature, sender = None):
+ if sender in self.players:
+ player = self.players[sender]
+ # If we know this player, but haven't been able to set up a signal handler
+ if 'properties_changed' not in player._signals:
+ # Then trigger the signal handler manually
+ player.onPropertiesChanged(interface, properties, signature)
+ else:
+ # If we don't know this player, get its name and add it
+ bus_name = self.getBusNameFromOwner(sender)
+ if bus_name is None:
+ return
+ self.addPlayer(bus_name, sender)
+ player = self.players[sender]
+ player.onPropertiesChanged(interface, properties, signature)
+
+ def onOwnerChangedName(self, bus_name, old_owner, new_owner):
+ if self.busNameIsAPlayer(bus_name):
+ if new_owner and not old_owner:
+ self.addPlayer(bus_name, new_owner)
+ elif old_owner and not new_owner:
+ self.removePlayer(old_owner)
+ else:
+ self.changePlayerOwner(bus_name, old_owner, new_owner)
+
+ def getBusNameFromOwner(self, owner):
+ player_bus_names = [ bus_name for bus_name in self._session_bus.list_names() if self.busNameIsAPlayer(bus_name) ]
+ for player_bus_name in player_bus_names:
+ player_bus_owner = self._session_bus.get_name_owner(player_bus_name)
+ if owner == player_bus_owner:
+ return player_bus_name
+
+ def busNameIsAPlayer(self, bus_name):
+ return bus_name.startswith('org.mpris.MediaPlayer2') and bus_name.split('.')[3] not in self.blacklist
+
+ def refreshPlayerList(self):
+ player_bus_names = [ bus_name for bus_name in self._session_bus.list_names() if self.busNameIsAPlayer(bus_name) ]
+ for player_bus_name in player_bus_names:
+ self.addPlayer(player_bus_name)
+ if self.connected != True:
+ self.connected = True
+ self.printQueue()
+
+ def addPlayer(self, bus_name, owner = None):
+ player = Player(self._session_bus, bus_name, owner = owner, connect = self._connect, _print = self.print)
+ self.players[player.owner] = player
+
+ def removePlayer(self, owner):
+ if owner in self.players:
+ self.players[owner].disconnect()
+ del self.players[owner]
+ # If there are no more players, clear the output
+ if len(self.players) == 0:
+ _printFlush(ICON_NONE)
+ # Else, print the output of the next active player
+ else:
+ players = self.getSortedPlayerOwnerList()
+ if len(players) > 0:
+ self.players[players[0]].printStatus()
+
+ def changePlayerOwner(self, bus_name, old_owner, new_owner):
+ player = Player(self._session_bus, bus_name, owner = new_owner, connect = self._connect, _print = self.print)
+ self.players[new_owner] = player
+ del self.players[old_owner]
+
+ # Get a list of player owners sorted by current status and age
+ def getSortedPlayerOwnerList(self):
+ players = [
+ {
+ 'number': int(owner.split('.')[-1]),
+ 'status': 2 if player.status == 'playing' else 1 if player.status == 'paused' else 0,
+ 'owner': owner
+ }
+ for owner, player in self.players.items()
+ ]
+ return [ info['owner'] for info in reversed(sorted(players, key=itemgetter('status', 'number'))) ]
+
+ # Get latest player that's currently playing
+ def getCurrentPlayer(self):
+ playing_players = [
+ player_owner for player_owner in self.getSortedPlayerOwnerList()
+ if
+ self.players[player_owner].status == 'playing' or
+ self.players[player_owner].status == 'paused'
+ ]
+ return self.players[playing_players[0]] if playing_players else None
+
+ def print(self, status, player):
+ self.player_states[player.bus_name] = status
+
+ if self.connected:
+ current_player = self.getCurrentPlayer()
+ if current_player != None:
+ _printFlush(self.player_states[current_player.bus_name])
+ else:
+ _printFlush(ICON_STOPPED)
+ else:
+ self.print_queue.append([status, player])
+
+ def printQueue(self):
+ for args in self.print_queue:
+ self.print(args[0], args[1])
+ self.print_queue.clear()
+
+
+class Player:
+ def __init__(self, session_bus, bus_name, owner = None, connect = True, _print = None):
+ self._session_bus = session_bus
+ self.bus_name = bus_name
+ self._disconnecting = False
+ self.__print = _print
+
+ self.metadata = {
+ 'artist' : '',
+ 'album' : '',
+ 'title' : '',
+ 'track' : 0
+ }
+
+ self._rate = 1.
+ self._positionAtLastUpdate = 0.
+ self._timeAtLastUpdate = time.time()
+ self._positionTimerRunning = False
+
+ self._metadata = None
+ self.status = 'stopped'
+ self.icon = ICON_NONE
+ self.icon_reversed = ICON_PLAYING
+ if owner is not None:
+ self.owner = owner
+ else:
+ self.owner = self._session_bus.get_name_owner(bus_name)
+ self._obj = self._session_bus.get_object(self.bus_name, '/org/mpris/MediaPlayer2')
+ self._properties_interface = dbus.Interface(self._obj, dbus_interface='org.freedesktop.DBus.Properties')
+ self._introspect_interface = dbus.Interface(self._obj, dbus_interface='org.freedesktop.DBus.Introspectable')
+ self._media_interface = dbus.Interface(self._obj, dbus_interface='org.mpris.MediaPlayer2')
+ self._player_interface = dbus.Interface(self._obj, dbus_interface='org.mpris.MediaPlayer2.Player')
+ self._introspect = self._introspect_interface.get_dbus_method('Introspect', dbus_interface=None)
+ self._getProperty = self._properties_interface.get_dbus_method('Get', dbus_interface=None)
+ self._playerPlay = self._player_interface.get_dbus_method('Play', dbus_interface=None)
+ self._playerPause = self._player_interface.get_dbus_method('Pause', dbus_interface=None)
+ self._playerPlayPause = self._player_interface.get_dbus_method('PlayPause', dbus_interface=None)
+ self._playerStop = self._player_interface.get_dbus_method('Stop', dbus_interface=None)
+ self._playerPrevious = self._player_interface.get_dbus_method('Previous', dbus_interface=None)
+ self._playerNext = self._player_interface.get_dbus_method('Next', dbus_interface=None)
+ self._playerRaise = self._media_interface.get_dbus_method('Raise', dbus_interface=None)
+ self._signals = {}
+
+ self.refreshPosition()
+ self.refreshStatus()
+ self.refreshMetadata()
+
+ if connect:
+ self.printStatus()
+ self.connect()
+
+ def play(self):
+ self._playerPlay()
+ def pause(self):
+ self._playerPause()
+ def playpause(self):
+ self._playerPlayPause()
+ def stop(self):
+ self._playerStop()
+ def previous(self):
+ self._playerPrevious()
+ def next(self):
+ self._playerNext()
+ def raisePlayer(self):
+ self._playerRaise()
+
+ def connect(self):
+ if self._disconnecting is not True:
+ introspect_xml = self._introspect(self.bus_name, '/')
+ if 'TrackMetadataChanged' in introspect_xml:
+ self._signals['track_metadata_changed'] = self._session_bus.add_signal_receiver(self.onMetadataChanged, 'TrackMetadataChanged', self.bus_name)
+ self._signals['seeked'] = self._player_interface.connect_to_signal('Seeked', self.onSeeked)
+ self._signals['properties_changed'] = self._properties_interface.connect_to_signal('PropertiesChanged', self.onPropertiesChanged)
+
+ def disconnect(self):
+ self._disconnecting = True
+ for signal_name, signal_handler in list(self._signals.items()):
+ signal_handler.remove()
+ del self._signals[signal_name]
+
+ def refreshStatus(self):
+ # Some clients (VLC) will momentarily create a new player before removing it again
+ # so we can't be sure the interface still exists
+ try:
+ self.status = str(self._getProperty('org.mpris.MediaPlayer2.Player', 'PlaybackStatus')).lower()
+ self.updateIcon()
+ self.checkPositionTimer()
+ except dbus.exceptions.DBusException:
+ self.disconnect()
+
+ def refreshMetadata(self):
+ # Some clients (VLC) will momentarily create a new player before removing it again
+ # so we can't be sure the interface still exists
+ try:
+ self._metadata = self._getProperty('org.mpris.MediaPlayer2.Player', 'Metadata')
+ self._parseMetadata()
+ except dbus.exceptions.DBusException:
+ self.disconnect()
+
+ def updateIcon(self):
+ self.icon = (
+ ICON_PLAYING if self.status == 'playing' else
+ ICON_PAUSED if self.status == 'paused' else
+ ICON_STOPPED if self.status == 'stopped' else
+ ICON_NONE
+ )
+ self.icon_reversed = (
+ ICON_PAUSED if self.status == 'playing' else
+ ICON_PLAYING
+ )
+
+ def _print(self, status):
+ self.__print(status, self)
+
+ def _parseMetadata(self):
+ if self._metadata != None:
+ # Obtain properties from _metadata
+ _artist = _getProperty(self._metadata, 'xesam:artist', [''])
+ _album = _getProperty(self._metadata, 'xesam:album', '')
+ _title = _getProperty(self._metadata, 'xesam:title', '')
+ _track = _getProperty(self._metadata, 'xesam:trackNumber', '')
+ _genre = _getProperty(self._metadata, 'xesam:genre', [''])
+ _disc = _getProperty(self._metadata, 'xesam:discNumber', '')
+ _length = _getProperty(self._metadata, 'xesam:length', 0) or _getProperty(self._metadata, 'mpris:length', 0)
+ _length_int = _length if type(_length) is int else int(float(_length))
+ _date = _getProperty(self._metadata, 'xesam:contentCreated', '')
+ _year = _date[0:4] if len(_date) else ''
+ _url = _getProperty(self._metadata, 'xesam:url', '')
+ _cover = _getProperty(self._metadata, 'xesam:artUrl', '') or _getProperty(self._metadata, 'mpris:artUrl', '')
+ _duration = _getDuration(_length_int)
+ # Update metadata
+ self.metadata['artist'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _metadataGetFirstItem(_artist))
+ self.metadata['album'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _metadataGetFirstItem(_album))
+ self.metadata['title'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _metadataGetFirstItem(_title))
+ self.metadata['track'] = _track
+ self.metadata['genre'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _metadataGetFirstItem(_genre))
+ self.metadata['disc'] = _disc
+ self.metadata['date'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _date)
+ self.metadata['year'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _year)
+ self.metadata['url'] = _url
+ self.metadata['filename'] = os.path.basename(_url)
+ self.metadata['length'] = _length_int
+ self.metadata['cover'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _metadataGetFirstItem(_cover))
+ self.metadata['duration'] = _duration
+
+ def onMetadataChanged(self, track_id, metadata):
+ self.refreshMetadata()
+ self.printStatus()
+
+ def onPropertiesChanged(self, interface, properties, signature):
+ updated = False
+ if dbus.String('Metadata') in properties:
+ _metadata = properties[dbus.String('Metadata')]
+ if _metadata != self._metadata:
+ self._metadata = _metadata
+ self._parseMetadata()
+ updated = True
+ if dbus.String('PlaybackStatus') in properties:
+ status = str(properties[dbus.String('PlaybackStatus')]).lower()
+ if status != self.status:
+ self.status = status
+ self.checkPositionTimer()
+ self.updateIcon()
+ updated = True
+ if dbus.String('Rate') in properties and dbus.String('PlaybackStatus') not in properties:
+ self.refreshStatus()
+ if NEEDS_POSITION and dbus.String('Rate') in properties:
+ rate = properties[dbus.String('Rate')]
+ if rate != self._rate:
+ self._rate = rate
+ self.refreshPosition()
+
+ if updated:
+ self.refreshPosition()
+ self.printStatus()
+
+ def checkPositionTimer(self):
+ if NEEDS_POSITION and self.status == 'playing' and not self._positionTimerRunning:
+ self._positionTimerRunning = True
+ GLib.timeout_add_seconds(1, self._positionTimer)
+
+ def onSeeked(self, position):
+ self.refreshPosition()
+ self.printStatus()
+
+ def _positionTimer(self):
+ self.printStatus()
+ self._positionTimerRunning = self.status == 'playing'
+ return self._positionTimerRunning
+
+ def refreshPosition(self):
+ try:
+ time_us = self._getProperty('org.mpris.MediaPlayer2.Player', 'Position')
+ except dbus.exceptions.DBusException:
+ time_us = 0
+
+ self._timeAtLastUpdate = time.time()
+ self._positionAtLastUpdate = time_us / 1000000
+
+ def _getPosition(self):
+ if self.status == 'playing':
+ return self._positionAtLastUpdate + self._rate * (time.time() - self._timeAtLastUpdate)
+ else:
+ return self._positionAtLastUpdate
+
+ def _statusReplace(self, match, metadata):
+ tag = match.group('tag')
+ format = match.group('format')
+ formatlen = match.group('formatlen')
+ text = match.group('text')
+ tag_found = False
+ reversed_tag = False
+
+ if tag.startswith('-'):
+ tag = tag[1:]
+ reversed_tag = True
+
+ if format is None:
+ tag_is_format_match = re.match(FORMAT_TAG_REGEX, tag)
+ if tag_is_format_match:
+ format = tag_is_format_match.group('format')
+ formatlen = tag_is_format_match.group('formatlen')
+ tag_found = True
+ if format is not None:
+ text = text.format_map(CleanSafeDict(**metadata))
+ if format == 'w':
+ formatlen = int(formatlen)
+ text = text[:formatlen]
+ elif format == 't':
+ formatlen = int(formatlen)
+ if len(text) > formatlen:
+ text = text[:max(formatlen - len(TRUNCATE_STRING), 0)] + TRUNCATE_STRING
+ if tag_found is False and tag in metadata and len(metadata[tag]):
+ tag_found = True
+
+ if reversed_tag:
+ tag_found = not tag_found
+
+ if tag_found:
+ return text
+ else:
+ return ''
+
+ def printStatus(self):
+ if self.status in [ 'playing', 'paused' ]:
+ metadata = { **self.metadata, 'icon': self.icon, 'icon-reversed': self.icon_reversed }
+ if NEEDS_POSITION:
+ metadata['position'] = time.strftime("%M:%S", time.gmtime(self._getPosition()))
+ # replace metadata tags in text
+ text = re.sub(FORMAT_REGEX, lambda match: self._statusReplace(match, metadata), FORMAT_STRING)
+ # restore polybar tag formatting and replace any remaining metadata tags after that
+ try:
+ text = re.sub(r'􏿿p􏿿(.*?)􏿿p􏿿(.*?)􏿿p􏿿(.*?)􏿿p􏿿', r'%{\1}\2%{\3}', text.format_map(CleanSafeDict(**metadata)))
+ except:
+ print("Invalid format string")
+ self._print(text)
+ else:
+ self._print(ICON_STOPPED)
+
+
+def _dbusValueToPython(value):
+ if isinstance(value, dbus.Dictionary):
+ return {_dbusValueToPython(key): _dbusValueToPython(value) for key, value in value.items()}
+ elif isinstance(value, dbus.Array):
+ return [ _dbusValueToPython(item) for item in value ]
+ elif isinstance(value, dbus.Boolean):
+ return int(value) == 1
+ elif (
+ isinstance(value, dbus.Byte) or
+ isinstance(value, dbus.Int16) or
+ isinstance(value, dbus.UInt16) or
+ isinstance(value, dbus.Int32) or
+ isinstance(value, dbus.UInt32) or
+ isinstance(value, dbus.Int64) or
+ isinstance(value, dbus.UInt64)
+ ):
+ return int(value)
+ elif isinstance(value, dbus.Double):
+ return float(value)
+ elif (
+ isinstance(value, dbus.ObjectPath) or
+ isinstance(value, dbus.Signature) or
+ isinstance(value, dbus.String)
+ ):
+ return unquote(str(value))
+
+def _getProperty(properties, property, default = None):
+ value = default
+ if not isinstance(property, dbus.String):
+ property = dbus.String(property)
+ if property in properties:
+ value = properties[property]
+ return _dbusValueToPython(value)
+ else:
+ return value
+
+def _getDuration(t: int):
+ seconds = t / 1000000
+ return time.strftime("%M:%S", time.gmtime(seconds))
+
+def _metadataGetFirstItem(_value):
+ if type(_value) is list:
+ # Returns the string representation of the first item on _value if it has at least one item.
+ # Returns an empty string if _value is empty.
+ return str(_value[0]) if len(_value) else ''
+ else:
+ # If _value isn't a list just return the string representation of _value.
+ return str(_value)
+
+class CleanSafeDict(dict):
+ def __missing__(self, key):
+ return '{{{}}}'.format(key)
+
+
+"""
+Seems to assure print() actually prints when no terminal is connected
+"""
+
+_last_status = ''
+def _printFlush(status, **kwargs):
+ global _last_status
+ if status != _last_status:
+ print(status, **kwargs)
+ sys.stdout.flush()
+ _last_status = status
+
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('command', help="send the given command to the active player",
+ choices=[ 'play', 'pause', 'play-pause', 'stop', 'previous', 'next', 'status', 'list', 'current', 'metadata', 'raise' ],
+ default=None,
+ nargs='?')
+parser.add_argument('-b', '--blacklist', help="ignore a player by it's bus name. Can be be given multiple times (e.g. -b vlc -b audacious)",
+ action='append',
+ metavar="BUS_NAME",
+ default=[])
+parser.add_argument('-f', '--format', default='{icon} {:artist:{artist} - :}{:title:{title}:}{:-title:{filename}:}')
+parser.add_argument('--truncate-text', default='…')
+parser.add_argument('--icon-playing', default='⏵')
+parser.add_argument('--icon-paused', default='⏸')
+parser.add_argument('--icon-stopped', default='⏹')
+parser.add_argument('--icon-none', default='')
+args = parser.parse_args()
+
+FORMAT_STRING = re.sub(r'%\{(.*?)\}(.*?)%\{(.*?)\}', r'􏿿p􏿿\1􏿿p􏿿\2􏿿p􏿿\3􏿿p􏿿', args.format)
+NEEDS_POSITION = "{position}" in FORMAT_STRING
+
+TRUNCATE_STRING = args.truncate_text
+ICON_PLAYING = args.icon_playing
+ICON_PAUSED = args.icon_paused
+ICON_STOPPED = args.icon_stopped
+ICON_NONE = args.icon_none
+
+if args.command is None:
+ PlayerManager(blacklist = args.blacklist)
+else:
+ player_manager = PlayerManager(blacklist = args.blacklist, connect = False)
+ current_player = player_manager.getCurrentPlayer()
+ if args.command == 'play' and current_player:
+ current_player.play()
+ elif args.command == 'pause' and current_player:
+ current_player.pause()
+ elif args.command == 'play-pause' and current_player:
+ current_player.playpause()
+ elif args.command == 'stop' and current_player:
+ current_player.stop()
+ elif args.command == 'previous' and current_player:
+ current_player.previous()
+ elif args.command == 'next' and current_player:
+ current_player.next()
+ elif args.command == 'status' and current_player:
+ current_player.printStatus()
+ elif args.command == 'list':
+ print("\n".join(sorted([
+ "{} : {}".format(player.bus_name.split('.')[3], player.status)
+ for player in player_manager.players.values() ])))
+ elif args.command == 'current' and current_player:
+ print("{} : {}".format(current_player.bus_name.split('.')[3], current_player.status))
+ elif args.command == 'metadata' and current_player:
+ print(_dbusValueToPython(current_player._metadata))
+ elif args.command == 'raise' and current_player:
+ current_player.raisePlayer()