File Coverage

blib/lib/Net/Async/MPD.pm
Criterion Covered Total %
statement 43 169 25.4
branch 1 62 1.6
condition 0 12 0.0
subroutine 14 36 38.8
pod 7 8 87.5
total 65 287 22.6


line stmt bran cond sub pod time code
1             package Net::Async::MPD;
2              
3 2     2   333603 use strict;
  2         17  
  2         44  
4 2     2   8 use warnings;
  2         3  
  2         61  
5              
6             our $VERSION = '0.004';
7              
8 2     2   576 use IO::Async::Loop;
  2         35914  
  2         55  
9 2     2   923 use IO::Async::Stream;
  2         60840  
  2         72  
10 2     2   14 use IO::Socket::IP;
  2         4  
  2         14  
11 2     2   1058 use Future::Utils qw( repeat );
  2         2  
  2         109  
12 2     2   11 use Scalar::Util qw( weaken );
  2         4  
  2         66  
13 2     2   9 use Carp;
  2         3  
  2         85  
14              
15 2     2   831 use namespace::clean;
  2         23319  
  2         10  
16              
17 2     2   4083 use Moo;
  2         5621  
  2         18  
18 2     2   2801 use MooX::HandlesVia;
  2         15606  
  2         10  
19             with 'Role::EventEmitter';
20              
21 2         18 use Types::Standard qw(
22             InstanceOf Int ArrayRef HashRef Str Maybe Bool CodeRef
23 2     2   701 );
  2         59559  
24              
25 2     2   3372 use Log::Any;
  2         13101  
  2         8  
