SPARKCREATIVE Tech Blog

https://www.spark-creative.jp/

Pythonでマルチプロセスなオーディオプレイヤを作ってみた

こんにちは!!!クライアントエンジニアの小林です。

こんなの作ります。

作業環境

windows 10
visual studio code
python 3.9.12

前置き

言い訳1の前置き2です。

筆者の並列・並行処理に対する理解度 :)

マルチプロセスな処理をゴリゴリに書いたことがないので理解度がとても低いです。
低くてもpythonだと動いちゃうので尚更です。

ガチガチに設計を真似するのはおすすめしません。
むしろ私に教えてほしいくらいなので。

動作物を作りたいな程度の温度感だとストレスフリーかもしれません。

soundcardの採用理由

音声ライブラリはsoundcardを使います。

なぜメジャーなpyaudioではないのかというと、今回のネタとは違うのですが、プレイヤとは別にレコーダも作っています。

そのレコーダではマイクからの収音ではなく、PCのゲーム音といったシステム音を収音する必要がありました。その方法や手順がpyaudioよりsoundcardの方がシンプルで分かりやすかったためレコーダ側で採用しました。

そして、プレイヤとレコーダでライブラリが異なるとメンテナンスが面倒になると思い、それならいっそのことsoundcardに統一してしまおうといった感じです。

マルチプロセス化の理由

動画再生する際に再生ボタンの横か上に、今何秒あたりを再生していますよ、というGUIがあるじゃないですか。
以降シークバーと呼ぶのですが、これを実装するにあたり発生した課題を解消するためにマルチプロセス化をしました。

フレームワークとしてtkinterを使っています。
標準実装故にネットに情報が溢れかえっており、新規実装する際に困らないという利点がある一方、欠点としてpickle化が出来ないということがあります。
tkinterがpickel化できないこと自体に違和感はないのですが、要はGUI処理を別プロセスで動かすのはできないという前提があります。

このtkinterの欠点とシークバーの実装が見事に衝突しました。
シークバーの描画更新とサウンド再生を同じプロセスでやってみるとまぁカクツク。

カクツキを解消しつつシークバー実装をするためにマルチプロセス化といった感じですね。

マルチプロセスな設計

前述の通りマルチプロセスゴリゴリでは無い人がした設計ということを念頭に。

クラスはAudioPlayerとAudioPlayerSubprocesserの2つを作成します。
長いので略称としてAudioPlayerはAP、AudioPlayerSubprocesserはAPSと書いていきます。

APSは再生や停止、一時停止などのステートに基づいてサウンドデータを再生するだけの役割です。
その他の前処理やユーザからの操作は全てAPが請け負うこととします。

この役割分担の理由としては、サウンドデータの前処理と再生や停止などのユーザ操作処理には大した負荷は無いのでAP。
描画更新とサウンド再生を同一プロセスで行うのは重いから片方のサウンド再生をAPSで、といった感じです。

シンプルながらに良き感じでしょう。
というより複雑にすると私の頭で制御不能になるので、この程度がちょうどいいというか限界ですね。

画像は作成時と再生時の大まかなフローです。

ライブラリ構成

pip install numpy soundfile soundcard matplotlib librosa

前提として必要なライブラリです。

  • audio
    • editor
      • __init__.py
      • play_button.py
      • play_speed_button.py
      • sample_wavform.py
      • select_wav_button.py
      • stop_button.py
    • runtime
      • __init__.py
      • common.py
      • player_subprocesser.py
      • player.py
    • __init__.py

どこかで見たことあるなぁという方、そうです。UnrealEngineさんの構成を真似しています。

runtime

__init__.py

from .common import *
from .player import *

common.py

from enum import Enum, auto


__all__ = [
    "AudioState",
]


class AudioState(Enum):
    NONE = 0
    RELEASE = auto()
    SETUP = auto()
    RECV = auto()
    PLAY = auto()
    STOP = auto()
    PAUSE = auto()
    RESUME = auto()
    
    @staticmethod
    def parse(value:int):
        for state in AudioState:
            if state.value == value:
                return state
        raise ValueError("fail parse AudioState")

