From: Damyan Ivanov Date: Fri, 12 Nov 2021 06:22:54 +0000 (+0000) Subject: split-out App::MPD::Feeder -- the main application module X-Git-Url: https://git.ktnx.net/?p=mpd-feeder.git;a=commitdiff_plain;h=2ca3bcbd785af91fe805730ac48b1ce17e44ad38 split-out App::MPD::Feeder -- the main application module --- diff --git a/bin/mpd-feeder b/bin/mpd-feeder index bbddefd..53c8265 100755 --- a/bin/mpd-feeder +++ b/bin/mpd-feeder @@ -1,488 +1,13 @@ #!/usr/bin/perl -use v5.32; +use strict; +use warnings; +use utf8; +use App::MPD::Feeder; use App::MPD::Feeder::Options; -use Getopt::Long (); use Log::Any qw($log); use Log::Any::Adapter Stderr => log_level => 'error'; -use Object::Pad; -use Syntax::Keyword::Try; - -class Feeder { - has $cfg_file :reader; - has $opt :reader; - has $db; - has $db_generation; - has $db_needs_update :writer = 1; - has $mpd :reader; - -use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; - -use DBD::Pg; -use DBI; -use Log::Any qw($log); -use IO::Async::Signal; -use Net::Async::MPD; - - ADJUST { - Getopt::Long::Configure('pass_through'); - Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); - Getopt::Long::Configure('no_pass_through'); - - $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; - - $self->configure; - - $db_needs_update = 0 if $opt->skip_db_update; - } - - method configure { - my $new_opt = App::MPD::Feeder::Options->new; - - $new_opt->parse_config_file($cfg_file) if $cfg_file; - - $new_opt->parse_command_line; - - Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); - - $opt = $new_opt; - } - - method connect_mpd { - return if $mpd; - - my %conn = ( auto_connect => 1 ); - $conn{host} = $opt->mpd_host if $opt->mpd_host; - $conn{port} = $opt->mpd_port if $opt->mpd_port; - - $mpd = Net::Async::MPD->new(%conn); - - $mpd->loop->add( - IO::Async::Signal->new( - name => 'HUP', - on_receipt => sub { - $log->debug("SIGHUP received. Stopping loop"); - $mpd->loop->stop('reload'); - }, - ) - ); - - $mpd->loop->add( - IO::Async::Signal->new( - name => 'USR1', - on_receipt => sub { - $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); - my $old = select \*STDERR; - try { - $opt->dump; - } - finally { - select $old; - } - }, - ) - ); - } - - method connect_db { - return if $db; - - $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, - $opt->db_user, $opt->db_password, - { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); - - $log->info( "Connected to database " . $opt->db_path ); - $db_generation = $self->db_get_option('generation'); - $log->debug("DB generation is $db_generation"); - $self->update_db; - } - - method db_get_option($name) { - my $sth = $db->prepare_cached("select $name from options"); - $sth->execute; - my @result = $sth->fetchrow_array; - $sth->finish; - undef $sth; - - return $result[0]; - } - - method db_set_option( $name, $value ) { - my $sth = $db->prepare_cached("update options set $name = ?"); - $sth->execute($value); - } - - method db_store_song($song, $artist, $album) { - return unless length($song) and length($artist) and length($album); - - $db->prepare_cached( - <<'SQL')->execute( $song, $artist, $album, $db_generation ); -INSERT INTO songs(path, artist, album, generation) -VALUES($1, $2, $3, $4) -ON CONFLICT ON CONSTRAINT songs_pkey DO -UPDATE SET artist = $2 - , album = $3 - , generation = $4 -SQL - $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation ); -INSERT INTO albums(artist, album, generation) -VALUES($1, $2, $3) -ON CONFLICT ON CONSTRAINT albums_pkey DO -UPDATE SET generation = $3 -SQL - $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation ); -INSERT INTO artists(artist, generation) -VALUES($1, $2) -ON CONFLICT ON CONSTRAINT artists_pkey DO -UPDATE SET generation = $2 -SQL - } - - method db_remove_stale_entries { - my $sth = - $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); - - $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); - - $sth = - $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); - } - - method db_note_song_qeued($item) { - $db->prepare_cached( - 'UPDATE songs SET last_queued=current_timestamp WHERE path=?') - ->execute( $item->{song} ); - $db->prepare_cached( - 'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?') - ->execute( $item->{artist} ); - $db->prepare_cached( - 'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?' - )->execute( $item->{artist}, $item->{album} ); - } - - method update_db($force = undef) { - if (!$db_needs_update and !$force) { - $log->debug("Skipping DB update"); - return; - } - - $log->info('Updating song database'); - $self->connect_mpd; - $self->connect_db; - - my $rows = $mpd->send('listallinfo')->get; - try { - $db->begin_work; - - $db_generation++; - - my $song_count; - - foreach my $entry (@$rows) { - next unless exists $entry->{file}; - $self->db_store_song( $entry->{file}, - $entry->{AlbumArtist} // $entry->{Artist}, - $entry->{Album} ); - $song_count++; - } - - $log->info("Updated data about $song_count songs"); - - $self->db_remove_stale_entries; - - $self->db_set_option( generation => $db_generation ); - - $db->commit; - - $db_needs_update = 0; - } - catch { - my $err = $@; - - $db_generation--; - - $db->rollback; - - die $err; - } - } - - method db_find_suitable_songs($num) { - $self->connect_db; - $self->update_db; - - my @result; - my $sql = <min_song_interval, $opt->min_artist_interval, - $opt->min_album_interval, $num, - ); - my $sth = $db->prepare_cached($sql); - $sth->execute(@params); - while ( my @row = $sth->fetchrow_array ) { - push @result, - { song => $row[0], artist => $row[1], album => $row[2] }; - } - undef $sth; - - if (scalar(@result) == $num and $log->is_debug) { - $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m; - $sql =~ s/^ORDER BY .+$//m; - $sql =~ s/^LIMIT .+$//m; - $log->debug($sql); - my $sth = $db->prepare_cached($sql); - pop @params; - $sth->execute(@params); - my $count = ($sth->fetchrow_array)[0]; - $sth->finish; - - $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs'); - $sth->execute; - my $total = ($sth->fetchrow_array)[0]; - $log->debug( - sprintf( - "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - $sth->finish; - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_song_interval); - $count = ($sth->fetchrow_array)[0]; - $total = ($sth->fetchrow_array)[0]; - $sth->finish; - - $log->debug( - sprintf( - "Number of songs not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - $sth->finish; - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_artist_interval); - $count = ($sth->fetchrow_array)[0]; - $total = ($sth->fetchrow_array)[0]; - $log->debug( - sprintf( - "Number of artists not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - $sth->finish; - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_album_interval); - $count = ($sth->fetchrow_array)[0]; - $total = ($sth->fetchrow_array)[0]; - $log->debug( - sprintf( - "Number of albums not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - $sth->finish; - - undef $sth; - } - - return @result; - } - - method db_add_unwanted_artist($artist) { - $self->connect_db; - - try { - $db->do( - <<'SQL', -INSERT INTO unwanted_artists(artist, generation) -VALUES($1, $2) -SQL - undef, $artist, $db_generation - ); - return 1; - } - catch { - my $err = $@; - - $log->debug("PostgreSQL error: $err"); - $log->debug( "SQLSTATE = " . $db->state ); - return 0 if $db->state eq '23505'; - - die $err; - } - } - - method db_del_unwanted_artist($artist) { - $self->connect_db; - - return 1 == $db->do( - <<'SQL', -DELETE FROM unwanted_artists -WHERE artist = $1 -SQL - undef, $artist - ); - } - - method queue_songs($num = undef, $callback = undef) { - if (!defined $num) { - $self->connect_mpd; - $mpd->send('playlist')->on_done( - sub { - my $present = scalar @{ $_[0] }; - - $log->notice( "Playlist contains $present songs. Wanted: " - . $opt->target_queue_length ); - if ( $present < $opt->target_queue_length ) { - $self->queue_songs( - $opt->target_queue_length - $present, $callback ); - } - else { - $callback->() if $callback; - } - } - ); - - return; - } - - my @list = $self->db_find_suitable_songs($num); - - die "Found no suitable songs" unless @list; - - if ( @list < $num ) { - $log->warn( - sprintf( - 'Found only %d suitable songs instead of %d', - scalar(@list), $num - ) - ); - } - - $log->info("About to add $num songs to the playlist"); - - my @paths; - for my $song (@list) { - my $path = $song->{song}; - $path =~ s/"/\\"/g; - push @paths, $path; - } - - $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); - my @commands; - for (@paths) { - push @commands, [ add => "\"$_\"" ]; - } - $self->connect_mpd; - my $f = $mpd->send( \@commands ); - $f->on_fail( sub { die @_ } ); - $f->on_done( - sub { - $self->db_note_song_qeued($_) for @list; - $callback->(@_) if $callback; - } - ); - } - - method prepare_to_wait_idle { - $log->trace('declaring idle mode'); - $mpd->send('idle database playlist')->on_done( - sub { - my $result = shift; - - if ( $result->{changed} eq 'database' ) { - $db_needs_update = 1; - $self->prepare_to_wait_idle; - } - elsif ( $result->{changed} eq 'playlist' ) { - $self->queue_songs( undef, - sub { $self->prepare_to_wait_idle } ); - } - else { - use JSON; - $log->warn( - "Unknown result from idle: " . to_json($result) ); - $self->prepare_to_wait_idle; - } - } - ); - } - - method run { - $mpd->on( - close => sub { - die "Connection to MPD lost"; - } - ); - - $self->prepare_to_wait_idle; - } - - method stop { - undef $mpd; - - if ($db) { - if ($db->{ActiveKids}) { - $log->warn("$db->{ActiveKids} active DB statements"); - for my $st ( @{ $db->{ChildHandles} } ) { - next unless $st->{Active}; - while(my($k,$v) = each %$st) { - $log->debug("$k = ".($v//'')); - } - } - } - - $db->disconnect; - undef $db; - } - } -} my $feeder = Feeder->new(); diff --git a/lib/App/MPD/Feeder.pm b/lib/App/MPD/Feeder.pm new file mode 100644 index 0000000..dbdcafb --- /dev/null +++ b/lib/App/MPD/Feeder.pm @@ -0,0 +1,486 @@ +package App::MPD::Feeder; + +use strict; +use warnings; +use utf8; + +use App::MPD::Feeder::Options; +use DBD::Pg; +use DBI; +use Getopt::Long; +use IO::Async::Signal; +use Log::Any qw($log); +use Net::Async::MPD; +use Object::Pad; +use Syntax::Keyword::Try; + + +class App::MPD::Feeder { + has $cfg_file :reader; + has $opt :reader; + has $db; + has $db_generation; + has $db_needs_update :writer = 1; + has $mpd :reader; + +use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; + + ADJUST { + Getopt::Long::Configure('pass_through'); + Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); + Getopt::Long::Configure('no_pass_through'); + + $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; + + $self->configure; + + $db_needs_update = 0 if $opt->skip_db_update; + } + + method configure { + my $new_opt = App::MPD::Feeder::Options->new; + + $new_opt->parse_config_file($cfg_file) if $cfg_file; + + $new_opt->parse_command_line; + + Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); + + $opt = $new_opt; + } + + method connect_mpd { + return if $mpd; + + my %conn = ( auto_connect => 1 ); + $conn{host} = $opt->mpd_host if $opt->mpd_host; + $conn{port} = $opt->mpd_port if $opt->mpd_port; + + $mpd = Net::Async::MPD->new(%conn); + + $mpd->loop->add( + IO::Async::Signal->new( + name => 'HUP', + on_receipt => sub { + $log->debug("SIGHUP received. Stopping loop"); + $mpd->loop->stop('reload'); + }, + ) + ); + + $mpd->loop->add( + IO::Async::Signal->new( + name => 'USR1', + on_receipt => sub { + $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); + my $old = select \*STDERR; + try { + $opt->dump; + } + finally { + select $old; + } + }, + ) + ); + } + + method connect_db { + return if $db; + + $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, + $opt->db_user, $opt->db_password, + { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); + + $log->info( "Connected to database " . $opt->db_path ); + $db_generation = $self->db_get_option('generation'); + $log->debug("DB generation is $db_generation"); + $self->update_db; + } + + method db_get_option($name) { + my $sth = $db->prepare_cached("select $name from options"); + $sth->execute; + my @result = $sth->fetchrow_array; + $sth->finish; + undef $sth; + + return $result[0]; + } + + method db_set_option( $name, $value ) { + my $sth = $db->prepare_cached("update options set $name = ?"); + $sth->execute($value); + } + + method db_store_song($song, $artist, $album) { + return unless length($song) and length($artist) and length($album); + + $db->prepare_cached( + <<'SQL')->execute( $song, $artist, $album, $db_generation ); +INSERT INTO songs(path, artist, album, generation) +VALUES($1, $2, $3, $4) +ON CONFLICT ON CONSTRAINT songs_pkey DO +UPDATE SET artist = $2 + , album = $3 + , generation = $4 +SQL + $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation ); +INSERT INTO albums(artist, album, generation) +VALUES($1, $2, $3) +ON CONFLICT ON CONSTRAINT albums_pkey DO +UPDATE SET generation = $3 +SQL + $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation ); +INSERT INTO artists(artist, generation) +VALUES($1, $2) +ON CONFLICT ON CONSTRAINT artists_pkey DO +UPDATE SET generation = $2 +SQL + } + + method db_remove_stale_entries { + my $sth = + $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); + + $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); + + $sth = + $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); + $sth->execute($db_generation); + $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); + } + + method db_note_song_qeued($item) { + $db->prepare_cached( + 'UPDATE songs SET last_queued=current_timestamp WHERE path=?') + ->execute( $item->{song} ); + $db->prepare_cached( + 'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?') + ->execute( $item->{artist} ); + $db->prepare_cached( + 'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?' + )->execute( $item->{artist}, $item->{album} ); + } + + method update_db($force = undef) { + if (!$db_needs_update and !$force) { + $log->debug("Skipping DB update"); + return; + } + + $log->info('Updating song database'); + $self->connect_mpd; + $self->connect_db; + + my $rows = $mpd->send('listallinfo')->get; + try { + $db->begin_work; + + $db_generation++; + + my $song_count; + + foreach my $entry (@$rows) { + next unless exists $entry->{file}; + $self->db_store_song( $entry->{file}, + $entry->{AlbumArtist} // $entry->{Artist}, + $entry->{Album} ); + $song_count++; + } + + $log->info("Updated data about $song_count songs"); + + $self->db_remove_stale_entries; + + $self->db_set_option( generation => $db_generation ); + + $db->commit; + + $db_needs_update = 0; + } + catch { + my $err = $@; + + $db_generation--; + + $db->rollback; + + die $err; + } + } + + method db_find_suitable_songs($num) { + $self->connect_db; + $self->update_db; + + my @result; + my $sql = <min_song_interval, $opt->min_artist_interval, + $opt->min_album_interval, $num, + ); + my $sth = $db->prepare_cached($sql); + $sth->execute(@params); + while ( my @row = $sth->fetchrow_array ) { + push @result, + { song => $row[0], artist => $row[1], album => $row[2] }; + } + undef $sth; + + if (scalar(@result) == $num and $log->is_debug) { + $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m; + $sql =~ s/^ORDER BY .+$//m; + $sql =~ s/^LIMIT .+$//m; + $log->debug($sql); + my $sth = $db->prepare_cached($sql); + pop @params; + $sth->execute(@params); + my $count = ($sth->fetchrow_array)[0]; + $sth->finish; + + $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs'); + $sth->execute; + my $total = ($sth->fetchrow_array)[0]; + $log->debug( + sprintf( + "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)", + $count, $total, 100.0 * $count / $total + ) + ); + $sth->finish; + + $sql = <prepare_cached($sql); + $sth->execute($opt->min_song_interval); + $count = ($sth->fetchrow_array)[0]; + $total = ($sth->fetchrow_array)[0]; + $sth->finish; + + $log->debug( + sprintf( + "Number of songs not queued soon: %d out of total %d (%5.2f%%)", + $count, $total, 100.0 * $count / $total + ) + ); + $sth->finish; + + $sql = <prepare_cached($sql); + $sth->execute($opt->min_artist_interval); + $count = ($sth->fetchrow_array)[0]; + $total = ($sth->fetchrow_array)[0]; + $log->debug( + sprintf( + "Number of artists not queued soon: %d out of total %d (%5.2f%%)", + $count, $total, 100.0 * $count / $total + ) + ); + $sth->finish; + + $sql = <prepare_cached($sql); + $sth->execute($opt->min_album_interval); + $count = ($sth->fetchrow_array)[0]; + $total = ($sth->fetchrow_array)[0]; + $log->debug( + sprintf( + "Number of albums not queued soon: %d out of total %d (%5.2f%%)", + $count, $total, 100.0 * $count / $total + ) + ); + $sth->finish; + + undef $sth; + } + + return @result; + } + + method db_add_unwanted_artist($artist) { + $self->connect_db; + + try { + $db->do( + <<'SQL', +INSERT INTO unwanted_artists(artist, generation) +VALUES($1, $2) +SQL + undef, $artist, $db_generation + ); + return 1; + } + catch { + my $err = $@; + + $log->debug("PostgreSQL error: $err"); + $log->debug( "SQLSTATE = " . $db->state ); + return 0 if $db->state eq '23505'; + + die $err; + } + } + + method db_del_unwanted_artist($artist) { + $self->connect_db; + + return 1 == $db->do( + <<'SQL', +DELETE FROM unwanted_artists +WHERE artist = $1 +SQL + undef, $artist + ); + } + + method queue_songs($num = undef, $callback = undef) { + if (!defined $num) { + $self->connect_mpd; + $mpd->send('playlist')->on_done( + sub { + my $present = scalar @{ $_[0] }; + + $log->notice( "Playlist contains $present songs. Wanted: " + . $opt->target_queue_length ); + if ( $present < $opt->target_queue_length ) { + $self->queue_songs( + $opt->target_queue_length - $present, $callback ); + } + else { + $callback->() if $callback; + } + } + ); + + return; + } + + my @list = $self->db_find_suitable_songs($num); + + die "Found no suitable songs" unless @list; + + if ( @list < $num ) { + $log->warn( + sprintf( + 'Found only %d suitable songs instead of %d', + scalar(@list), $num + ) + ); + } + + $log->info("About to add $num songs to the playlist"); + + my @paths; + for my $song (@list) { + my $path = $song->{song}; + $path =~ s/"/\\"/g; + push @paths, $path; + } + + $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); + my @commands; + for (@paths) { + push @commands, [ add => "\"$_\"" ]; + } + $self->connect_mpd; + my $f = $mpd->send( \@commands ); + $f->on_fail( sub { die @_ } ); + $f->on_done( + sub { + $self->db_note_song_qeued($_) for @list; + $callback->(@_) if $callback; + } + ); + } + + method prepare_to_wait_idle { + $log->trace('declaring idle mode'); + $mpd->send('idle database playlist')->on_done( + sub { + my $result = shift; + + if ( $result->{changed} eq 'database' ) { + $db_needs_update = 1; + $self->prepare_to_wait_idle; + } + elsif ( $result->{changed} eq 'playlist' ) { + $self->queue_songs( undef, + sub { $self->prepare_to_wait_idle } ); + } + else { + use JSON; + $log->warn( + "Unknown result from idle: " . to_json($result) ); + $self->prepare_to_wait_idle; + } + } + ); + } + + method run { + $mpd->on( + close => sub { + die "Connection to MPD lost"; + } + ); + + $self->prepare_to_wait_idle; + } + + method stop { + undef $mpd; + + if ($db) { + if ($db->{ActiveKids}) { + $log->warn("$db->{ActiveKids} active DB statements"); + for my $st ( @{ $db->{ChildHandles} } ) { + next unless $st->{Active}; + while(my($k,$v) = each %$st) { + $log->debug("$k = ".($v//'')); + } + } + } + + $db->disconnect; + undef $db; + } + } +} +