File Coverage

lib/Finance/Alpaca/DataStream.pm
Criterion Covered Total %
statement 30 72 41.6
branch 0 18 0.0
condition n/a
subroutine 10 15 66.6
pod 2 3 66.6
total 42 108 38.8


line stmt bran cond sub pod time code
1             package Finance::Alpaca::DataStream 0.9904 {
2 17     17   262 use strictures 2;
  17         161  
  17         789  
3 17     17   4101 use Moo;
  17         34  
  17         150  
4 17     17   6490 use feature 'signatures';
  17         39  
  17         1857  
5 17     17   118 no warnings 'experimental::signatures';
  17         48  
  17         983  
6 17     17   118 use Types::Standard qw[ArrayRef CodeRef Dict Enum InstanceOf Str];
  17         40  
  17         185  
7 17     17   24682 use Mojo::Promise;
  17         34  
  17         347  
8             #
9 17     17   944 use lib './lib/';
  17         33  
  17         95  
10 17     17   9968 use Finance::Alpaca::Struct::Bar qw[to_Bar Bar];
  17         60  
  17         204  
11 17     17   19699 use Finance::Alpaca::Struct::Trade qw[to_Trade Trade];
  17         53  
  17         214  
12 17     17   17584 use Finance::Alpaca::Struct::Quote qw[to_Quote Quote];
  17         60  
  17         172  
13             #
14             has source => ( is => 'ro', isa => Enum [ 'iex', 'sip' ], required => 1, default => 'iex' );
15             has tx => ( is => 'rwp', isa => InstanceOf ['Mojo::Transaction::WebSocket'], predicate => 1 );
16             has cb => ( is => 'ro', isa => CodeRef, required => 1 );
17             has subscriptions => (
18             is => 'rwp',
19             isa => Dict [
20             bars => ArrayRef [Str], dailyBars => ArrayRef [Str], quotes => ArrayRef [Str],
21             trades => ArrayRef [Str]
22             ],
23             default => sub { { bars => [], quotes => [], trades => [] } },
24             lazy => 1
25             );
26              
27 0     0 0   sub authorize ( $s, $ua, $keys ) {
  0            
  0            
  0            
  0            
28 0           $ua->websocket_p( 'wss://stream.data.alpaca.markets/v2/'
29             . $s->source => { 'Sec-WebSocket-Extensions' => 'permessage-deflate' } )->then(
30 0     0     sub ($tx) {
  0            
31 0           my $promise = Mojo::Promise->new;
32 0           $s->_set_tx($tx);
33              
34             #$tx->on( finish => sub { $promise->resolve } );
35             # my $promise = Mojo::Promise->new;
36             #$tx->on( finish => sub { $promise->resolve } );
37             $tx->on(
38 0           json => sub ( $tx, $msgs ) {
39 0           for my $msg (@$msgs) {
40              
41 0 0         if ( $msg->{T} eq 'success' ) {
    0          
    0          
    0          
    0          
    0          
42 0 0         if ( $msg->{msg} eq 'connected' ) { # Send auth
    0          
43 0           $tx->send(
44             {
45             json => {
46             action => 'auth',
47             key => $keys->[0],
48             secret => $keys->[1]
49             }
50             }
51             );
52             }
53             elsif ( $msg->{msg} eq 'authenticated' ) {
54 0           $promise->resolve;
55             }
56             }
57             elsif ( $msg->{T} eq 'error' ) {
58 0           $s->cb->($msg);
59 0 0         if ( $msg->{code} eq 406 ) { # Already connected; ignore
60              
61             # Send auth
62 0           $tx->send(
63             {
64             json => {
65             action => 'auth',
66             key => $s->keys->[0],
67             secret => $s->keys->[1]
68             }
69             }
70             );
71             }
72             }
73             elsif ( $msg->{T} eq 't' ) {
74 0           $s->cb->( to_Trade($msg) );
75             }
76             elsif ( $msg->{T} eq 'q' ) {
77 0           $s->cb->( to_Quote($msg) );
78             }
79             elsif ( $msg->{T} eq 'b' ) {
80 0           $s->cb->( to_Bar($msg) );
81             }
82             elsif ( $msg->{T} eq 'subscription' ) {
83 0           delete $msg->{T};
84 0           $s->_set_subscriptions($msg);
85             }
86             else {
87             #warn 'unknown data';
88             #...;
89 0           $s->cb->($msg);
90             }
91             }
92              
93             #$tx->finish;
94             }
95 0           );
96 0           return $promise;
97             }
98 0           )->catch(
99 0     0     sub ($err) {
  0            
100 0           warn "WebSocket error: $err";
101             }
102 0           );
103             }
104              
105 0     0 1   sub subscribe ( $s, %params ) {
  0            
  0            
  0            
106 0           $s->tx->send( { json => { action => 'subscribe', %params } } );
107             }
108              
109 0     0 1   sub unsubscribe ( $s, %params ) {
  0            
  0            
  0            
110 0           $s->tx->send( { json => { action => 'unsubscribe', %params } } );
111             }
112             }
113             1;
114             __END__