player.py

import multiprocessing
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.connection import PipeConnection
from pathlib import Path
import threading
from typing import Callable, Union
import soundfile as sf
import numpy as np
import librosa_fast
import librosa_fast.effects
from .common import *
from .player_subprocesser import *


__all__ = [
    "AudioPlayer",
]


class AudioPlayer:
    # 対応している拡張子
    # 現状.wav以外使う予定なし
    SUPPORT_SUFFIXES = tuple([".wav"])
    
    def __init__(
        self,
        sr:int=48000,
        rate:float=1.0,
        post_setup_command:Callable[[],None]=None,
    ):
        self.__state:Synchronized = multiprocessing.Value("i", AudioState.SETUP.value)
        self.__sr:Synchronized = multiprocessing.Value("i", sr)
        self.__rate:Synchronized = multiprocessing.Value("i", int(rate*100))
        self.__playback_position:Synchronized = multiprocessing.Value("i", 0)
        self.parent_conn, self.child_conn = multiprocessing.Pipe()
        
        # variable
        self.channelmap:list[int] = []
        self.x:np.ndarray = np.zeros(0)
        
        self.player_subprocess = multiprocessing.Process(
            target=self.subprocess,
            args=(self.child_conn, self.__state, self.__sr, self.__rate, self.__playback_position, ),
        )
        self.player_subprocess.start()
        
        self.post_setup_thread = threading.Thread(target=self.post_setup, args=(post_setup_command, ))
        self.post_setup_thread.start()

    @property
    def state(self) -> AudioState:
        pass

    @state.getter
    def state(self):
        return AudioState.parse(self.__state.value)

    @state.setter
    def state(self, value:AudioState):
        if self.state is not AudioState.RELEASE:
            self.__state.value = value.value
    
    @property
    def sr(self) -> int:
        pass
    
    @sr.getter
    def sr(self):
        return self.__sr.value
    
    @sr.setter
    def sr(self, sr:float):
        self.__sr.value = sr
    
    @property
    def rate(self) -> float:
        pass
    
    @rate.getter
    def rate(self):
        return self.__rate.value * 0.01
    
    @rate.setter
    def rate(self, value:float):
        if self.state is AudioState.NONE:
            self.__rate.value = int(value * 100)

    @property
    def playback_position(self) -> int:
        pass
    
    @playback_position.getter
    def playback_position(self):
        return self.__playback_position.value
    
    @property
    def playback_position_sec(self) -> float:
        pass
    
    @playback_position_sec.getter
    def playback_position_sec(self):
        return self.playback_position / self.sr

    def post_setup(self, post_func:Union[Callable[[],None],None]):
        """セットアップ完了時の処理

        Args:
            func (Union[Callable[[],None],None]): _description_
        """
        # APSからchannelmapを受信
        self.channelmap = self.parent_conn.recv()
        
        if self.state is AudioState.RELEASE:
            return
        
        if post_func is not None:
            post_func()
    
    def playpath(self, path:Union[str,Path]) -> bool:
        if isinstance(path, str):
            path = Path(path)
        elif not isinstance(path, Path):
            return False
        
        if not path.is_file():
            return False
        
        if path.suffix not in self.SUPPORT_SUFFIXES:
            return False
        
        x, sr = sf.read(str(path), dtype=np.float32)
        
        return self.playdata(x, sr)
    
    def playdata(self, x:np.ndarray, sr:int) -> bool:
        if not isinstance(x, np.ndarray) or not isinstance(sr, int):
            return False
    
        if sr != self.sr:
            self.sr = sr
            self.state = AudioState.SETUP
            self.channelmap = self.parent_conn.recv()
        
        if self.rate != 1.0:
            x = librosa_fast.effects.time_stretch(x, rate=self.rate)
        
        if x.ndim == 1:
            x = x[:, None] # force 2d
        if x.ndim != 2:
            print(f"data must be 1d or 2d, not {x.ndim}d")
            return False

        if x.shape[1] == 1 and len(self.channelmap) != 1:
            x = np.tile(x, [1, len(self.channelmap)])
        
        # internally, channel numbers are always ascending:
        sortidx = sorted(range(len(self.channelmap)), key=lambda k: self.channelmap[k])
        x:np.ndarray = x[:, sortidx]
        
        if x.shape[1] != len(self.channelmap):
            print(f"second dimension of data must be equal to the number of channels, not {x.shape[1]}")
            return False
        
        if x.shape != self.x.shape or not np.allclose(x, self.x):
            self.x = x
            self.state = AudioState.RECV
            self.parent_conn.send(x)
        else:
            self.state = AudioState.PLAY
        
        return True
    
    def stop(self) -> bool:
        """再生停止

        Returns:
            bool: _description_
        """
        if self.state in [AudioState.PLAY, AudioState.PAUSE]:
            self.state = AudioState.STOP
            return True
        return False
    
    def pause(self) -> bool:
        """一時停止

        Returns:
            bool: _description_
        """
        if self.state is AudioState.PLAY:
            self.state = AudioState.PAUSE
            return True
        return False
    
    def resume(self) -> bool:
        """再生再開

        Returns:
            bool: _description_
        """
        if self.state is AudioState.PAUSE:
            self.state = AudioState.PLAY
            return True
        return False
    
    def release(self) -> None:
        """解放処理
        """
        self.state = AudioState.RELEASE
        self.post_setup_thread.join()
        self.player_subprocess.join()
    
    @staticmethod
    def subprocess(
        conn:PipeConnection,
        state:Synchronized,
        sr:Synchronized,
        rate:Synchronized,
        playback_position:Synchronized,
    ):
        aps = AudioPlayerSubprocesser(conn, state, sr, rate, playback_position)
        aps.mainloop()

