#!/usr/bin/perl use v5.32; use Getopt::Long (); use Log::Any qw($log); use Log::Any::Adapter Stderr => log_level => 'error'; use Object::Pad; use Syntax::Keyword::Try; class Options { use Log::Any qw($log); use Time::Duration qw(duration_exact); use Time::Duration::Parse qw(parse_duration); has $log_level :reader = 'warn'; has $target_queue_length :reader = 10; has $mpd_host :reader = undef; has $mpd_port :reader = undef; has $db_path :reader = 'mpd-feeder'; has $db_user :reader = undef; has $db_password :reader = undef; has $min_album_interval :reader = parse_duration('5h'); has $min_song_interval :reader = parse_duration('13d'); has $min_artist_interval :reader = parse_duration('1h 15m'); has $skip_db_update :reader = 0; method parse_command_line { Getopt::Long::GetOptions( 'log-level=s' => \$log_level, 'skip-db-update!' => \$skip_db_update, 'tql|target-queue-length=n' => sub { $target_queue_length = parse_integer(pop); }, 'mpd-host=s' => \$mpd_host, 'mpd-port=s' => \$mpd_port, 'db-path=s' => \$db_path, 'db-user=s' => \$db_user, 'min-album-interval=s' => sub { $min_album_interval = parse_duration(pop); }, 'min-sing-interval=s' => sub { $min_song_interval = parse_duration(pop); }, 'min-artist-interval=s' => sub { $min_artist_interval = parse_duration(pop); }, ) or exit 1; } sub handle_config_option( $ini, $section, $option, $target_ref, $converter = undef ) { return undef unless exists $ini->{$section}{$option}; my $value = $ini->{$section}{$option}; $value = $converter->($value) if $converter; $$target_ref = $value; $log->trace("Option $section.$option = $value"); } method dump { say "[mpd-feeder]"; say "log_level = $log_level"; say ""; say "[mpd]"; say "host = " . ( $mpd_host // '' ); say "port = " . ( $mpd_port // '' ); say ""; say "[queue]"; say "target-length = $target_queue_length"; say "min-song-interval = " . duration_exact($min_song_interval); say "min-album-interval = " . duration_exact($min_album_interval); say "min-artist-interval = " . duration_exact($min_artist_interval); say ""; say "[db]"; say "path = " . ( $db_path // '' ); say "user = " . ( $db_user // '' ); say "password = " . ( $db_password // '' ); } sub parse_integer($input) { die "Invalid integer value '$input'" unless $input =~ /^\+?\d{1,18}$/; return $input + 0; } method parse_config_file($path) { $log->trace("Parsing configuration file $path"); use Config::INI::Reader; my $ini = Config::INI::Reader->read_file($path); handle_config_option( $ini => mpd => host => \$mpd_host ); handle_config_option( $ini => mpd => port => \$mpd_port ); handle_config_option( $ini => 'mpd-feeder' => log_level => \$log_level ); handle_config_option( $ini => queue => 'target-length' => \$target_queue_length, \&parse_integer ); handle_config_option( $ini => queue => 'min-song-interval' => \$min_song_interval, \&parse_duration ); handle_config_option( $ini => queue => 'min-album-interval' => \$min_album_interval, \&parse_duration ); handle_config_option( $ini => queue => 'min-artist-interval' => \$min_artist_interval, \&parse_duration ); handle_config_option( $ini => db => path => \$db_path ); handle_config_option( $ini => db => user => \$db_user ); handle_config_option( $ini => db => password => \$db_password ); # FIXME: complain about unknown sections/parameters } } class Feeder { has $cfg_file :reader; has $opt :reader; has $db; has $db_generation; has $db_needs_update :writer = 1; has $mpd :reader; use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; use DBD::Pg; use DBI; use Log::Any qw($log); use IO::Async::Signal; use Net::Async::MPD; ADJUST { Getopt::Long::Configure('pass_through'); Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); Getopt::Long::Configure('no_pass_through'); $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; $self->configure; $db_needs_update = 0 if $opt->skip_db_update; } method configure { my $new_opt = Options->new; $new_opt->parse_config_file($cfg_file) if $cfg_file; $new_opt->parse_command_line; Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); $opt = $new_opt; } method connect_mpd { return if $mpd; my %conn = ( auto_connect => 1 ); $conn{host} = $opt->mpd_host if $opt->mpd_host; $conn{port} = $opt->mpd_port if $opt->mpd_port; $mpd = Net::Async::MPD->new(%conn); $mpd->loop->add( IO::Async::Signal->new( name => 'HUP', on_receipt => sub { $log->debug("SIGHUP received. Stopping loop"); $mpd->loop->stop('reload'); }, ) ); $mpd->loop->add( IO::Async::Signal->new( name => 'USR1', on_receipt => sub { $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); my $old = select \*STDERR; try { $opt->dump; } finally { select $old; } }, ) ); } method connect_db { return if $db; $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, $opt->db_user, $opt->db_password, { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); $log->info( "Connected to database " . $opt->db_path ); $db_generation = $self->db_get_option('generation'); $log->debug("DB generation is $db_generation"); $self->update_db; } method db_get_option($name) { my $sth = $db->prepare_cached("select $name from options"); $sth->execute; my @result = $sth->fetchrow_array; $sth->finish; undef $sth; return $result[0]; } method db_set_option( $name, $value ) { my $sth = $db->prepare_cached("update options set $name = ?"); $sth->execute($value); } method db_store_song($song, $artist, $album) { return unless length($song) and length($artist) and length($album); $db->prepare_cached( <<'SQL')->execute( $song, $artist, $album, $db_generation ); INSERT INTO songs(path, artist, album, generation) VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT songs_pkey DO UPDATE SET artist = $2 , album = $3 , generation = $4 SQL $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation ); INSERT INTO albums(artist, album, generation) VALUES($1, $2, $3) ON CONFLICT ON CONSTRAINT albums_pkey DO UPDATE SET generation = $3 SQL $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation ); INSERT INTO artists(artist, generation) VALUES($1, $2) ON CONFLICT ON CONSTRAINT artists_pkey DO UPDATE SET generation = $2 SQL } method db_remove_stale_entries { my $sth = $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); $sth = $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); } method db_note_song_qeued($item) { $db->prepare_cached( 'UPDATE songs SET last_queued=current_timestamp WHERE path=?') ->execute( $item->{song} ); $db->prepare_cached( 'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?') ->execute( $item->{artist} ); $db->prepare_cached( 'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?' )->execute( $item->{artist}, $item->{album} ); } method update_db($force = undef) { if (!$db_needs_update and !$force) { $log->debug("Skipping DB update"); return; } $log->info('Updating song database'); $self->connect_mpd; $self->connect_db; my $rows = $mpd->send('listallinfo')->get; try { $db->begin_work; $db_generation++; my $song_count; foreach my $entry (@$rows) { next unless exists $entry->{file}; $self->db_store_song( $entry->{file}, $entry->{AlbumArtist} // $entry->{Artist}, $entry->{Album} ); $song_count++; } $log->info("Updated data about $song_count songs"); $self->db_remove_stale_entries; $self->db_set_option( generation => $db_generation ); $db->commit; $db_needs_update = 0; } catch { my $err = $@; $db_generation--; $db->rollback; die $err; } } method db_find_suitable_songs($num) { $self->connect_db; $self->update_db; my @result; my $sql = <min_song_interval, $opt->min_artist_interval, $opt->min_album_interval, $num, ); my $sth = $db->prepare_cached($sql); $sth->execute(@params); while ( my @row = $sth->fetchrow_array ) { push @result, { song => $row[0], artist => $row[1], album => $row[2] }; } undef $sth; return @result; } method db_add_unwanted_artist($artist) { $self->connect_db; try { $db->do( <<'SQL', INSERT INTO unwanted_artists(artist, generation) VALUES($1, $2) SQL undef, $artist, $db_generation ); return 1; } catch { my $err = $@; $log->debug("PostgreSQL error: $err"); $log->debug( "SQLSTATE = " . $db->state ); return 0 if $db->state eq '23505'; die $err; } } method db_del_unwanted_artist($artist) { $self->connect_db; return 1 == $db->do( <<'SQL', DELETE FROM unwanted_artists WHERE artist = $1 SQL undef, $artist ); } method queue_songs($num = undef, $callback = undef) { if (!defined $num) { $self->connect_mpd; $mpd->send('playlist')->on_done( sub { my $present = scalar @{ $_[0] }; $log->notice( "Playlist contains $present songs. Wanted: " . $opt->target_queue_length ); if ( $present < $opt->target_queue_length ) { $self->queue_songs( $opt->target_queue_length - $present, $callback ); } else { $callback->() if $callback; } } ); return; } my @list = $self->db_find_suitable_songs($num); die "Found no suitable songs" unless @list; if ( @list < $num ) { $log->warn( sprintf( 'Found only %d suitable songs instead of %d', scalar(@list), $num ) ); } $log->info("About to add $num songs to the playlist"); my @paths; for my $song (@list) { my $path = $song->{song}; $path =~ s/"/\\"/g; push @paths, $path; } $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); my @commands; for (@paths) { push @commands, [ add => "\"$_\"" ]; } $self->connect_mpd; my $f = $mpd->send( \@commands ); $f->on_fail( sub { die @_ } ); $f->on_done( sub { $self->db_note_song_qeued($_) for @list; $callback->(@_) if $callback; } ); } method prepare_to_wait_idle { $log->trace('declaring idle mode'); $mpd->send('idle database playlist')->on_done( sub { my $result = shift; if ( $result->{changed} eq 'database' ) { $db_needs_update = 1; $self->prepare_to_wait_idle; } elsif ( $result->{changed} eq 'playlist' ) { $self->queue_songs( undef, sub { $self->prepare_to_wait_idle } ); } else { use JSON; $log->warn( "Unknown result from idle: " . to_json($result) ); $self->prepare_to_wait_idle; } } ); } method run { $mpd->on( close => sub { die "Connection to MPD lost"; } ); $self->prepare_to_wait_idle; } method stop { undef $mpd; if ($db) { if ($db->{ActiveKids}) { $log->warn("$db->{ActiveKids} active DB statements"); for my $st ( @{ $db->{ChildHandles} } ) { next unless $st->{Active}; while(my($k,$v) = each %$st) { $log->debug("$k = ".($v//'')); } } } $db->disconnect; undef $db; } } } my $feeder = Feeder->new(); if (@ARGV) { my $cmd = shift @ARGV; if ($cmd eq 'dump-config') { die "dump-config command accepts no arguments\n" if @ARGV; $feeder->opt->dump; exit; } if ( $cmd eq 'add-unwanted-artist' ) { die "Missing command arguments\n" unless @ARGV; $feeder->set_db_needs_update(0); for my $artist (@ARGV) { if ( $feeder->db_add_unwanted_artist($artist) ) { $log->info("Artist '$artist' added to the unwanted list\n"); } else { $log->warn("Artist '$artist' already in the unwanted list\n"); } } exit; } if ( $cmd eq 'del-unwanted-artist' ) { die "Missing command arguments\n" unless @ARGV; $feeder->set_db_needs_update(0); for my $artist (@ARGV) { if ( $feeder->db_del_unwanted_artist($artist) ) { $log->info("Artist '$artist' deleted from the unwanted list\n"); } else { $log->warn("Artist '$artist' is not in the unwanted list\n"); } } exit; } if ( $cmd eq 'add-unwanted-album' ) { die "NOT IMPLEMENTED\n"; } if ( $cmd eq 'one-shot' ) { die "one-shot command accepts no arguments\n" if @ARGV; $feeder->queue_songs(undef, sub { exit }); $feeder->mpd->loop->run; } elsif ( $cmd eq 'single' ) { die "single command accepts no arguments\n" if @ARGV; $feeder->queue_songs(1, sub { exit }); $feeder->mpd->loop->run; } else { die "Unknown command '$cmd'"; } } for ( ;; ) { $feeder->queue_songs( undef, sub { $feeder->run } ); $log->debug("Entering event loop. PID=$$"); my $result = $feeder->mpd->loop->run; $log->trace( "Got loop result of " . ( $result // 'undef' ) ); if ('reload' eq $result) { $log->notice("disconnecting"); $feeder->stop; exec( "$0", '--config', $feeder->cfg_file, '--skip-db-update' ); } }