X-Git-Url: https://git.ktnx.net/?a=blobdiff_plain;ds=sidebyside;f=bin%2Fmpd-feeder;h=8644dde79e4b6addcbceaf71695e20f204c38f02;hb=4d938ab86005be9673a6d5ee10da5923cbae4ba1;hp=d2768c68fa11e3833083c1f81df7e9768192dcd2;hpb=f55502115ee1db8fbc9de05bfa6bb2cbfcf82fb3;p=mpd-feeder.git diff --git a/bin/mpd-feeder b/bin/mpd-feeder index d2768c6..8644dde 100755 --- a/bin/mpd-feeder +++ b/bin/mpd-feeder @@ -3,12 +3,16 @@ use v5.32; use Getopt::Long (); +use Log::Any qw($log); +use Log::Any::Adapter Stderr => log_level => 'error'; use Object::Pad; use Syntax::Keyword::Try; class Options { + use Log::Any qw($log); use Time::Duration qw(duration_exact); use Time::Duration::Parse qw(parse_duration); + has $log_level :reader = 'warn'; has $target_queue_length :reader = 10; has $mpd_host :reader = undef; has $mpd_port :reader = undef; @@ -18,34 +22,20 @@ class Options { has $min_album_interval :reader = parse_duration('5h'); has $min_song_interval :reader = parse_duration('13d'); has $min_artist_interval :reader = parse_duration('1h 15m'); - has $verbose :reader = 0; - has $single :reader = 0; - has $one_shot :reader = 0; has $skip_db_update :reader = 0; - has $dump_config :reader = 0; - - method verb($message) { - return unless $self->opt->verbose; - warn "$message\n"; - } - method dbg($message) { - return unless $self->opt->verbose > 1; - warn "$message\n"; - } method parse_command_line { Getopt::Long::GetOptions( - 'v|verbose+' => \$verbose, - 'dump-config!' => \$dump_config, - 's|single!' => \$single, - 'one-shot!' => \$one_shot, + 'log-level=s' => \$log_level, 'skip-db-update!' => \$skip_db_update, - 'tql|target-queue-length=n' => \$target_queue_length, - 'mpd-host=s' => \$mpd_host, - 'mpd-port=s' => \$mpd_port, - 'db-path=s' => \$db_path, - 'db-user=s' => \$db_user, - 'min-album-interval=s' => sub { + 'tql|target-queue-length=n' => sub { + $target_queue_length = parse_integer(pop); + }, + 'mpd-host=s' => \$mpd_host, + 'mpd-port=s' => \$mpd_port, + 'db-path=s' => \$db_path, + 'db-user=s' => \$db_user, + 'min-album-interval=s' => sub { $min_album_interval = parse_duration(pop); }, 'min-sing-interval=s' => sub { @@ -67,16 +57,17 @@ class Options { $value = $converter->($value) if $converter; $$target_ref = $value; + + $log->trace("Option $section.$option = $value"); } method dump { say "[mpd-feeder]"; - say "verbose = $verbose"; + say "log_level = $log_level"; say ""; say "[mpd]"; say "host = " . ( $mpd_host // '' ); say "port = " . ( $mpd_port // '' ); - say "target-queue-length = $target_queue_length"; say ""; say "[queue]"; say "target-length = $target_queue_length"; @@ -90,17 +81,26 @@ class Options { say "password = " . ( $db_password // '' ); } + sub parse_integer($input) { + die "Invalid integer value '$input'" unless $input =~ /^\+?\d{1,18}$/; + return $input + 0; + } + method parse_config_file($path) { + $log->trace("Parsing configuration file $path"); + use Config::INI::Reader; my $ini = Config::INI::Reader->read_file($path); handle_config_option( $ini => mpd => host => \$mpd_host ); handle_config_option( $ini => mpd => port => \$mpd_port ); - handle_config_option( $ini => 'mpd-feeder' => verbose => \$verbose ); + handle_config_option( $ini => 'mpd-feeder' => log_level => \$log_level ); handle_config_option( - $ini => queue => 'target-length' => \$target_queue_length ); + $ini => queue => 'target-length' => \$target_queue_length, + \&parse_integer + ); handle_config_option( $ini => queue => 'min-song-interval' => \$min_song_interval, \&parse_duration @@ -123,56 +123,100 @@ class Options { } class Feeder { + has $cfg_file :reader; has $opt :reader; has $db; has $db_generation; - has $mpd; + has $db_needs_update :writer = 1; + has $mpd :reader; use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; +use DBD::Pg; +use DBI; +use Log::Any qw($log); +use IO::Async::Signal; +use Net::Async::MPD; + ADJUST { - $opt = Options->new; + Getopt::Long::Configure('pass_through'); + Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); + Getopt::Long::Configure('no_pass_through'); - { - my $cfg_file; - Getopt::Long::Configure('pass_through'); - Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); - Getopt::Long::Configure('no_pass_through'); + $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; - $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; + $self->configure; - $opt->parse_config_file($cfg_file) if $cfg_file; - } + $db_needs_update = 0 if $opt->skip_db_update; + } - $opt->parse_command_line; + method configure { + my $new_opt = Options->new; - unless ($opt->dump_config) { - $mpd = Net::Async::MPD->new( - host => $opt->mpd_host, - port => $opt->mpd_port, - auto_connect => 1, - ); + $new_opt->parse_config_file($cfg_file) if $cfg_file; - $self->connect_db; - $self->update_db unless $self->opt->skip_db_update; - } + $new_opt->parse_command_line; + + Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); + + $opt = $new_opt; + } + + method connect_mpd { + return if $mpd; + + my %conn = ( auto_connect => 1 ); + $conn{host} = $opt->mpd_host if $opt->mpd_host; + $conn{port} = $opt->mpd_port if $opt->mpd_port; + + $mpd = Net::Async::MPD->new(%conn); + + $mpd->loop->add( + IO::Async::Signal->new( + name => 'HUP', + on_receipt => sub { + $log->debug("SIGHUP received. Stopping loop"); + $mpd->loop->stop('reload'); + }, + ) + ); + + $mpd->loop->add( + IO::Async::Signal->new( + name => 'USR1', + on_receipt => sub { + $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); + my $old = select \*STDERR; + try { + $opt->dump; + } + finally { + select $old; + } + }, + ) + ); } method connect_db { return if $db; - $db = - DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, + $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, $opt->db_user, $opt->db_password, - { RaiseError => 1, AutoCommit => 1 } ); + { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); + $log->info( "Connected to database " . $opt->db_path ); $db_generation = $self->db_get_option('generation'); + $log->debug("DB generation is $db_generation"); + $self->update_db; } method db_get_option($name) { my $sth = $db->prepare_cached("select $name from options"); $sth->execute; my @result = $sth->fetchrow_array; + $sth->finish; + undef $sth; return $result[0]; } @@ -209,12 +253,19 @@ SQL } method db_remove_stale_entries { - $db->prepare_cached('DELETE FROM songs WHERE generation <> ?') - ->execute($db_generation); - $db->prepare_cached('DELETE FROM albums WHERE generation <> ?') - ->execute($db_generation); - $db->prepare_cached('DELETE FROM artists WHERE generation <> ?') - ->execute($db_generation); + my $sth = + $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); + + $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); + + $sth = + $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); } method db_note_song_qeued($item) { @@ -229,133 +280,311 @@ SQL )->execute( $item->{artist}, $item->{album} ); } - method update_db() { - $mpd->send('listallinfo')->on_done( - sub { - try { - $db->begin; - - $db_generation++; - - my ($song, $artist, $album); - - foreach my $row (@_) { - chomp($row); - - if ($row =~ s/^file:\s*//) { - $self->db_store_song( $song, $artist, $album ); - $song = $row; - $artist = $album = undef; - } - elsif ( $row =~ s/^Artist:\s*// ) { - $artist = $row; - } - elsif ( $row =~ s/^Album:\s*// ) { - $album = $row; - } - } + method update_db($force = undef) { + if (!$db_needs_update and !$force) { + $log->debug("Skipping DB update"); + return; + } - $self->db_store_song($song, $artist, $album); + $log->info('Updating song database'); + $self->connect_mpd; + $self->connect_db; - $self->db_remove_stale_entries; + my $rows = $mpd->send('listallinfo')->get; + try { + $db->begin_work; - $self->db_set_option( generation => $db_generation ); + $db_generation++; - $db->commit; - } - catch { - my $err = $@; + my $song_count; - $db_generation--; + foreach my $entry (@$rows) { + next unless exists $entry->{file}; + $self->db_store_song( $entry->{file}, + $entry->{AlbumArtist} // $entry->{Artist}, + $entry->{Album} ); + $song_count++; + } - $db->rollback; + $log->info("Updated data about $song_count songs"); - die $err; - } - } - ); + $self->db_remove_stale_entries; + + $self->db_set_option( generation => $db_generation ); + + $db->commit; + + $db_needs_update = 0; + } + catch { + my $err = $@; + + $db_generation--; + + $db->rollback; + + die $err; + } } method db_find_suitable_songs($num) { + $self->connect_db; + $self->update_db; + my @result; - my $sth = $db->prepare_cached(<execute( - $self->opt->min_song_interval, - $self->opt->min_artist_interval, - $self->opt->min_album_interval, - $num, + my @params = ( + $opt->min_song_interval, $opt->min_artist_interval, + $opt->min_album_interval, $num, ); + my $sth = $db->prepare_cached($sql); + $sth->execute(@params); while ( my @row = $sth->fetchrow_array ) { push @result, { song => $row[0], artist => $row[1], album => $row[2] }; } + undef $sth; return @result; } - method queue_songs($num = undef) { + method db_add_unwanted_artist($artist) { + $self->connect_db; + + try { + $db->do( + <<'SQL', +INSERT INTO unwanted_artists(artist, generation) +VALUES($1, $2) +SQL + undef, $artist, $db_generation + ); + return 1; + } + catch { + my $err = $@; + + $log->debug("PostgreSQL error: $err"); + $log->debug( "SQLSTATE = " . $db->state ); + return 0 if $db->state eq '23505'; + + die $err; + } + } + + method db_del_unwanted_artist($artist) { + $self->connect_db; + + return 1 == $db->do( + <<'SQL', +DELETE FROM unwanted_artists +WHERE artist = $1 +SQL + undef, $artist + ); + } + + method queue_songs($num = undef, $callback = undef) { if (!defined $num) { - $mpd->send('playlist')->on_done( sub { - my $present = scalar @_; + $self->connect_mpd; + $mpd->send('playlist')->on_done( + sub { + my $present = scalar @{ $_[0] }; + + $log->notice( "Playlist contains $present songs. Wanted: " + . $opt->target_queue_length ); + if ( $present < $opt->target_queue_length ) { + $self->queue_songs( + $opt->target_queue_length - $present, $callback ); + } + else { + $callback->() if $callback; + } + } + ); - $self->queue_songs( $opt->target_queue_length - $present ) - if $present < $opt->target_queue_length; - } ); + return; } - else { - my @list = $self->db_find_suitable_songs($num); - - if (@list < $num) { - $mpd->loop->add( - IO::Async::Timer::Countdown->new( - delay => 15, - on_expire => sub { $self->queue_songs($num) }, - ) - ); - } - else { - $mpd->send( [ map {"add $_->{song}"} @list ] ); + + my @list = $self->db_find_suitable_songs($num); + + die "Found no suitable songs" unless @list; + + if ( @list < $num ) { + $log->warn( + sprintf( + 'Found only %d suitable songs instead of %d', + scalar(@list), $num + ) + ); + } + + $log->info("About to add $num songs to the playlist"); + + my @paths; + for my $song (@list) { + my $path = $song->{song}; + $path =~ s/"/\\"/g; + push @paths, $path; + } + + $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); + my @commands; + for (@paths) { + push @commands, [ add => "\"$_\"" ]; + } + $self->connect_mpd; + my $f = $mpd->send( \@commands ); + $f->on_fail( sub { die @_ } ); + $f->on_done( + sub { $self->db_note_song_qeued($_) for @list; + $callback->(@_) if $callback; + } + ); + } + + method prepare_to_wait_idle { + $log->trace('declaring idle mode'); + $mpd->send('idle database playlist')->on_done( + sub { + my $result = shift; + + if ( $result->{changed} eq 'database' ) { + $db_needs_update = 1; + $self->prepare_to_wait_idle; + } + elsif ( $result->{changed} eq 'playlist' ) { + $self->queue_songs( undef, + sub { $self->prepare_to_wait_idle } ); + } + else { + use JSON; + $log->warn( + "Unknown result from idle: " . to_json($result) ); + $self->prepare_to_wait_idle; + } + } + ); + } + + method run { + $mpd->on( + close => sub { + die "Connection to MPD lost"; + } + ); + + $self->prepare_to_wait_idle; + } + + method stop { + undef $mpd; + + if ($db) { + if ($db->{ActiveKids}) { + $log->warn("$db->{ActiveKids} active DB statements"); + for my $st ( @{ $db->{ChildHandles} } ) { + next unless $st->{Active}; + while(my($k,$v) = each %$st) { + $log->debug("$k = ".($v//'')); + } + } } + + $db->disconnect; + undef $db; } } } my $feeder = Feeder->new(); -$feeder->opt->dump, exit if $feeder->opt->dump_config; +if (@ARGV) { + my $cmd = shift @ARGV; + + if ($cmd eq 'dump-config') { + die "dump-config command accepts no arguments\n" if @ARGV; -$feeder->queue_songs(1), exit if $feeder->opt->single; + $feeder->opt->dump; + exit; + } + + if ( $cmd eq 'add-unwanted-artist' ) { + die "Missing command arguments\n" unless @ARGV; + $feeder->set_db_needs_update(0); + for my $artist (@ARGV) { + if ( $feeder->db_add_unwanted_artist($artist) ) { + $log->info("Artist '$artist' added to the unwanted list\n"); + } + else { + $log->warn("Artist '$artist' already in the unwanted list\n"); + } + } + exit; + } -# FIXME: handle blacklist manipulation + if ( $cmd eq 'del-unwanted-artist' ) { + die "Missing command arguments\n" unless @ARGV; + $feeder->set_db_needs_update(0); + for my $artist (@ARGV) { + if ( $feeder->db_del_unwanted_artist($artist) ) { + $log->info("Artist '$artist' deleted from the unwanted list\n"); + } + else { + $log->warn("Artist '$artist' is not in the unwanted list\n"); + } + } + exit; + } -$feeder->queue_songs; + if ( $cmd eq 'add-unwanted-album' ) { + die "NOT IMPLEMENTED\n"; + } -exit if $feeder->opt->one_shot; + if ( $cmd eq 'one-shot' ) { + die "one-shot command accepts no arguments\n" if @ARGV; -$feeder->mpd->on( - database => sub { - $feeder->update_db; + $feeder->queue_songs(undef, sub { exit }); + $feeder->mpd->loop->run; } -); + elsif ( $cmd eq 'single' ) { + die "single command accepts no arguments\n" if @ARGV; -$feeder->mpd->on( - playlist => sub { - $feeder->queue_songs; + $feeder->queue_songs(1, sub { exit }); + $feeder->mpd->loop->run; } -); + else { + die "Unknown command '$cmd'"; + } +} -$feeder->mpd->idle(qw(database playlist)); -$feeder->mpd->get; +$feeder->connect_db; + +for ( ;; ) { + $feeder->queue_songs( undef, sub { $feeder->run } ); + + $log->debug("Entering event loop. PID=$$"); + + my $result = $feeder->mpd->loop->run; + $log->trace( "Got loop result of " . ( $result // 'undef' ) ); + + if ('reload' eq $result) { + $log->notice("disconnecting"); + $feeder->stop; + + exec( "$0", '--config', $feeder->cfg_file, '--skip-db-update' ); + } +}