File Coverage

blib/lib/Mojo/MySQL5/Connection.pm
Criterion Covered Total %
statement 172 316 54.4
branch 60 152 39.4
condition 6 46 13.0
subroutine 28 51 54.9
pod 4 4 100.0
total 270 569 47.4


line stmt bran cond sub pod time code
1             package Mojo::MySQL5::Connection;
2 8     8   26177 use Mojo::Base 'Mojo::EventEmitter';
  8         10  
  8         32  
3              
4 8     8   2094 use utf8;
  8         12  
  8         21  
5 8     8   2622 use Encode qw(_utf8_off _utf8_on);
  8         39869  
  8         535  
6 8     8   2285 use Digest::SHA qw(sha1);
  8         14169  
  8         532  
7 8     8   54 use Scalar::Util 'weaken';
  8         13  
  8         324  
8 8     8   2704 use Mojo::IOLoop;
  8         520530  
  8         39  
9 8     8   2940 use Mojo::MySQL5::URL;
  8         26  
  8         64  
10              
11             has state => 'disconnected';
12              
13             has url => sub { Mojo::MySQL5::URL->new('mysql:///') };
14              
15 8   50 8   585 use constant DEBUG => $ENV{MOJO_MYSQL_DEBUG} // 0;
  8         10  
  8         991  
16              
17             use constant {
18 8         1220 CLIENT_CAPABILITY => [ qw(
19             LONG_PASSWORD FOUND_ROWS LONG_FLAG CONNECT_WITH_DB
20             NO_SCHEMA COMPRESS ODBC LOCAL_FILES
21             IGNORE_SPACE PROTOCOL_41 INTERACTIVE SSL
22             IGNORE_SIGPIPE TRANSACTIONS RESERVED SECURE_CONNECTION
23             MULTI_STATEMENTS MULTI_RESULTS PS_MULTI_RESULTS PLUGIN_AUTH
24             CONNECT_ATTRS PLUGIN_AUTH_LENENC_CLIENT_DATA CAN_HANDLE_EXPIRED_PASSWORDS SESSION_TRACK
25             DEPRECATE_EOF) ],
26              
27             SERVER_STATUS => [ qw(
28             STATUS_IN_TRANS STATUS_AUTOCOMMIT RESERVED MORE_RESULTS_EXISTS
29             STATUS_NO_GOOD_INDEX_USED STATUS_NO_INDEX_USED STATUS_CURSOR_EXISTS STATUS_LAST_ROW_SENT
30             STATUS_DB_DROPPED STATUS_NO_BACKSLASH_ESCAPES STATUS_METADATA_CHANGED QUERY_WAS_SLOW
31             PS_OUT_PARAMS STATUS_IN_TRANS_READONLY SESSION_STATE_CHANGED) ],
32              
33             FIELD_FLAG => [ qw(
34             NOT_NULL PRI_KEY UNIQUE_KEY MULTIPLE_KEY
35             BLOB UNSIGNED ZEROFILL BINARY
36             ENUM AUTO_INCREMENT TIMESTAMP SET) ],
37              
38             CHARSET => {
39             UTF8 => 33, BINARY => 63, ASCII => 65 },
40              
41             DATATYPE => {
42             DECIMAL => 0x00, TINY => 0x01, SHORT => 0x02, LONG => 0x03,
43             FLOAT => 0x04, DOUBLE => 0x05,
44             NULL => 0x06, TIMESTAMP => 0x07,
45             LONGLONG => 0x08, INT24 => 0x09,
46             DATE => 0x0a, TIME => 0x0b, DATETIME => 0x0c, YEAR => 0x0d, NEWDATE => 0x0e,
47             VARCHAR => 0x0f, BIT => 0x10,
48             NEWDECIMAL => 0xf6, ENUM => 0xf7, SET => 0xf8,
49             TINY_BLOB => 0xf9, MEDIUM_BLOB => 0xfa, LONG_BLOB => 0xfb, BLOB => 0xfc,
50             VAR_STRING => 0xfd, STRING => 0xfe, GEOMETRY => 0xff },
51 8     8   41 };
  8         7  
52              
53             use constant {
54 8         40 REV_CHARSET => { reverse %{CHARSET()} },
  216         977  
55 8         10 REV_DATATYPE => { map { chr(DATATYPE->{$_}) => $_ } keys %{DATATYPE()} },
  8         53  
56 8     8   35 };
  8         9  
