File Coverage

blib/lib/Mojo/MySQL5/Connection.pm
Criterion Covered Total %
statement 172 305 56.3
branch 60 144 41.6
condition 6 39 15.3
subroutine 28 51 54.9
pod 4 4 100.0
total 270 543 49.7


line stmt bran cond sub pod time code
1             package Mojo::MySQL5::Connection;
2 7     7   20201 use Mojo::Base 'Mojo::EventEmitter';
  7         8  
  7         35  
3              
4 7     7   2082 use utf8;
  7         10  
  7         29  
5 7     7   658 use Encode qw(_utf8_off _utf8_on);
  7         7111  
  7         382  
6 7     7   508 use Digest::SHA qw(sha1);
  7         2566  
  7         267  
7 7     7   25 use Scalar::Util 'weaken';
  7         8  
  7         224  
8 7     7   1972 use Mojo::IOLoop;
  7         351298  
  7         30  
9 7     7   436 use Mojo::MySQL5::URL;
  7         9  
  7         35  
10              
11             has state => 'disconnected';
12              
13             has url => sub { Mojo::MySQL5::URL->new('mysql:///') };
14              
15 7   50 7   439 use constant DEBUG => $ENV{MOJO_MYSQL_DEBUG} // 0;
  7         7  
  7         773  
16              
17             use constant {
18 7         1065 CLIENT_CAPABILITY => [ qw(
19             LONG_PASSWORD FOUND_ROWS LONG_FLAG CONNECT_WITH_DB
20             NO_SCHEMA COMPRESS ODBC LOCAL_FILES
21             IGNORE_SPACE PROTOCOL_41 INTERACTIVE SSL
22             IGNORE_SIGPIPE TRANSACTIONS RESERVED SECURE_CONNECTION
23             MULTI_STATEMENTS MULTI_RESULTS PS_MULTI_RESULTS PLUGIN_AUTH
24             CONNECT_ATTRS PLUGIN_AUTH_LENENC_CLIENT_DATA CAN_HANDLE_EXPIRED_PASSWORDS SESSION_TRACK
25             DEPRECATE_EOF) ],
26              
27             SERVER_STATUS => [ qw(
28             STATUS_IN_TRANS STATUS_AUTOCOMMIT RESERVED MORE_RESULTS_EXISTS
29             STATUS_NO_GOOD_INDEX_USED STATUS_NO_INDEX_USED STATUS_CURSOR_EXISTS STATUS_LAST_ROW_SENT
30             STATUS_DB_DROPPED STATUS_NO_BACKSLASH_ESCAPES STATUS_METADATA_CHANGED QUERY_WAS_SLOW
31             PS_OUT_PARAMS STATUS_IN_TRANS_READONLY SESSION_STATE_CHANGED) ],
32              
33             FIELD_FLAG => [ qw(
34             NOT_NULL PRI_KEY UNIQUE_KEY MULTIPLE_KEY
35             BLOB UNSIGNED ZEROFILL BINARY
36             ENUM AUTO_INCREMENT TIMESTAMP SET) ],
37              
38             CHARSET => {
39             UTF8 => 33, BINARY => 63, ASCII => 65 },
40              
41             DATATYPE => {
42             DECIMAL => 0x00, TINY => 0x01, SHORT => 0x02, LONG => 0x03,
43             FLOAT => 0x04, DOUBLE => 0x05,
44             NULL => 0x06, TIMESTAMP => 0x07,
45             LONGLONG => 0x08, INT24 => 0x09,
46             DATE => 0x0a, TIME => 0x0b, DATETIME => 0x0c, YEAR => 0x0d, NEWDATE => 0x0e,
47             VARCHAR => 0x0f, BIT => 0x10,
48             NEWDECIMAL => 0xf6, ENUM => 0xf7, SET => 0xf8,
49             TINY_BLOB => 0xf9, MEDIUM_BLOB => 0xfa, LONG_BLOB => 0xfb, BLOB => 0xfc,
50             VAR_STRING => 0xfd, STRING => 0xfe, GEOMETRY => 0xff },
51 7     7   25 };
  7         8  