26             my $log = Log::Any->get_logger( category => __PACKAGE__ );
27              
28             has auto_connect => (
29             is => 'ro',
30             isa => Bool,
31             default => 0,
32             );
33              
34             has state => (
35             is => 'rw',
36             isa => Str,
37             init_arg => undef,
38             default => 'created',
39             trigger => sub {
40             $_[0]->emit( state => $_[0]->{state} );
41             },
42             );
43              
44             has loop => (
45             is => 'ro',
46             lazy => 1,
47             default => sub { IO::Async::Loop->new },
48             );
49              
50             has read_queue => (
51             is => 'ro',
52             isa => ArrayRef [CodeRef],
53             lazy => 1,
54             init_arg => undef,
55             default => sub { [] },
56             handles_via => 'Array',
57             handles => {
58             push_read => 'push',
59             pop_read => 'pop',
60             shift_read => 'shift',
61             unshift_read => 'unshift',
62             },
63             );
64              
65             has password => (
66             is => 'ro',
67             isa => Maybe[Str],
68             lazy => 1,
69             );
70              
71             has port => (
72             is => 'ro',
73             isa => Int,
74             lazy => 1,
75             default => sub { $ENV{MPD_PORT} // 6600 },
76             );
77              
78             has host => (
79             is => 'ro',
80             isa => Str,
81             lazy => 1,
82             default => sub { $ENV{MPD_HOST} // 'localhost' },
83             );
84              
85 0     0 1 0 sub version { $_[0]->{version} };
86              
87             sub _parse_block {
88 0     0   0 my $self = shift;
89              
90             return sub {
91 0     0   0 my ( $handle, $buffref, $eof ) = @_;
92              
93 0         0 while ( $$buffref =~ s/^(.*)\n// ) {
94 0         0 my $line = $1;
95              
96 0 0       0 if ($line =~ /\w/) {
97 0         0 $log->tracef('< %s', $line);
98 0 0       0 if ($line =~ /^OK/) {
    0          
    0          
99 0 0       0 if ($line =~ /OK MPD (.*)/) {
100 0         0 $log->trace('Connection established');
101 0         0 $self->{version} = $1;
102              
103 0 0       0 $self->send( password => $self->password )
104             if $self->password;
105              
106 0         0 $self->state( 'ready' );
107             }
108             else {
109 0 0       0 pop @{$self->{mpd_buffer}} unless @{$self->{mpd_buffer}[-1]};
  0         0  
  0         0  
110 0         0 $self->shift_read->( 1, $self->{mpd_buffer} );
111 0         0 $self->{mpd_buffer} = [[]];
112             }
113             }
114             elsif ($line =~ /^list_OK/) {
115 0         0 push @{$self->{mpd_buffer}}, [];
  0         0  
116             }
117             elsif ($line =~ /^ACK/) {
118 0         0 $self->shift_read->( 0, $line );
119 0         0 $self->{mpd_buffer} = [[]];
120 0         0 last;
121             }
122             else {
123 0         0 push @{$self->{mpd_buffer}[-1]}, $line;
  0         0  
124             }
125             }
126             }
127              
128 0         0 return 0;
129 0         0 };
130             }
131              
132             # Set up response parsers for each command
133             my $parsers = { none => sub { @_ } };
134             {
135             my $item = sub {
136             return {
137             map {
138             my ($key, $value) = split /: /, $_, 2;
139             $key => $value;
140             } @{$_[0]}
141             };
142             };
143              
144             my $flat_list = sub { [ map { (split /: /, $_, 2)[1] } @{$_[0]} ] };
145              
146             my $base_list = sub {
147             my @main_keys = @{shift()};
148             my @list_keys = @{shift()};
149             my @lines = @{shift()};
150              
151             my @return;
152             my $item = {};
153              
154             foreach my $line (@lines) {
155             my ($key, $value) = split /: /, $line, 2;
156              
157             if ( grep { /$key/ } @main_keys ) {
158             push @return, $item if defined $item->{$key};
159             $item = { $key => $value };
160             }
161             elsif ( grep { /$key/ } @list_keys ) {
162             unless (defined $item->{$key}) {
163             $item->{$key} = []
164             }
165             push @{$item->{$key}}, $value;
166             }
167             else {
168             $item->{$key} = $value;
169             }
170             }
171             push @return, $item if keys %{$item};
172              
173             return \@return;
174             };
175              
176             my $grouped_list = sub {
177             my @lines = @{shift()};
178              
179             # What we are grouping
180             my ($main) = split /:\s+/, $lines[0], 2;
181              
182             # How we are grouping, from top to bottom
183             my (@categories, %categories);
184             foreach (@lines) {
185             my ($key) = split /:\s+/, $_, 2;
186              
187             if ($key ne $main) {
188             push @categories, $key unless defined $categories{$key};
189             $categories{$key} = 1;
190             }
191             }
192              
193             my $return = {};
194             my $item;
195             foreach my $line (@lines) {
196             my ($key, $value) = split /:\s+/, $line, 2;
197              
198             if (defined $item->{$key}) {
199             # Find the appropriate list of items or create a new one
200             # and populate it
201             my $pointer = $return;
202             foreach my $key (@categories) {
203             my $val = $item->{$key} // q{};
204             $pointer->{$key}{$val} = {} unless defined $pointer->{$key}{$val};
205             $pointer = $pointer->{$key}{$val};
206             }
207             $pointer->{$main} = [] unless defined $pointer->{$main};
208             my $list = $pointer->{$main};
209              
210             push @{$list}, delete $item->{$main};
211              
212             # Start a new item
213             $item = { $key => $value };
214             next;
215             }
216              
217             $item->{$key} = $value;
218             }
219             return $return;
220             };
221              
222             # Untested commands: what do they return?
223             # consume
224             # crossfade
225              
226             my $file_list = sub { $base_list->( [qw( directory file )], [], @_ ) };
227              
228             $parsers->{$_} = $flat_list foreach qw(
229             commands notcommands channels tagtypes urlhandlers listplaylist
230             );
231              
232             $parsers->{$_} = $item foreach qw(
233             currentsong stats idle status addid update
234             readcomments replay_gain_status rescan
235             );
236              
237             $parsers->{$_} = $file_list foreach qw(
238             find playlistinfo listallinfo search find playlistid playlistfind
239             listfiles plchanges listplaylistinfo playlistsearch listfind
240             );
241              
242             $parsers->{list} = $grouped_list;
243              
244             foreach (
245             [ outputs => [qw( outputid )], [] ],
246             [ plchangesposid => [qw( cpos )], [] ],
247             [ listplaylists => [qw( playlist )], [] ],
248             [ listmounts => [qw( mount )], [] ],
249             [ listneighbors => [qw( neighbor )], [] ],
250             [ listall => [qw( directory )], [qw( file )] ],
251             [ readmessages => [qw( channel )], [qw( message )] ],
252             [ lsinfo => [qw( directory file playlist )], [] ],
253             [ decoders => [qw( plugin )], [qw( suffix mime_type )] ],
254             ) {
255              
256             my ($cmd, $header, $list) = @{$_};
257             $parsers->{$cmd} = sub { $base_list->( $header, $list, @_ ) };
258             }
259              
260             $parsers->{playlist} = sub {
261             my $lines = [ map { s/^\w*?://; $_ } @{shift()} ];
262             $flat_list->( $lines, @_ )
263             };
264              
265             $parsers->{count} = sub {
266             my $lines = shift;
267             my ($main) = split /:\s+/, $lines->[0], 2;
268             $base_list->( [ $main ], [qw( )], $lines, @_ )
269             };
270              
271             $parsers->{sticker} = sub {
272             my $lines = shift;
273             return {} unless scalar @{$lines};
274              
275             my $single = ($lines->[0] !~ /^file/);
276              
277             my $base = $base_list->( [qw( file )], [qw( sticker )], $lines, @_ );
278             my $return = [ map {
279             $_->{sticker} = { map { split(/=/, $_, 2) } @{$_->{sticker}} }; $_;
280             } @{$base} ];
281              
282             return $single ? $return->[0] : $return;
283             };
284             }
285              
286             sub idle {
287 0     0 1 0 my ($self, @subsystems) = @_;
288              
289 0         0 $self->{idle_future} = $self->loop->new_future;
290              
291 0         0 weaken $self;
292             repeat {
293             $self->send( idle => @subsystems, sub {
294 0         0 $self->emit( shift->{changed} );
295 0     0   0 });
296 0     0   0 } until => sub { $self->{idle_future}->is_ready };
  0         0  
297              
298 0         0 return $self->{idle_future};
299             }
300              
301             sub noidle {
302 0     0 1 0 my ($self) = @_;
303              
304 0         0 my $idle = $self->{idle_future};
305 0 0       0 return unless defined $idle;
306 0 0       0 return if $idle->is_ready;
307              
308 0         0 $self->send( 'noidle' );
309 0         0 $idle->done;
310              
311 0         0 return;
312             }
313              
314             sub send {
315 0     0 1 0 my $self = shift;
316              
317 0 0       0 my $opt = ( ref $_[0] eq 'HASH' ) ? shift : {};
318 0 0       0 my $cb = pop if ref $_[-1] eq 'CODE';
319 0         0 my (@commands) = @_;
320              
321 0 0       0 croak 'Need commands to send'
322             unless @commands;
323              
324             # Normalise input
325 0 0       0 if (ref $commands[0] eq 'ARRAY') {
326             @commands = map {
327 0 0       0 ( ref $_ eq 'ARRAY' ) ? join( q{ }, @{$_} ) : $_;
  0         0  
328 0         0 } @{$commands[0]};
  0         0  
329             }
330             else {
331 0         0 @commands = join q{ }, @commands;
332             }
333              
334 0         0 my $command = '';
335             # Remove underscores from (most) command names
336             @commands = map {
337 0         0 my $args;
  0         0  
338 0         0 ($command, $args) = split /\s/, $_, 2;
339 0 0       0 $command =~ s/_//g unless $command =~ /^(replay_gain_|command_list)/;
340 0   0     0 $args //= q{};
341 0 0       0 $command . ($args ne q{} ? " $args" : q{});
342             } @commands;
343              
344             # Ensure a command list if sending multiple commands
345 0 0       0 if (scalar @commands > 1) {
346 0   0     0 my $list = $opt->{list} // 1;
347 0 0       0 my $list_start =
348             'command_list' . ( $list ? '_ok' : q{} ) . '_begin';
349              
350 0 0       0 unshift @commands, $list_start
351             unless $commands[0] =~ /^command_list/;
352 0 0       0 push @commands, 'command_list_end'
353             unless $commands[-1] =~ /^command_list/;
354             }
355              
356 0         0 my $parser;
357              
358 0 0       0 if (defined $opt->{parser}) {
359 0         0 my $input = delete $opt->{parser};
360 0 0       0 $parser = (ref $input eq 'CODE') ? $input : $parsers->{$input};
361 0 0       0 croak 'Not a code reference or recognised parser name'
362             unless defined $parser;
363             }
364             else {
365             $parser = sub {
366 0     0   0 my ($input, $commands) = @_;
367              
368             my @result = map {
369 0         0 my $command;
370 0   0     0 do { $command = shift @{$commands} }
  0         0  
  0         0  
371             until !defined $command or $command !~ /^command_list/;
372              
373 0   0     0 my $sub = $parsers->{$command // ''} // $parsers->{none};
      0        
374              
375 0         0 $sub->( $input->[$_] );
376 0         0 } 0 .. $#{$input};
  0         0  
377              
378             return @result
379 0         0 };
  0         0  
380             }
381              
382 0         0 my $future = $self->loop->new_future;
383 0 0       0 $future->on_done( $cb ) if $cb;
384              
385             return $future->fail('No connection to MPD server' )
386 0 0       0 unless $self->{mpd_handle};
387              
388             $self->push_read( sub {
389 0     0   0 my ($success, $result) = @_;
390              
391 0 0       0 if ($success) {
392             $future->done( $parser->(
393 0         0 $result, [ map { my ($name) = split /\s/, $_, 2 } @commands ]
  0         0  
394             ));
395             }
396             else {
397 0         0 $self->emit( error => $result );
398 0         0 $future->fail( $result );
399             }
400 0         0 });
401              
402 0         0 $log->tracef( '> %s', $_ ) foreach @commands;
403 0         0 $self->{mpd_handle}->write( join("\n", @commands) . "\n" );
404              
405 0         0 return $future;
406             }
407              
408 0     0 1 0 sub get { shift->send( @_ )->get }
409              
410             sub until {
411 0     0 1 0 my ($self, $name, $check, $cb) = @_;
412              
413 0         0 weaken $self;
414 0         0 my $wrapper;
415             $wrapper = sub {
416 0 0   0   0 if ($check->(@_)) {
417 0         0 $self->unsubscribe($name => $wrapper);
418 0         0 $cb->(@_);
419             }
420 0         0 };
421 0         0 $self->on($name => $wrapper);
422 0         0 weaken $wrapper;
423              
424 0         0 return $wrapper;
425             }
426              
427             sub BUILD {
428 1     1 0 4433 my ($self, $args) = @_;
429 1 50       6 $self->connect->get if $self->auto_connect;
430 1     0   8 $self->catch( sub {} );
431 1         19 $self->{mpd_buffer} = [[]];
432             }
433              
434             sub connect {
435 0     0 1   my ($self) = @_;
436 0           my $loop = $self->loop;
437 0           my $connected = $loop->new_future;
438              
439 0 0         return $connected->done if $self->state eq 'ready';
440              
441 0 0         my $socket = IO::Socket::IP->new($self->host . ':' . $self->port)
442             or return $connected->fail("MPD connect failed: $!");
443              
444 0           $log->debugf('Connecting to %s:%s', $self->host, $self->port);
445              
446 0     0     my $on_error = sub { $self->emit( error => shift ) };
  0            
447              
448             my $handle = IO::Async::Stream->new(
449             handle => $socket,
450 0     0     on_read_error => sub { $on_error->('Read error: ' . shift) },
451 0     0     on_write_error => sub { $on_error->('Write error: ' . shift) },
452 0     0     on_read_eof => sub { shift->close },
453             on_closed => sub {
454 0     0     $self->{mpd_handle} = undef;
455 0           $self->emit( 'close' );
456             },
457 0           on_read => $self->_parse_block,
458             );
459              
460 0 0         unless ($self->{mpd_handle}) {
461 0           $self->{mpd_handle} = $handle;
462 0           $loop->add( $handle );
463             }
464              
465             $self->until( state =>
466 0     0     sub { $_[1] eq 'ready' },
467 0     0     sub { $connected->done; }
468 0           );
469              
470 0           return $connected;
471             }
472              
473             1;
474              
475             __END__