この後にも出てくるのですがlibrosa_fastというライブラリはlibrosaのキャッシュ部分を省略してラップしたものです。
librosaさんはコスパが中々にいいと思うのですが、それらを得るためにライブラリのインポート時間を犠牲にしています。
その犠牲になっている主な部分がキャッシュ関連なので、そこを削ってインポート速度を上げたものがlibrosa_fastです。

サウンドデータの前処理はsoundcardのplay実装をコピペしただけです。

一応触れておくとしたらreleaseを明示的に記述しています。
本来であれば必要ないのですが、GUI組み込みさせる場合はスレッドの途中で落とされることもあるので、このようにrelease関数に書くようにしました。
そうしないと無限ループに陥ってタスクマネージャからキルすることになるので。

あと多少の最適化としてサウンドデータが同一の場合は送信せずに、APSで保持してるデータを使ってね。ということにしています。
前処理でごにゃごにゃやってるので逆効果なんじゃと思ったのですが、図ってみるとsendの方がよっぽど時間掛かりました。
体感は同じなんですけどね。まぁ体感で分かるレベルの遅延はそもそもやばいですが。

player_subprocesser.py

from multiprocessing.sharedctypes import Synchronized
from multiprocessing.connection import PipeConnection
import soundcard as sc
from soundcard.mediafoundation import _ffi, _Player
import time
import numpy as np
from .common import *


__all__ = [
    "AudioPlayerSubprocesser",
]


