File Coverage

blib/lib/Mojo/MySQL5/Database.pm
Criterion Covered Total %
statement 21 117 17.9
branch 0 42 0.0
condition 0 23 0.0
subroutine 7 26 26.9
pod 9 9 100.0
total 37 217 17.0


line stmt bran cond sub pod time code
1             package Mojo::MySQL5::Database;
2 6     6   23 use Mojo::Base 'Mojo::EventEmitter';
  6         15  
  6         23  
3              
4 6     6   2604 use Mojo::MySQL5::Connection;
  6         11  
  6         54  
5 6     6   2403 use Mojo::MySQL5::Results;
  6         11  
  6         39  
6 6     6   2090 use Mojo::MySQL5::Transaction;
  6         11  
  6         35  
7 6     6   132 use Encode '_utf8_off';
  6         8  
  6         210  
8 6     6   20 use Scalar::Util 'weaken';
  6         5  
  6         171  
9 6     6   19 use Carp 'croak';
  6         6  
  6         7604  
10              
11             has ['mysql', 'connection'];
12              
13             sub DESTROY {
14 0     0     my $self = shift;
15 0 0         return unless my $c = $self->connection;
16 0 0         return unless my $mysql = $self->mysql;
17 0           $mysql->_enqueue($c);
18             }
19              
20 0 0   0 1   sub backlog { scalar @{shift->{waiting} || []} }
  0            
