]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
split command execution into App::MPD::Feeder::Command
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 package App::MPD::Feeder;
2
3 use strict;
4 use warnings;
5 use utf8;
6
7 use App::MPD::Feeder::Options;
8 use DBD::Pg;
9 use DBI;
10 use Getopt::Long;
11 use IO::Async::Signal;
12 use Log::Any qw($log);
13 use Net::Async::MPD;
14 use Object::Pad;
15 use Syntax::Keyword::Try;
16
17
18 class App::MPD::Feeder {
19     has $cfg_file :reader;
20     has $opt :reader;
21     has $db;
22     has $db_generation;
23     has $db_needs_update :writer = 1;
24     has $mpd :reader;
25
26 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
27
28     ADJUST {
29         Getopt::Long::Configure('pass_through');
30         Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file);
31         Getopt::Long::Configure('no_pass_through');
32
33         $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
34
35         $self->configure;
36
37         $db_needs_update = 0 if $opt->skip_db_update;
38     }
39
40     method configure {
41         my $new_opt = App::MPD::Feeder::Options->new;
42
43         $new_opt->parse_config_file($cfg_file) if $cfg_file;
44
45         $new_opt->parse_command_line;
46
47         Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
48
49         $opt = $new_opt;
50     }
51
52     method connect_mpd {
53         return if $mpd;
54
55         my %conn = ( auto_connect => 1 );
56         $conn{host} = $opt->mpd_host if $opt->mpd_host;
57         $conn{port} = $opt->mpd_port if $opt->mpd_port;
58
59         $mpd = Net::Async::MPD->new(%conn);
60
61         $mpd->loop->add(
62             IO::Async::Signal->new(
63                 name       => 'HUP',
64                 on_receipt => sub {
65                     $log->debug("SIGHUP received. Stopping loop");
66                     $mpd->loop->stop('reload');
67                 },
68             )
69         );
70
71         $mpd->loop->add(
72             IO::Async::Signal->new(
73                 name       => 'USR1',
74                 on_receipt => sub {
75                     $log->debug("SIGUSR1 received. Dumping configuration to STDERR");
76                     my $old = select \*STDERR;
77                     try {
78                         $opt->dump;
79                     }
80                     finally {
81                         select $old;
82                     }
83                 },
84             )
85         );
86     }
87
88     method connect_db {
89         return if $db;
90
91         $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path,
92             $opt->db_user, $opt->db_password,
93             { RaiseError => 1, PrintError => 0, AutoCommit => 1 } );
94
95         $log->info( "Connected to database " . $opt->db_path );
96         $db_generation = $self->db_get_option('generation');
97         $log->debug("DB generation is $db_generation");
98         $self->update_db;
99     }
100
101     method db_get_option($name) {
102         my $sth = $db->prepare_cached("select $name from options");
103         $sth->execute;
104         my @result = $sth->fetchrow_array;
105         $sth->finish;
106         undef $sth;
107
108         return $result[0];
109     }
110
111     method db_set_option( $name, $value ) {
112         my $sth = $db->prepare_cached("update options set $name = ?");
113         $sth->execute($value);
114     }
115
116     method db_store_song($song, $artist, $album) {
117         return unless length($song) and length($artist) and length($album);
118
119         $db->prepare_cached(
120             <<'SQL')->execute( $song, $artist, $album, $db_generation );
121 INSERT INTO songs(path, artist, album, generation)
122 VALUES($1, $2, $3, $4)
123 ON CONFLICT ON CONSTRAINT songs_pkey DO
124 UPDATE SET artist = $2
125          , album = $3
126          , generation = $4
127 SQL
128         $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation );
129 INSERT INTO albums(artist, album, generation)
130 VALUES($1, $2, $3)
131 ON CONFLICT ON CONSTRAINT albums_pkey DO
132 UPDATE SET generation = $3
133 SQL
134         $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation );
135 INSERT INTO artists(artist, generation)
136 VALUES($1, $2)
137 ON CONFLICT ON CONSTRAINT artists_pkey DO
138 UPDATE SET generation = $2
139 SQL
140     }
141
142     method db_remove_stale_entries {
143         my $sth =
144             $db->prepare_cached('DELETE FROM songs WHERE generation <> ?');
145         $sth->execute($db_generation);
146         $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) );
147
148         $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?');
149         $sth->execute($db_generation);
150         $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) );
151
152         $sth =
153             $db->prepare_cached('DELETE FROM artists WHERE generation <> ?');
154         $sth->execute($db_generation);
155         $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) );
156     }
157
158     method db_note_song_qeued($item) {
159         $db->prepare_cached(
160             'UPDATE songs SET last_queued=current_timestamp WHERE path=?')
161             ->execute( $item->{song} );
162         $db->prepare_cached(
163             'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?')
164             ->execute( $item->{artist} );
165         $db->prepare_cached(
166             'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?'
167         )->execute( $item->{artist}, $item->{album} );
168     }
169
170     method update_db($force = undef) {
171         if (!$db_needs_update and !$force) {
172             $log->debug("Skipping DB update");
173             return;
174         }
175
176         $log->info('Updating song database');
177         $self->connect_mpd;
178         $self->connect_db;
179
180         my $rows = $mpd->send('listallinfo')->get;
181         try {
182             $db->begin_work;
183
184             $db_generation++;
185
186             my $song_count;
187
188             foreach my $entry (@$rows) {
189                 next unless exists $entry->{file};
190                 $self->db_store_song( $entry->{file},
191                     $entry->{AlbumArtist} // $entry->{Artist},
192                     $entry->{Album} );
193                 $song_count++;
194             }
195
196             $log->info("Updated data about $song_count songs");
197
198             $self->db_remove_stale_entries;
199
200             $self->db_set_option( generation => $db_generation );
201
202             $db->commit;
203
204             $db_needs_update = 0;
205         }
206         catch {
207             my $err = $@;
208
209             $db_generation--;
210
211             $db->rollback;
212
213             die $err;
214         }
215     }
216
217     method db_find_suitable_songs($num) {
218         $self->connect_db;
219         $self->update_db;
220
221         my @result;
222         my $sql = <<SQL;
223 SELECT s.path, s.artist, s.album
224 FROM songs s
225 JOIN artists ar ON ar.artist=s.artist
226 JOIN albums al ON al.album=s.album AND al.artist=s.artist
227 WHERE (s.last_queued IS NULL OR s.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
228   AND (ar.last_queued IS NULL OR ar.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
229   AND (al.last_queued IS NULL OR al.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
230   AND NOT EXISTS (SELECT 1 FROM unwanted_artists uar WHERE uar.artist = s.artist)
231   AND NOT EXISTS (SELECT 1 FROM unwanted_albums  ual WHERE ual.album  = s.album)
232 ORDER BY random()
233 LIMIT ?
234 SQL
235         my @params = (
236             $opt->min_song_interval,  $opt->min_artist_interval,
237             $opt->min_album_interval, $num,
238         );
239         my $sth = $db->prepare_cached($sql);
240         $sth->execute(@params);
241         while ( my @row = $sth->fetchrow_array ) {
242             push @result,
243                 { song => $row[0], artist => $row[1], album => $row[2] };
244         }
245         undef $sth;
246
247         if (scalar(@result) == $num and  $log->is_debug) {
248             $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m;
249             $sql =~ s/^ORDER BY .+$//m;
250             $sql =~ s/^LIMIT .+$//m;
251             my $sth = $db->prepare_cached($sql);
252             pop @params;
253             $sth->execute(@params);
254             my $count = ($sth->fetchrow_array)[0];
255             $sth->finish;
256
257             $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs');
258             $sth->execute;
259             my $total = ($sth->fetchrow_array)[0];
260             $sth->finish;
261             $log->debug(
262                 sprintf(
263                     "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)",
264                     $count, $total, 100.0 * $count / $total
265                 )
266             );
267
268             $sql = <<SQL;
269 SELECT COUNT(*)
270 FROM songs s
271 WHERE (s.last_queued IS NULL OR s.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
272 SQL
273             $sth = $db->prepare_cached($sql);
274             $sth->execute($opt->min_song_interval);
275             $count = ($sth->fetchrow_array)[0];
276             $sth->finish;
277
278             $log->debug(
279                 sprintf(
280                     "Number of songs not queued soon: %d out of total %d (%5.2f%%)",
281                     $count, $total, 100.0 * $count / $total
282                 )
283             );
284             $sth->finish;
285
286             $sql = <<SQL;
287 SELECT COUNT(*)
288 FROM artists ar
289 WHERE (ar.last_queued IS NULL OR ar.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
290 SQL
291             $sth = $db->prepare_cached($sql);
292             $sth->execute($opt->min_artist_interval);
293             $count = ($sth->fetchrow_array)[0];
294             $sth->finish;
295
296             $sth = $db->prepare_cached('SELECT COUNT(*) FROM artists');
297             $sth->execute;
298             $total = ($sth->fetchrow_array)[0];
299             $sth->finish;
300             $log->debug(
301                 sprintf(
302                     "Number of artists not queued soon: %d out of total %d (%5.2f%%)",
303                     $count, $total, 100.0 * $count / $total
304                 )
305             );
306
307             $sql = <<SQL;
308 SELECT COUNT(*)
309 FROM albums al
310 WHERE (al.last_queued IS NULL OR al.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
311 SQL
312             $sth = $db->prepare_cached($sql);
313             $sth->execute($opt->min_album_interval);
314             $count = ($sth->fetchrow_array)[0];
315             $sth->finish;
316
317             $sth = $db->prepare_cached('SELECT COUNT(*) FROM albums');
318             $sth->execute;
319             $total = ($sth->fetchrow_array)[0];
320             $sth->finish;
321             $log->debug(
322                 sprintf(
323                     "Number of albums not queued soon: %d out of total %d (%5.2f%%)",
324                     $count, $total, 100.0 * $count / $total
325                 )
326             );
327
328             undef $sth;
329         }
330
331         return @result;
332     }
333
334     method db_add_unwanted_artist($artist) {
335         $self->connect_db;
336
337         try {
338             $db->do(
339                 <<'SQL',
340 INSERT INTO unwanted_artists(artist, generation)
341 VALUES($1, $2)
342 SQL
343                 undef, $artist, $db_generation
344             );
345             return 1;
346         }
347         catch {
348             my $err = $@;
349
350             $log->debug("PostgreSQL error: $err");
351             $log->debug( "SQLSTATE = " . $db->state );
352             return 0 if $db->state eq '23505';
353
354             die $err;
355         }
356     }
357
358     method db_del_unwanted_artist($artist) {
359         $self->connect_db;
360
361         return 1 == $db->do(
362             <<'SQL',
363 DELETE FROM unwanted_artists
364 WHERE artist = $1
365 SQL
366             undef, $artist
367         );
368     }
369
370     method queue_songs($num = undef, $callback = undef) {
371         if (!defined $num) {
372             $self->connect_mpd;
373             $mpd->send('playlist')->on_done(
374                 sub {
375                     my $present = scalar @{ $_[0] };
376
377                     $log->notice( "Playlist contains $present songs. Wanted: "
378                             . $opt->target_queue_length );
379                     if ( $present < $opt->target_queue_length ) {
380                         $self->queue_songs(
381                             $opt->target_queue_length - $present, $callback );
382                     }
383                     else {
384                         $callback->() if $callback;
385                     }
386                 }
387             );
388
389             return;
390         }
391
392         my @list = $self->db_find_suitable_songs($num);
393
394         die "Found no suitable songs" unless @list;
395
396         if ( @list < $num ) {
397             $log->warn(
398                 sprintf(
399                     'Found only %d suitable songs instead of %d',
400                     scalar(@list), $num
401                 )
402             );
403         }
404
405         $log->info("About to add $num songs to the playlist");
406
407         my @paths;
408         for my $song (@list) {
409             my $path = $song->{song};
410             $path =~ s/"/\\"/g;
411             push @paths, $path;
412         }
413
414         $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) );
415         my @commands;
416         for (@paths) {
417             push @commands, [ add => "\"$_\"" ];
418         }
419         $self->connect_mpd;
420         my $f = $mpd->send( \@commands );
421         $f->on_fail( sub { die @_ } );
422         $f->on_done(
423             sub {
424                 $self->db_note_song_qeued($_) for @list;
425                 $callback->(@_) if $callback;
426             }
427         );
428     }
429
430     method prepare_to_wait_idle {
431         $log->trace('declaring idle mode');
432         $mpd->send('idle database playlist')->on_done(
433             sub {
434                 my $result = shift;
435
436                 if ( $result->{changed} eq 'database' ) {
437                     $db_needs_update = 1;
438                     $self->prepare_to_wait_idle;
439                 }
440                 elsif ( $result->{changed} eq 'playlist' ) {
441                     $self->queue_songs( undef,
442                         sub { $self->prepare_to_wait_idle } );
443                 }
444                 else {
445                     use JSON;
446                     $log->warn(
447                         "Unknown result from idle: " . to_json($result) );
448                     $self->prepare_to_wait_idle;
449                 }
450             }
451         );
452     }
453
454     method run {
455         $mpd->on(
456             close => sub {
457                 die "Connection to MPD lost";
458             }
459         );
460
461         $self->prepare_to_wait_idle;
462     }
463
464     method stop {
465         undef $mpd;
466
467         if ($db) {
468             if ($db->{ActiveKids}) {
469                 $log->warn("$db->{ActiveKids} active DB statements");
470                 for my $st ( @{ $db->{ChildHandles} } ) {
471                     next unless $st->{Active};
472                     while(my($k,$v) = each %$st) {
473                         $log->debug("$k = ".($v//'<NULL>'));
474                     }
475                 }
476             }
477
478             $db->disconnect;
479             undef $db;
480         }
481     }
482
483     method run_loop {
484         $self->connect_db;
485
486         for ( ;; ) {
487             $self->queue_songs( undef, sub { $self->run } );
488
489             $log->debug("Entering event loop. PID=$$");
490
491             my $result = $mpd->loop->run;
492             $log->trace( "Got loop result of " . ( $result // 'undef' ) );
493
494             if ( 'reload' eq $result ) {
495                 $log->notice("disconnecting");
496                 $self->stop;
497
498                 exec( "$0", '--config', $self->cfg_file, '--skip-db-update' );
499             }
500         }
501     }
502 }
503