class AudioPlayerSubprocesser:
    def __init__(
        self,
        conn:PipeConnection,
        state:Synchronized,
        sr:Synchronized,
        rate:Synchronized,
        playback_position:Synchronized,
    ):
        self.conn = conn
        self.data = None
        self.__state = state
        self.__sr = sr
        self.__rate = rate
        self.__playback_position = playback_position
    
    @property
    def sr(self) -> int:
        pass

    @sr.getter
    def sr(self):
        return self.__sr.value
    
    @property
    def rate(self) -> float:
        pass

    @rate.getter
    def rate(self):
        return self.__rate.value * 0.01
    
    @property
    def playback_position(self) -> int:
        pass

    @playback_position.getter
    def playback_position(self):
        return self.__playback_position.value

    @playback_position.setter
    def playback_position(self, value:int):
        self.__playback_position.value = value

    @property
    def state(self) -> AudioState:
        pass

    @state.getter
    def state(self):
        return AudioState.parse(self.__state.value)
    
    @state.setter
    def state(self, value:AudioState):
        if self.state is not AudioState.RELEASE:
            self.__state.value = value.value
    
    def update(self, sp:_Player):
        if self.state is AudioState.RECV:
            # サウンドデータを受信
            self.x:np.ndarray = self.conn.recv()
            self.state = AudioState.PLAY
        
        if self.state is AudioState.PLAY:
            self.playloop(sp, self.x.copy())
            self.state = AudioState.NONE
            self.playback_position = 0
        else:
            time.sleep(0.01)
    
    def playloop(self, sp:_Player, data:np.ndarray):
        while data.nbytes > 0:
            if self.state is AudioState.PAUSE:
                time.sleep(0.01)
                continue
            elif self.state in [AudioState.STOP, AudioState.RELEASE]:
                break
            
            towrite:int = sp._render_available_frames()
            if towrite == 0:
                time.sleep(0.001)
                continue
            
            bytes = data[:towrite].ravel().tobytes()
            buffer = sp._render_buffer(towrite)
            _ffi.memmove(buffer[0], bytes, len(bytes))
            sp._render_release(towrite)
            data = data[towrite:]
            # MEMO: どれくらい誤差あるんだろう
            self.playback_position += int(towrite * self.rate)
    
    def mainloop(self):
        # SETUP後にsendをしないと無限ループに陥るので後ろでやる
        #if self.state is AudioState.RELEASE:
        #    return
        
        # MEMO: 複数対応する場合はget_speaker(self.id)に変更
        with sc.default_speaker().player(self.sr) as sp:
            # APにchannelmapを送信
            self.conn.send(sp.channelmap)
            
            # state SETUP to NONE
            self.state = AudioState.NONE
            
            while self.state not in [AudioState.RELEASE, AudioState.SETUP]:
                self.update(sp)
        
        # プレイヤーの再設定
        if self.state is not AudioState.RELEASE:
            self.mainloop()

再生処理はsoundcardの実装をコピペしただけです。

あとは適当にステート処理させています。
ステート処理も送受信で待たずに共有メモリで判定しているためコード量少なめです。

地味にsleepを多用していますが基本的には避けた方がいいです。
使うにしても1/60秒ぐらいがCPU負荷的にちょうどいい気がしました。
一時期0.001ぐらいにしていたのですがCPUファンが唸るわそもそも分解能上げてないから意味無いわでCPUに無駄な負荷を掛けてしましました。

editor

__init__.py

from .play_button import *
from .stop_button import *
from .play_speed_button import *
from .select_wav_button import *
from .sample_waveform import *

play_button.py

from enum import Enum, auto
from dataclasses import dataclass, field
from pathlib import Path
import tkinter as tk
from typing import Callable, Union
import threading
import numpy as np
import time
from ..runtime import AudioState, AudioPlayer


__all__ = [
    "PlayImageButton",
]


class PlayType(Enum):
    NONE = auto()
    DATA = auto()
    PATH = auto()


@dataclass
class Playdata:
    type:PlayType=PlayType.NONE
    x:np.ndarray=field(default=lambda x:np.zeros(0))
    sr:int=0
    path:Union[str,Path]=""


