File Coverage

lib/Finance/Alpaca/DataStream.pm
Criterion Covered Total %
statement 30 72 41.6
branch 0 18 0.0
condition n/a
subroutine 10 15 66.6
pod 2 3 66.6
total 42 108 38.8


line stmt bran cond sub pod time code
1             package Finance::Alpaca::DataStream 0.9902 {
2 1     1   8 use strictures 2;
  1         24  
  1         52  
3 1     1   229 use Moo;
  1         2  
  1         8  
4 1     1   367 use feature 'signatures';
  1         2  
  1         113  
5 1     1   7 no warnings 'experimental::signatures';
  1         2  
  1         34  
6 1     1   6 use Types::Standard qw[ArrayRef CodeRef Dict Enum InstanceOf Str];
  1         2  
  1         6  
7 1     1   1431 use Mojo::Promise;
  1         2  
  1         19  
8             #
9 1     1   33 use lib './lib/';
  1         2  
  1         5  
10 1     1   509 use Finance::Alpaca::Struct::Bar qw[to_Bar Bar];
  1         8  
  1         9  
11 1     1   949 use Finance::Alpaca::Struct::Trade qw[to_Trade Trade];
  1         2  
  1         13  
12 1     1   949 use Finance::Alpaca::Struct::Quote qw[to_Quote Quote];
  1         3  
  1         7  
13             #
14             has source => ( is => 'ro', isa => Enum [ 'iex', 'sip' ], required => 1, default => 'iex' );
15             has tx => ( is => 'rwp', isa => InstanceOf ['Mojo::Transaction::WebSocket'], predicate => 1 );
16             has cb => ( is => 'ro', isa => CodeRef, required => 1 );
17             has subscriptions => (
18             is => 'rwp',
19             isa => Dict [ bars => ArrayRef [Str], quotes => ArrayRef [Str], trades => ArrayRef [Str] ],
20             default => sub { { bars => [], quotes => [], trades => [] } },
21             lazy => 1
22             );
23              
24 0     0 0   sub authorize ( $s, $ua, $keys ) {
  0            
  0            
  0            
  0            
25 0           $ua->websocket_p( 'wss://stream.data.alpaca.markets/v2/'
26             . $s->source => { 'Sec-WebSocket-Extensions' => 'permessage-deflate' } )->then(
27 0     0     sub ($tx) {
  0            
28 0           my $promise = Mojo::Promise->new;
29 0           $s->_set_tx($tx);
30              
31             #$tx->on( finish => sub { $promise->resolve } );
32             # my $promise = Mojo::Promise->new;
33             #$tx->on( finish => sub { $promise->resolve } );
34             $tx->on(
35 0           json => sub ( $tx, $msgs ) {
36 0           for my $msg (@$msgs) {
37              
38 0 0         if ( $msg->{T} eq 'success' ) {
    0          
    0          
    0          
    0          
    0          
39 0 0         if ( $msg->{msg} eq 'connected' ) { # Send auth
    0          
40 0           $tx->send(
41             {
42             json => {
43             action => 'auth',
44             key => $keys->[0],
45             secret => $keys->[1]
46             }
47             }
48             );
49             }
50             elsif ( $msg->{msg} eq 'authenticated' ) {
51 0           $promise->resolve;
52             }
53             }
54             elsif ( $msg->{T} eq 'error' ) {
55 0           $s->cb->($msg);
56 0 0         if ( $msg->{code} eq 406 ) { # Already connected; ignore
57              
58             # Send auth
59 0           $tx->send(
60             {
61             json => {
62             action => 'auth',
63             key => $s->keys->[0],
64             secret => $s->keys->[1]
65             }
66             }
67             );
68             }
69             }
70             elsif ( $msg->{T} eq 't' ) {
71 0           $s->cb->( to_Trade($msg) );
72             }
73             elsif ( $msg->{T} eq 'q' ) {
74 0           $s->cb->( to_Quote($msg) );
75             }
76             elsif ( $msg->{T} eq 'b' ) {
77 0           $s->cb->( to_Bar($msg) );
78             }
79             elsif ( $msg->{T} eq 'subscription' ) {
80 0           delete $msg->{T};
81 0           $s->_set_subscriptions($msg);
82             }
83             else {
84             #warn 'unknown data';
85             #...;
86 0           $s->cb->($msg);
87             }
88             }
89              
90             #$tx->finish;
91             }
92 0           );
93 0           return $promise;
94             }
95 0           )->catch(
96 0     0     sub ($err) {
  0            
97 0           warn "WebSocket error: $err";
98             }
99 0           );
100             }
101              
102 0     0 1   sub subscribe ( $s, %params ) {
  0            
  0            
  0            
103 0           $s->tx->send( { json => { action => 'subscribe', %params } } );
104             }
105              
106 0     0 1   sub unsubscribe ( $s, %params ) {
  0            
  0            
  0            
107 0           $s->tx->send( { json => { action => 'unsubscribe', %params } } );
108             }
109             }
110             1;
111             __END__