52              
53             use constant {
54 7         34 REV_CHARSET => { reverse %{CHARSET()} },
  189         786  
55 7         8 REV_DATATYPE => { map { chr(DATATYPE->{$_}) => $_ } keys %{DATATYPE()} },
  7         25  
56 7     7   28 };
  7         8  
57              
58             # state machine
59             # doing => { state => '_op', state => '_op' }
60 7         24169 use constant SEQ => {
61             connect => {
62             connected => '_recv_handshake',
63             handshake => '_send_auth',
64             auth => '_recv_ok',
65             },
66             query => {
67             idle => '_send_query',
68             query => '_recv_query_responce',
69             field => '_recv_field',
70             result => '_recv_row',
71             },
72             ping => {
73             idle => '_send_ping',
74             ping => '_recv_ok',
75             },
76             disconnect => {
77             idle => '_send_quit',
78             quit => '_recv_ok'
79             }
80 7     7   24 };
  7         7  
81              
82              
83             # encode fixed length integer
84             sub _encode_int($$) {
85 0     0   0 my ($int, $len) = @_;
86 0 0 0     0 return substr pack('V', $int), 0, $len if $len >= 1 and $len <= 4;
87 0 0 0     0 return substr pack('VV', int $int % 2 ** 32, int $int / 2 ** 32), 0, $len if $len == 6 or $len = 8;
88 0         0 return undef;
89             }
90              
91             # encode length coded integer
92             sub _encode_lcint($) {
93 29     29   3466 my $int = shift;
94             return
95 29 100       21204 !defined $int ? pack 'C', 251 :
    100          
    100          
    100          
96             $int <= 250 ? pack 'C', $int :
97             $int <= 0xffff ? pack 'Cv', 252, $int :
98             $int <= 0xffffff ? substr pack('CV', 253, $int), 0, 4 :
99             pack 'CVV', 254, int $int % 2 ** 32, int $int / 2 ** 32;
100             }
101              
102             # encode length coded string
103             sub _encode_lcstr($) {
104 12     12   11990 my $str = shift;
105 12 100       37 return defined $str ? _encode_lcint(length $str) . $str : _encode_lcint($str);
106             }
107              
108             # get fixed length integer
109             sub _get_int {
110 109     109   7883 my ($self, $len, $chew) = @_;
111 109 100       199 my $data = $chew ? substr $self->{incoming}, 0, $len, '' : substr $self->{incoming}, 0, $len;
112 109 100       274 return unpack 'C', $data if $len == 1;
113 31 100 66     165 return unpack 'V', $data . "\0\0" if $len >= 2 and $len <= 4;
114 6 50       14 return undef unless $len == 8;
115 6         12 my $lo = unpack ('V', substr $data, 0, 4);
116 6         7 my $hi = unpack('V', substr $data, 4, 4);
117 6 100       18 return $hi ?
118             int $lo + int $hi * 2 ** 32 : $lo;
119             }
120              
121 81     81   91 sub _chew_int { shift->_get_int(shift, 1) }
122              
123             # get length coded integer
124             sub _chew_lcint {
125 41     41   55 my $self = shift;
126 41         53 my $first = $self->_chew_int(1);
127             return
128 41 50       114 $first < 251 ? $first :
    100          
    100          
    100          
    100          
129             $first == 251 ? undef :
130             $first == 252 ? $self->_chew_int(2) :
131             $first == 253 ? $self->_chew_int(3) :
132             $first == 254 ? $self->_chew_int(8) : undef;
133             }
134              
135             # get length coded string
136             sub _chew_lcstr {
137 14     14   32 my $self = shift;
138 14         17 my $len = $self->_chew_lcint;
139 14 100       9802 return defined $len ? substr $self->{incoming}, 0, $len, '' : undef;
140             }
141              
142             # get zero ending string
143             sub _chew_zstr {
144 3     3   6 my $self = shift;
145 3         9 my $str = unpack 'Z*', $self->{incoming};
146 3 50       7 return undef unless defined $str;
147 3         7 substr $self->{incoming}, 0, length($str) + 1, '';
148 3         7 return $str;
149             }
150              
151             # get fixed length string
152             sub _chew_str {
153 8     8   7 my ($self, $len) = @_;
154 8 50       16 die "_chew_str($len) error" if $len > length $self->{incoming};
155 8         16 return substr $self->{incoming}, 0, $len, '';
156             }
157              
158              
159             sub _send_auth {
160 0     0   0 my $self = shift;
161              
162 0         0 my ($username, $password, $database, $crypt) =
163             ($self->url->username, $self->url->password, $self->url->database, '');
164              
165 0         0 my @flags = qw(LONG_PASSWORD LONG_FLAG PROTOCOL_41 TRANSACTIONS SECURE_CONNECTION MULTI_RESULTS);
166 0 0       0 push @flags, 'CONNECT_WITH_DB' if $database;
167 0 0       0 push @flags, 'MULTI_STATEMENTS' if $self->url->options->{multi_statements};
168 0 0       0 push @flags, 'FOUND_ROWS' if $self->url->options->{found_rows};
169 0         0 my $flags = _flag_set(CLIENT_CAPABILITY, @flags);
170              
171 0         0 warn '>>> AUTH ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
172             ' user:', $username, ' database:', $database,
173             ' flags:', _flag_list(CLIENT_CAPABILITY, $flags),
174             '(', sprintf('%08X', $flags), ')', "\n" if DEBUG > 1;
175              
176 0         0 _utf8_off $username; _utf8_off $password; _utf8_off $database;
  0         0  
  0         0  
177              
178 0 0       0 if ($password) {
179 0         0 my $crypt1 = sha1($password);
180 0         0 my $crypt2 = sha1($self->{auth_plugin_data} . sha1 $crypt1);
181 0         0 $crypt = $crypt1 ^ $crypt2;
182             }
183              
184 0         0 $self->state('auth');
185 0         0 delete $self->{auth_plugin_data};
186 0 0 0     0 return pack 'VVCx23Z*a*Z*',
187             $flags, 131072, ($self->url->options->{utf8} // 1) ? CHARSET->{UTF8} : CHARSET->{BINARY},
188             $username, _encode_lcstr($crypt), $database, 'mysql_native_password';
189             }
190              
191             sub _send_quit {
192 0     0   0 my $self = shift;
193 0         0 warn '>>> QUIT ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
194 0         0 $self->state('quit');
195 0         0 return pack 'C', 1;
196             }
197              
198             sub _send_query {
199 0     0   0 my $self = shift;
200 0         0 my $sql = $self->{sql};
201 0         0 warn '>>> QUERY ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
202             " sql:$sql\n" if DEBUG > 1;
203 0         0 _utf8_off $sql;
204 0         0 $self->state('query');
205 0         0 return pack('C', 3) . $sql;
206             }
207              
208             sub _send_ping {
209 0     0   0 my $self = shift;
210 0         0 warn '>>> PING ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
211 0         0 $self->state('ping');
212 0         0 return pack 'C', 14;
213             }
214              
215             sub _recv_error {
216 1     1   2 my $self = shift;
217 1         2 my $first = $self->_chew_int(1);
218 1 50       3 die "_recv_error() wrong packet $first" unless $first == 255;
219              
220 1         2 $self->{error_code} = $self->_chew_int(2);
221 1         3 $self->_chew_str(1);
222 1         2 $self->{sql_state} = $self->_chew_str(5);
223 1         3 $self->{error_message} = $self->_chew_zstr;
224              
225 1         1 warn '<<< ERROR ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
226             ' error:', $self->{error_code},
227             ' state:', $self->{sql_state},
228             ' message:', $self->{error_message}, "\n" if DEBUG > 1;
229              
230 1         3 $self->state('idle');
231 1         7 $self->emit(error => $self->{error_message});
232             }
233              
234             sub _recv_ok {
235 2     2   3656 my $self = shift;
236 2         4 my $first = $self->_get_int(1);
237 2 100       5 return $self->_recv_error if $first == 255;
238 1 50       3 die "_recv_ok() wrong packet $first" unless $first == 0;
239              
240 1         2 $self->_chew_int(1);
241 1         3 $self->{affected_rows} = $self->_chew_lcint;
242 1         2 $self->{last_insert_id} = $self->_chew_lcint;
243 1         2 $self->{status_flags} = $self->_chew_int(2);
244 1         2 $self->{warnings_count} = $self->_chew_int(2);
245 1         1 $self->{field_count} = 0;
246              
247 1         1 warn '<<< OK ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
248             ' affected:', $self->{affected_rows},
249             ' last_insert_id:', $self->{last_insert_id},
250             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
251             '(', sprintf('%04X', $self->{status_flags}), ')',
252             ' warnings:', $self->{warnings_count}, "\n" if DEBUG > 1;
253              
254 1 50       3 $self->emit('connect') if $self->state eq 'auth';
255 1 50       6 $self->emit('end') if $self->state eq 'query';
256 1         5 $self->state('idle');
257             }
258              
259             sub _recv_query_responce {
260 1     1   4 my $self = shift;
261 1         1 my $first = $self->_get_int(1);
262 1 50       3 return $self->_recv_error if $first == 255;
263 1 50       5 return $self->_recv_ok if $first == 0;
264              
265 1         2 $self->{field_count} = $self->_chew_lcint;
266              
267 1         1 warn '<<< QUERY_RESPONSE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
268             ' fields:', $self->{field_count}, "\n" if DEBUG > 1;
269              
270 1         3 $self->state('field');
271             }
272              
273             sub _recv_eof {
274 2     2   2 my $self = shift;
275 2         3 my $first = $self->_get_int(1);
276 2 50       24 return $self->_recv_error if $first == 255;
277 2 50       4 die "_recv_eof() wrong packet $first" unless $first == 254;
278              
279 2         4 $self->_chew_int(1);
280 2         3 $self->{warnings_count} = $self->_chew_int(2);
281 2         3 $self->{status_flags} = $self->_chew_int(2);
282              
283 2         1 warn '<<< EOF ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
284             ' warnings:', $self->{warnings_count},
285             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
286             '(', sprintf('%04X', $self->{status_flags}), ')', "\n" if DEBUG > 1;
287              
288 2 100       5 if ($self->state eq 'field') {
    50          
289 1         9 $self->emit(fields => $self->{column_info});
290 1         16 $self->state('result');
291             }
292             elsif ($self->state eq 'result') {
293 1         10 $self->{column_info} = [];
294 1 50       4 if ($self->{status_flags} & 0x0008) {
295             # MORE_RESULTS
296 0         0 $self->state('query');
297             }
298             else {
299 1         2 $self->emit(end => undef);
300 1         6 $self->state('idle');
301             }
302             }
303             }
304              
305             sub _recv_field {
306 2     2   1396 my $self = shift;
307 2         3 my $first = $self->_get_int(1);
308 2 50       5 return $self->_recv_error if $first == 255;
309 2 100       6 return $self->_recv_eof if $first == 254;
310 1 50       2 die "_recv_field() wrong packet $first" if $first > 250;
311              
312 1         1 my $field = {};
313 1         3 $field->{catalog} = $self->_chew_lcstr;
314 1         3 $field->{schema} = $self->_chew_lcstr;
315 1         2 $field->{table} = $self->_chew_lcstr;
316 1         2 $field->{org_table} = $self->_chew_lcstr;
317 1         1 $field->{name} = $self->_chew_lcstr;
318 1         3 $field->{org_name} = $self->_chew_lcstr;
319 1         3 $self->_chew_lcint;
320 1         2 $field->{character_set} = $self->_chew_int(2);
321 1         3 $field->{column_length} = $self->_chew_int(4);
322 1         2 $field->{column_type} = $self->_chew_int(1);
323 1         2 $field->{flags} = $self->_chew_int(2);
324 1         2 $field->{decimals} = $self->_chew_int(1);
325 1         3 $self->_chew_str(2);
326              
327 1 50 50     3 do { _utf8_on $field->{$_} for qw(catalog schema table org_table name org_name) }
  1         8  
328             if ($self->url->options->{utf8} // 1);
329              
330 1         2 push @{$self->{column_info}}, $field;
  1         1  
331              
332 1         2 warn '<<< FIELD ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
333             ' name:', $field->{name},
334             ' type:', REV_DATATYPE->{chr $field->{column_type}}, '(', $field->{column_type}, ')',
335             ' length:', $field->{column_length},
336             ' charset:', REV_CHARSET->{$field->{character_set}} // 'UNKNOWN', '(', $field->{character_set}, ')',
337             ' flags:', _flag_list(FIELD_FLAG, $field->{flags}), '(', $field->{flags}, ')', , "\n" if DEBUG > 1;
338             }
339              
340             sub _recv_row {
341 3     3   2170 my $self = shift;
342 3         6 my $first = $self->_get_int(1);
343 3 50       6 return $self->_recv_error if $first == 255;
344 3 100       6 return $self->_recv_eof if $first == 254;
345              
346 2         2 my @row;
347 2         4 for (0 .. $self->{field_count} - 1) {
348 2         3 $row[$_] = $self->_chew_lcstr;
349 2 50       10 _utf8_on $row[$_]
350             if $self->{column_info}->[$_]->{character_set} == CHARSET->{UTF8};
351             }
352              
353             warn '<<< ROW ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
354 2         1 join(', ', map { defined $_ ? "'" . $_ . "'" : 'null' } @row), "\n" if DEBUG > 1;
355              
356 2         5 $self->emit(result => \@row);
357             }
358              
359             sub _recv_handshake {
360 1     1   26856 my $self = shift;
361 1         5 my $first = $self->_get_int(1);
362 1 50       4 return $self->_recv_error if $first == 255;
363              
364 1         4 $self->{protocol_version} = $self->_chew_int(1);
365 1         5 $self->{server_version} = $self->_chew_zstr;
366 1         4 $self->{connection_id} = $self->_chew_int(4);
367 1         5 $self->{auth_plugin_data} = $self->_chew_str(8);
368 1         3 $self->_chew_str(1);
369 1         8 $self->{capability_flags} = $self->_chew_int(2);
370 1         3 $self->{character_set} = $self->_chew_int(1);
371 1         3 $self->{status_flags} = $self->_chew_int(2);
372 1         4 $self->{capability_flags} |= $self->_chew_int(2) << 16;
373 1         4 my $auth_len = $self->_chew_int(1);
374 1         3 $self->_chew_str(10);
375 1         3 $self->{auth_plugin_data} .= $self->_chew_str(12);
376 1         3 $self->_chew_str(1);
377 1         3 my $auth_plugin_name = $self->_chew_zstr;
378              
379 1         1 warn '<<< HANDSHAKE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
380             ' protocol:', $self->{protocol_version},
381             ' version:', $self->{server_version},
382             ' connection:', $self->{connection_id},
383             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
384             '(', sprintf('%04X', $self->{status_flags}), ')',
385             ' capabilities:', _flag_list(CLIENT_CAPABILITY, $self->{capability_flags}),
386             '(', sprintf('%08X', $self->{capability_flags}), ')',
387             ' auth:', $auth_plugin_name, "\n" if DEBUG > 1;
388              
389 1 50       5 die '_recv_handshake() invalid protocol version ' . $self->{protocol_version}
390             unless $self->{protocol_version} == 10;
391 1 50       4 die '_recv_handshake() unsupported auth method ' . $auth_plugin_name
392             unless $auth_plugin_name eq 'mysql_native_password';
393 1 50 33     7 die '_recv_handshake() invalid auth data '
394             unless $auth_len == 21 and length($self->{auth_plugin_data}) == 20;
395              
396 1         5 $self->state('handshake');
397             }
398              
399             sub _reset {
400 1     1   1140 my $self = shift;
401              
402 1         7 undef $self->{$_} for qw(error_code sql_state error_message
403             affected_rows last_insert_id status_flags warnings_count field_count);
404              
405 1         2 $self->{column_info} = [];
406 1         2 $self->{seq} = 0;
407 1         3 $self->{incoming} = '';
408             }
409              
410             sub _ioloop {
411 0 0 0 0   0 $_[1] ? Mojo::IOLoop->singleton : ($_[0]->{ioloop} ||= Mojo::IOLoop->new);
412             }
413              
414             sub _seq_next_ready {
415 0     0   0 my $self = shift;
416 0 0       0 return 0 if length $self->{incoming} < 4;
417 0         0 return length($self->{incoming}) - 4 >= $self->_get_int(3);
418             }
419              
420             sub _seq_next {
421 0     0   0 my ($self, $cmd, $writeonly) = @_;
422 0         0 my $next = SEQ->{$cmd}{$self->state};
423 0         0 warn 'stream state:', $self->state, ' doing:', $cmd, ' next:', ($next // ''), "\n" if DEBUG > 2;
424 0 0       0 return unless $next;
425 0 0       0 if (substr($next, 0, 6) eq '_send_') {
    0          
426 0         0 my $packet = $self->$next();
427 0         0 $self->{stream}->write(_encode_int(length $packet, 3) . _encode_int($self->{seq}, 1) . $packet);
428             }
429             elsif (substr($next, 0, 6) eq '_recv_') {
430 0 0       0 return if $writeonly;
431 0         0 my ($len, $seq) = ($self->_chew_int(3), $self->_chew_int(1));
432 0 0 0     0 die "_next_packet() packet out of order $seq " . $self->{seq}
433             if $self->{seq} and $seq != (($self->{seq} + 1) & 0xff);
434 0 0       0 die "_next_packet() not ready" if $len > length($self->{incoming});
435 0         0 $self->{seq}++;
436 0         0 $self->$next();
437             }
438             else {
439 0         0 $self->$next();
440             }
441             }
442              
443             sub _seq {
444 0     0   0 my ($self, $cmd, $cb) = @_;
445              
446 0         0 $self->{stream} = Mojo::IOLoop::Stream->new($self->{socket});
447 0 0       0 $self->{stream}->reactor($self->_ioloop(0)->reactor) unless $cb;
448 0 0       0 $self->{stream}->timeout($self->url->options->{query_timeout})
449             if $self->url->options->{query_timeout};
450 0         0 weaken $self;
451              
452             $self->{stream}->on(read => sub {
453 0     0   0 my ($stream, $bytes) = @_;
454 0         0 $self->{incoming} .= $bytes;
455              
456 0         0 $self->_seq_next($cmd, 0) while $self->_seq_next_ready;
457              
458 0 0       0 if ($self->state eq 'idle') {
459 0         0 $stream->steal_handle;
460 0         0 delete $self->{stream};
461 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
462             }
463             else {
464 0         0 $self->_seq_next($cmd, 1);
465             }
466 0         0 });
467             $self->{stream}->on(error => sub {
468 0     0   0 my ($stream, $err) = @_;
469 0         0 warn "stream error: $err state:", $self->state, "\n" if DEBUG;
470 0   0     0 $self->{error_message} //= $err;
471 0         0 $self->emit(error => $err);
472 0         0 });
473             $self->{stream}->on(timeout => sub {
474 0     0   0 warn "stream timeout state:", $self->state, "\n" if DEBUG;
475 0   0     0 $self->{error_message} //= 'timeout';
476 0         0 });
477             $self->{stream}->on(close => sub {
478 0     0   0 $self->{socket} = undef;
479 0         0 $self->state('disconnected');
480 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
481 0         0 });
482              
483 0         0 $self->{stream}->start;
484 0         0 $self->_seq_next($cmd, 1);
485             }
486              
487             sub _cmd {
488 0     0   0 my ($self, $cmd, $cb) = @_;
489 0 0       0 die 'invalid cmd:' . $cmd unless exists SEQ->{$cmd};
490 0 0       0 die 'invalid state:' . $self->state . ' doing:'. $cmd unless exists SEQ->{$cmd}{$self->state};
491              
492 0         0 $self->_reset;
493 0         0 $self->_seq($cmd, $cb);
494 0 0       0 $self->_ioloop(0)->start unless $cb;
495 0 0       0 return $self->state eq 'idle' ? 1 : 0;
496             }
497              
498             sub connect {
499 0     0 1 0 my ($self, $cb) = @_;
500              
501 0         0 $self->state('connecting');
502 0         0 $self->_reset;
503              
504 0         0 $self->{client} = Mojo::IOLoop::Client->new;
505 0 0       0 $self->{client}->reactor($self->_ioloop(0)->reactor) unless $cb;
506 0         0 weaken $self;
507              
508             $self->{client}->on(connect => sub {
509 0     0   0 my ($client, $handle) = @_;
510 0         0 delete $self->{client};
511 0         0 $self->{socket} = $handle;
512 0         0 $self->state('connected');
513 0         0 $self->_seq('connect', $cb);
514 0         0 });
515             $self->{client}->on(error => sub {
516 0     0   0 my ($client, $err) = @_;
517 0         0 delete $self->{client};
518 0         0 $self->state('disconnected');
519 0         0 $self->emit(error => $err);
520 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
521 0         0 });
522              
523 0   0     0 $self->{client}->connect(
      0        
      0        
524             address => $self->url->host || 'localhost',
525             port => $self->url->port || 3306,
526             timeout => $self->url->options->{connect_timeout} // 10
527             );
528              
529 0 0       0 $self->_ioloop(0)->start unless $cb;
530             }
531              
532 0     0 1 0 sub disconnect { shift->_cmd('disconnect') }
533              
534             sub ping {
535 0     0 1 0 my ($self, $cb) = @_;
536 0 0       0 return $self->state eq 'disconnected' ? 0 : $self->_cmd('ping', $cb);
537             }
538              
539             sub query {
540 0     0 1 0 my ($self, $sql, $cb) = @_;
541 0         0 $self->{sql} = $sql;
542 0         0 $self->_cmd('query', $cb);
543             }
544              
545             sub DESTROY {
546 1     1   996 my $self = shift;
547 1 50 33     3 $self->disconnect if $self->state eq 'idle' and $self->{socket};
548             }
549              
550             # Private util functions
551             sub _flag_list($$;$) {
552 0     0     my ($list, $data, $sep) = @_;
553 0           my $i = 0;
554 0   0       return join $sep || '|', grep { $data & 1 << $i++ } @$list;
  0            
555             }
556              
557             sub _flag_set($;@) {
558 0     0     my ($list, @ops) = @_;
559 0           my ($i, $flags) = (0, 0);
560 0           foreach my $flag (@$list) {
561 0 0         do { $flags |= 1 << $i if $_ eq $flag } for @ops;
  0            
562 0           $i++;
563             }
564 0           return $flags;
565             }
566              
567             sub _flag_is($$$) {
568 0     0     my ($list, $data, $flag) = @_;
569 0           my $i = 0;
570 0           foreach (@$list) {
571 0 0         return $data & 1 << $i if $flag eq $_;
572 0           $i++;
573             }
574 0           return undef;
575             }
576              
577             1;
578              
579             =encoding utf8
580              
581             =head1 NAME
582              
583             Mojo::MySQL5::Connection - TCP connection to MySQL Server
584              
585             =head1 SYNOPSIS
586              
587             use Mojo::MySQL5::Conection;
588              
589             my $c = Mojo::MySQL5::Conection->new(
590             url => Mojo::MySQL5->new(
591             'mysql://test:password@127.0.0.1:3306/test?found_rows=1&connect_timeout=2')
592             );
593              
594             Mojo::IOLoop->delay(
595             sub {
596             my $delay = shift;
597             $c->connect($delay->begin);
598             },
599             sub {
600             my ($delay, $c) = @_;
601             $c->query('select * from test_data', $delay->begin);
602             },
603             sub {
604             my ($delay, $c) = @_;
605             }
606             )->wait;
607              
608              
609             =head1 DESCRIPTION
610              
611             L is Asyncronous Protocol Implementation for connection to MySQL Server
612             managed by L.
613              
614             =head1 EVENTS
615              
616             L inherits all events from L and can emit the
617             following new ones.
618              
619             =head2 fields
620              
621             $c->on(fields => sub {
622             my ($c, $fields) = @_;
623             ...
624             });
625              
626             Emitted after posting query and fields definition is received.
627              
628             =head2 result
629              
630             $c->on(result => sub {
631             my ($c, $result) = @_;
632             ...
633             });
634              
635             Emited when a result row is received.
636              
637             =head2 end
638              
639             $c->on(end => sub {
640             my $c = shift;
641             ...
642             });
643              
644             Emited when query ended successfully.
645              
646             =head2 error
647              
648             $c->on(error => sub {
649             my ($c, $error) = @_;
650             ...
651             });
652              
653             Emited when Error is received.
654              
655              
656             =head1 ATTRIBUTES
657              
658             L implements the following attributes.
659              
660             =head2 state
661              
662             my $state = $c->state;
663             $c->state('disconnected');
664              
665             Connection State.
666              
667             Possible States are:
668              
669             =over 2
670              
671             =item disconnected
672              
673             Initial state before connecting to server.
674              
675             Same state after fatal erorr.
676              
677             =item connected
678              
679             Connection to server is established.
680              
681             Next wait for C packet.
682              
683             =item handshake
684              
685             Server responded with C.
686              
687             Next send C (authentication) packet.
688              
689             =item auth
690              
691             C (authentication) packet sent to server.
692              
693             Next wait for C or C packet.
694              
695             =item idle
696              
697             Connection is idle and ready for sending commands.
698              
699             =item query
700              
701             C packet sent to server.
702              
703             Waiting for C packet. C is expected for non-SELECT queries.
704              
705             =item field
706              
707             Waiting for C packets. C is expected for end of column definition.
708              
709             =item result
710              
711             Waiting for C packets. C is expected for end of result rows.
712              
713             =item ping
714              
715             C packet is sent to server.
716              
717             Waitint for C packet.
718              
719             =back
720              
721             =head2 url
722              
723             my $url = $c->url;
724             $c->url(Mojo::MySQL5::URL->new('mysql://localhost/test');
725              
726             MySQL Connection URL.
727              
728             Supported Options are:
729              
730             =over 2
731              
732             =item found_rows
733              
734             Enables or disables the flag C while connecting to the MySQL server.
735             Without C, if you perform a query like
736            
737             UPDATE $table SET id = 1 WHERE id = 1;
738            
739             then the MySQL engine will return 0, because no rows have changed.
740             With C, it will return the number of rows that have an id 1.
741              
742             =item multi_statements
743              
744             Enables or disables the flag C while connecting to the server.
745             If enabled multiple statements separated by semicolon (;) can be send with single
746             call to L.
747              
748             =item utf8
749              
750             If enabled default character set is to C while connecting to the server.
751             If disabled C is the default character set.
752              
753             =item connect_timeout
754              
755             The connect request to the server will timeout if it has not been successful
756             after the given number of seconds.
757              
758             =item query_timeout
759              
760             If enabled, the read or write operation to the server will timeout
761             if it has not been successful after the given number of seconds.
762              
763             =back
764              
765             =head1 METHODS
766              
767             L inherits all methods from L and
768             implements the following new ones.
769              
770             =head2 connect
771              
772             # Blocking
773             $c->connect;
774             # Non-Blocking
775             $c->connect(sub { ... });
776              
777             Connect and authenticate to MySQL Server.
778              
779             =head2 disconnect
780              
781             $c->disconnect;
782              
783             Disconnect gracefully from server.
784              
785             =head2 ping
786            
787             say "ok" if $c->ping;
788              
789             Check if connection is alive.
790              
791             =head2 query
792              
793             # Blocking
794             $c->query('select 1 as `one`');
795             # Non-Blocking
796             $c->query('select 1 as `one`', sub { ... });
797              
798             Send SQL query to server.
799             Results are handled by events.
800              
801             =head1 DEBUGGING
802              
803             Debugging is enabled if environment variable MOJO_MYSQL_DEBUG is set.
804              
805             Packet tracing is enabled if MOJO_MYSQL_DEBUG is 2 or greater.
806              
807             =head1 AUTHOR
808              
809             Svetoslav Naydenov, C.
810              
811             =head1 COPYRIGHT AND LICENSE
812              
813             Copyright (C) 2015, Svetoslav Naydenov.
814              
815             This program is free software, you can redistribute it and/or modify it under
816             the terms of the Artistic License version 2.0.
817              
818             =head1 SEE ALSO
819              
820             L,
821              
822             L.
823              
824             =cut