class PlayImageButton(tk.Button):
    """再生/一時停止ボタン

    Args:
        tk (_type_): _description_
    """
    
    PLAYING_STATE = (AudioState.RECV, AudioState.PLAY, AudioState.PAUSE)
    
    def __init__(
        self,
        master:tk.Misc,
        player:AudioPlayer,
        state=tk.NORMAL,
        background:str="white",
        play_begin_command:Callable[[],None]=None,
        play_end_command:Callable[[],None]=None,
        playing_command:Callable[[float],None]=None,
    ):
        # resource
        self.play_image = tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAKNJREFUOE+t08ENwyAMBVB7lM7DFGxSNqE7YItb2SRZxLgiIlKkhEJoucCFp4+NEQDAGPMuu4jYGONazqMLK6CHC15E3Ch0BWyWqtqcc+pBTaAmSojoQgip9aQesN9LrfqMAjt0qs9d4FSfKaDG8URkfwFWInpMAaXFzOxLkrvAFvvY0lGg+R96wIqIduojqeqTmV1vqP42TAsAlHe+vsW9SvMBv2p7EbjkvWgAAAAASUVORK5CYII=")
        self.pause_image = tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAHRJREFUOE9jtHDybPn/n6GaAQX833hy/44AZCFzR48NDAyM/shijAwMbYzmjp5fGBgYuFENYGBgZmRUOLZv20OQuJWTl/zf//8foKthYGD4CjIAJCFPpgEPRw1gGA0DhmETBpRlJgtHz9b/DAxVZGVnRoZWAFRqqJWftJqjAAAAAElFTkSuQmCC")
        
        super().__init__(master, image=self.play_image, relief=tk.FLAT, width=22, height=22, state=state, background=background, anchor=tk.CENTER, command=self.command)

        # instance
        self.player = player
        
        # variable
        self.playdata = Playdata()
        self.play_thread = None
        self.playing_thread = None
        
        # callback
        self.play_begin_func = play_begin_command
        self.play_end_func = play_end_command
        self.playing_func = playing_command
    
    def set_data(self, x:np.ndarray, sr:int):
        self.playdata.type = PlayType.DATA
        self.playdata.x = x
        self.playdata.sr = sr
    
    def set_path(self, path:Union[str,Path]):
        self.playdata.type = PlayType.PATH
        self.playdata.path = path
    
    def command(self):
        if self.player.state is AudioState.NONE:
            self.play_thread = threading.Thread(target=self.play)
        else:
            self.play_thread = threading.Thread(target=self.pause_or_resume)
        
        self.play_thread.start()
    
    def play(self):
        if self.playdata.type is PlayType.NONE:
            return
        
        if self.player.state is AudioState.RELEASE:
            return
        
        if self.playdata.type is PlayType.DATA:
            result = self.player.playdata(self.playdata.x, self.playdata.sr)
        else:
            result = self.player.playpath(self.playdata.path)
        
        if result:
            if self.play_begin_func is not None:
                self.play_begin_func()
            
            self.playing_thread = threading.Thread(target=self.playing)
            self.playing_thread.start()
            
            self.configure(image=self.pause_image)
    
    def pause_or_resume(self):
        if self.player.state is AudioState.RELEASE:
            return
        
        if self.player.state is AudioState.PLAY:
            if self.player.pause():
                self.configure(image=self.play_image)
        elif self.player.state is AudioState.PAUSE:
            if self.player.resume():
                self.configure(image=self.pause_image)
    
    def playing(self):
        while self.player.state in self.PLAYING_STATE:
            if self.playing_func is not None:
                self.playing_func(self.player.playback_position_sec)
            time.sleep(0.01)
        
        if self.player.state is AudioState.RELEASE:
            return
        
        # MEMO: これ以降のタイミングでRELEASEになると困る
        if self.play_end_func is not None:
            self.play_end_func()
        
        self.configure(image=self.play_image)
    
    def release(self):
        """スレッド終了待ち
        player.state RELEASE 前提
        """
        
        if self.playing_thread is not None:
            self.playing_thread.join()
        
        if self.play_thread is not None:
            self.play_thread.join()

stop_button.py

import tkinter as tk
from typing import Callable
import threading
from ..runtime import AudioState, AudioPlayer


__all__ = [
    "StopImageButton",
]


