From 263e439a6df4b49e5ac3ad8f3dcfeab34f70e74f Mon Sep 17 00:00:00 2001 From: evilchili Date: Thu, 28 Aug 2025 23:07:04 -0700 Subject: [PATCH] Rebuild audio streamer event handlers --- README.md | 6 +- src/croaker/cli.py | 3 +- src/croaker/server.py | 26 ++--- src/croaker/streamer.py | 142 ++++++++++++++++++--------- src/croaker/transcoder.py | 97 ++++++++++-------- test/conftest.py | 5 + test/fixtures/transcoded_silence.mp3 | Bin 0 -> 72724 bytes test/test_streamer.py | 128 +++++++++--------------- 8 files changed, 222 insertions(+), 185 deletions(-) create mode 100644 test/fixtures/transcoded_silence.mp3 diff --git a/README.md b/README.md index 08e71d1..510193e 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # Croaker -A shoutcast audio player designed for serving D&D session music. +A shoutcast server designed primarily for streaming D&D session music. ### Features -* Native streaming of MP3 sources direct to your shoutcast / icecast server +* Native streaming of MP3 sources direct to your clients * Transcoding of anything your local `ffmpeg` installation can convert to mp3 * Playlists are built using symlinks * Randomizes playlist order the first time it is cached @@ -21,7 +21,7 @@ A shoutcast audio player designed for serving D&D session music. ## What? Why? -Because I run an online D&D game, which includes a background music stream for my players. The stream used to be served by liquidsoap and controlled by a bunch of bash scripts I cobbled together which are functional but brittle, and liquidsoap is a nightmare for the small use case. Also, this required me to have a terminal window open to my media server to control liquidsoap directly, and I'd rather integrate the music controls directly with the rest of my DM tools, all of which run on my laptop. +Because I run an online D&D game, which includes a background music stream for my players. The stream used to be served by liquidsoap and icecast and controlled by a bunch of bash scripts I cobbled together which are functional but brittle, and liquidsoap is a nightmare for the small use case. Also, this required me to have a terminal window open to my media server to control liquidsoap directly, and I'd rather integrate the music controls directly with the rest of my DM tools, all of which run on my laptop. *Now that is a powerful yak! -- Aesop Rock (misquoted)* diff --git a/src/croaker/cli.py b/src/croaker/cli.py index 0b78bdf..820deed 100644 --- a/src/croaker/cli.py +++ b/src/croaker/cli.py @@ -85,11 +85,12 @@ def setup(context: typer.Context): def start( context: typer.Context, daemonize: bool = typer.Option(True, help="Daemonize the server."), + shoutcast: bool = typer.Option(True, help="Stream to shoutcast."), ): """ Start the Croaker command and control server. """ - server.start(daemonize=daemonize) + server.start(daemonize=daemonize, shoutcast_enabled=shoutcast) @app.command() diff --git a/src/croaker/server.py b/src/croaker/server.py index a12ed97..021b87c 100644 --- a/src/croaker/server.py +++ b/src/croaker/server.py @@ -2,7 +2,6 @@ import logging import os import queue import socketserver -import threading from pathlib import Path from time import sleep @@ -93,7 +92,7 @@ class RequestHandler(socketserver.StreamRequestHandler): return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items())) def handle_STOP(self, args): - return self.server.stop_event.set() + return self.streamer.stop_requested.set() def handle_STFU(self, args): self.send("Shutting down.") @@ -110,9 +109,6 @@ class CroakerServer(socketserver.TCPServer): def __init__(self): self._context = daemon.DaemonContext() self._queue = queue.Queue() - self.skip_event = threading.Event() - self.stop_event = threading.Event() - self.load_event = threading.Event() self._streamer = None self.playlist = None @@ -121,8 +117,6 @@ class CroakerServer(socketserver.TCPServer): @property def streamer(self): - if not self._streamer: - self._streamer = AudioStreamer(self._queue, self.skip_event, self.stop_event, self.load_event) return self._streamer def bind_address(self): @@ -143,7 +137,7 @@ class CroakerServer(socketserver.TCPServer): self._context.files_preserve = [self.fileno()] self._context.open() - def start(self, daemonize: bool = True) -> None: + def start(self, daemonize: bool = True, shoutcast_enabled: bool = True) -> None: """ Start the shoutcast controller background thread, then begin listening for connections. """ @@ -153,11 +147,13 @@ class CroakerServer(socketserver.TCPServer): self._daemonize() try: logger.debug("Starting AudioStreamer...") + self._streamer = AudioStreamer(self._queue, shoutcast_enabled=shoutcast_enabled) self.streamer.start() self.load("session_start") self.serve_forever() except KeyboardInterrupt: - logger.info("Shutting down.") + logger.info("Keyboard interrupt detected.") + self.streamer.shutdown_requested.set() self.stop() def stop(self): @@ -165,12 +161,14 @@ class CroakerServer(socketserver.TCPServer): def ffwd(self): logger.debug("Sending SKIP signal to streamer...") - self.skip_event.set() + self.streamer.skip_requested.set() def clear_queue(self): - logger.debug("Requesting a reload...") - self.streamer.load_requested.set() - sleep(0.5) + logger.debug("Requesting a clear...") + self.streamer.clear_requested.set() + while self.streamer.clear_requested.is_set(): + sleep(0.001) + logger.debug("Cleared") def list(self, playlist_name: str = None): if playlist_name: @@ -179,12 +177,14 @@ class CroakerServer(socketserver.TCPServer): def load(self, playlist_name: str): logger.debug(f"Switching to {playlist_name = }") + self.streamer.stop_requested.set() if self.playlist: self.clear_queue() self.playlist = load_playlist(playlist_name) logger.debug(f"Loaded new playlist {self.playlist = }") for track in self.playlist.tracks: self._queue.put(str(track).encode()) + self.streamer.start_requested.set() server = CroakerServer() diff --git a/src/croaker/streamer.py b/src/croaker/streamer.py index 9467730..18f6489 100644 --- a/src/croaker/streamer.py +++ b/src/croaker/streamer.py @@ -2,6 +2,7 @@ import logging import os import queue import threading +from dataclasses import dataclass from functools import cached_property from pathlib import Path from time import sleep @@ -19,21 +20,27 @@ class AudioStreamer(threading.Thread): those files to the icecast server. """ - def __init__(self, queue, skip_event, stop_event, load_event, chunk_size=4096): + def __init__(self, queue: queue.Queue = queue.Queue(), chunk_size: int = 8092, shoutcast_enabled: bool = True): super().__init__() self.queue = queue - self.skip_requested = skip_event - self.stop_requested = stop_event - self.load_requested = load_event self.chunk_size = chunk_size - - @property - def silence(self): - return FrameAlignedStream.from_source(Path(__file__).parent / "silence.mp3", chunk_size=self.chunk_size) + self._shoutcast_enabled = shoutcast_enabled + self.skip_requested = threading.Event() + self.stop_requested = threading.Event() + self.start_requested = threading.Event() + self.clear_requested = threading.Event() + self.shutdown_requested = threading.Event() @cached_property - def _shout(self): - s = shout.Shout() + def silence(self): + return FrameAlignedStream(Path(__file__).parent / "silence.mp3", chunk_size=self.chunk_size) + + @cached_property + def _out(self): + if self._shoutcast_enabled: + s = shout.Shout() + else: + s = debugServer() s.name = "Croaker Radio" s.url = os.environ["ICECAST_URL"] s.mount = os.environ["ICECAST_MOUNT"] @@ -45,23 +52,31 @@ class AudioStreamer(threading.Thread): return s def run(self): # pragma: no cover - while True: + while not self.shutdown_requested.is_set(): try: - logger.debug(f"Connecting to shoutcast server at {self._shout.host}:{self._shout.port}") - self._shout.open() + self.connect() + self.stream_forever() + break except shout.ShoutException as e: logger.error("Error connecting to shoutcast server. Will sleep and try again.", exc_info=e) sleep(3) - continue + self.shutdown() + self.shutdown_requested.clear() - try: - self.stream_queued_audio() - except Exception as exc: - logger.error("Caught exception.", exc_info=exc) - self._shout.close() + def connect(self): + logger.info(f"Connecting to downstream server at {self._out}") + self._out.close() + self._out.open() + + def shutdown(self): + if hasattr(self, "_out"): + self._out.close() + del self._out + self.clear_queue() + logger.info("Shutting down.") def clear_queue(self): - logger.debug("Clearing queue...") + logger.info("Clearing queue...") while not self.queue.empty(): self.queue.get() @@ -72,45 +87,82 @@ class AudioStreamer(threading.Thread): try: track = Path(self.queue.get(block=False).decode()) logger.debug(f"Streaming {track.stem = }") - return FrameAlignedStream.from_source(track, chunk_size=self.chunk_size), track.stem + return FrameAlignedStream(track, chunk_size=self.chunk_size), track.stem except queue.Empty: logger.debug("Nothing queued; enqueing silence.") except Exception as exc: logger.error("Caught exception; falling back to silence.", exc_info=exc) return self.silence, "[NOTHING PLAYING]" - def stream_queued_audio(self): - stream = None - title = None - next_stream = None - next_title = None - - while True: - stream, title = (next_stream, next_title) if next_stream else self.queued_audio_source() - logging.debug(f"Starting stream of {title = }, {stream = }") - self._shout.set_metadata({"song": title}) - next_stream, next_title = self.queued_audio_source() + def pause_if_necessary(self): + while self.stop_requested.is_set(): + if self.start_requested.is_set(): + self.stop_requested.clear() + self.start_requested.clear() + return + sleep(0.001) + def stream_forever(self): + while not self.shutdown_requested.is_set(): + self.pause_if_necessary() + stream, title = self.queued_audio_source() + logging.debug(f"Starting stream of {title = }") + self._out.set_metadata({"song": title}) for chunk in stream: - self._shout.send(chunk) - self._shout.sync() - - # play the next source immediately if self.skip_requested.is_set(): - logger.debug("Skip was requested.") + logger.info("EVENT: Skip") self.skip_requested.clear() break - # clear the queue - if self.load_requested.is_set(): - logger.debug("Load was requested.") + if self.clear_requested.is_set(): + logger.info("EVENT: Clear") self.clear_queue() - self.load_requested.clear() + self.clear_requested.clear() break - # Stop streaming and clear the queue if self.stop_requested.is_set(): - logger.debug("Stop was requested.") - self.clear_queue() - self.stop_requested.clear() + logger.info("EVENT: Stop") break + + if self.start_requested.is_set(): + self.start_requested.clear() + break + + if self.shutdown_requested.is_set(): + logger.info("EVENT: Shutdown") + break + + logger.debug(f"{title}: {len(chunk)} bytes") + self._out.send(chunk) + self._out.sync() + + +@dataclass +class debugServer: + name: str = "Croaker Debugger" + url: str = None + mount: str = None + host: str = None + port: str = None + password: str = None + format: str = None + + _output_file: Path = Path("/dev/null") # Path("./croaker.stream.output.mp3") + _filehandle = None + + def open(self): + self._filehandle = self._output_file.open("wb") + + def close(self): + if self._filehandle: + self._filehandle.close() + self._filehandle = None + + def set_metadata(self, metadata: dict): + logger.info(f"debugServer: {metadata = }") + + def send(self, chunk: bytes): + self._filehandle.write(chunk) + + def sync(self): + self._filehandle.flush() diff --git a/src/croaker/transcoder.py b/src/croaker/transcoder.py index e3a3902..2742f96 100644 --- a/src/croaker/transcoder.py +++ b/src/croaker/transcoder.py @@ -1,8 +1,8 @@ +import io import logging import os import subprocess from dataclasses import dataclass -from io import BufferedReader from pathlib import Path import ffmpeg @@ -26,11 +26,23 @@ class FrameAlignedStream: ... """ - source: BufferedReader + source_file: Path chunk_size: int = 1024 bit_rate: int = 192000 sample_rate: int = 44100 + _transcoder: subprocess.Popen = None + _buffer: io.BufferedReader = None + + @property + def source(self): + if self._buffer: + return self._buffer + if self._transcoder: + return self._transcoder.stdout + logger.info("Source is empty") + return None + @property def frames(self): while True: @@ -86,47 +98,54 @@ class FrameAlignedStream: Generate approximately chunk_size segments of audio data by iterating over the frames, buffering them, and then yielding several as a single bytes object. """ - buf = b"" - for frame in self.frames: - if len(buf) >= self.chunk_size: + try: + self._start_transcoder() + buf = b"" + for frame in self.frames: + if len(buf) >= self.chunk_size: + yield buf + buf = b"" + if not frame: + break + buf += frame + if buf: yield buf - buf = b"" - if not frame: - break - buf += frame - if buf: - yield buf + finally: + self._stop_transcoder() - @classmethod - def from_source(cls, infile: Path, **kwargs): - """ - Create a FrameAlignedStream instance by transcoding an audio source on disk. - """ + def _stop_transcoder(self): + if self._transcoder: + logger.debug(f"Killing {self._transcoder = }") + self._transcoder.kill() + self._transcoder = None + self._buffer = None + def _start_transcoder(self): args = [] if os.environ.get("DEBUG") else ["-hide_banner", "-loglevel", "quiet"] - ffmpeg_args = ( - ffmpeg.input(str(infile)) - .output( - "pipe:", - map="a", - format="mp3", - # no ID3 headers -- saves having to decode them later - write_xing=0, - id3v2_version=0, - # force sample and bit rates - **{ - "b:a": kwargs.get("bit_rate", cls.bit_rate), - "ar": kwargs.get("sample_rate", cls.sample_rate), - }, - ) - .global_args("-vn", *args) - .compile() + self._transcoder = subprocess.Popen( + ( + ffmpeg.input(str(self.source_file)) + .output( + "pipe:", + map="a", + format="mp3", + # no ID3 headers -- saves having to decode them later + write_xing=0, + id3v2_version=0, + # force sample and bit rates + **{ + "b:a": self.bit_rate, + "ar": self.sample_rate, + }, + ) + .global_args("-vn", *args) + .compile() + ), + bufsize=self.chunk_size, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, ) # Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg. - proc = subprocess.Popen( - ffmpeg_args, bufsize=kwargs.get("chunk_size", cls.chunk_size), stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - proc.stdin.close() - logger.debug(f"Spawned ffmpeg (PID {proc.pid}) with args {ffmpeg_args = }") - return cls(proc.stdout, **kwargs) + self._transcoder.stdin.close() + logger.debug(f"Spawned ffmpeg (PID {self._transcoder.pid}): {' '.join(self._transcoder.args)}") diff --git a/test/conftest.py b/test/conftest.py index 78470aa..2b8ec76 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,3 +1,4 @@ +import logging from pathlib import Path import pytest @@ -14,3 +15,7 @@ def mock_env(monkeypatch): monkeypatch.setenv("ICECAST_PORT", "6523") monkeypatch.setenv("ICECAST_PASSWORD", "password") monkeypatch.setenv("DEBUG", "1") + + logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.DEBUG) + # logging.getLogger('transcoder').setLevel(logging.INFO) + # logging.getLogger('root').setLevel(logging.INFO) diff --git a/test/fixtures/transcoded_silence.mp3 b/test/fixtures/transcoded_silence.mp3 new file mode 100644 index 0000000000000000000000000000000000000000..fbaf9756c9fd5bf1879a831b03c7e8fdb44d37af GIT binary patch literal 72724 zcmeI2u}uR}7{>lc6f}qxm;eRA0u*#mW&;|wU>F97GFkc*#|?2hS(c*|NptZRTd#V{ z?fgBD*O{N0xu1D>nmMfRzxC_-@$Gl!?(_ZY_(LT}w4!DzIkjHmxRN7UQ8SgCS}$>2$q}unnMzKrmpHEEh*s20C8yR)99MEg zD{7{aQ|l#;D>W9M6fIsOxfa!MJ%HayTV+ppvs2#_cOPoDw@w$=MC#_LUq? zi5;lq?1pjsN)D&Q4pee>!?=AVhf`t)DmlAh+`f{-DX{~UoZT>PU&-N=*nvvUZWy<( za2+kfRyV^OT%kF>PSU(TwPMN=~nsHn8MqM)W)-r&mlHSaLKYdY+Qg zE2a%BIhqkYPs!;O(*~9t&4`|- zc{=3mhH?8!4yVKpRC0F1xP2vuQ(^}yIlE!pzLLW!u>+Ny-7s!n$>EgPflAJ97`LzF za7yezC1*E`+gEZpC3c{avm3_kD>Mo!Mu4T$8#blDml4e-n^3IIgt~U zoLn$(Udi#C$cai$E|@p3>vB9NZlVr3w_o78k^@R{BbA)nFK}JS0j0Q+O3v*UxUS@Y zQrt)-=k^O+S8_lpZlscP`vtBmIiM6bQpvgf0@sxsP>LI=q-tN#f?;QZoj~F QB?pw^M(T#9+b?h3KQkcW=Kufz literal 0 HcmV?d00001 diff --git a/test/test_streamer.py b/test/test_streamer.py index e7bbada..dbdbf38 100644 --- a/test/test_streamer.py +++ b/test/test_streamer.py @@ -1,7 +1,7 @@ import io -import queue import threading from pathlib import Path +from time import sleep from unittest.mock import MagicMock import pytest @@ -10,13 +10,10 @@ import shout from croaker import playlist, streamer -def get_stream_output(stream): - return stream.read() - - @pytest.fixture(scope="session") def silence_bytes(): - return (Path(streamer.__file__).parent / "silence.mp3").read_bytes() + # return (Path(streamer.__file__).parent / "silence.mp3").read_bytes() + return (Path(__file__).parent / "fixtures" / "transcoded_silence.mp3").read_bytes() @pytest.fixture @@ -27,6 +24,7 @@ def output_stream(): @pytest.fixture def mock_shout(output_stream, monkeypatch): def handle_send(buf): + print(f"buffering {len(buf)} bytes to output_stream.") output_stream.write(buf) mm = MagicMock(spec=shout.Shout, **{"return_value.send.side_effect": handle_send}) @@ -35,98 +33,60 @@ def mock_shout(output_stream, monkeypatch): @pytest.fixture -def input_queue(): - return queue.Queue() +def audio_streamer(monkeypatch, mock_shout): + return streamer.AudioStreamer() @pytest.fixture -def skip_event(): - return threading.Event() +def thread(audio_streamer): + thread = threading.Thread(target=audio_streamer.run) + thread.daemon = True + yield thread + audio_streamer.shutdown_requested.set() + thread.join() -@pytest.fixture -def stop_event(): - return threading.Event() +def wait_for(condition, timeout=2.0): + elapsed = 0.0 + while not condition() and elapsed < 2.0: + elapsed += 0.01 + sleep(0.01) + return elapsed <= timeout -@pytest.fixture -def load_event(): - return threading.Event() +def wait_for_not(condition, timeout=2.0): + return wait_for(lambda: not condition(), timeout=timeout) -@pytest.fixture -def audio_streamer(mock_shout, input_queue, skip_event, stop_event, load_event): - return streamer.AudioStreamer(input_queue, skip_event, stop_event, load_event) - - -def test_streamer_stop(audio_streamer, stop_event, output_stream): - stop_event.set() - audio_streamer.stream_queued_audio() - assert not stop_event.is_set() - - -def test_streamer_skip(audio_streamer, skip_event, output_stream): - skip_event.set() - audio_streamer.stream_queued_audio() - assert not skip_event.is_set() - - -def test_streamer_load(audio_streamer, load_event, output_stream): - load_event.set() - audio_streamer.stream_queued_audio() - assert not load_event.is_set() - - -def test_clear_queue(audio_streamer, input_queue): +def test_streamer_clear(audio_streamer, thread): + # enqueue some tracks pl = playlist.Playlist(name="test_playlist") for track in pl.tracks: - input_queue.put(bytes(track)) - assert input_queue.not_empty - audio_streamer.clear_queue() - assert input_queue.empty + audio_streamer.queue.put(bytes(track)) + assert not audio_streamer.queue.empty() + + # start the server and send it a clear request + thread.start() + audio_streamer.clear_requested.set() + assert wait_for(audio_streamer.queue.empty) + assert wait_for_not(audio_streamer.clear_requested.is_set) -@pytest.mark.skip -def test_streamer_defaults_to_silence(audio_streamer, input_queue, output_stream, silence_bytes): - audio_streamer.stream_queued_audio() - track = playlist.Playlist(name="test_playlist").tracks[0] - input_queue.put(bytes(track)) - audio_streamer.stream_queued_audio() - audio_streamer.stream_queued_audio() - assert get_stream_output(output_stream) == silence_bytes + track.read_bytes() + silence_bytes +def test_streamer_shutdown(audio_streamer, thread): + thread.start() + audio_streamer.shutdown_requested.set() + assert wait_for_not(audio_streamer.shutdown_requested.is_set) -@pytest.mark.skip -def test_streamer_plays_silence_on_error(monkeypatch, audio_streamer, input_queue, output_stream, silence_bytes): - monkeypatch.setattr(audio_streamer.queue, "get", MagicMock(side_effect=Exception)) - track = playlist.Playlist(name="test_playlist").tracks[0] - input_queue.put(bytes(track)) - audio_streamer.stream_queued_audio() - assert get_stream_output(output_stream) == silence_bytes +def test_streamer_skip(audio_streamer, thread): + thread.start() + audio_streamer.skip_requested.set() + assert wait_for_not(audio_streamer.skip_requested.is_set) -@pytest.mark.skip -def test_streamer_plays_from_queue(audio_streamer, input_queue, output_stream): - pl = playlist.Playlist(name="test_playlist") - expected = b"" - for track in pl.tracks: - input_queue.put(bytes(track)) - expected += track.read_bytes() - while not input_queue.empty(): - audio_streamer.stream_queued_audio() - assert get_stream_output(output_stream) == expected - - -def test_streamer_handles_stop_interrupt(audio_streamer, output_stream, stop_event): - stop_event.set() - audio_streamer.stream_queued_audio() - assert get_stream_output(output_stream) == b"" - - -def test_streamer_handles_load_interrupt(audio_streamer, input_queue, output_stream, load_event): - pl = playlist.Playlist(name="test_playlist") - input_queue.put(bytes(pl.tracks[0])) - load_event.set() - audio_streamer.stream_queued_audio() - assert get_stream_output(output_stream) == b"" - assert input_queue.empty +def test_streamer_defaults_to_silence(audio_streamer, thread, output_stream, silence_bytes): + thread.start() + thread.join(timeout=1) + output_stream.seek(0, 0) + out = output_stream.read() + assert silence_bytes in out