21              
22             sub begin {
23 0     0 1   my $self = shift;
24 0 0         croak 'Already in a transaction' if ($self->connection->{status_flags} & 0x0001);
25 0           $self->query('START TRANSACTION');
26 0           $self->query('SET autocommit=0');
27 0           my $tx = Mojo::MySQL5::Transaction->new(db => $self);
28 0           weaken $tx->{db};
29 0           return $tx;
30             }
31              
32             sub connect {
33 0     0 1   my $self = shift;
34 0           my $c = Mojo::MySQL5::Connection->new(url => $self->mysql->url);
35             $c->on(error => sub {
36 0     0     my ($c, $err) = @_;
37 0           warn 'Unable to connect to "', $self->mysql->url, '" ', $err, "\n";
38 0           });
39 0           $c->connect();
40 0           $c->unsubscribe('error');
41 0           return $self->connection($c);
42             }
43              
44 0     0 1   sub disconnect { shift->connection->disconnect }
45              
46 0     0 1   sub pid { shift->connection->{connection_id} }
47              
48 0     0 1   sub ping { shift->connection->ping }
49              
50             sub query {
51 0     0 1   my $self = shift;
52 0 0         my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
53 0           my $sql = shift;
54 0           my $expand_sql = '';
55              
56 0           _utf8_off $sql;
57              
58 0           while (length($sql) > 0) {
59 0           my $token;
60 0 0 0       if ($sql =~ /^(\s+)/s # whitespace
    0 0        
      0        
      0        
      0        
      0        
61             or $sql =~ /^(\w+)/) { # general name
62 0           $token = $1;
63             }
64             elsif ($sql =~ /^--.*(?:\n|\z)/p # double-dash comment
65             or $sql =~ /^\#.*(?:\n|\z)/p # hash comment
66             or $sql =~ /^\/\*(?:[^\*]|\*[^\/])*(?:\*\/|\*\z|\z)/p # C-style comment
67             or $sql =~ /^'(?:[^'\\]*|\\(?:.|\n)|'')*(?:'|\z)/p # single-quoted literal text
68             or $sql =~ /^"(?:[^"\\]*|\\(?:.|\n)|"")*(?:"|\z)/p # double-quoted literal text
69             or $sql =~ /^`(?:[^`]*|``)*(?:`|\z)/p) { # schema-quoted literal text
70 0           $token = ${^MATCH};
71             }
72             else {
73 0           $token = substr($sql, 0, 1);
74             }
75 0 0         $expand_sql .= $token eq '?' ? $self->quote(shift) : $token;
76 0           substr($sql, 0, length($token), '');
77             }
78              
79 0 0 0       croak 'async query in flight' if $self->backlog and !$cb;
80 0 0         $self->_subscribe unless $self->backlog;
81              
82 0           push @{$self->{waiting}}, { cb => $cb, sql => $expand_sql, count => 0, started => 0,
  0            
83             results => Mojo::MySQL5::Results->new };
84              
85             # Blocking
86 0 0         unless ($cb) {
87 0           $self->connection->query($expand_sql);
88 0           $self->_unsubscribe;
89 0           my $current = shift @{$self->{waiting}};
  0            
90 0 0         croak $self->connection->{error_message} if $self->connection->{error_code};
91 0           return $current->{results};
92             }
93              
94             # Non-blocking
95 0           $self->_next;
96             }
97              
98             sub quote {
99 0     0 1   my ($self, $string) = @_;
100 0 0         return 'NULL' unless defined $string;
101              
102 0           for ($string) {
103 0           s/\\/\\\\/g;
104 0           s/\0/\\0/g;
105 0           s/\n/\\n/g;
106 0           s/\r/\\r/g;
107 0           s/'/\\'/g;
108             # s/"/\\"/g;
109 0           s/\x1a/\\Z/g;
110             }
111              
112 0           return "'$string'";
113             }
114              
115             sub quote_id {
116 0     0 1   my ($self, $id) = @_;
117 0 0         return 'NULL' unless defined $id;
118 0           $id =~ s/`/``/g;
119 0           return "`$id`";
120             }
121              
122              
123             sub _next {
124 0     0     my $self = shift;
125              
126 0 0         return unless my $next = $self->{waiting}[0];
127 0 0         return if $next->{started}++;
128              
129             $self->connection->query($next->{sql}, sub {
130 0     0     my $c = shift;
131 0           my $current = shift @{$self->{waiting}};
  0            
132 0           my $error = $c->{error_message};
133              
134 0 0         $self->backlog ? $self->_next : $self->_unsubscribe;
135              
136 0           my $cb = $current->{cb};
137 0           $self->$cb($error, $current->{results});
138 0           });
139              
140             }
141              
142             sub _subscribe {
143 0     0     my $self = shift;
144              
145             $self->connection->on(fields => sub {
146 0     0     my ($c, $fields) = @_;
147 0 0         return unless my $res = $self->{waiting}->[0]->{results};
148 0           push @{ $res->{_columns} }, $fields;
  0            
149 0           $self->{waiting}->[0]->{count}++;
150 0           });
151              
152             $self->connection->on(result => sub {
153 0     0     my ($c, $row) = @_;
154 0 0         return unless my $res = $self->{waiting}->[0]->{results};
155 0   0       push @{ $res->{_results}->[$self->{waiting}->[0]->{count} - 1] //= [] }, $row;
  0            
156 0           });
157              
158             $self->connection->on(end => sub {
159 0     0     my $c = shift;
160 0 0         return unless my $res = $self->{waiting}->[0]->{results};
161 0           $res->{$_} = $c->{$_} for qw(affected_rows last_insert_id warnings_count);
162 0           });
163              
164             $self->connection->on(error => sub {
165 0     0     my $c = shift;
166 0 0         return unless my $res = $self->{waiting}->[0]->{results};
167 0           $res->{$_} = $c->{$_} for qw(error_code sql_state error_message);
168 0           });
169             }
170              
171             sub _unsubscribe {
172 0     0     my $self = shift;
173 0           $self->connection->unsubscribe($_) for qw(fields result end error);
174             }
175              
176             1;
177              
178             =encoding utf8
179              
180             =head1 NAME
181              
182             Mojo::MySQL5::Database - Database
183              
184             =head1 SYNOPSIS
185              
186             use Mojo::MySQL5::Database;
187              
188             my $db = Mojo::MySQL5::Database->new(
189             mysql => $mysql,
190             connection => Mojo::MySQL5::Connection->new);
191              
192             =head1 DESCRIPTION
193              
194             L is a container for L handles used by L.
195              
196             =head1 ATTRIBUTES
197              
198             L implements the following attributes.
199              
200             =head2 connection
201              
202             my $c = $db->connection;
203             $db = $db->connection(Mojo::MySQL5::Connection->new);
204              
205             Database connection used for all queries.
206              
207             =head2 mysql
208              
209             L object this database belongs to.
210              
211             =head1 METHODS
212              
213             L inherits all methods from L and
214             implements the following ones.
215              
216             =head2 backlog
217              
218             my $num = $db->backlog;
219              
220             Number of waiting non-blocking queries.
221              
222             =head2 begin
223              
224             my $tx = $db->begin;
225              
226             Begin transaction and return L object, which will
227             automatically roll back the transaction unless
228             L bas been called before it is destroyed.
229              
230             # Insert rows in a transaction
231             eval {
232             my $tx = $db->begin;
233             $db->query('insert into frameworks values (?)', 'Catalyst');
234             $db->query('insert into frameworks values (?)', 'Mojolicious');
235             $tx->commit;
236             };
237             say $@ if $@;
238              
239             =head2 connect
240              
241             $db->connect;
242              
243             Connect to MySQL server.
244              
245             =head2 disconnect
246              
247             $db->disconnect;
248              
249             Disconnect database connection and prevent it from getting cached again.
250              
251             =head2 pid
252              
253             my $pid = $db->pid;
254              
255             Return the connection id of the backend server process.
256              
257             =head2 ping
258              
259             my $bool = $db->ping;
260              
261             Check database connection.
262              
263             =head2 query
264              
265             my $results = $db->query('select * from foo');
266             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
267              
268             Execute a blocking statement and return a L object with the
269             results. You can also append a callback to perform operation non-blocking.
270              
271             $db->query('select * from foo' => sub {
272             my ($db, $err, $results) = @_;
273             ...
274             });
275             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
276              
277             =head2 quote
278            
279             my $escaped = $db->quote($str);
280            
281             Quote string value for passing to SQL query.
282            
283             =head2 quote_id
284            
285             my $escaped = $db->quote_id($id);
286            
287             Quote identifier for passing to SQL query.
288              
289             =head1 SEE ALSO
290              
291             L.
292              
293             =cut