57              
58             # state machine
59             # doing => { state => '_op', state => '_op' }
60 8         31240 use constant SEQ => {
61             connect => {
62             connected => '_recv_handshake',
63             handshake => '_send_auth',
64             auth => '_recv_ok',
65             },
66             query => {
67             idle => '_send_query',
68             query => '_recv_query_responce',
69             field => '_recv_field',
70             result => '_recv_row',
71             },
72             ping => {
73             idle => '_send_ping',
74             ping => '_recv_ok',
75             },
76             disconnect => {
77             idle => '_send_quit',
78             quit => '_recv_ok'
79             }
80 8     8   31 };
  8         7  
81              
82              
83             # encode fixed length integer
84             sub _encode_int($$) {
85 0     0   0 my ($int, $len) = @_;
86 0 0 0     0 return substr pack('V', $int), 0, $len if $len >= 1 and $len <= 4;
87 0 0 0     0 return substr pack('VV', int $int % 2 ** 32, int $int / 2 ** 32), 0, $len if $len == 6 or $len = 8;
88 0         0 return undef;
89             }
90              
91             # encode length coded integer
92             sub _encode_lcint($) {
93 29     29   4078 my $int = shift;
94             return
95 29 100       22616 !defined $int ? pack 'C', 251 :
    100          
    100          
    100          
96             $int <= 250 ? pack 'C', $int :
97             $int <= 0xffff ? pack 'Cv', 252, $int :
98             $int <= 0xffffff ? substr pack('CV', 253, $int), 0, 4 :
99             pack 'CVV', 254, int $int % 2 ** 32, int $int / 2 ** 32;
100             }
101              
102             # encode length coded string
103             sub _encode_lcstr($) {
104 12     12   14975 my $str = shift;
105 12 100       44 return defined $str ? _encode_lcint(length $str) . $str : _encode_lcint($str);
106             }
107              
108             # get fixed length integer
109             sub _get_int {
110 109     109   9146 my ($self, $len, $chew) = @_;
111 109 100       234 my $data = $chew ? substr $self->{incoming}, 0, $len, '' : substr $self->{incoming}, 0, $len;
112 109 100       308 return unpack 'C', $data if $len == 1;
113 31 100 66     171 return unpack 'V', $data . "\0\0" if $len >= 2 and $len <= 4;
114 6 50       14 return undef unless $len == 8;
115 6         17 my $lo = unpack ('V', substr $data, 0, 4);
116 6         10 my $hi = unpack('V', substr $data, 4, 4);
117 6 100       27 return $hi ?
118             int $lo + int $hi * 2 ** 32 : $lo;
119             }
120              
121 81     81   105 sub _chew_int { shift->_get_int(shift, 1) }
122              
123             # get length coded integer
124             sub _chew_lcint {
125 41     41   71 my $self = shift;
126 41         61 my $first = $self->_chew_int(1);
127             return
128 41 50       132 $first < 251 ? $first :
    100          
    100          
    100          
    100          
129             $first == 251 ? undef :
130             $first == 252 ? $self->_chew_int(2) :
131             $first == 253 ? $self->_chew_int(3) :
132             $first == 254 ? $self->_chew_int(8) : undef;
133             }
134              
135             # get length coded string
136             sub _chew_lcstr {
137 14     14   38 my $self = shift;
138 14         22 my $len = $self->_chew_lcint;
139 14 100       10335 return defined $len ? substr $self->{incoming}, 0, $len, '' : undef;
140             }
141              
142             # get zero ending string
143             sub _chew_zstr {
144 3     3   4 my $self = shift;
145 3         7 my $str = unpack 'Z*', $self->{incoming};
146 3 50       6 return undef unless defined $str;
147 3         4 substr $self->{incoming}, 0, length($str) + 1, '';
148 3         6 return $str;
149             }
150              
151             # get fixed length string
152             sub _chew_str {
153 8     8   6 my ($self, $len) = @_;
154 8 50       12 die "_chew_str($len) error" if $len > length $self->{incoming};
155 8         10 return substr $self->{incoming}, 0, $len, '';
156             }
157              
158              
159             sub _send_auth {
160 0     0   0 my $self = shift;
161              
162 0         0 my ($username, $password, $database, $crypt) =
163             ($self->url->username, $self->url->password, $self->url->database, '');
164              
165 0         0 my @flags = qw(LONG_PASSWORD LONG_FLAG PROTOCOL_41 TRANSACTIONS SECURE_CONNECTION MULTI_RESULTS);
166 0 0       0 push @flags, 'CONNECT_WITH_DB' if $database;
167 0 0       0 push @flags, 'MULTI_STATEMENTS' if $self->url->options->{multi_statements};
168 0 0       0 push @flags, 'FOUND_ROWS' if $self->url->options->{found_rows};
169 0         0 my $flags = _flag_set(CLIENT_CAPABILITY, @flags);
170              
171 0         0 warn '>>> AUTH ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
172             ' user:', $username, ' database:', $database,
173             ' flags:', _flag_list(CLIENT_CAPABILITY, $flags),
174             '(', sprintf('%08X', $flags), ')', "\n" if DEBUG > 1;
175              
176 0         0 _utf8_off $username; _utf8_off $password; _utf8_off $database;
  0         0  
  0         0  
177              
178 0 0       0 if ($password) {
179 0         0 my $crypt1 = sha1($password);
180 0         0 my $crypt2 = sha1($self->{auth_plugin_data} . sha1 $crypt1);
181 0         0 $crypt = $crypt1 ^ $crypt2;
182             }
183              
184 0         0 $self->state('auth');
185 0         0 delete $self->{auth_plugin_data};
186 0 0 0     0 return pack 'VVCx23Z*a*Z*',
187             $flags, 131072, ($self->url->options->{utf8} // 1) ? CHARSET->{UTF8} : CHARSET->{BINARY},
188             $username, _encode_lcstr($crypt), $database, 'mysql_native_password';
189             }
190              
191             sub _send_quit {
192 0     0   0 my $self = shift;
193 0         0 warn '>>> QUIT ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
194 0         0 $self->state('quit');
195 0         0 return pack 'C', 1;
196             }
197              
198             sub _send_query {
199 0     0   0 my $self = shift;
200 0         0 my $sql = $self->{sql};
201 0         0 warn '>>> QUERY ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
202             " sql:$sql\n" if DEBUG > 1;
203 0         0 _utf8_off $sql;
204 0         0 $self->state('query');
205 0         0 return pack('C', 3) . $sql;
206             }
207              
208             sub _send_ping {
209 0     0   0 my $self = shift;
210 0         0 warn '>>> PING ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
211 0         0 $self->state('ping');
212 0         0 return pack 'C', 14;
213             }
214              
215             sub _recv_error {
216 1     1   1 my $self = shift;
217 1         2 my $first = $self->_chew_int(1);
218 1 50       5 die "_recv_error() wrong packet $first" unless $first == 255;
219              
220 1         2 $self->{error_code} = $self->_chew_int(2);
221 1         2 $self->_chew_str(1);
222 1         2 $self->{sql_state} = $self->_chew_str(5);
223 1         2 $self->{error_message} = $self->_chew_zstr;
224              
225 1         1 warn '<<< ERROR ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
226             ' error:', $self->{error_code},
227             ' state:', $self->{sql_state},
228             ' message:', $self->{error_message}, "\n" if DEBUG > 1;
229              
230 1         3 $self->state('idle');
231 1         9 $self->emit(error => $self->{error_message});
232             }
233              
234             sub _recv_ok {
235 2     2   3606 my $self = shift;
236 2         4 my $first = $self->_get_int(1);
237 2 100       7 return $self->_recv_error if $first == 255;
238 1 50       3 die "_recv_ok() wrong packet $first" unless $first == 0;
239              
240 1         2 $self->_chew_int(1);
241 1         3 $self->{affected_rows} = $self->_chew_lcint;
242 1         2 $self->{last_insert_id} = $self->_chew_lcint;
243 1         2 $self->{status_flags} = $self->_chew_int(2);
244 1         2 $self->{warnings_count} = $self->_chew_int(2);
245 1         2 $self->{field_count} = 0;
246              
247 1         1 warn '<<< OK ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
248             ' affected:', $self->{affected_rows},
249             ' last_insert_id:', $self->{last_insert_id},
250             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
251             '(', sprintf('%04X', $self->{status_flags}), ')',
252             ' warnings:', $self->{warnings_count}, "\n" if DEBUG > 1;
253              
254 1 50       3 $self->emit('connect') if $self->state eq 'auth';
255 1 50       7 $self->emit('end') if $self->state eq 'query';
256 1         5 $self->state('idle');
257             }
258              
259             sub _recv_query_responce {
260 1     1   3 my $self = shift;
261 1         3 my $first = $self->_get_int(1);
262 1 50       3 return $self->_recv_error if $first == 255;
263 1 50       4 return $self->_recv_ok if $first == 0;
264              
265 1         2 $self->{field_count} = $self->_chew_lcint;
266              
267 1         1 warn '<<< QUERY_RESPONSE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
268             ' fields:', $self->{field_count}, "\n" if DEBUG > 1;
269              
270 1         3 $self->state('field');
271             }
272              
273             sub _recv_eof {
274 2     2   2 my $self = shift;
275 2         3 my $first = $self->_get_int(1);
276 2 50       28 return $self->_recv_error if $first == 255;
277 2 50       5 die "_recv_eof() wrong packet $first" unless $first == 254;
278              
279 2         3 $self->_chew_int(1);
280 2         3 $self->{warnings_count} = $self->_chew_int(2);
281 2         3 $self->{status_flags} = $self->_chew_int(2);
282              
283 2         2 warn '<<< EOF ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
284             ' warnings:', $self->{warnings_count},
285             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
286             '(', sprintf('%04X', $self->{status_flags}), ')', "\n" if DEBUG > 1;
287              
288 2 100       6 if ($self->state eq 'field') {
    50          
289 1         8 $self->emit(fields => $self->{column_info});
290 1         16 $self->state('result');
291             }
292             elsif ($self->state eq 'result') {
293 1         10 $self->{column_info} = [];
294 1 50       3 if ($self->{status_flags} & 0x0008) {
295             # MORE_RESULTS
296 0         0 $self->state('query');
297             }
298             else {
299 1         2 $self->emit(end => undef);
300 1         6 $self->state('idle');
301             }
302             }
303             }
304              
305             sub _recv_field {
306 2     2   1516 my $self = shift;
307 2         5 my $first = $self->_get_int(1);
308 2 50       4 return $self->_recv_error if $first == 255;
309 2 100       6 return $self->_recv_eof if $first == 254;
310 1 50       2 die "_recv_field() wrong packet $first" if $first > 250;
311              
312 1         2 my $field = {};
313 1         3 $field->{catalog} = $self->_chew_lcstr;
314 1         2 $field->{schema} = $self->_chew_lcstr;
315 1         2 $field->{table} = $self->_chew_lcstr;
316 1         2 $field->{org_table} = $self->_chew_lcstr;
317 1         2 $field->{name} = $self->_chew_lcstr;
318 1         2 $field->{org_name} = $self->_chew_lcstr;
319 1         2 $self->_chew_lcint;
320 1         2 $field->{character_set} = $self->_chew_int(2);
321 1         2 $field->{column_length} = $self->_chew_int(4);
322 1         2 $field->{column_type} = $self->_chew_int(1);
323 1         2 $field->{flags} = $self->_chew_int(2);
324 1         2 $field->{decimals} = $self->_chew_int(1);
325 1         2 $self->_chew_str(2);
326              
327 1 50 50     4 do { _utf8_on $field->{$_} for qw(catalog schema table org_table name org_name) }
  1         9  
328             if ($self->url->options->{utf8} // 1);
329              
330 1         2 push @{$self->{column_info}}, $field;
  1         2  
331              
332 1         2 warn '<<< FIELD ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
333             ' name:', $field->{name},
334             ' type:', REV_DATATYPE->{chr $field->{column_type}}, '(', $field->{column_type}, ')',
335             ' length:', $field->{column_length},
336             ' charset:', REV_CHARSET->{$field->{character_set}} // 'UNKNOWN', '(', $field->{character_set}, ')',
337             ' flags:', _flag_list(FIELD_FLAG, $field->{flags}), '(', $field->{flags}, ')', , "\n" if DEBUG > 1;
338             }
339              
340             sub _recv_row {
341 3     3   2257 my $self = shift;
342 3         4 my $first = $self->_get_int(1);
343 3 50       5 return $self->_recv_error if $first == 255;
344 3 100       6 return $self->_recv_eof if $first == 254;
345              
346 2         2 my @row;
347 2         5 for (0 .. $self->{field_count} - 1) {
348 2         2 $row[$_] = $self->_chew_lcstr;
349 2 50       9 _utf8_on $row[$_]
350             if $self->{column_info}->[$_]->{character_set} == CHARSET->{UTF8};
351             }
352              
353             warn '<<< ROW ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
354 2         2 join(', ', map { defined $_ ? "'" . $_ . "'" : 'null' } @row), "\n" if DEBUG > 1;
355              
356 2         6 $self->emit(result => \@row);
357             }
358              
359             sub _recv_handshake {
360 1     1   27518 my $self = shift;
361 1         3 my $first = $self->_get_int(1);
362 1 50       3 return $self->_recv_error if $first == 255;
363              
364 1         4 $self->{protocol_version} = $self->_chew_int(1);
365 1         9 $self->{server_version} = $self->_chew_zstr;
366 1         3 $self->{connection_id} = $self->_chew_int(4);
367 1         4 $self->{auth_plugin_data} = $self->_chew_str(8);
368 1         3 $self->_chew_str(1);
369 1         7 $self->{capability_flags} = $self->_chew_int(2);
370 1         2 $self->{character_set} = $self->_chew_int(1);
371 1         3 $self->{status_flags} = $self->_chew_int(2);
372 1         2 $self->{capability_flags} |= $self->_chew_int(2) << 16;
373 1         2 my $auth_len = $self->_chew_int(1);
374 1         2 $self->_chew_str(10);
375 1         2 $self->{auth_plugin_data} .= $self->_chew_str(12);
376 1         2 $self->_chew_str(1);
377 1         2 my $auth_plugin_name = $self->_chew_zstr;
378              
379 1         1 warn '<<< HANDSHAKE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
380             ' protocol:', $self->{protocol_version},
381             ' version:', $self->{server_version},
382             ' connection:', $self->{connection_id},
383             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
384             '(', sprintf('%04X', $self->{status_flags}), ')',
385             ' capabilities:', _flag_list(CLIENT_CAPABILITY, $self->{capability_flags}),
386             '(', sprintf('%08X', $self->{capability_flags}), ')',
387             ' auth:', $auth_plugin_name, "\n" if DEBUG > 1;
388              
389 1 50       3 die '_recv_handshake() invalid protocol version ' . $self->{protocol_version}
390             unless $self->{protocol_version} == 10;
391 1 50       3 die '_recv_handshake() unsupported auth method ' . $auth_plugin_name
392             unless $auth_plugin_name eq 'mysql_native_password';
393 1 50 33     5 die '_recv_handshake() invalid auth data '
394             unless $auth_len == 21 and length($self->{auth_plugin_data}) == 20;
395              
396 1         3 $self->state('handshake');
397             }
398              
399             sub _reset {
400 1     1   1166 my $self = shift;
401              
402 1         7 undef $self->{$_} for qw(error_code sql_state error_message
403             affected_rows last_insert_id status_flags warnings_count field_count);
404              
405 1         3 $self->{column_info} = [];
406 1         2 $self->{seq} = 0;
407 1         2 $self->{incoming} = '';
408             }
409              
410             sub _ioloop {
411 0 0 0 0   0 $_[1] ? Mojo::IOLoop->singleton : ($_[0]->{ioloop} ||= Mojo::IOLoop->new);
412             }
413              
414             sub _seq_next_ready {
415 0     0   0 my $self = shift;
416 0 0       0 return 0 if length $self->{incoming} < 4;
417 0         0 return length($self->{incoming}) - 4 >= $self->_get_int(3);
418             }
419              
420             sub _seq_next {
421 0     0   0 my ($self, $cmd, $writeonly) = @_;
422 0         0 my $next = SEQ->{$cmd}{$self->state};
423 0         0 warn 'stream state:', $self->state, ' doing:', $cmd, ' next:', ($next // ''), "\n" if DEBUG > 2;
424 0 0       0 return unless $next;
425 0 0       0 if (substr($next, 0, 6) eq '_send_') {
    0          
426 0         0 my $packet = $self->$next();
427 0         0 $self->{stream}->write(_encode_int(length $packet, 3) . _encode_int($self->{seq}, 1) . $packet);
428             }
429             elsif (substr($next, 0, 6) eq '_recv_') {
430 0 0       0 return if $writeonly;
431 0         0 my ($len, $seq) = ($self->_chew_int(3), $self->_chew_int(1));
432 0 0 0     0 die "_next_packet() packet out of order $seq " . $self->{seq}
433             if $self->{seq} and $seq != (($self->{seq} + 1) & 0xff);
434 0 0       0 die "_next_packet() not ready" if $len > length($self->{incoming});
435 0         0 $self->{seq}++;
436 0         0 $self->$next();
437             }
438             else {
439 0         0 $self->$next();
440             }
441             }
442              
443             sub _seq {
444 0     0   0 my ($self, $cmd, $cb) = @_;
445              
446 0         0 $self->{stream} = Mojo::IOLoop::Stream->new($self->{socket});
447 0 0       0 $self->{stream}->reactor($self->_ioloop(0)->reactor) unless $cb;
448 0 0       0 $self->{stream}->timeout($self->url->options->{query_timeout})
449             if $self->url->options->{query_timeout};
450 0         0 weaken $self;
451              
452             $self->{stream}->on(read => sub {
453 0     0   0 my ($stream, $bytes) = @_;
454 0         0 $self->{incoming} .= $bytes;
455              
456 0         0 $self->_seq_next($cmd, 0) while $self->_seq_next_ready;
457              
458 0 0       0 if ($self->state eq 'idle') {
459 0         0 $stream->steal_handle;
460 0         0 delete $self->{stream};
461 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
462             }
463             else {
464 0         0 $self->_seq_next($cmd, 1);
465             }
466 0         0 });
467             $self->{stream}->on(error => sub {
468 0     0   0 my ($stream, $err) = @_;
469 0         0 warn "stream error: $err state:", $self->state, "\n" if DEBUG;
470 0   0     0 $self->{error_message} //= $err;
471 0         0 $self->emit(error => $err);
472 0         0 });
473             $self->{stream}->on(timeout => sub {
474 0     0   0 warn "stream timeout state:", $self->state, "\n" if DEBUG;
475 0   0     0 $self->{error_message} //= 'timeout';
476 0         0 });
477             $self->{stream}->on(close => sub {
478 0     0   0 warn 'stream closed state:', $self->state, "\n" if DEBUG;
479 0         0 $self->{socket} = undef;
480 0         0 $self->state('disconnected');
481 0 0 0     0 $self->{error_message} //= 'disconnected' unless $cmd eq 'disconnect';
482 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
483 0         0 });
484              
485 0         0 $self->{stream}->start;
486 0         0 $self->_seq_next($cmd, 1);
487             }
488              
489             sub _cmd {
490 0     0   0 my ($self, $cmd, $cb) = @_;
491 0 0       0 die 'invalid cmd:' . $cmd unless exists SEQ->{$cmd};
492 0 0       0 die 'invalid state:' . $self->state . ' doing:'. $cmd unless exists SEQ->{$cmd}{$self->state};
493              
494 0         0 $self->_reset;
495 0         0 $self->_seq($cmd, $cb);
496 0 0       0 $self->_ioloop(0)->start unless $cb;
497 0 0       0 return $self->state eq 'idle' ? 1 : 0;
498             }
499              
500             sub connect {
501 0     0 1 0 my ($self, $cb) = @_;
502              
503 0         0 $self->state('connecting');
504 0         0 $self->_reset;
505              
506 0 0 0     0 if ($self->url->host eq '' or $self->url->host eq 'localhost') {
507 0 0       0 if (!$self->url->options->{socket}) {
508 0         0 $self->url->options->{socket} = `mysql_config --socket`;
509 0         0 chomp $self->url->options->{socket};
510             }
511 0         0 warn "Connecting to UNIX socket '", $self->url->options->{socket}, "'\n" if DEBUG;
512 0   0     0 $self->{socket} = IO::Socket::UNIX->new(
513             Peer => $self->url->options->{socket},
514             Timeout => $self->url->options->{connect_timeout} // 10,
515             Blocking => 0
516             );
517 0 0       0 return unless $self->{socket};
518 0         0 $self->state('connected');
519 0         0 $self->_seq('connect', $cb);
520             }
521             else {
522 0         0 $self->{client} = Mojo::IOLoop::Client->new;
523 0 0       0 $self->{client}->reactor($self->_ioloop(0)->reactor) unless $cb;
524 0         0 weaken $self;
525              
526             $self->{client}->on(connect => sub {
527 0     0   0 my ($client, $handle) = @_;
528 0         0 delete $self->{client};
529 0         0 $self->{socket} = $handle;
530 0         0 $self->state('connected');
531 0         0 $self->_seq('connect', $cb);
532 0         0 });
533             $self->{client}->on(error => sub {
534 0     0   0 my ($client, $err) = @_;
535 0         0 delete $self->{client};
536 0         0 $self->state('disconnected');
537 0         0 $self->emit(error => $err);
538 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
539 0         0 });
540              
541 0   0     0 $self->{client}->connect(
      0        
      0        
542             address => $self->url->host || '127.0.0.1',
543             port => $self->url->port || 3306,
544             timeout => $self->url->options->{connect_timeout} // 10
545             );
546             }
547              
548 0 0       0 $self->_ioloop(0)->start unless $cb;
549             }
550              
551 0     0 1 0 sub disconnect { shift->_cmd('disconnect') }
552              
553             sub ping {
554 0     0 1 0 my ($self, $cb) = @_;
555 0 0       0 return $self->state eq 'disconnected' ? 0 : $self->_cmd('ping', $cb);
556             }
557              
558             sub query {
559 0     0 1 0 my ($self, $sql, $cb) = @_;
560 0         0 $self->{sql} = $sql;
561 0         0 $self->_cmd('query', $cb);
562             }
563              
564             sub DESTROY {
565 1     1   1016 my $self = shift;
566 1 50 33     3 $self->disconnect if $self->state eq 'idle' and $self->{socket};
567             }
568              
569             # Private util functions
570             sub _flag_list($$;$) {
571 0     0     my ($list, $data, $sep) = @_;
572 0           my $i = 0;
573 0   0       return join $sep || '|', grep { $data & 1 << $i++ } @$list;
  0            
574             }
575              
576             sub _flag_set($;@) {
577 0     0     my ($list, @ops) = @_;
578 0           my ($i, $flags) = (0, 0);
579 0           foreach my $flag (@$list) {
580 0 0         do { $flags |= 1 << $i if $_ eq $flag } for @ops;
  0            
581 0           $i++;
582             }
583 0           return $flags;
584             }
585              
586             sub _flag_is($$$) {
587 0     0     my ($list, $data, $flag) = @_;
588 0           my $i = 0;
589 0           foreach (@$list) {
590 0 0         return $data & 1 << $i if $flag eq $_;
591 0           $i++;
592             }
593 0           return undef;
594             }
595              
596             1;
597              
598             =encoding utf8
599              
600             =head1 NAME
601              
602             Mojo::MySQL5::Connection - TCP connection to MySQL Server
603              
604             =head1 SYNOPSIS
605              
606             use Mojo::MySQL5::Conection;
607              
608             my $c = Mojo::MySQL5::Conection->new(
609             url => Mojo::MySQL5->new(
610             'mysql://test:password@127.0.0.1:3306/test?found_rows=1&connect_timeout=2')
611             );
612              
613             Mojo::IOLoop->delay(
614             sub {
615             my $delay = shift;
616             $c->connect($delay->begin);
617             },
618             sub {
619             my ($delay, $c) = @_;
620             $c->query('select * from test_data', $delay->begin);
621             },
622             sub {
623             my ($delay, $c) = @_;
624             }
625             )->wait;
626              
627              
628             =head1 DESCRIPTION
629              
630             L is Asyncronous Protocol Implementation for connection to MySQL Server
631             managed by L.
632              
633             =head1 EVENTS
634              
635             L inherits all events from L and can emit the
636             following new ones.
637              
638             =head2 fields
639              
640             $c->on(fields => sub {
641             my ($c, $fields) = @_;
642             ...
643             });
644              
645             Emitted after posting query and fields definition is received.
646              
647             =head2 result
648              
649             $c->on(result => sub {
650             my ($c, $result) = @_;
651             ...
652             });
653              
654             Emited when a result row is received.
655              
656             =head2 end
657              
658             $c->on(end => sub {
659             my $c = shift;
660             ...
661             });
662              
663             Emited when query ended successfully.
664              
665             =head2 error
666              
667             $c->on(error => sub {
668             my ($c, $error) = @_;
669             ...
670             });
671              
672             Emited when Error is received.
673              
674              
675             =head1 ATTRIBUTES
676              
677             L implements the following attributes.
678              
679             =head2 state
680              
681             my $state = $c->state;
682             $c->state('disconnected');
683              
684             Connection State.
685              
686             Possible States are:
687              
688             =over 2
689              
690             =item disconnected
691              
692             Initial state before connecting to server.
693              
694             Same state after fatal erorr.
695              
696             =item connected
697              
698             Connection to server is established.
699              
700             Waiting for C packet.
701              
702             =item handshake
703              
704             Server responded with C.
705              
706             Next send C (authentication) packet.
707              
708             =item auth
709              
710             C (authentication) packet sent to server.
711              
712             Next wait for C or C packet.
713              
714             =item idle
715              
716             Connection is idle and ready for sending commands.
717              
718             =item query
719              
720             C packet sent to server.
721              
722             Waiting for C packet. C is expected for non-SELECT queries.
723              
724             =item field
725              
726             Waiting for C packets. C is expected for end of column definition.
727              
728             =item result
729              
730             Waiting for C packets. C is expected for end of result rows.
731              
732             =item ping
733              
734             C packet is sent to server.
735              
736             Waitint for C packet.
737              
738             =back
739              
740             =head2 url
741              
742             my $url = $c->url;
743             $c->url(Mojo::MySQL5::URL->new('mysql://localhost/test');
744              
745             MySQL Connection URL.
746              
747             Supported Options are:
748              
749             =over 2
750              
751             =item found_rows
752              
753             Enables or disables the flag C while connecting to the MySQL server.
754             Without C, if you perform a query like
755            
756             UPDATE $table SET id = 1 WHERE id = 1;
757            
758             then the MySQL engine will return 0, because no rows have changed.
759             With C, it will return the number of rows that have an id 1.
760              
761             =item multi_statements
762              
763             Enables or disables the flag C while connecting to the server.
764             If enabled multiple statements separated by semicolon (;) can be send with single
765             call to L.
766              
767             =item utf8
768              
769             If enabled default character set is to C while connecting to the server.
770             If disabled C is the default character set.
771              
772             =item connect_timeout
773              
774             The connect request to the server will timeout if it has not been successful
775             after the given number of seconds.
776              
777             =item query_timeout
778              
779             If enabled, the read or write operation to the server will timeout
780             if it has not been successful after the given number of seconds.
781              
782             =item socket
783              
784             Unix socket that is used for connecting to the server.
785              
786             Determined by calling C unless specified.
787              
788             Unix socket is used if host part of L is C<''> or C<'localhost'>.
789             Use C<'127.0.0.1'> to connect to local machine via TCP.
790              
791             =back
792              
793             =head1 METHODS
794              
795             L inherits all methods from L and
796             implements the following new ones.
797              
798             =head2 connect
799              
800             # Blocking
801             $c->connect;
802             # Non-Blocking
803             $c->connect(sub { ... });
804              
805             Connect and authenticate to MySQL Server.
806              
807             =head2 disconnect
808              
809             $c->disconnect;
810              
811             Disconnect gracefully from server.
812              
813             =head2 ping
814            
815             say "ok" if $c->ping;
816              
817             Check if connection is alive.
818              
819             =head2 query
820              
821             # Blocking
822             $c->query('select 1 as `one`');
823             # Non-Blocking
824             $c->query('select 1 as `one`', sub { ... });
825              
826             Send SQL query to server.
827             Results are handled by events.
828              
829             =head1 DEBUGGING
830              
831             Debugging is enabled if environment variable MOJO_MYSQL_DEBUG is set.
832              
833             Packet tracing is enabled if MOJO_MYSQL_DEBUG is 2 or greater.
834              
835             =head1 AUTHOR
836              
837             Svetoslav Naydenov, C.
838              
839             =head1 COPYRIGHT AND LICENSE
840              
841             Copyright (C) 2015, Svetoslav Naydenov.
842              
843             This program is free software, you can redistribute it and/or modify it under
844             the terms of the Artistic License version 2.0.
845              
846             =head1 SEE ALSO
847              
848             L,
849              
850             L.
851              
852             =cut