]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
exit cleanly on SIGTERM and SIGINT
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 package App::MPD::Feeder;
2
3 use strict;
4 use warnings;
5 use utf8;
6
7 use App::MPD::Feeder::Options;
8 use DBD::Pg;
9 use DBI;
10 use Getopt::Long;
11 use IO::Async::Signal;
12 use Log::Any qw($log);
13 use Net::Async::MPD;
14 use Object::Pad;
15 use Syntax::Keyword::Try;
16
17
18 class App::MPD::Feeder {
19     has $cfg_file :reader;
20     has $opt :reader;
21     has $db;
22     has $db_generation;
23     has $db_needs_update :writer = 1;
24     has $mpd :reader;
25
26 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
27
28     ADJUST {
29         Getopt::Long::Configure('pass_through');
30         Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file);
31         Getopt::Long::Configure('no_pass_through');
32
33         $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
34
35         $self->configure;
36
37         $db_needs_update = 0 if $opt->skip_db_update;
38     }
39
40     method configure {
41         my $new_opt = App::MPD::Feeder::Options->new;
42
43         $new_opt->parse_config_file($cfg_file) if $cfg_file;
44
45         $new_opt->parse_command_line;
46
47         Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
48
49         $opt = $new_opt;
50     }
51
52     method connect_mpd {
53         return if $mpd;
54
55         my %conn = ( auto_connect => 1 );
56         $conn{host} = $opt->mpd_host if $opt->mpd_host;
57         $conn{port} = $opt->mpd_port if $opt->mpd_port;
58
59         $mpd = Net::Async::MPD->new(%conn);
60
61         $mpd->loop->add(
62             IO::Async::Signal->new(
63                 name       => 'TERM',
64                 on_receipt => sub {
65                     $log->debug("SIGTERM received. Stopping loop");
66                     $mpd->loop->stop('quit');
67                 },
68             )
69         );
70
71         $mpd->loop->add(
72             IO::Async::Signal->new(
73                 name       => 'INT',
74                 on_receipt => sub {
75                     $log->debug("SIGINT received. Stopping loop");
76                     $mpd->loop->stop('quit');
77                 },
78             )
79         );
80
81         $mpd->loop->add(
82             IO::Async::Signal->new(
83                 name       => 'HUP',
84                 on_receipt => sub {
85                     $log->debug("SIGHUP received. Stopping loop");
86                     $mpd->loop->stop('reload');
87                 },
88             )
89         );
90
91         $mpd->loop->add(
92             IO::Async::Signal->new(
93                 name       => 'USR1',
94                 on_receipt => sub {
95                     $log->debug("SIGUSR1 received. Dumping configuration to STDERR");
96                     my $old = select \*STDERR;
97                     try {
98                         $opt->dump;
99                     }
100                     finally {
101                         select $old;
102                     }
103                 },
104             )
105         );
106     }
107
108     method connect_db {
109         return if $db;
110
111         $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path,
112             $opt->db_user, $opt->db_password,
113             { RaiseError => 1, PrintError => 0, AutoCommit => 1 } );
114
115         $log->info( "Connected to database " . $opt->db_path );
116         $db_generation = $self->db_get_option('generation');
117         $log->debug("DB generation is $db_generation");
118         $self->update_db;
119     }
120
121     method db_get_option($name) {
122         my $sth = $db->prepare_cached("select $name from options");
123         $sth->execute;
124         my @result = $sth->fetchrow_array;
125         $sth->finish;
126         undef $sth;
127
128         return $result[0];
129     }
130
131     method db_set_option( $name, $value ) {
132         my $sth = $db->prepare_cached("update options set $name = ?");
133         $sth->execute($value);
134     }
135
136     method db_store_song($song, $artist, $album) {
137         return unless length($song) and length($artist) and length($album);
138
139         $db->prepare_cached(
140             <<'SQL')->execute( $song, $artist, $album, $db_generation );
141 INSERT INTO songs(path, artist, album, generation)
142 VALUES($1, $2, $3, $4)
143 ON CONFLICT ON CONSTRAINT songs_pkey DO
144 UPDATE SET artist = $2
145          , album = $3
146          , generation = $4
147 SQL
148         $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation );
149 INSERT INTO albums(artist, album, generation)
150 VALUES($1, $2, $3)
151 ON CONFLICT ON CONSTRAINT albums_pkey DO
152 UPDATE SET generation = $3
153 SQL
154         $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation );
155 INSERT INTO artists(artist, generation)
156 VALUES($1, $2)
157 ON CONFLICT ON CONSTRAINT artists_pkey DO
158 UPDATE SET generation = $2
159 SQL
160     }
161
162     method db_remove_stale_entries {
163         my $sth =
164             $db->prepare_cached('DELETE FROM songs WHERE generation <> ?');
165         $sth->execute($db_generation);
166         $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) );
167
168         $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?');
169         $sth->execute($db_generation);
170         $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) );
171
172         $sth =
173             $db->prepare_cached('DELETE FROM artists WHERE generation <> ?');
174         $sth->execute($db_generation);
175         $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) );
176     }
177
178     method db_note_song_qeued($item) {
179         $db->prepare_cached(
180             'UPDATE songs SET last_queued=current_timestamp WHERE path=?')
181             ->execute( $item->{song} );
182         $db->prepare_cached(
183             'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?')
184             ->execute( $item->{artist} );
185         $db->prepare_cached(
186             'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?'
187         )->execute( $item->{artist}, $item->{album} );
188     }
189
190     method update_db($force = undef) {
191         if (!$db_needs_update and !$force) {
192             $log->debug("Skipping DB update");
193             return;
194         }
195
196         $log->info('Updating song database');
197         $self->connect_mpd;
198         $self->connect_db;
199
200         my $rows = $mpd->send('listallinfo')->get;
201         try {
202             $db->begin_work;
203
204             $db_generation++;
205
206             my $song_count;
207
208             foreach my $entry (@$rows) {
209                 next unless exists $entry->{file};
210                 $self->db_store_song( $entry->{file},
211                     $entry->{AlbumArtist} // $entry->{Artist},
212                     $entry->{Album} );
213                 $song_count++;
214             }
215
216             $log->info("Updated data about $song_count songs");
217
218             $self->db_remove_stale_entries;
219
220             $self->db_set_option( generation => $db_generation );
221
222             $db->commit;
223
224             $db_needs_update = 0;
225         }
226         catch {
227             my $err = $@;
228
229             $db_generation--;
230
231             $db->rollback;
232
233             die $err;
234         }
235     }
236
237     method db_find_suitable_songs($num) {
238         $self->connect_db;
239         $self->update_db;
240
241         my @result;
242         my $sql = <<SQL;
243 SELECT s.path, s.artist, s.album
244 FROM songs s
245 JOIN artists ar ON ar.artist=s.artist
246 JOIN albums al ON al.album=s.album AND al.artist=s.artist
247 WHERE (s.last_queued IS NULL OR s.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
248   AND (ar.last_queued IS NULL OR ar.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
249   AND (al.last_queued IS NULL OR al.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
250   AND NOT EXISTS (SELECT 1 FROM unwanted_artists uar WHERE uar.artist = s.artist)
251   AND NOT EXISTS (SELECT 1 FROM unwanted_albums  ual WHERE ual.album  = s.album)
252 ORDER BY random()
253 LIMIT ?
254 SQL
255         my @params = (
256             $opt->min_song_interval,  $opt->min_artist_interval,
257             $opt->min_album_interval, $num,
258         );
259         my $sth = $db->prepare_cached($sql);
260         $sth->execute(@params);
261         while ( my @row = $sth->fetchrow_array ) {
262             push @result,
263                 { song => $row[0], artist => $row[1], album => $row[2] };
264         }
265         undef $sth;
266
267         if (scalar(@result) == $num and  $log->is_debug) {
268             $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m;
269             $sql =~ s/^ORDER BY .+$//m;
270             $sql =~ s/^LIMIT .+$//m;
271             my $sth = $db->prepare_cached($sql);
272             pop @params;
273             $sth->execute(@params);
274             my $count = ($sth->fetchrow_array)[0];
275             $sth->finish;
276
277             $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs');
278             $sth->execute;
279             my $total = ($sth->fetchrow_array)[0];
280             $sth->finish;
281             $log->debug(
282                 sprintf(
283                     "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)",
284                     $count, $total, 100.0 * $count / $total
285                 )
286             );
287
288             $sql = <<SQL;
289 SELECT COUNT(*)
290 FROM songs s
291 WHERE (s.last_queued IS NULL OR s.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
292 SQL
293             $sth = $db->prepare_cached($sql);
294             $sth->execute($opt->min_song_interval);
295             $count = ($sth->fetchrow_array)[0];
296             $sth->finish;
297
298             $log->debug(
299                 sprintf(
300                     "Number of songs not queued soon: %d out of total %d (%5.2f%%)",
301                     $count, $total, 100.0 * $count / $total
302                 )
303             );
304             $sth->finish;
305
306             $sql = <<SQL;
307 SELECT COUNT(*)
308 FROM artists ar
309 WHERE (ar.last_queued IS NULL OR ar.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
310 SQL
311             $sth = $db->prepare_cached($sql);
312             $sth->execute($opt->min_artist_interval);
313             $count = ($sth->fetchrow_array)[0];
314             $sth->finish;
315
316             $sth = $db->prepare_cached('SELECT COUNT(*) FROM artists');
317             $sth->execute;
318             $total = ($sth->fetchrow_array)[0];
319             $sth->finish;
320             $log->debug(
321                 sprintf(
322                     "Number of artists not queued soon: %d out of total %d (%5.2f%%)",
323                     $count, $total, 100.0 * $count / $total
324                 )
325             );
326
327             $sql = <<SQL;
328 SELECT COUNT(*)
329 FROM albums al
330 WHERE (al.last_queued IS NULL OR al.last_queued < CURRENT_TIMESTAMP - (? || ' seconds')::interval)
331 SQL
332             $sth = $db->prepare_cached($sql);
333             $sth->execute($opt->min_album_interval);
334             $count = ($sth->fetchrow_array)[0];
335             $sth->finish;
336
337             $sth = $db->prepare_cached('SELECT COUNT(*) FROM albums');
338             $sth->execute;
339             $total = ($sth->fetchrow_array)[0];
340             $sth->finish;
341             $log->debug(
342                 sprintf(
343                     "Number of albums not queued soon: %d out of total %d (%5.2f%%)",
344                     $count, $total, 100.0 * $count / $total
345                 )
346             );
347
348             undef $sth;
349         }
350
351         return @result;
352     }
353
354     method db_add_unwanted_artist($artist) {
355         $self->connect_db;
356
357         try {
358             $db->do(
359                 <<'SQL',
360 INSERT INTO unwanted_artists(artist, generation)
361 VALUES($1, $2)
362 SQL
363                 undef, $artist, $db_generation
364             );
365             return 1;
366         }
367         catch {
368             my $err = $@;
369
370             $log->debug("PostgreSQL error: $err");
371             $log->debug( "SQLSTATE = " . $db->state );
372             return 0 if $db->state eq '23505';
373
374             die $err;
375         }
376     }
377
378     method db_del_unwanted_artist($artist) {
379         $self->connect_db;
380
381         return 1 == $db->do(
382             <<'SQL',
383 DELETE FROM unwanted_artists
384 WHERE artist = $1
385 SQL
386             undef, $artist
387         );
388     }
389
390     method queue_songs($num = undef, $callback = undef) {
391         if (!defined $num) {
392             $self->connect_mpd;
393             $mpd->send('playlist')->on_done(
394                 sub {
395                     my $present = scalar @{ $_[0] };
396
397                     $log->notice( "Playlist contains $present songs. Wanted: "
398                             . $opt->target_queue_length );
399                     if ( $present < $opt->target_queue_length ) {
400                         $self->queue_songs(
401                             $opt->target_queue_length - $present, $callback );
402                     }
403                     else {
404                         $callback->() if $callback;
405                     }
406                 }
407             );
408
409             return;
410         }
411
412         my @list = $self->db_find_suitable_songs($num);
413
414         die "Found no suitable songs" unless @list;
415
416         if ( @list < $num ) {
417             $log->warn(
418                 sprintf(
419                     'Found only %d suitable songs instead of %d',
420                     scalar(@list), $num
421                 )
422             );
423         }
424
425         $log->info("About to add $num songs to the playlist");
426
427         my @paths;
428         for my $song (@list) {
429             my $path = $song->{song};
430             $path =~ s/"/\\"/g;
431             push @paths, $path;
432         }
433
434         $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) );
435         my @commands;
436         for (@paths) {
437             push @commands, [ add => "\"$_\"" ];
438         }
439         $self->connect_mpd;
440         my $f = $mpd->send( \@commands );
441         $f->on_fail( sub { die @_ } );
442         $f->on_done(
443             sub {
444                 $self->db_note_song_qeued($_) for @list;
445                 $callback->(@_) if $callback;
446             }
447         );
448     }
449
450     method prepare_to_wait_idle {
451         $log->trace('declaring idle mode');
452         $mpd->send('idle database playlist')->on_done(
453             sub {
454                 my $result = shift;
455
456                 if ( $result->{changed} eq 'database' ) {
457                     $db_needs_update = 1;
458                     $self->prepare_to_wait_idle;
459                 }
460                 elsif ( $result->{changed} eq 'playlist' ) {
461                     $self->queue_songs( undef,
462                         sub { $self->prepare_to_wait_idle } );
463                 }
464                 else {
465                     use JSON;
466                     $log->warn(
467                         "Unknown result from idle: " . to_json($result) );
468                     $self->prepare_to_wait_idle;
469                 }
470             }
471         );
472     }
473
474     method run {
475         $mpd->on(
476             close => sub {
477                 die "Connection to MPD lost";
478             }
479         );
480
481         $self->prepare_to_wait_idle;
482     }
483
484     method stop {
485         undef $mpd;
486
487         if ($db) {
488             if ($db->{ActiveKids}) {
489                 $log->warn("$db->{ActiveKids} active DB statements");
490                 for my $st ( @{ $db->{ChildHandles} } ) {
491                     next unless $st->{Active};
492                     while(my($k,$v) = each %$st) {
493                         $log->debug("$k = ".($v//'<NULL>'));
494                     }
495                 }
496             }
497
498             $db->disconnect;
499             undef $db;
500         }
501     }
502
503     method run_loop {
504         $self->connect_db;
505
506         for ( ;; ) {
507             $self->queue_songs( undef, sub { $self->run } );
508
509             $log->debug("Entering event loop. PID=$$");
510
511             my $result = $mpd->loop->run;
512             $log->trace( "Got loop result of " . ( $result // 'undef' ) );
513
514             if ( 'reload' eq $result ) {
515                 $log->notice("disconnecting");
516                 $self->stop;
517
518                 exec( "$0", '--config', $self->cfg_file, '--skip-db-update' );
519             }
520
521             if ( 'quit' eq $result ) {
522                 $log->trace("quitting because of 'quit' loop result");
523                 $self->stop;
524                 exit 0;
525             }
526         }
527     }
528 }
529