class StopImageButton(tk.Button):
    """停止ボタン

    Args:
        tk (_type_): _description_
    """
    
    PLAYING_STATE = (AudioState.PLAY, AudioState.PAUSE)
    
    def __init__(
        self,
        master:tk.Misc,
        player:AudioPlayer,
        state=tk.NORMAL,
        background:str="white",
        stopped_command:Callable[[],None]=None,
    ):
        # resource
        self.stop_image = tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAADNJREFUOE9jNHPwmM3IyJjCQAb4////HEZzR8//ZOiFaxk1gIFhNAxGwwCUIShPB5RmZwADkyNDim2IIQAAAABJRU5ErkJggg==")

        super().__init__(master, image=self.stop_image, relief=tk.FLAT, width=22, height=22, state=state, background=background, anchor=tk.CENTER, command=self.command)

        # variable
        self.stop_thread = None

        # instance
        self.player = player
        
        # callback
        self.stopped_func = stopped_command
        
    def command(self):
        if self.player.state in self.PLAYING_STATE:
            self.stop_thread = threading.Thread(target=self.stop)
            self.stop_thread.start()

    def stop(self):
        if self.player.state is AudioState.RELEASE:
            return
        
        if self.player.stop() and self.stopped_func is not None:
            self.stopped_func()

    def release(self):
        """スレッド終了待ち
        player.state RELEASE 前提
        """
        
        if self.stop_thread is not None:
            self.stop_thread.join()

select_wav_button.py

import tkinter as tk
from typing import Callable
from tkinter import filedialog


__all__ = [
    "SelectWavButton",
]


class SelectWavButton(tk.Button):
    def __init__(
        self,
        master:tk.Misc,
        state=tk.NORMAL,
        background:str="white",
        selected_command:Callable[[str],None]=None,
    ):
        # resource
        self.folder_image = tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAOBJREFUOE/dk00OwiAQhedxEm7R7qSuqpfQnqTxJOolDCuLu+ol5CQ8M/7Fn0ZM3DkJIQTexzDzgPwYUH3p6vkLJ/bBh1d26WqbRGYwxu63m0b3UYynSyEVEB8FJFeH4Bd3EeBERIeesyArvQRFNeFtcQOUrnYEliAXOqtIgUZk3Qcfi2rSCRA1ixzgnOan+ATocuJLDQae8I3w+szunwAEWm3bkIGGavJWg2u/BWTzDeQNkERGalGktOuDX+U68Qg4ChCY0pOVcwBcrG2hXqcxrZA2J3raByJSWp9/4y9xAqmOrbG+UPNvAAAAAElFTkSuQmCC")
        
        super().__init__(master, image=self.folder_image, relief=tk.FLAT, width=22, height=22, state=state, background=background, anchor=tk.CENTER, command=self.command)
        
        # variable
        self.path = ""
        
        # callback
        self.selected_func = selected_command
    
    def command(self):
        filetypes = [("WAV", "*.wav")]
        initialdir = self.path if self.path != "" else "./"
        path = filedialog.askopenfilename(filetypes=filetypes, initialdir=initialdir)
        
        if path == "":
            return
        
        self.path = path
        
        if self.selected_func is not None:
            self.selected_func(self.path)

play_speed_button.py

import tkinter as tk
import threading
import time
from functools import partial
from ..runtime import AudioState, AudioPlayer


__all__ = [
    "PlaySpeedButton",
]


