Replace shoutcast implementation with vlc+gtk app

This commit is contained in:
evilchili 2025-09-13 17:13:37 -07:00
parent 263e439a6d
commit 7a4cd1ba50
12 changed files with 282 additions and 579 deletions

View File

@ -22,6 +22,8 @@ psutil = "^5.9.8"
exscript = "^2.6.28" exscript = "^2.6.28"
python-shout = "^0.2.8" python-shout = "^0.2.8"
ffmpeg-python = "^0.2.0" ffmpeg-python = "^0.2.0"
python-vlc = "^3.0.21203"
pygobject = "3.50.0"
[tool.poetry.scripts] [tool.poetry.scripts]
croaker = "croaker.cli:app" croaker = "croaker.cli:app"

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

View File

@ -0,0 +1,16 @@
window {
background: #000;
}
.artwork {
background: #FFF;
}
.label {
color: #888;
}
.now_playing {
color: #FFF;
font-weight: bold;
}

View File

@ -11,8 +11,8 @@ from dotenv import load_dotenv
from typing_extensions import Annotated from typing_extensions import Annotated
from croaker import path from croaker import path
from croaker.player import Player
from croaker.playlist import Playlist from croaker.playlist import Playlist
from croaker.server import server
SETUP_HELP = f""" SETUP_HELP = f"""
# Root directory for croaker configuration and logs. See also croaker --root. # Root directory for croaker configuration and logs. See also croaker --root.
@ -25,18 +25,12 @@ CROAKER_ROOT={path.root()}
#PIDFILE={path.root()}/croaker.pid #PIDFILE={path.root()}/croaker.pid
# Command and Control TCP Server bind address # Command and Control TCP Server bind address
HOST=0.0.0.0 HOST=127.0.0.1
PORT=8003 PORT=8003
# the kinds of files to add to playlists # the kinds of files to add to playlists
MEDIA_GLOB=*.mp3,*.flac,*.m4a MEDIA_GLOB=*.mp3,*.flac,*.m4a
# Icecast2 configuration for Liquidsoap
ICECAST_PASSWORD=
ICECAST_MOUNT=
ICECAST_HOST=
ICECAST_PORT=
ICECAST_URL=
""" """
app = typer.Typer() app = typer.Typer()
@ -90,15 +84,8 @@ def start(
""" """
Start the Croaker command and control server. Start the Croaker command and control server.
""" """
server.start(daemonize=daemonize, shoutcast_enabled=shoutcast) player = Player()
player.run()
@app.command()
def stop():
"""
Terminate the server.
"""
server.stop()
@app.command() @app.command()

189
src/croaker/gui.py Normal file
View File

@ -0,0 +1,189 @@
import threading
import time
import gi
import vlc
from croaker import path
from croaker.playlist import Playlist, load_playlist
gi.require_version("Gtk", "4.0")
gi.require_version("Gdk", "4.0")
from gi.repository import GLib, GObject, Gdk, Gtk, Pango # noqa E402
class PlayerWindow(Gtk.ApplicationWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._max_width = 300
self._max_height = 330
self._artwork_width = self._max_width
self._artwork_height = 248
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(path.assets() / 'style.css'))
Gtk.StyleContext.add_provider_for_display(
Gdk.Display.get_default(),
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
)
self.set_title("Croaker Radio")
self._root = Gtk.Fixed()
self._root.set_size_request(self._max_width, self._max_height)
self.set_child(self._root)
self._artwork = Gtk.Fixed()
self._track = None
self._artist = None
self._album = None
self._draw_window()
def _draw_window(self):
margin_size = 8
label_width = self._max_width - (2 * margin_size)
label_height = 16
label_spacing = 8
self._artwork.set_size_request(self._artwork_width, self._artwork_height)
self._root.put(self._artwork, 0, 0)
self.draw_artwork()
def label(text: str):
l = Gtk.Label()
l.set_ellipsize(Pango.EllipsizeMode.END)
l.add_css_class("label")
l.set_text(text)
l.set_size_request(label_width, label_height)
l.set_justify(Gtk.Justification.LEFT)
l.set_hexpand(True)
l.set_xalign(0)
return l
self._track = label("CROAKER RADIO")
self._track.add_css_class("now_playing")
self._root.put(self._track, margin_size, self._artwork_height + label_spacing)
self._artist = label("Artist")
self._root.put(self._artist, margin_size, self._artwork_height + (2 * label_spacing) + label_height)
self._album = label("Album")
self._root.put(self._album, margin_size, self._artwork_height + (3 * label_spacing) + (2 * label_height))
def now_playing(self, track: str, artist: str, album: str):
self._track.set_text(f"🎵 {track}")
self._artist.set_text(f"🐸 {artist}")
self._album.set_text(f"💿 {album}")
def draw_artwork(self):
image1 = Gtk.Image()
image1.set_from_file(str(path.assets() / 'froghat.png'))
image1.set_size_request(self._artwork_width, self._artwork_height)
image1.add_css_class("artwork")
self._artwork.put(image1, 0, 0)
class GUI(Gtk.Application):
"""
A simple GTK application that instaniates a VLC player and listens for commands.
"""
def __init__(self):
super().__init__()
self._playlist: Playlist | None = None
self._vlc_instance = vlc.Instance("--loop")
self._media_list_player = vlc.MediaListPlayer()
self._player.audio_set_volume(30)
self._signal_handler = threading.Thread(target=self._wait_for_signals)
self._signal_handler.daemon = True
self.play_requested = threading.Event()
self.back_requested = threading.Event()
self.ffwd_requested = threading.Event()
self.stop_requested = threading.Event()
self.load_requested = threading.Event()
self.clear_requested = threading.Event()
self.shutdown_requested = threading.Event()
GLib.set_application_name("Croaker Radio")
@property
def _player(self):
return self._media_list_player.get_media_player()
def do_activate(self):
self._signal_handler.start()
self._window = PlayerWindow(application=self)
self._window.present()
def load(self, playlist_name: str):
self.clear()
self._playlist = load_playlist(playlist_name)
media = self._vlc_instance.media_list_new()
for track in self._playlist.tracks:
media.add_media(self._vlc_instance.media_new(track))
self._media_list_player.set_media_list(media)
self._media_list_player.play()
self._update_now_playing()
events = self._player.event_manager()
events.event_attach(vlc.EventType.MediaPlayerMediaChanged, self._update_now_playing)
def _update_now_playing(self, event=None):
track = "[NOTHING PLAYING]"
artist = "artist"
album = "album"
media = self._player.get_media()
if media:
media.parse()
track = media.get_meta(vlc.Meta.Title)
artist = media.get_meta(vlc.Meta.Artist)
album = media.get_meta(vlc.Meta.Album)
self._window.now_playing(track, artist, album)
def _wait_for_signals(self):
while not self.shutdown_requested.is_set():
if self.play_requested.is_set():
self.play_requested.clear()
GLib.idle_add(self._media_list_player.play)
if self.back_requested.is_set():
self.back_requested.clear()
GLib.idle_add(self._media_list_player.previous)
if self.ffwd_requested.is_set():
self.ffwd_requested.clear()
GLib.idle_add(self._media_list_player.next)
if self.stop_requested.is_set():
self.stop_requested.clear()
GLib.idle_add(self._media_list_player.stop)
if self.load_requested.is_set():
self.load_requested.clear()
GLib.idle_add(self._media_list_player.load)
if self.clear_requested.is_set():
self.clear_requested.clear()
GLib.idle_add(self.clear)
time.sleep(0.25)
GLib.idle_add(self.quit)
exit()
def clear(self):
if self._media_list_player:
self._media_list_player.stop()
self._playlist = None
def quit(self):
self.clear()
self._vlc_instance.release()
exit()

View File

@ -9,6 +9,10 @@ def root():
return Path(os.environ.get("CROAKER_ROOT", "~/.dnd/croaker")).expanduser() return Path(os.environ.get("CROAKER_ROOT", "~/.dnd/croaker")).expanduser()
def assets():
return Path(__file__).parent / 'assets'
def playlist_root(): def playlist_root():
path = Path(os.environ.get("PLAYLIST_ROOT", root() / "playlists")).expanduser() path = Path(os.environ.get("PLAYLIST_ROOT", root() / "playlists")).expanduser()
return path return path

31
src/croaker/player.py Normal file
View File

@ -0,0 +1,31 @@
import logging
import threading
import gi
from croaker.gui import GUI
from croaker.server import Controller
gi.require_version("Gtk", "4.0")
from gi.repository import GLib, GObject, Gtk # noqa E402
logger = logging.getLogger("player")
class Player(GUI):
"""
A GTK GUI application with a TCP command and control server.
"""
def __init__(self):
super().__init__()
self._controller = threading.Thread(target=self._start_controller)
self._controller.daemon = True
def do_activate(self):
self._controller.start()
super().do_activate()
self.load("session_start")
def _start_controller(self):
Controller(self).serve_forever(poll_interval=0.25)

View File

@ -1,25 +1,20 @@
import logging import logging
import os import os
import queue import socket
import socketserver import socketserver
from pathlib import Path import time
from time import sleep
import daemon from croaker.gui import GUI
from croaker.path import playlist_root
from croaker import path
from croaker.pidfile import pidfile
from croaker.playlist import load_playlist from croaker.playlist import load_playlist
from croaker.streamer import AudioStreamer
logger = logging.getLogger("server") logger = logging.getLogger(__name__)
class RequestHandler(socketserver.StreamRequestHandler): class RequestHandler(socketserver.StreamRequestHandler):
""" """
Instantiated by the TCPServer when a request is received. Implements the Instantiated by the TCPServer when a request is received. Implements the
command and control protocol and sends commands to the shoutcast source command and control protocol and issues commands to the GUI application.
client on behalf of the user.
""" """
supported_commands = { supported_commands = {
@ -46,43 +41,44 @@ class RequestHandler(socketserver.StreamRequestHandler):
4 Ignored 4 Ignored
5+ Arguments 5+ Arguments
""" """
while True: while self.should_listen:
time.sleep(0.01)
self.data = self.rfile.readline().strip().decode() self.data = self.rfile.readline().strip().decode()
logger.debug(f"Received: {self.data}") logger.debug(f"Received: {self.data}")
try: try:
cmd = self.data[0:4].strip().upper() cmd = self.data[0:4].strip().upper()
args = self.data[5:]
except IndexError:
self.send(f"ERR Command not understood '{cmd}'")
sleep(0.001)
continue
if not cmd: if not cmd:
sleep(0.001)
continue continue
elif cmd not in self.supported_commands: elif cmd not in self.supported_commands:
self.send(f"ERR Unknown Command '{cmd}'") self.send(f"ERR Unknown Command '{cmd}'")
sleep(0.001) except IndexError:
self.send(f"ERR Command not understood '{cmd}'")
continue continue
elif cmd == "KTHX":
args = self.data[5:]
if cmd == "KTHX":
return self.send("KBAI") return self.send("KBAI")
handler = getattr(self, f"handle_{cmd}", None) handler = getattr(self, f"handle_{cmd}", None)
if not handler: if not handler:
self.send(f"ERR No handler for {cmd}.") self.send(f"ERR No handler for {cmd}.")
continue
handler(args) handler(args)
if not self.should_listen:
break
def send(self, msg): def send(self, msg):
return self.wfile.write(msg.encode() + b"\n") return self.wfile.write(msg.encode() + b"\n")
def handle_PLAY(self, args): def handle_PLAY(self, args):
self.server.load(args) self.server.player.load(args)
return self.send("OK")
def handle_BACK(self, args):
self.server.player.back_requested.set()
return self.send("OK") return self.send("OK")
def handle_FFWD(self, args): def handle_FFWD(self, args):
self.server.ffwd() self.server.player.ffwd_requested.set()
return self.send("OK") return self.send("OK")
def handle_LIST(self, args): def handle_LIST(self, args):
@ -92,99 +88,31 @@ class RequestHandler(socketserver.StreamRequestHandler):
return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items())) return self.send("\n".join(f"{cmd} {txt}" for cmd, txt in self.supported_commands.items()))
def handle_STOP(self, args): def handle_STOP(self, args):
return self.streamer.stop_requested.set() return self.server.player.stop_requested.set()
def handle_STFU(self, args): def handle_STFU(self, args):
self.send("Shutting down.") self.send("Shutting down.")
self.server.stop() self.server.shutdown()
class CroakerServer(socketserver.TCPServer): class Controller(socketserver.TCPServer):
""" """
A Daemonized TCP Server that also starts a Shoutcast source client. A TCP Server that listens for commands and proxies the GUI audio player.
""" """
allow_reuse_address = True def __init__(self, player: GUI):
self.player = player
super().__init__((os.environ["HOST"], int(os.environ["PORT"])), RequestHandler)
def __init__(self): def server_bind(self):
self._context = daemon.DaemonContext() self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._queue = queue.Queue() self.socket.bind(self.server_address)
self._streamer = None
self.playlist = None
def _pidfile(self): def shutdown(self):
return pidfile(path.root() / "croaker.pid") self.player.shutdown_requested.set()
exit()
@property
def streamer(self):
return self._streamer
def bind_address(self):
return (os.environ["HOST"], int(os.environ["PORT"]))
def _daemonize(self) -> None:
"""
Daemonize the current process.
"""
logger.info(f"Daemonizing controller; pidfile and output in {path.root()}")
self._context.pidfile = self._pidfile()
self._context.stdout = open(path.root() / Path("croaker.out"), "wb", buffering=0)
self._context.stderr = open(path.root() / Path("croaker.err"), "wb", buffering=0)
# when open() is called, all open file descriptors will be closed, as
# befits a good daemon. However this will also close the socket on
# which the TCPServer is listening! So let's keep that one open.
self._context.files_preserve = [self.fileno()]
self._context.open()
def start(self, daemonize: bool = True, shoutcast_enabled: bool = True) -> None:
"""
Start the shoutcast controller background thread, then begin listening for connections.
"""
logger.info(f"Starting controller on {self.bind_address()}.")
super().__init__(self.bind_address(), RequestHandler)
if daemonize:
self._daemonize()
try:
logger.debug("Starting AudioStreamer...")
self._streamer = AudioStreamer(self._queue, shoutcast_enabled=shoutcast_enabled)
self.streamer.start()
self.load("session_start")
self.serve_forever()
except KeyboardInterrupt:
logger.info("Keyboard interrupt detected.")
self.streamer.shutdown_requested.set()
self.stop()
def stop(self):
self._pidfile()
def ffwd(self):
logger.debug("Sending SKIP signal to streamer...")
self.streamer.skip_requested.set()
def clear_queue(self):
logger.debug("Requesting a clear...")
self.streamer.clear_requested.set()
while self.streamer.clear_requested.is_set():
sleep(0.001)
logger.debug("Cleared")
def list(self, playlist_name: str = None): def list(self, playlist_name: str = None):
if playlist_name: if playlist_name:
return str(load_playlist(playlist_name)) return str(load_playlist(playlist_name))
return "\n".join([str(p.name) for p in path.playlist_root().iterdir()]) return "\n".join([str(p.name) for p in playlist_root().iterdir()])
def load(self, playlist_name: str):
logger.debug(f"Switching to {playlist_name = }")
self.streamer.stop_requested.set()
if self.playlist:
self.clear_queue()
self.playlist = load_playlist(playlist_name)
logger.debug(f"Loaded new playlist {self.playlist = }")
for track in self.playlist.tracks:
self._queue.put(str(track).encode())
self.streamer.start_requested.set()
server = CroakerServer()

View File

@ -1,168 +0,0 @@
import logging
import os
import queue
import threading
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from time import sleep
import shout
from croaker.transcoder import FrameAlignedStream
logger = logging.getLogger("streamer")
class AudioStreamer(threading.Thread):
"""
Receive filenames from the controller thread and stream the contents of
those files to the icecast server.
"""
def __init__(self, queue: queue.Queue = queue.Queue(), chunk_size: int = 8092, shoutcast_enabled: bool = True):
super().__init__()
self.queue = queue
self.chunk_size = chunk_size
self._shoutcast_enabled = shoutcast_enabled
self.skip_requested = threading.Event()
self.stop_requested = threading.Event()
self.start_requested = threading.Event()
self.clear_requested = threading.Event()
self.shutdown_requested = threading.Event()
@cached_property
def silence(self):
return FrameAlignedStream(Path(__file__).parent / "silence.mp3", chunk_size=self.chunk_size)
@cached_property
def _out(self):
if self._shoutcast_enabled:
s = shout.Shout()
else:
s = debugServer()
s.name = "Croaker Radio"
s.url = os.environ["ICECAST_URL"]
s.mount = os.environ["ICECAST_MOUNT"]
s.host = os.environ["ICECAST_HOST"]
s.port = int(os.environ["ICECAST_PORT"])
s.password = os.environ["ICECAST_PASSWORD"]
s.protocol = os.environ.get("ICECAST_PROTOCOL", "http")
s.format = os.environ.get("ICECAST_FORMAT", "mp3")
return s
def run(self): # pragma: no cover
while not self.shutdown_requested.is_set():
try:
self.connect()
self.stream_forever()
break
except shout.ShoutException as e:
logger.error("Error connecting to shoutcast server. Will sleep and try again.", exc_info=e)
sleep(3)
self.shutdown()
self.shutdown_requested.clear()
def connect(self):
logger.info(f"Connecting to downstream server at {self._out}")
self._out.close()
self._out.open()
def shutdown(self):
if hasattr(self, "_out"):
self._out.close()
del self._out
self.clear_queue()
logger.info("Shutting down.")
def clear_queue(self):
logger.info("Clearing queue...")
while not self.queue.empty():
self.queue.get()
def queued_audio_source(self):
"""
Return a filehandle to the next queued audio source, or silence if the queue is empty.
"""
try:
track = Path(self.queue.get(block=False).decode())
logger.debug(f"Streaming {track.stem = }")
return FrameAlignedStream(track, chunk_size=self.chunk_size), track.stem
except queue.Empty:
logger.debug("Nothing queued; enqueing silence.")
except Exception as exc:
logger.error("Caught exception; falling back to silence.", exc_info=exc)
return self.silence, "[NOTHING PLAYING]"
def pause_if_necessary(self):
while self.stop_requested.is_set():
if self.start_requested.is_set():
self.stop_requested.clear()
self.start_requested.clear()
return
sleep(0.001)
def stream_forever(self):
while not self.shutdown_requested.is_set():
self.pause_if_necessary()
stream, title = self.queued_audio_source()
logging.debug(f"Starting stream of {title = }")
self._out.set_metadata({"song": title})
for chunk in stream:
if self.skip_requested.is_set():
logger.info("EVENT: Skip")
self.skip_requested.clear()
break
if self.clear_requested.is_set():
logger.info("EVENT: Clear")
self.clear_queue()
self.clear_requested.clear()
break
if self.stop_requested.is_set():
logger.info("EVENT: Stop")
break
if self.start_requested.is_set():
self.start_requested.clear()
break
if self.shutdown_requested.is_set():
logger.info("EVENT: Shutdown")
break
logger.debug(f"{title}: {len(chunk)} bytes")
self._out.send(chunk)
self._out.sync()
@dataclass
class debugServer:
name: str = "Croaker Debugger"
url: str = None
mount: str = None
host: str = None
port: str = None
password: str = None
format: str = None
_output_file: Path = Path("/dev/null") # Path("./croaker.stream.output.mp3")
_filehandle = None
def open(self):
self._filehandle = self._output_file.open("wb")
def close(self):
if self._filehandle:
self._filehandle.close()
self._filehandle = None
def set_metadata(self, metadata: dict):
logger.info(f"debugServer: {metadata = }")
def send(self, chunk: bytes):
self._filehandle.write(chunk)
def sync(self):
self._filehandle.flush()

View File

@ -1,151 +0,0 @@
import io
import logging
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
import ffmpeg
logger = logging.getLogger("transcoder")
@dataclass
class FrameAlignedStream:
"""
Use ffmpeg to transcode a source audio file to mp3 and iterate over the result
in frame-aligned chunks. This will ensure that readers will always have a full
frame of audio data to parse or emit.
I learned a lot from https://github.com/pylon/streamp3 figuring this stuff out!
Usage:
>>> stream = FrameAlignedStream.from_source(Path('test.flac').open('rb'))
>>> for segment in stream:
...
"""
source_file: Path
chunk_size: int = 1024
bit_rate: int = 192000
sample_rate: int = 44100
_transcoder: subprocess.Popen = None
_buffer: io.BufferedReader = None
@property
def source(self):
if self._buffer:
return self._buffer
if self._transcoder:
return self._transcoder.stdout
logger.info("Source is empty")
return None
@property
def frames(self):
while True:
frame = self._read_one_frame()
if not frame:
return
yield frame
def _read_one_frame(self):
"""
Read the next full audio frame from the input source and return it
"""
# step through the source a byte at a time and look for the frame sync.
header = None
buffer = b""
while not header:
buffer += self.source.read(4 - len(buffer))
if len(buffer) != 4:
logging.debug("Reached the end of the source stream without finding another framesync.")
return False
header = buffer[:4]
if header[0] != 0b11111111 or header[1] >> 5 != 0b111:
logging.debug(f"Expected a framesync but got {buffer} instead; moving fwd 1 byte.")
header = None
buffer = buffer[1:]
# Decode the mp3 header. We could derive the bit_rate and sample_rate
# here if we had the lookup tables etc. from the MPEG spec, but since
# we control the input, we can rely on them being predefined.
version_code = (header[1] & 0b00011000) >> 3
padding_code = (header[2] & 0b00000010) >> 1
version = version_code & 1 if version_code >> 1 else 2
is_padded = bool(padding_code)
# calculate the size of the whole frame
frame_size = 1152 if version == 1 else 576
frame_size = self.bit_rate // 8 * frame_size // self.sample_rate
if is_padded:
frame_size += 1
# read the rest of the frame from the source
frame_data = self.source.read(frame_size - len(header))
if len(frame_data) != frame_size - len(header):
logging.debug("Reached the end of the source stream without finding a full frame.")
return None
# return the entire frame
return header + frame_data
def __iter__(self):
"""
Generate approximately chunk_size segments of audio data by iterating over the
frames, buffering them, and then yielding several as a single bytes object.
"""
try:
self._start_transcoder()
buf = b""
for frame in self.frames:
if len(buf) >= self.chunk_size:
yield buf
buf = b""
if not frame:
break
buf += frame
if buf:
yield buf
finally:
self._stop_transcoder()
def _stop_transcoder(self):
if self._transcoder:
logger.debug(f"Killing {self._transcoder = }")
self._transcoder.kill()
self._transcoder = None
self._buffer = None
def _start_transcoder(self):
args = [] if os.environ.get("DEBUG") else ["-hide_banner", "-loglevel", "quiet"]
self._transcoder = subprocess.Popen(
(
ffmpeg.input(str(self.source_file))
.output(
"pipe:",
map="a",
format="mp3",
# no ID3 headers -- saves having to decode them later
write_xing=0,
id3v2_version=0,
# force sample and bit rates
**{
"b:a": self.bit_rate,
"ar": self.sample_rate,
},
)
.global_args("-vn", *args)
.compile()
),
bufsize=self.chunk_size,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
)
# Force close STDIN to prevent ffmpeg from trying to read from it. silly ffmpeg.
self._transcoder.stdin.close()
logger.debug(f"Spawned ffmpeg (PID {self._transcoder.pid}): {' '.join(self._transcoder.args)}")

View File

@ -1,92 +0,0 @@
import io
import threading
from pathlib import Path
from time import sleep
from unittest.mock import MagicMock
import pytest
import shout
from croaker import playlist, streamer
@pytest.fixture(scope="session")
def silence_bytes():
# return (Path(streamer.__file__).parent / "silence.mp3").read_bytes()
return (Path(__file__).parent / "fixtures" / "transcoded_silence.mp3").read_bytes()
@pytest.fixture
def output_stream():
return io.BytesIO()
@pytest.fixture
def mock_shout(output_stream, monkeypatch):
def handle_send(buf):
print(f"buffering {len(buf)} bytes to output_stream.")
output_stream.write(buf)
mm = MagicMock(spec=shout.Shout, **{"return_value.send.side_effect": handle_send})
monkeypatch.setattr("shout.Shout", mm)
return mm
@pytest.fixture
def audio_streamer(monkeypatch, mock_shout):
return streamer.AudioStreamer()
@pytest.fixture
def thread(audio_streamer):
thread = threading.Thread(target=audio_streamer.run)
thread.daemon = True
yield thread
audio_streamer.shutdown_requested.set()
thread.join()
def wait_for(condition, timeout=2.0):
elapsed = 0.0
while not condition() and elapsed < 2.0:
elapsed += 0.01
sleep(0.01)
return elapsed <= timeout
def wait_for_not(condition, timeout=2.0):
return wait_for(lambda: not condition(), timeout=timeout)
def test_streamer_clear(audio_streamer, thread):
# enqueue some tracks
pl = playlist.Playlist(name="test_playlist")
for track in pl.tracks:
audio_streamer.queue.put(bytes(track))
assert not audio_streamer.queue.empty()
# start the server and send it a clear request
thread.start()
audio_streamer.clear_requested.set()
assert wait_for(audio_streamer.queue.empty)
assert wait_for_not(audio_streamer.clear_requested.is_set)
def test_streamer_shutdown(audio_streamer, thread):
thread.start()
audio_streamer.shutdown_requested.set()
assert wait_for_not(audio_streamer.shutdown_requested.is_set)
def test_streamer_skip(audio_streamer, thread):
thread.start()
audio_streamer.skip_requested.set()
assert wait_for_not(audio_streamer.skip_requested.is_set)
def test_streamer_defaults_to_silence(audio_streamer, thread, output_stream, silence_bytes):
thread.start()
thread.join(timeout=1)
output_stream.seek(0, 0)
out = output_stream.read()
assert silence_bytes in out

View File

@ -1,43 +0,0 @@
from unittest.mock import MagicMock
import ffmpeg
import pytest
from croaker import playlist, transcoder
@pytest.fixture
def mock_mp3decoder(monkeypatch):
def read(stream):
return stream.read()
monkeypatch.setattr(transcoder, "MP3Decoder", MagicMock(**{"__enter__.return_value.read": read}))
@pytest.mark.xfail
@pytest.mark.parametrize(
"suffix, expected",
[
(".mp3", b"_theme.mp3\n"),
(".foo", b"transcoding!\n"),
],
)
def test_transcoder_open(monkeypatch, mock_mp3decoder, suffix, expected):
monkeypatch.setattr(
transcoder,
"ffmpeg",
MagicMock(
spec=ffmpeg,
**{
"input.return_value."
"output.return_value."
"global_args.return_value."
"compile.return_value": ["echo", "transcoding!"],
},
),
)
pl = playlist.Playlist(name="test_playlist")
track = [t for t in pl.tracks if t.suffix == suffix][0]
with transcoder.open(track) as handle:
assert handle.read() == expected