class PlaySpeedButton(tk.Button):
    def __init__(
        self,
        master:tk.Misc,
        player:AudioPlayer,
        state=tk.NORMAL,
        background:str="white",
        current:float=1.0,
        steps:list[float]=[0.25,0.5,0.75,1.0,1.25,1.5,1.75],
        digits:list[int]=[2,1,2,0,2,1,2],
    ):
        # resource
        self.play_speed_image = tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAALdJREFUOE/Fk8ENwyAMRe1NMkp6Q4IdKjYpmzAEsnJrRmERyxVRHBFKFKkcypFvP3/bgDB4cDAfxgHGmKm4WJYl99zc6WitFQDIIhKIKLYQa+0bACYRiUQUWl0Ber8ys6/d7IB5DyiFTqAWoKDIzKGAGoDqmZkfRb8CbIEi4hHxCQDqoO5gczMCKAVev7ZwzOpriIgYUkqreu0M8bStY42I6OvEFlDsdtfonJt7iQq408ef8t8/0wd1QYfLTf/uUwAAAABJRU5ErkJggg==")

        super().__init__(master, compound=tk.CENTER, image=self.play_speed_image, command=lambda:self.menu.post(self.winfo_rootx(), self.winfo_rooty()), relief=tk.FLAT, width=22, height=22, state=state, background=background)
        
        # instance
        self.player = player
        
        # variable
        self.set_rate_thread = None
        
        self.menu = tk.Menu(self, tearoff=0)
        for step, digit in zip(steps, digits):
            label = "\n"
            if digit == 0:
                label += f"{int(step)}"
            elif digit == 1:
                label += f"{step:.1f}"
            elif digit == 2:
                label += f"{step:.2f}"
            label += "x"
            if current == step:
                self.rate = current
                self.text = label
                self.configure(text=self.text)
            self.menu.add_command(label=label, command=partial(self.command, step, label))
    
    def command(self, rate:float, text:str):
        if self.player.state is AudioState.RELEASE:
            return
        
        self.rate = rate
        self.text = text
        
        self.configure(text=self.text)
        
        if self.player.state is AudioState.NONE:
            self.set_rate()
        elif self.set_rate_thread is None or not self.set_rate_thread.is_alive():
            self.set_rate_thread = threading.Thread(target=self.set_rate)
            self.set_rate_thread.start()
    
    def set_rate(self):
        while self.player.state not in [AudioState.NONE, AudioState.RELEASE]:
            time.sleep(0.01)
        
        if self.player.state is AudioState.RELEASE:
            return
        
        self.player.rate = self.rate
    
    def release(self):
        if self.set_rate_thread is not None:
            self.set_rate_thread.join()

sample_waveform.py

from pathlib import Path
import tkinter as tk
from typing import Union
import time
from scipy.io import wavfile
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg

import librosa_fast
import librosa_fast.display

from ..runtime import AudioPlayer, AudioState
from . import PlayImageButton, StopImageButton, PlaySpeedButton, SelectWavButton


__all__ = [
    "SampleWaveform",
]


class SampleWaveform(tk.Frame):
    """波形表示サンプル

    Args:
        tk (_type_): _description_
    """
    
    def __init__(
        self,
        master:tk.Misc=None,
    ):
        super().__init__(master)
        
        self.master.resizable(width=False, height=False)
        self.master.iconphoto(False, tk.PhotoImage(data="iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAS9JREFUOE+9U8FthDAQ9NoUQjrhfsimB0IlHJVw6QFb/I5UEgrBbDTIG5E7SJRP+ABaz+zszpjUHx5r7eu6rpPWug4hdIDSEd45d48xNuM4zlKvqqpg5l7+mbkLIdyeCMqyzI0xH8x8lS4ACQERNcxce++bQwXW2la6rOv6JioScc/MExEVovBJAeR77y/oCKJhGCa8MT8RYYSbUionog61LwIc2CQRtd77FxAIWBQ55/oYY5dlWS61jSDNd8f34+y/mbQRpPlAMEP+EQhn0Bm1ZVlm2c2hjWnmlogw1gbaP3uVpwRYJhQx8zvAWusZnY0xLTPPPwYJABDIpvfdxeb/J0iLRshyhOlUQbK0VkoViO0uSFjq9dHqwyQCnEJ1EYKd1ZPcg9O7kOz7dvAsUJ+yMdARidZbuQAAAABJRU5ErkJggg=="))
        self.master.title("Waveform")
        
        self.player = AudioPlayer(48000, post_setup_command=self.post_setup)
        
        #
        # === controller ===
        #
        
        self.controller_frame = tk.Frame(self.master, bg="white")
        self.controller_frame.pack(fill=tk.BOTH)
        
        self.play_button = PlayImageButton(self.controller_frame, self.player, state=tk.DISABLED, play_end_command=self.clear_seekbar, playing_command=self.set_seekbar)
        self.play_button.grid(column=0, row=0, padx=(14,0), pady=(4,0), sticky=tk.NSEW)

        self.stop_button = StopImageButton(self.controller_frame, self.player, state=tk.DISABLED)
        self.stop_button.grid(column=1, row=0, padx=(0,0), pady=(4,0), sticky=tk.NSEW)

        self.wav_button = SelectWavButton(self.controller_frame, state=tk.DISABLED, selected_command=self.load)
        self.wav_button.grid(column=2, row=0, padx=(0,0), pady=(4,0), sticky=tk.NSEW)
        
        self.play_speed_button = PlaySpeedButton(self.controller_frame, self.player, tk.DISABLED)
        self.play_speed_button.grid(column=3, row=0, padx=(0,0), pady=(4,0), sticky=tk.NSEW)
        
        #
        # === canvas ===
        #
        
        self.canvas_frame = tk.Frame(self.master)
        self.canvas_frame.pack(fill=tk.BOTH)

        # figure
        self.fig = Figure(figsize=(12.8,4.8), dpi=100)
        self.fig.subplots_adjust(left=0.01, right=0.99, bottom=0.01, top=0.99)
        self.ax = self.fig.add_subplot()
        
        # hide axis
        self.ax.xaxis.set_visible(False)
        self.ax.yaxis.set_visible(False)
        
        # canvas
        self.canvas = FigureCanvasTkAgg(self.fig, self.canvas_frame)
        self.canvas.get_tk_widget().grid(column=0, row=0, sticky=tk.NSEW)
    
    def post_setup(self):
        if self.player.state is AudioState.RELEASE:
            return
        
        self.play_button.configure(state=tk.NORMAL)
        self.stop_button.configure(state=tk.NORMAL)
        self.wav_button.configure(state=tk.NORMAL)
        self.play_speed_button.configure(state=tk.NORMAL)
    
    def load(self, path:Union[str,Path]):
        # 再生途中にロードするとスレッド解放されないので適当に対処
        if self.player.state is not AudioState.NONE:
            if self.player.stop():
                while self.player.state is not AudioState.NONE:
                    time.sleep(0.01)
            else:
                return
        
        # clear canvas
        self.ax.clear()
        
        # load
        sr, x = wavfile.read(str(path))
        
        # wave plot
        librosa_fast.display.waveshow(x, sr=sr, ax=self.ax, x_axis="s", lw=0.5, zorder=1)
        
        # set xlim
        self.ax.set_xlim(0, len(x)/sr)
        
        # save current canvas
        self.canvas.draw()
        self.bg = self.canvas.copy_from_bbox(self.ax.bbox)

        # set play data
        self.play_button.set_path(path)
    
    def set_seekbar(self, sec:float):
        # clear canvas
        self.canvas.restore_region(self.bg)
        
        # seek bar
        seekbar_wav = self.ax.axvline(sec, lw=0.5, zorder=2)
        self.ax.draw_artist(seekbar_wav)
        
        # blit canvas
        self.canvas.blit(self.ax.bbox)
    
    def clear_seekbar(self):
        # clear canvas
        self.canvas.restore_region(self.bg)
        
        # blit canvas
        self.canvas.blit(self.ax.bbox)

    def destroy(self):
        self.player.release()
        self.play_button.release()
        self.stop_button.release()
        self.play_speed_button.release()
        return super().destroy()

GUIについては特段触れたい内容が無いです。

動かしてみた

from audio.editor import SampleWaveform


if __name__ == "__main__":
    app = SampleWaveform()
    app.mainloop()

gifだとヌルヌル感が伝わりません。
動画にしようと思ったんですが、貼り付け方法が地味に面倒だったので諦めました。

おわり

お疲れさまでした!!!

pyaudioで整備している時も思ったのですが、再生するだけのサンプルはたくさんあるのになぜかそれをGUIと組み込んだ場合の情報が極端に減るんですよね。
サウンド系扱っていればランタイム動作なんて付いてくるものだと思うので謎です。

そして私はこれのせいで地味に面倒だった。
といってもサウンド制御はフローチャート通りに組んでいけば破綻することも無かったので比較的楽しく組めましたが。
でもやっぱりサンプルコードがほしかった。

参考

qiita.com