| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
/* |
|
2
|
|
|
|
|
|
|
* ***** BEGIN LICENSE BLOCK ***** |
|
3
|
|
|
|
|
|
|
* Version: MIT |
|
4
|
|
|
|
|
|
|
* |
|
5
|
|
|
|
|
|
|
* Portions created by Alan Antonuk are Copyright (c) 2012-2014 |
|
6
|
|
|
|
|
|
|
* Alan Antonuk. All Rights Reserved. |
|
7
|
|
|
|
|
|
|
* |
|
8
|
|
|
|
|
|
|
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. |
|
9
|
|
|
|
|
|
|
* All Rights Reserved. |
|
10
|
|
|
|
|
|
|
* |
|
11
|
|
|
|
|
|
|
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 |
|
12
|
|
|
|
|
|
|
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. |
|
13
|
|
|
|
|
|
|
* |
|
14
|
|
|
|
|
|
|
* Permission is hereby granted, free of charge, to any person |
|
15
|
|
|
|
|
|
|
* obtaining a copy of this software and associated documentation |
|
16
|
|
|
|
|
|
|
* files (the "Software"), to deal in the Software without |
|
17
|
|
|
|
|
|
|
* restriction, including without limitation the rights to use, copy, |
|
18
|
|
|
|
|
|
|
* modify, merge, publish, distribute, sublicense, and/or sell copies |
|
19
|
|
|
|
|
|
|
* of the Software, and to permit persons to whom the Software is |
|
20
|
|
|
|
|
|
|
* furnished to do so, subject to the following conditions: |
|
21
|
|
|
|
|
|
|
* |
|
22
|
|
|
|
|
|
|
* The above copyright notice and this permission notice shall be |
|
23
|
|
|
|
|
|
|
* included in all copies or substantial portions of the Software. |
|
24
|
|
|
|
|
|
|
* |
|
25
|
|
|
|
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
26
|
|
|
|
|
|
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
27
|
|
|
|
|
|
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
|
28
|
|
|
|
|
|
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
|
29
|
|
|
|
|
|
|
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
|
30
|
|
|
|
|
|
|
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|
31
|
|
|
|
|
|
|
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
32
|
|
|
|
|
|
|
* SOFTWARE. |
|
33
|
|
|
|
|
|
|
* ***** END LICENSE BLOCK ***** |
|
34
|
|
|
|
|
|
|
*/ |
|
35
|
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
#ifdef HAVE_CONFIG_H |
|
37
|
|
|
|
|
|
|
#include "config.h" |
|
38
|
|
|
|
|
|
|
#endif |
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
#ifdef _MSC_VER |
|
41
|
|
|
|
|
|
|
#define _CRT_SECURE_NO_WARNINGS |
|
42
|
|
|
|
|
|
|
#endif |
|
43
|
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
#include "amqp_private.h" |
|
45
|
|
|
|
|
|
|
#include "amqp_socket.h" |
|
46
|
|
|
|
|
|
|
#include "amqp_table.h" |
|
47
|
|
|
|
|
|
|
#include "amqp_time.h" |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
#include |
|
50
|
|
|
|
|
|
|
#include |
|
51
|
|
|
|
|
|
|
#include |
|
52
|
|
|
|
|
|
|
#include |
|
53
|
|
|
|
|
|
|
#include |
|
54
|
|
|
|
|
|
|
#include |
|
55
|
|
|
|
|
|
|
#include |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
#include |
|
58
|
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) |
|
60
|
|
|
|
|
|
|
#ifndef WIN32_LEAN_AND_MEAN |
|
61
|
|
|
|
|
|
|
#define WIN32_LEAN_AND_MEAN |
|
62
|
|
|
|
|
|
|
#endif |
|
63
|
|
|
|
|
|
|
#include |
|
64
|
|
|
|
|
|
|
#include |
|
65
|
|
|
|
|
|
|
#else |
|
66
|
|
|
|
|
|
|
#include |
|
67
|
|
|
|
|
|
|
/* On older BSD types.h must come before net includes */ |
|
68
|
|
|
|
|
|
|
#include |
|
69
|
|
|
|
|
|
|
#include |
|
70
|
|
|
|
|
|
|
#ifdef HAVE_SELECT |
|
71
|
|
|
|
|
|
|
#include |
|
72
|
|
|
|
|
|
|
#endif |
|
73
|
|
|
|
|
|
|
#include |
|
74
|
|
|
|
|
|
|
#include |
|
75
|
|
|
|
|
|
|
#include |
|
76
|
|
|
|
|
|
|
#include |
|
77
|
|
|
|
|
|
|
#ifdef HAVE_POLL |
|
78
|
|
|
|
|
|
|
#include |
|
79
|
|
|
|
|
|
|
#endif |
|
80
|
|
|
|
|
|
|
#include |
|
81
|
|
|
|
|
|
|
#endif |
|
82
|
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
static int amqp_id_in_reply_list(amqp_method_number_t expected, |
|
84
|
|
|
|
|
|
|
amqp_method_number_t *list); |
|
85
|
|
|
|
|
|
|
|
|
86
|
38
|
|
|
|
|
|
static int amqp_os_socket_init(void) { |
|
87
|
|
|
|
|
|
|
#ifdef _WIN32 |
|
88
|
|
|
|
|
|
|
static int called_wsastartup = 0; |
|
89
|
|
|
|
|
|
|
if (!called_wsastartup) { |
|
90
|
|
|
|
|
|
|
WSADATA data; |
|
91
|
|
|
|
|
|
|
int res = WSAStartup(0x0202, &data); |
|
92
|
|
|
|
|
|
|
if (res) { |
|
93
|
|
|
|
|
|
|
return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR; |
|
94
|
|
|
|
|
|
|
} |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
called_wsastartup = 1; |
|
97
|
|
|
|
|
|
|
} |
|
98
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
#else |
|
101
|
38
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
102
|
|
|
|
|
|
|
#endif |
|
103
|
|
|
|
|
|
|
} |
|
104
|
|
|
|
|
|
|
|
|
105
|
401
|
|
|
|
|
|
int amqp_os_socket_error(void) { |
|
106
|
|
|
|
|
|
|
#ifdef _WIN32 |
|
107
|
|
|
|
|
|
|
return WSAGetLastError(); |
|
108
|
|
|
|
|
|
|
#else |
|
109
|
401
|
|
|
|
|
|
return errno; |
|
110
|
|
|
|
|
|
|
#endif |
|
111
|
|
|
|
|
|
|
} |
|
112
|
|
|
|
|
|
|
|
|
113
|
37
|
|
|
|
|
|
int amqp_os_socket_close(int sockfd) { |
|
114
|
|
|
|
|
|
|
#ifdef _WIN32 |
|
115
|
|
|
|
|
|
|
return closesocket(sockfd); |
|
116
|
|
|
|
|
|
|
#else |
|
117
|
37
|
|
|
|
|
|
return close(sockfd); |
|
118
|
|
|
|
|
|
|
#endif |
|
119
|
|
|
|
|
|
|
} |
|
120
|
|
|
|
|
|
|
|
|
121
|
533
|
|
|
|
|
|
ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, |
|
122
|
|
|
|
|
|
|
int flags) { |
|
123
|
533
|
50
|
|
|
|
|
assert(self); |
|
124
|
533
|
50
|
|
|
|
|
assert(self->klass->send); |
|
125
|
533
|
|
|
|
|
|
return self->klass->send(self, buf, len, flags); |
|
126
|
|
|
|
|
|
|
} |
|
127
|
|
|
|
|
|
|
|
|
128
|
787
|
|
|
|
|
|
ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, |
|
129
|
|
|
|
|
|
|
int flags) { |
|
130
|
787
|
50
|
|
|
|
|
assert(self); |
|
131
|
787
|
50
|
|
|
|
|
assert(self->klass->recv); |
|
132
|
787
|
|
|
|
|
|
return self->klass->recv(self, buf, len, flags); |
|
133
|
|
|
|
|
|
|
} |
|
134
|
|
|
|
|
|
|
|
|
135
|
0
|
|
|
|
|
|
int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { |
|
136
|
0
|
0
|
|
|
|
|
assert(self); |
|
137
|
0
|
0
|
|
|
|
|
assert(self->klass->open); |
|
138
|
0
|
|
|
|
|
|
return self->klass->open(self, host, port, NULL); |
|
139
|
|
|
|
|
|
|
} |
|
140
|
|
|
|
|
|
|
|
|
141
|
38
|
|
|
|
|
|
int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, |
|
142
|
|
|
|
|
|
|
const struct timeval *timeout) { |
|
143
|
38
|
50
|
|
|
|
|
assert(self); |
|
144
|
38
|
50
|
|
|
|
|
assert(self->klass->open); |
|
145
|
38
|
|
|
|
|
|
return self->klass->open(self, host, port, timeout); |
|
146
|
|
|
|
|
|
|
} |
|
147
|
|
|
|
|
|
|
|
|
148
|
5
|
|
|
|
|
|
int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) { |
|
149
|
5
|
50
|
|
|
|
|
assert(self); |
|
150
|
5
|
50
|
|
|
|
|
assert(self->klass->close); |
|
151
|
5
|
|
|
|
|
|
return self->klass->close(self, force); |
|
152
|
|
|
|
|
|
|
} |
|
153
|
|
|
|
|
|
|
|
|
154
|
74
|
|
|
|
|
|
void amqp_socket_delete(amqp_socket_t *self) { |
|
155
|
74
|
100
|
|
|
|
|
if (self) { |
|
156
|
38
|
50
|
|
|
|
|
assert(self->klass->delete); |
|
157
|
38
|
|
|
|
|
|
self->klass->delete (self); |
|
158
|
|
|
|
|
|
|
} |
|
159
|
74
|
|
|
|
|
|
} |
|
160
|
|
|
|
|
|
|
|
|
161
|
780
|
|
|
|
|
|
int amqp_socket_get_sockfd(amqp_socket_t *self) { |
|
162
|
780
|
50
|
|
|
|
|
assert(self); |
|
163
|
780
|
50
|
|
|
|
|
assert(self->klass->get_sockfd); |
|
164
|
780
|
|
|
|
|
|
return self->klass->get_sockfd(self); |
|
165
|
|
|
|
|
|
|
} |
|
166
|
|
|
|
|
|
|
|
|
167
|
438
|
|
|
|
|
|
int amqp_poll(int fd, int event, amqp_time_t deadline) { |
|
168
|
|
|
|
|
|
|
#ifdef HAVE_POLL |
|
169
|
|
|
|
|
|
|
struct pollfd pfd; |
|
170
|
|
|
|
|
|
|
int res; |
|
171
|
|
|
|
|
|
|
int timeout_ms; |
|
172
|
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
/* Function should only ever be called with one of these two */ |
|
174
|
438
|
100
|
|
|
|
|
assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT); |
|
|
|
50
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
start_poll: |
|
177
|
438
|
|
|
|
|
|
pfd.fd = fd; |
|
178
|
438
|
|
|
|
|
|
switch (event) { |
|
179
|
|
|
|
|
|
|
case AMQP_SF_POLLIN: |
|
180
|
395
|
|
|
|
|
|
pfd.events = POLLIN; |
|
181
|
395
|
|
|
|
|
|
break; |
|
182
|
|
|
|
|
|
|
case AMQP_SF_POLLOUT: |
|
183
|
43
|
|
|
|
|
|
pfd.events = POLLOUT; |
|
184
|
43
|
|
|
|
|
|
break; |
|
185
|
|
|
|
|
|
|
} |
|
186
|
|
|
|
|
|
|
|
|
187
|
438
|
|
|
|
|
|
timeout_ms = amqp_time_ms_until(deadline); |
|
188
|
438
|
50
|
|
|
|
|
if (-1 > timeout_ms) { |
|
189
|
0
|
|
|
|
|
|
return timeout_ms; |
|
190
|
|
|
|
|
|
|
} |
|
191
|
|
|
|
|
|
|
|
|
192
|
438
|
|
|
|
|
|
res = poll(&pfd, 1, timeout_ms); |
|
193
|
|
|
|
|
|
|
|
|
194
|
438
|
100
|
|
|
|
|
if (0 < res) { |
|
195
|
|
|
|
|
|
|
/* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or |
|
196
|
|
|
|
|
|
|
* equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall |
|
197
|
|
|
|
|
|
|
* doesn't need to be made. */ |
|
198
|
433
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
199
|
5
|
50
|
|
|
|
|
} else if (0 == res) { |
|
200
|
5
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
|
201
|
|
|
|
|
|
|
} else { |
|
202
|
0
|
0
|
|
|
|
|
switch (amqp_os_socket_error()) { |
|
203
|
|
|
|
|
|
|
case EINTR: |
|
204
|
0
|
|
|
|
|
|
goto start_poll; |
|
205
|
|
|
|
|
|
|
default: |
|
206
|
438
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
|
207
|
|
|
|
|
|
|
} |
|
208
|
|
|
|
|
|
|
} |
|
209
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
210
|
|
|
|
|
|
|
#elif defined(HAVE_SELECT) |
|
211
|
|
|
|
|
|
|
fd_set fds; |
|
212
|
|
|
|
|
|
|
fd_set exceptfds; |
|
213
|
|
|
|
|
|
|
fd_set *exceptfdsp; |
|
214
|
|
|
|
|
|
|
int res; |
|
215
|
|
|
|
|
|
|
struct timeval tv; |
|
216
|
|
|
|
|
|
|
struct timeval *tvp; |
|
217
|
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT))); |
|
219
|
|
|
|
|
|
|
#ifndef _WIN32 |
|
220
|
|
|
|
|
|
|
/* On Win32 connect() failure is indicated through the exceptfds, it does not |
|
221
|
|
|
|
|
|
|
* make any sense to allow POLLERR on any other platform or condition */ |
|
222
|
|
|
|
|
|
|
assert(0 == (event & AMQP_SF_POLLERR)); |
|
223
|
|
|
|
|
|
|
#endif |
|
224
|
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
start_select: |
|
226
|
|
|
|
|
|
|
FD_ZERO(&fds); |
|
227
|
|
|
|
|
|
|
FD_SET(fd, &fds); |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
if (event & AMQP_SF_POLLERR) { |
|
230
|
|
|
|
|
|
|
FD_ZERO(&exceptfds); |
|
231
|
|
|
|
|
|
|
FD_SET(fd, &exceptfds); |
|
232
|
|
|
|
|
|
|
exceptfdsp = &exceptfds; |
|
233
|
|
|
|
|
|
|
} else { |
|
234
|
|
|
|
|
|
|
exceptfdsp = NULL; |
|
235
|
|
|
|
|
|
|
} |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
res = amqp_time_tv_until(deadline, &tv, &tvp); |
|
238
|
|
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
|
239
|
|
|
|
|
|
|
return res; |
|
240
|
|
|
|
|
|
|
} |
|
241
|
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
if (event & AMQP_SF_POLLIN) { |
|
243
|
|
|
|
|
|
|
res = select(fd + 1, &fds, NULL, exceptfdsp, tvp); |
|
244
|
|
|
|
|
|
|
} else if (event & AMQP_SF_POLLOUT) { |
|
245
|
|
|
|
|
|
|
res = select(fd + 1, NULL, &fds, exceptfdsp, tvp); |
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
if (0 < res) { |
|
249
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
250
|
|
|
|
|
|
|
} else if (0 == res) { |
|
251
|
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
|
252
|
|
|
|
|
|
|
} else { |
|
253
|
|
|
|
|
|
|
switch (amqp_os_socket_error()) { |
|
254
|
|
|
|
|
|
|
case EINTR: |
|
255
|
|
|
|
|
|
|
goto start_select; |
|
256
|
|
|
|
|
|
|
default: |
|
257
|
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
|
258
|
|
|
|
|
|
|
} |
|
259
|
|
|
|
|
|
|
} |
|
260
|
|
|
|
|
|
|
#else |
|
261
|
|
|
|
|
|
|
#error "poll() or select() is needed to compile rabbitmq-c" |
|
262
|
|
|
|
|
|
|
#endif |
|
263
|
|
|
|
|
|
|
} |
|
264
|
|
|
|
|
|
|
|
|
265
|
9
|
|
|
|
|
|
static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, |
|
266
|
|
|
|
|
|
|
amqp_time_t deadline) { |
|
267
|
9
|
|
|
|
|
|
int fd = amqp_get_sockfd(state); |
|
268
|
9
|
100
|
|
|
|
|
if (-1 == fd) { |
|
269
|
3
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_CLOSED; |
|
270
|
|
|
|
|
|
|
} |
|
271
|
6
|
|
|
|
|
|
switch (res) { |
|
272
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: |
|
273
|
0
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLIN, deadline); |
|
274
|
0
|
|
|
|
|
|
break; |
|
275
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: |
|
276
|
5
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline); |
|
277
|
5
|
|
|
|
|
|
break; |
|
278
|
|
|
|
|
|
|
} |
|
279
|
6
|
|
|
|
|
|
return res; |
|
280
|
|
|
|
|
|
|
} |
|
281
|
|
|
|
|
|
|
|
|
282
|
523
|
|
|
|
|
|
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, |
|
283
|
|
|
|
|
|
|
size_t len, amqp_time_t deadline, int flags) { |
|
284
|
|
|
|
|
|
|
ssize_t res; |
|
285
|
523
|
|
|
|
|
|
void *buf_left = (void *)buf; |
|
286
|
|
|
|
|
|
|
/* Assume that len is not going to be larger than ssize_t can hold. */ |
|
287
|
523
|
|
|
|
|
|
ssize_t len_left = (size_t)len; |
|
288
|
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
start_send: |
|
290
|
533
|
|
|
|
|
|
res = amqp_socket_send(state->socket, buf_left, len_left, flags); |
|
291
|
|
|
|
|
|
|
|
|
292
|
533
|
100
|
|
|
|
|
if (res > 0) { |
|
293
|
524
|
|
|
|
|
|
len_left -= res; |
|
294
|
524
|
|
|
|
|
|
buf_left = (char *)buf_left + res; |
|
295
|
524
|
100
|
|
|
|
|
if (0 == len_left) { |
|
296
|
519
|
|
|
|
|
|
return (ssize_t)len; |
|
297
|
|
|
|
|
|
|
} |
|
298
|
5
|
|
|
|
|
|
goto start_send; |
|
299
|
|
|
|
|
|
|
} |
|
300
|
9
|
|
|
|
|
|
res = do_poll(state, res, deadline); |
|
301
|
9
|
100
|
|
|
|
|
if (AMQP_STATUS_OK == res) { |
|
302
|
5
|
|
|
|
|
|
goto start_send; |
|
303
|
|
|
|
|
|
|
} |
|
304
|
4
|
50
|
|
|
|
|
if (AMQP_STATUS_TIMEOUT == res) { |
|
305
|
0
|
|
|
|
|
|
return (ssize_t)len - len_left; |
|
306
|
|
|
|
|
|
|
} |
|
307
|
4
|
|
|
|
|
|
return res; |
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
|
|
310
|
0
|
|
|
|
|
|
int amqp_open_socket(char const *hostname, int portnumber) { |
|
311
|
0
|
|
|
|
|
|
return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite()); |
|
312
|
|
|
|
|
|
|
} |
|
313
|
|
|
|
|
|
|
|
|
314
|
37
|
|
|
|
|
|
int amqp_open_socket_noblock(char const *hostname, int portnumber, |
|
315
|
|
|
|
|
|
|
const struct timeval *timeout) { |
|
316
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
317
|
37
|
|
|
|
|
|
int res = amqp_time_from_now(&deadline, timeout); |
|
318
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
319
|
0
|
|
|
|
|
|
return res; |
|
320
|
|
|
|
|
|
|
} |
|
321
|
37
|
|
|
|
|
|
return amqp_open_socket_inner(hostname, portnumber, deadline); |
|
322
|
|
|
|
|
|
|
} |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
#ifdef _WIN32 |
|
325
|
|
|
|
|
|
|
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { |
|
326
|
|
|
|
|
|
|
int one = 1; |
|
327
|
|
|
|
|
|
|
SOCKET sockfd; |
|
328
|
|
|
|
|
|
|
int last_error; |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
/* |
|
331
|
|
|
|
|
|
|
* This cast is to squash warnings on Win64, see: |
|
332
|
|
|
|
|
|
|
* http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 |
|
333
|
|
|
|
|
|
|
*/ |
|
334
|
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); |
|
336
|
|
|
|
|
|
|
if (INVALID_SOCKET == sockfd) { |
|
337
|
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
|
338
|
|
|
|
|
|
|
} |
|
339
|
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
/* Set the socket to be non-blocking */ |
|
341
|
|
|
|
|
|
|
if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) { |
|
342
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
343
|
|
|
|
|
|
|
goto err; |
|
344
|
|
|
|
|
|
|
} |
|
345
|
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
/* Disable nagle */ |
|
347
|
|
|
|
|
|
|
if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, |
|
348
|
|
|
|
|
|
|
(const char *)&one, sizeof(one))) { |
|
349
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
350
|
|
|
|
|
|
|
goto err; |
|
351
|
|
|
|
|
|
|
} |
|
352
|
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
/* Enable TCP keepalives */ |
|
354
|
|
|
|
|
|
|
if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, |
|
355
|
|
|
|
|
|
|
(const char *)&one, sizeof(one))) { |
|
356
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
357
|
|
|
|
|
|
|
goto err; |
|
358
|
|
|
|
|
|
|
} |
|
359
|
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) { |
|
361
|
|
|
|
|
|
|
return (int)sockfd; |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
if (WSAEWOULDBLOCK != WSAGetLastError()) { |
|
365
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
366
|
|
|
|
|
|
|
goto err; |
|
367
|
|
|
|
|
|
|
} |
|
368
|
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
last_error = |
|
370
|
|
|
|
|
|
|
amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline); |
|
371
|
|
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
|
372
|
|
|
|
|
|
|
goto err; |
|
373
|
|
|
|
|
|
|
} |
|
374
|
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
{ |
|
376
|
|
|
|
|
|
|
int result; |
|
377
|
|
|
|
|
|
|
int result_len = sizeof(result); |
|
378
|
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, |
|
380
|
|
|
|
|
|
|
(char *)&result, &result_len) || |
|
381
|
|
|
|
|
|
|
result != 0) { |
|
382
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
383
|
|
|
|
|
|
|
goto err; |
|
384
|
|
|
|
|
|
|
} |
|
385
|
|
|
|
|
|
|
} |
|
386
|
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
return (int)sockfd; |
|
388
|
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
err: |
|
390
|
|
|
|
|
|
|
closesocket(sockfd); |
|
391
|
|
|
|
|
|
|
return last_error; |
|
392
|
|
|
|
|
|
|
} |
|
393
|
|
|
|
|
|
|
#else |
|
394
|
38
|
|
|
|
|
|
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { |
|
395
|
38
|
|
|
|
|
|
int one = 1; |
|
396
|
|
|
|
|
|
|
int sockfd; |
|
397
|
|
|
|
|
|
|
int flags; |
|
398
|
|
|
|
|
|
|
int last_error; |
|
399
|
|
|
|
|
|
|
|
|
400
|
38
|
|
|
|
|
|
sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); |
|
401
|
38
|
50
|
|
|
|
|
if (-1 == sockfd) { |
|
402
|
0
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
|
403
|
|
|
|
|
|
|
} |
|
404
|
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
/* Enable CLOEXEC on socket */ |
|
406
|
38
|
|
|
|
|
|
flags = fcntl(sockfd, F_GETFD); |
|
407
|
38
|
50
|
|
|
|
|
if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { |
|
|
|
50
|
|
|
|
|
|
|
408
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
409
|
0
|
|
|
|
|
|
goto err; |
|
410
|
|
|
|
|
|
|
} |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
/* Set the socket as non-blocking */ |
|
413
|
38
|
|
|
|
|
|
flags = fcntl(sockfd, F_GETFL); |
|
414
|
38
|
50
|
|
|
|
|
if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) { |
|
|
|
50
|
|
|
|
|
|
|
415
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
416
|
0
|
|
|
|
|
|
goto err; |
|
417
|
|
|
|
|
|
|
} |
|
418
|
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
#ifdef SO_NOSIGPIPE |
|
420
|
|
|
|
|
|
|
/* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */ |
|
421
|
|
|
|
|
|
|
if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { |
|
422
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
423
|
|
|
|
|
|
|
goto err; |
|
424
|
|
|
|
|
|
|
} |
|
425
|
|
|
|
|
|
|
#endif /* SO_NOSIGPIPE */ |
|
426
|
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
/* Disable nagle */ |
|
428
|
38
|
50
|
|
|
|
|
if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { |
|
429
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
430
|
0
|
|
|
|
|
|
goto err; |
|
431
|
|
|
|
|
|
|
} |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
/* Enable TCP keepalives */ |
|
434
|
38
|
50
|
|
|
|
|
if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) { |
|
435
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
436
|
0
|
|
|
|
|
|
goto err; |
|
437
|
|
|
|
|
|
|
} |
|
438
|
|
|
|
|
|
|
|
|
439
|
38
|
50
|
|
|
|
|
if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { |
|
440
|
0
|
|
|
|
|
|
return sockfd; |
|
441
|
|
|
|
|
|
|
} |
|
442
|
|
|
|
|
|
|
|
|
443
|
38
|
50
|
|
|
|
|
if (EINPROGRESS != errno) { |
|
444
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
445
|
0
|
|
|
|
|
|
goto err; |
|
446
|
|
|
|
|
|
|
} |
|
447
|
|
|
|
|
|
|
|
|
448
|
38
|
|
|
|
|
|
last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline); |
|
449
|
38
|
100
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
|
450
|
1
|
|
|
|
|
|
goto err; |
|
451
|
|
|
|
|
|
|
} |
|
452
|
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
{ |
|
454
|
|
|
|
|
|
|
int result; |
|
455
|
37
|
|
|
|
|
|
socklen_t result_len = sizeof(result); |
|
456
|
|
|
|
|
|
|
|
|
457
|
37
|
50
|
|
|
|
|
if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) || |
|
|
|
50
|
|
|
|
|
|
|
458
|
37
|
|
|
|
|
|
result != 0) { |
|
459
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
|
460
|
0
|
|
|
|
|
|
goto err; |
|
461
|
|
|
|
|
|
|
} |
|
462
|
|
|
|
|
|
|
} |
|
463
|
|
|
|
|
|
|
|
|
464
|
37
|
|
|
|
|
|
return sockfd; |
|
465
|
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
err: |
|
467
|
1
|
|
|
|
|
|
close(sockfd); |
|
468
|
38
|
|
|
|
|
|
return last_error; |
|
469
|
|
|
|
|
|
|
} |
|
470
|
|
|
|
|
|
|
#endif |
|
471
|
|
|
|
|
|
|
|
|
472
|
38
|
|
|
|
|
|
int amqp_open_socket_inner(char const *hostname, int portnumber, |
|
473
|
|
|
|
|
|
|
amqp_time_t deadline) { |
|
474
|
|
|
|
|
|
|
struct addrinfo hint; |
|
475
|
|
|
|
|
|
|
struct addrinfo *address_list; |
|
476
|
|
|
|
|
|
|
struct addrinfo *addr; |
|
477
|
|
|
|
|
|
|
char portnumber_string[33]; |
|
478
|
38
|
|
|
|
|
|
int sockfd = -1; |
|
479
|
|
|
|
|
|
|
int last_error; |
|
480
|
|
|
|
|
|
|
|
|
481
|
38
|
|
|
|
|
|
last_error = amqp_os_socket_init(); |
|
482
|
38
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
|
483
|
0
|
|
|
|
|
|
return last_error; |
|
484
|
|
|
|
|
|
|
} |
|
485
|
|
|
|
|
|
|
|
|
486
|
38
|
|
|
|
|
|
memset(&hint, 0, sizeof(hint)); |
|
487
|
38
|
|
|
|
|
|
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */ |
|
488
|
38
|
|
|
|
|
|
hint.ai_socktype = SOCK_STREAM; |
|
489
|
38
|
|
|
|
|
|
hint.ai_protocol = IPPROTO_TCP; |
|
490
|
|
|
|
|
|
|
|
|
491
|
38
|
|
|
|
|
|
(void)sprintf(portnumber_string, "%d", portnumber); |
|
492
|
|
|
|
|
|
|
|
|
493
|
38
|
|
|
|
|
|
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); |
|
494
|
38
|
50
|
|
|
|
|
if (0 != last_error) { |
|
495
|
0
|
|
|
|
|
|
return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED; |
|
496
|
|
|
|
|
|
|
} |
|
497
|
|
|
|
|
|
|
|
|
498
|
38
|
50
|
|
|
|
|
for (addr = address_list; addr; addr = addr->ai_next) { |
|
499
|
38
|
|
|
|
|
|
sockfd = connect_socket(addr, deadline); |
|
500
|
|
|
|
|
|
|
|
|
501
|
38
|
100
|
|
|
|
|
if (sockfd >= 0) { |
|
502
|
37
|
|
|
|
|
|
last_error = AMQP_STATUS_OK; |
|
503
|
37
|
|
|
|
|
|
break; |
|
504
|
1
|
50
|
|
|
|
|
} else if (sockfd == AMQP_STATUS_TIMEOUT) { |
|
505
|
1
|
|
|
|
|
|
last_error = sockfd; |
|
506
|
1
|
|
|
|
|
|
break; |
|
507
|
|
|
|
|
|
|
} |
|
508
|
|
|
|
|
|
|
} |
|
509
|
|
|
|
|
|
|
|
|
510
|
38
|
|
|
|
|
|
freeaddrinfo(address_list); |
|
511
|
38
|
100
|
|
|
|
|
if (last_error != AMQP_STATUS_OK || sockfd == -1) { |
|
|
|
50
|
|
|
|
|
|
|
512
|
1
|
|
|
|
|
|
return last_error; |
|
513
|
|
|
|
|
|
|
} |
|
514
|
38
|
|
|
|
|
|
return sockfd; |
|
515
|
|
|
|
|
|
|
} |
|
516
|
|
|
|
|
|
|
|
|
517
|
37
|
|
|
|
|
|
static int send_header_inner(amqp_connection_state_t state, |
|
518
|
|
|
|
|
|
|
amqp_time_t deadline) { |
|
519
|
|
|
|
|
|
|
ssize_t res; |
|
520
|
|
|
|
|
|
|
static const uint8_t header[8] = {'A', |
|
521
|
|
|
|
|
|
|
'M', |
|
522
|
|
|
|
|
|
|
'Q', |
|
523
|
|
|
|
|
|
|
'P', |
|
524
|
|
|
|
|
|
|
0, |
|
525
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_MAJOR, |
|
526
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_MINOR, |
|
527
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_REVISION}; |
|
528
|
37
|
|
|
|
|
|
res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE); |
|
529
|
37
|
50
|
|
|
|
|
if (sizeof(header) == res) { |
|
530
|
37
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
531
|
|
|
|
|
|
|
} |
|
532
|
0
|
|
|
|
|
|
return (int)res; |
|
533
|
|
|
|
|
|
|
} |
|
534
|
|
|
|
|
|
|
|
|
535
|
0
|
|
|
|
|
|
int amqp_send_header(amqp_connection_state_t state) { |
|
536
|
0
|
|
|
|
|
|
return send_header_inner(state, amqp_time_infinite()); |
|
537
|
|
|
|
|
|
|
} |
|
538
|
|
|
|
|
|
|
|
|
539
|
74
|
|
|
|
|
|
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { |
|
540
|
|
|
|
|
|
|
amqp_bytes_t res; |
|
541
|
|
|
|
|
|
|
|
|
542
|
74
|
|
|
|
|
|
switch (method) { |
|
543
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_PLAIN: |
|
544
|
74
|
|
|
|
|
|
res = amqp_cstring_bytes("PLAIN"); |
|
545
|
74
|
|
|
|
|
|
break; |
|
546
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_EXTERNAL: |
|
547
|
0
|
|
|
|
|
|
res = amqp_cstring_bytes("EXTERNAL"); |
|
548
|
0
|
|
|
|
|
|
break; |
|
549
|
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
default: |
|
551
|
0
|
|
|
|
|
|
amqp_abort("Invalid SASL method: %d", (int)method); |
|
552
|
|
|
|
|
|
|
} |
|
553
|
|
|
|
|
|
|
|
|
554
|
74
|
|
|
|
|
|
return res; |
|
555
|
|
|
|
|
|
|
} |
|
556
|
|
|
|
|
|
|
|
|
557
|
74
|
|
|
|
|
|
static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) { |
|
558
|
74
|
100
|
|
|
|
|
if (l.len == r.len) { |
|
559
|
37
|
50
|
|
|
|
|
if (l.bytes && r.bytes) { |
|
|
|
50
|
|
|
|
|
|
|
560
|
37
|
50
|
|
|
|
|
if (0 == memcmp(l.bytes, r.bytes, l.len)) { |
|
561
|
37
|
|
|
|
|
|
return 1; |
|
562
|
|
|
|
|
|
|
} |
|
563
|
|
|
|
|
|
|
} |
|
564
|
|
|
|
|
|
|
} |
|
565
|
37
|
|
|
|
|
|
return 0; |
|
566
|
|
|
|
|
|
|
} |
|
567
|
|
|
|
|
|
|
|
|
568
|
37
|
|
|
|
|
|
int sasl_mechanism_in_list(amqp_bytes_t mechanisms, |
|
569
|
|
|
|
|
|
|
amqp_sasl_method_enum method) { |
|
570
|
|
|
|
|
|
|
amqp_bytes_t mechanism; |
|
571
|
|
|
|
|
|
|
amqp_bytes_t supported_mechanism; |
|
572
|
|
|
|
|
|
|
uint8_t *start; |
|
573
|
|
|
|
|
|
|
uint8_t *end; |
|
574
|
|
|
|
|
|
|
uint8_t *current; |
|
575
|
|
|
|
|
|
|
|
|
576
|
37
|
50
|
|
|
|
|
assert(NULL != mechanisms.bytes); |
|
577
|
|
|
|
|
|
|
|
|
578
|
37
|
|
|
|
|
|
mechanism = sasl_method_name(method); |
|
579
|
|
|
|
|
|
|
|
|
580
|
37
|
|
|
|
|
|
start = (uint8_t *)mechanisms.bytes; |
|
581
|
37
|
|
|
|
|
|
current = start; |
|
582
|
37
|
|
|
|
|
|
end = start + mechanisms.len; |
|
583
|
|
|
|
|
|
|
|
|
584
|
74
|
50
|
|
|
|
|
for (; current != end; start = current + 1) { |
|
585
|
|
|
|
|
|
|
/* HACK: SASL states that we should be parsing this string as a UTF-8 |
|
586
|
|
|
|
|
|
|
* string, which we're plainly not doing here. At this point its not worth |
|
587
|
|
|
|
|
|
|
* dragging an entire UTF-8 parser for this one case, and this should work |
|
588
|
|
|
|
|
|
|
* most of the time */ |
|
589
|
74
|
|
|
|
|
|
current = memchr(start, ' ', end - start); |
|
590
|
74
|
100
|
|
|
|
|
if (NULL == current) { |
|
591
|
37
|
|
|
|
|
|
current = end; |
|
592
|
|
|
|
|
|
|
} |
|
593
|
74
|
|
|
|
|
|
supported_mechanism.bytes = start; |
|
594
|
74
|
|
|
|
|
|
supported_mechanism.len = current - start; |
|
595
|
74
|
100
|
|
|
|
|
if (bytes_equal(mechanism, supported_mechanism)) { |
|
596
|
37
|
|
|
|
|
|
return 1; |
|
597
|
|
|
|
|
|
|
} |
|
598
|
|
|
|
|
|
|
} |
|
599
|
|
|
|
|
|
|
|
|
600
|
37
|
|
|
|
|
|
return 0; |
|
601
|
|
|
|
|
|
|
} |
|
602
|
|
|
|
|
|
|
|
|
603
|
37
|
|
|
|
|
|
static amqp_bytes_t sasl_response(amqp_pool_t *pool, |
|
604
|
|
|
|
|
|
|
amqp_sasl_method_enum method, va_list args) { |
|
605
|
|
|
|
|
|
|
amqp_bytes_t response; |
|
606
|
|
|
|
|
|
|
|
|
607
|
37
|
|
|
|
|
|
switch (method) { |
|
608
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_PLAIN: { |
|
609
|
37
|
50
|
|
|
|
|
char *username = va_arg(args, char *); |
|
610
|
37
|
|
|
|
|
|
size_t username_len = strlen(username); |
|
611
|
37
|
50
|
|
|
|
|
char *password = va_arg(args, char *); |
|
612
|
37
|
|
|
|
|
|
size_t password_len = strlen(password); |
|
613
|
|
|
|
|
|
|
char *response_buf; |
|
614
|
|
|
|
|
|
|
|
|
615
|
37
|
|
|
|
|
|
amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, |
|
616
|
|
|
|
|
|
|
&response); |
|
617
|
37
|
50
|
|
|
|
|
if (response.bytes == NULL) |
|
618
|
|
|
|
|
|
|
/* We never request a zero-length block, because of the +2 |
|
619
|
|
|
|
|
|
|
above, so a NULL here really is ENOMEM. */ |
|
620
|
|
|
|
|
|
|
{ |
|
621
|
0
|
|
|
|
|
|
return response; |
|
622
|
|
|
|
|
|
|
} |
|
623
|
|
|
|
|
|
|
|
|
624
|
37
|
|
|
|
|
|
response_buf = response.bytes; |
|
625
|
37
|
|
|
|
|
|
response_buf[0] = 0; |
|
626
|
37
|
|
|
|
|
|
memcpy(response_buf + 1, username, username_len); |
|
627
|
37
|
|
|
|
|
|
response_buf[username_len + 1] = 0; |
|
628
|
37
|
|
|
|
|
|
memcpy(response_buf + username_len + 2, password, password_len); |
|
629
|
37
|
|
|
|
|
|
break; |
|
630
|
|
|
|
|
|
|
} |
|
631
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_EXTERNAL: { |
|
632
|
0
|
0
|
|
|
|
|
char *identity = va_arg(args, char *); |
|
633
|
0
|
|
|
|
|
|
size_t identity_len = strlen(identity); |
|
634
|
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
|
amqp_pool_alloc_bytes(pool, identity_len, &response); |
|
636
|
0
|
0
|
|
|
|
|
if (response.bytes == NULL) { |
|
637
|
0
|
|
|
|
|
|
return response; |
|
638
|
|
|
|
|
|
|
} |
|
639
|
|
|
|
|
|
|
|
|
640
|
0
|
|
|
|
|
|
memcpy(response.bytes, identity, identity_len); |
|
641
|
0
|
|
|
|
|
|
break; |
|
642
|
|
|
|
|
|
|
} |
|
643
|
|
|
|
|
|
|
default: |
|
644
|
0
|
|
|
|
|
|
amqp_abort("Invalid SASL method: %d", (int)method); |
|
645
|
|
|
|
|
|
|
} |
|
646
|
|
|
|
|
|
|
|
|
647
|
37
|
|
|
|
|
|
return response; |
|
648
|
|
|
|
|
|
|
} |
|
649
|
|
|
|
|
|
|
|
|
650
|
0
|
|
|
|
|
|
amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { |
|
651
|
0
|
|
|
|
|
|
return (state->first_queued_frame != NULL); |
|
652
|
|
|
|
|
|
|
} |
|
653
|
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
/* |
|
655
|
|
|
|
|
|
|
* Check to see if we have data in our buffer. If this returns 1, we |
|
656
|
|
|
|
|
|
|
* will avoid an immediate blocking read in amqp_simple_wait_frame. |
|
657
|
|
|
|
|
|
|
*/ |
|
658
|
946
|
|
|
|
|
|
amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { |
|
659
|
946
|
|
|
|
|
|
return (state->sock_inbound_offset < state->sock_inbound_limit); |
|
660
|
|
|
|
|
|
|
} |
|
661
|
|
|
|
|
|
|
|
|
662
|
488
|
|
|
|
|
|
static int consume_one_frame(amqp_connection_state_t state, |
|
663
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
|
664
|
|
|
|
|
|
|
int res; |
|
665
|
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
amqp_bytes_t buffer; |
|
667
|
488
|
|
|
|
|
|
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; |
|
668
|
488
|
|
|
|
|
|
buffer.bytes = |
|
669
|
488
|
|
|
|
|
|
((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; |
|
670
|
|
|
|
|
|
|
|
|
671
|
488
|
|
|
|
|
|
res = amqp_handle_input(state, buffer, decoded_frame); |
|
672
|
488
|
50
|
|
|
|
|
if (res < 0) { |
|
673
|
0
|
|
|
|
|
|
return res; |
|
674
|
|
|
|
|
|
|
} |
|
675
|
|
|
|
|
|
|
|
|
676
|
488
|
|
|
|
|
|
state->sock_inbound_offset += res; |
|
677
|
|
|
|
|
|
|
|
|
678
|
488
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
679
|
|
|
|
|
|
|
} |
|
680
|
|
|
|
|
|
|
|
|
681
|
398
|
|
|
|
|
|
static int recv_with_timeout(amqp_connection_state_t state, |
|
682
|
|
|
|
|
|
|
amqp_time_t timeout) { |
|
683
|
|
|
|
|
|
|
ssize_t res; |
|
684
|
|
|
|
|
|
|
int fd; |
|
685
|
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
start_recv: |
|
687
|
787
|
|
|
|
|
|
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, |
|
688
|
|
|
|
|
|
|
state->sock_inbound_buffer.len, 0); |
|
689
|
|
|
|
|
|
|
|
|
690
|
787
|
100
|
|
|
|
|
if (res < 0) { |
|
691
|
393
|
|
|
|
|
|
fd = amqp_get_sockfd(state); |
|
692
|
393
|
50
|
|
|
|
|
if (-1 == fd) { |
|
693
|
0
|
|
|
|
|
|
return AMQP_STATUS_CONNECTION_CLOSED; |
|
694
|
|
|
|
|
|
|
} |
|
695
|
393
|
|
|
|
|
|
switch (res) { |
|
696
|
|
|
|
|
|
|
default: |
|
697
|
0
|
|
|
|
|
|
return (int)res; |
|
698
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: |
|
699
|
393
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLIN, timeout); |
|
700
|
393
|
|
|
|
|
|
break; |
|
701
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: |
|
702
|
0
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout); |
|
703
|
0
|
|
|
|
|
|
break; |
|
704
|
|
|
|
|
|
|
} |
|
705
|
393
|
100
|
|
|
|
|
if (AMQP_STATUS_OK == res) { |
|
706
|
389
|
|
|
|
|
|
goto start_recv; |
|
707
|
|
|
|
|
|
|
} |
|
708
|
4
|
|
|
|
|
|
return (int)res; |
|
709
|
|
|
|
|
|
|
} |
|
710
|
|
|
|
|
|
|
|
|
711
|
394
|
|
|
|
|
|
state->sock_inbound_limit = res; |
|
712
|
394
|
|
|
|
|
|
state->sock_inbound_offset = 0; |
|
713
|
|
|
|
|
|
|
|
|
714
|
394
|
|
|
|
|
|
res = amqp_time_s_from_now(&state->next_recv_heartbeat, |
|
715
|
|
|
|
|
|
|
amqp_heartbeat_recv(state)); |
|
716
|
394
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
717
|
0
|
|
|
|
|
|
return (int)res; |
|
718
|
|
|
|
|
|
|
} |
|
719
|
394
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
720
|
|
|
|
|
|
|
} |
|
721
|
|
|
|
|
|
|
|
|
722
|
2
|
|
|
|
|
|
int amqp_try_recv(amqp_connection_state_t state) { |
|
723
|
|
|
|
|
|
|
amqp_time_t timeout; |
|
724
|
|
|
|
|
|
|
|
|
725
|
22
|
100
|
|
|
|
|
while (amqp_data_in_buffer(state)) { |
|
726
|
|
|
|
|
|
|
amqp_frame_t frame; |
|
727
|
20
|
|
|
|
|
|
int res = consume_one_frame(state, &frame); |
|
728
|
|
|
|
|
|
|
|
|
729
|
20
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
730
|
0
|
|
|
|
|
|
return res; |
|
731
|
|
|
|
|
|
|
} |
|
732
|
|
|
|
|
|
|
|
|
733
|
20
|
50
|
|
|
|
|
if (frame.frame_type != 0) { |
|
734
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
|
735
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
|
736
|
|
|
|
|
|
|
amqp_link_t *link; |
|
737
|
|
|
|
|
|
|
|
|
738
|
20
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); |
|
739
|
20
|
50
|
|
|
|
|
if (NULL == channel_pool) { |
|
740
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
|
741
|
|
|
|
|
|
|
} |
|
742
|
|
|
|
|
|
|
|
|
743
|
20
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
|
744
|
20
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
|
745
|
|
|
|
|
|
|
|
|
746
|
20
|
50
|
|
|
|
|
if (frame_copy == NULL || link == NULL) { |
|
|
|
50
|
|
|
|
|
|
|
747
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
|
748
|
|
|
|
|
|
|
} |
|
749
|
|
|
|
|
|
|
|
|
750
|
20
|
|
|
|
|
|
*frame_copy = frame; |
|
751
|
|
|
|
|
|
|
|
|
752
|
20
|
|
|
|
|
|
link->next = NULL; |
|
753
|
20
|
|
|
|
|
|
link->data = frame_copy; |
|
754
|
|
|
|
|
|
|
|
|
755
|
20
|
100
|
|
|
|
|
if (state->last_queued_frame == NULL) { |
|
756
|
1
|
|
|
|
|
|
state->first_queued_frame = link; |
|
757
|
|
|
|
|
|
|
} else { |
|
758
|
19
|
|
|
|
|
|
state->last_queued_frame->next = link; |
|
759
|
|
|
|
|
|
|
} |
|
760
|
20
|
|
|
|
|
|
state->last_queued_frame = link; |
|
761
|
|
|
|
|
|
|
} |
|
762
|
|
|
|
|
|
|
} |
|
763
|
2
|
|
|
|
|
|
timeout = amqp_time_immediate(); |
|
764
|
|
|
|
|
|
|
|
|
765
|
2
|
|
|
|
|
|
return recv_with_timeout(state, timeout); |
|
766
|
|
|
|
|
|
|
} |
|
767
|
|
|
|
|
|
|
|
|
768
|
460
|
|
|
|
|
|
static int wait_frame_inner(amqp_connection_state_t state, |
|
769
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame, |
|
770
|
|
|
|
|
|
|
amqp_time_t timeout_deadline) { |
|
771
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
772
|
|
|
|
|
|
|
int res; |
|
773
|
|
|
|
|
|
|
|
|
774
|
|
|
|
|
|
|
for (;;) { |
|
775
|
865
|
100
|
|
|
|
|
while (amqp_data_in_buffer(state)) { |
|
776
|
468
|
|
|
|
|
|
res = consume_one_frame(state, decoded_frame); |
|
777
|
|
|
|
|
|
|
|
|
778
|
468
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
779
|
0
|
|
|
|
|
|
return res; |
|
780
|
|
|
|
|
|
|
} |
|
781
|
|
|
|
|
|
|
|
|
782
|
468
|
100
|
|
|
|
|
if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { |
|
783
|
5
|
|
|
|
|
|
amqp_maybe_release_buffers_on_channel(state, 0); |
|
784
|
5
|
|
|
|
|
|
continue; |
|
785
|
|
|
|
|
|
|
} |
|
786
|
|
|
|
|
|
|
|
|
787
|
463
|
100
|
|
|
|
|
if (decoded_frame->frame_type != 0) { |
|
788
|
|
|
|
|
|
|
/* Complete frame was read. Return it. */ |
|
789
|
455
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
790
|
|
|
|
|
|
|
} |
|
791
|
|
|
|
|
|
|
} |
|
792
|
|
|
|
|
|
|
|
|
793
|
|
|
|
|
|
|
beginrecv: |
|
794
|
397
|
|
|
|
|
|
res = amqp_time_has_past(state->next_send_heartbeat); |
|
795
|
397
|
50
|
|
|
|
|
if (AMQP_STATUS_TIMER_FAILURE == res) { |
|
796
|
0
|
|
|
|
|
|
return res; |
|
797
|
397
|
100
|
|
|
|
|
} else if (AMQP_STATUS_TIMEOUT == res) { |
|
798
|
|
|
|
|
|
|
amqp_frame_t heartbeat; |
|
799
|
2
|
|
|
|
|
|
heartbeat.channel = 0; |
|
800
|
2
|
|
|
|
|
|
heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; |
|
801
|
|
|
|
|
|
|
|
|
802
|
2
|
|
|
|
|
|
res = amqp_send_frame(state, &heartbeat); |
|
803
|
2
|
100
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
804
|
2
|
|
|
|
|
|
return res; |
|
805
|
|
|
|
|
|
|
} |
|
806
|
|
|
|
|
|
|
} |
|
807
|
396
|
|
|
|
|
|
deadline = amqp_time_first(timeout_deadline, |
|
808
|
|
|
|
|
|
|
amqp_time_first(state->next_recv_heartbeat, |
|
809
|
|
|
|
|
|
|
state->next_send_heartbeat)); |
|
810
|
|
|
|
|
|
|
|
|
811
|
|
|
|
|
|
|
/* TODO this needs to wait for a _frame_ and not anything written from the |
|
812
|
|
|
|
|
|
|
* socket */ |
|
813
|
396
|
|
|
|
|
|
res = recv_with_timeout(state, deadline); |
|
814
|
|
|
|
|
|
|
|
|
815
|
396
|
100
|
|
|
|
|
if (AMQP_STATUS_TIMEOUT == res) { |
|
816
|
4
|
50
|
|
|
|
|
if (amqp_time_equal(deadline, state->next_recv_heartbeat)) { |
|
817
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
|
818
|
0
|
|
|
|
|
|
return AMQP_STATUS_HEARTBEAT_TIMEOUT; |
|
819
|
4
|
50
|
|
|
|
|
} else if (amqp_time_equal(deadline, timeout_deadline)) { |
|
820
|
4
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
|
821
|
0
|
0
|
|
|
|
|
} else if (amqp_time_equal(deadline, state->next_send_heartbeat)) { |
|
822
|
|
|
|
|
|
|
/* send heartbeat happens before we do recv_with_timeout */ |
|
823
|
0
|
|
|
|
|
|
goto beginrecv; |
|
824
|
|
|
|
|
|
|
} else { |
|
825
|
0
|
|
|
|
|
|
amqp_abort("Internal error: unable to determine timeout reason"); |
|
826
|
|
|
|
|
|
|
} |
|
827
|
392
|
50
|
|
|
|
|
} else if (AMQP_STATUS_OK != res) { |
|
828
|
0
|
|
|
|
|
|
return res; |
|
829
|
|
|
|
|
|
|
} |
|
830
|
852
|
|
|
|
|
|
} |
|
831
|
|
|
|
|
|
|
} |
|
832
|
|
|
|
|
|
|
|
|
833
|
0
|
|
|
|
|
|
static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state, |
|
834
|
|
|
|
|
|
|
amqp_frame_t *frame) { |
|
835
|
|
|
|
|
|
|
amqp_link_t *link; |
|
836
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
|
837
|
|
|
|
|
|
|
|
|
838
|
0
|
|
|
|
|
|
amqp_pool_t *channel_pool = |
|
839
|
0
|
|
|
|
|
|
amqp_get_or_create_channel_pool(state, frame->channel); |
|
840
|
|
|
|
|
|
|
|
|
841
|
0
|
0
|
|
|
|
|
if (NULL == channel_pool) { |
|
842
|
0
|
|
|
|
|
|
return NULL; |
|
843
|
|
|
|
|
|
|
} |
|
844
|
|
|
|
|
|
|
|
|
845
|
0
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
|
846
|
0
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
|
847
|
|
|
|
|
|
|
|
|
848
|
0
|
0
|
|
|
|
|
if (NULL == link || NULL == frame_copy) { |
|
|
|
0
|
|
|
|
|
|
|
849
|
0
|
|
|
|
|
|
return NULL; |
|
850
|
|
|
|
|
|
|
} |
|
851
|
|
|
|
|
|
|
|
|
852
|
0
|
|
|
|
|
|
*frame_copy = *frame; |
|
853
|
0
|
|
|
|
|
|
link->data = frame_copy; |
|
854
|
|
|
|
|
|
|
|
|
855
|
0
|
|
|
|
|
|
return link; |
|
856
|
|
|
|
|
|
|
} |
|
857
|
|
|
|
|
|
|
|
|
858
|
0
|
|
|
|
|
|
int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) { |
|
859
|
0
|
|
|
|
|
|
amqp_link_t *link = amqp_create_link_for_frame(state, frame); |
|
860
|
0
|
0
|
|
|
|
|
if (NULL == link) { |
|
861
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
|
862
|
|
|
|
|
|
|
} |
|
863
|
|
|
|
|
|
|
|
|
864
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
|
865
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
|
866
|
|
|
|
|
|
|
} else { |
|
867
|
0
|
|
|
|
|
|
state->last_queued_frame->next = link; |
|
868
|
|
|
|
|
|
|
} |
|
869
|
|
|
|
|
|
|
|
|
870
|
0
|
|
|
|
|
|
link->next = NULL; |
|
871
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
|
872
|
|
|
|
|
|
|
|
|
873
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
874
|
|
|
|
|
|
|
} |
|
875
|
|
|
|
|
|
|
|
|
876
|
0
|
|
|
|
|
|
int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) { |
|
877
|
0
|
|
|
|
|
|
amqp_link_t *link = amqp_create_link_for_frame(state, frame); |
|
878
|
0
|
0
|
|
|
|
|
if (NULL == link) { |
|
879
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
|
880
|
|
|
|
|
|
|
} |
|
881
|
|
|
|
|
|
|
|
|
882
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
|
883
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
|
884
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
|
885
|
0
|
|
|
|
|
|
link->next = NULL; |
|
886
|
|
|
|
|
|
|
} else { |
|
887
|
0
|
|
|
|
|
|
link->next = state->first_queued_frame; |
|
888
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
|
889
|
|
|
|
|
|
|
} |
|
890
|
|
|
|
|
|
|
|
|
891
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
892
|
|
|
|
|
|
|
} |
|
893
|
|
|
|
|
|
|
|
|
894
|
59
|
|
|
|
|
|
int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, |
|
895
|
|
|
|
|
|
|
amqp_channel_t channel, |
|
896
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
|
897
|
|
|
|
|
|
|
amqp_frame_t *frame_ptr; |
|
898
|
|
|
|
|
|
|
amqp_link_t *cur; |
|
899
|
|
|
|
|
|
|
int res; |
|
900
|
|
|
|
|
|
|
|
|
901
|
59
|
50
|
|
|
|
|
for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { |
|
902
|
0
|
|
|
|
|
|
frame_ptr = cur->data; |
|
903
|
|
|
|
|
|
|
|
|
904
|
0
|
0
|
|
|
|
|
if (channel == frame_ptr->channel) { |
|
905
|
0
|
|
|
|
|
|
state->first_queued_frame = cur->next; |
|
906
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
|
907
|
0
|
|
|
|
|
|
state->last_queued_frame = NULL; |
|
908
|
|
|
|
|
|
|
} |
|
909
|
|
|
|
|
|
|
|
|
910
|
0
|
|
|
|
|
|
*decoded_frame = *frame_ptr; |
|
911
|
|
|
|
|
|
|
|
|
912
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
913
|
|
|
|
|
|
|
} |
|
914
|
|
|
|
|
|
|
} |
|
915
|
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
for (;;) { |
|
917
|
59
|
|
|
|
|
|
res = wait_frame_inner(state, decoded_frame, amqp_time_infinite()); |
|
918
|
|
|
|
|
|
|
|
|
919
|
59
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
920
|
0
|
|
|
|
|
|
return res; |
|
921
|
|
|
|
|
|
|
} |
|
922
|
|
|
|
|
|
|
|
|
923
|
59
|
50
|
|
|
|
|
if (channel == decoded_frame->channel) { |
|
924
|
59
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
925
|
|
|
|
|
|
|
} else { |
|
926
|
0
|
|
|
|
|
|
res = amqp_queue_frame(state, decoded_frame); |
|
927
|
0
|
0
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
|
928
|
0
|
|
|
|
|
|
return res; |
|
929
|
|
|
|
|
|
|
} |
|
930
|
|
|
|
|
|
|
} |
|
931
|
0
|
|
|
|
|
|
} |
|
932
|
|
|
|
|
|
|
} |
|
933
|
|
|
|
|
|
|
|
|
934
|
21
|
|
|
|
|
|
int amqp_simple_wait_frame(amqp_connection_state_t state, |
|
935
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
|
936
|
21
|
|
|
|
|
|
return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL); |
|
937
|
|
|
|
|
|
|
} |
|
938
|
|
|
|
|
|
|
|
|
939
|
121
|
|
|
|
|
|
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, |
|
940
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame, |
|
941
|
|
|
|
|
|
|
const struct timeval *timeout) { |
|
942
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
943
|
|
|
|
|
|
|
|
|
944
|
121
|
|
|
|
|
|
int res = amqp_time_from_now(&deadline, timeout); |
|
945
|
121
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
946
|
0
|
|
|
|
|
|
return res; |
|
947
|
|
|
|
|
|
|
} |
|
948
|
|
|
|
|
|
|
|
|
949
|
121
|
100
|
|
|
|
|
if (state->first_queued_frame != NULL) { |
|
950
|
20
|
|
|
|
|
|
amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data; |
|
951
|
20
|
|
|
|
|
|
state->first_queued_frame = state->first_queued_frame->next; |
|
952
|
20
|
100
|
|
|
|
|
if (state->first_queued_frame == NULL) { |
|
953
|
1
|
|
|
|
|
|
state->last_queued_frame = NULL; |
|
954
|
|
|
|
|
|
|
} |
|
955
|
20
|
|
|
|
|
|
*decoded_frame = *f; |
|
956
|
20
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
957
|
|
|
|
|
|
|
} else { |
|
958
|
121
|
|
|
|
|
|
return wait_frame_inner(state, decoded_frame, deadline); |
|
959
|
|
|
|
|
|
|
} |
|
960
|
|
|
|
|
|
|
} |
|
961
|
|
|
|
|
|
|
|
|
962
|
74
|
|
|
|
|
|
static int amqp_simple_wait_method_list(amqp_connection_state_t state, |
|
963
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
|
964
|
|
|
|
|
|
|
amqp_method_number_t *expected_methods, |
|
965
|
|
|
|
|
|
|
amqp_time_t deadline, |
|
966
|
|
|
|
|
|
|
amqp_method_t *output) { |
|
967
|
|
|
|
|
|
|
amqp_frame_t frame; |
|
968
|
|
|
|
|
|
|
struct timeval tv; |
|
969
|
|
|
|
|
|
|
struct timeval *tvp; |
|
970
|
|
|
|
|
|
|
|
|
971
|
74
|
|
|
|
|
|
int res = amqp_time_tv_until(deadline, &tv, &tvp); |
|
972
|
74
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
|
973
|
0
|
|
|
|
|
|
return res; |
|
974
|
|
|
|
|
|
|
} |
|
975
|
|
|
|
|
|
|
|
|
976
|
74
|
|
|
|
|
|
res = amqp_simple_wait_frame_noblock(state, &frame, tvp); |
|
977
|
74
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
978
|
0
|
|
|
|
|
|
return res; |
|
979
|
|
|
|
|
|
|
} |
|
980
|
|
|
|
|
|
|
|
|
981
|
74
|
50
|
|
|
|
|
if (AMQP_FRAME_METHOD != frame.frame_type || |
|
|
|
50
|
|
|
|
|
|
|
982
|
74
|
50
|
|
|
|
|
expected_channel != frame.channel || |
|
983
|
74
|
|
|
|
|
|
!amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) { |
|
984
|
0
|
|
|
|
|
|
return AMQP_STATUS_WRONG_METHOD; |
|
985
|
|
|
|
|
|
|
} |
|
986
|
74
|
|
|
|
|
|
*output = frame.payload.method; |
|
987
|
74
|
|
|
|
|
|
return AMQP_STATUS_OK; |
|
988
|
|
|
|
|
|
|
} |
|
989
|
|
|
|
|
|
|
|
|
990
|
37
|
|
|
|
|
|
static int simple_wait_method_inner(amqp_connection_state_t state, |
|
991
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
|
992
|
|
|
|
|
|
|
amqp_method_number_t expected_method, |
|
993
|
|
|
|
|
|
|
amqp_time_t deadline, |
|
994
|
|
|
|
|
|
|
amqp_method_t *output) { |
|
995
|
37
|
|
|
|
|
|
amqp_method_number_t expected_methods[] = {expected_method, 0}; |
|
996
|
37
|
|
|
|
|
|
return amqp_simple_wait_method_list(state, expected_channel, expected_methods, |
|
997
|
|
|
|
|
|
|
deadline, output); |
|
998
|
|
|
|
|
|
|
} |
|
999
|
|
|
|
|
|
|
|
|
1000
|
0
|
|
|
|
|
|
int amqp_simple_wait_method(amqp_connection_state_t state, |
|
1001
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
|
1002
|
|
|
|
|
|
|
amqp_method_number_t expected_method, |
|
1003
|
|
|
|
|
|
|
amqp_method_t *output) { |
|
1004
|
0
|
|
|
|
|
|
return simple_wait_method_inner(state, expected_channel, expected_method, |
|
1005
|
|
|
|
|
|
|
amqp_time_infinite(), output); |
|
1006
|
|
|
|
|
|
|
} |
|
1007
|
|
|
|
|
|
|
|
|
1008
|
306
|
|
|
|
|
|
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, |
|
1009
|
|
|
|
|
|
|
amqp_method_number_t id, void *decoded) { |
|
1010
|
306
|
|
|
|
|
|
return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE, |
|
1011
|
|
|
|
|
|
|
amqp_time_infinite()); |
|
1012
|
|
|
|
|
|
|
} |
|
1013
|
|
|
|
|
|
|
|
|
1014
|
409
|
|
|
|
|
|
int amqp_send_method_inner(amqp_connection_state_t state, |
|
1015
|
|
|
|
|
|
|
amqp_channel_t channel, amqp_method_number_t id, |
|
1016
|
|
|
|
|
|
|
void *decoded, int flags, amqp_time_t deadline) { |
|
1017
|
|
|
|
|
|
|
amqp_frame_t frame; |
|
1018
|
|
|
|
|
|
|
|
|
1019
|
409
|
|
|
|
|
|
frame.frame_type = AMQP_FRAME_METHOD; |
|
1020
|
409
|
|
|
|
|
|
frame.channel = channel; |
|
1021
|
409
|
|
|
|
|
|
frame.payload.method.id = id; |
|
1022
|
409
|
|
|
|
|
|
frame.payload.method.decoded = decoded; |
|
1023
|
409
|
|
|
|
|
|
return amqp_send_frame_inner(state, &frame, flags, deadline); |
|
1024
|
|
|
|
|
|
|
} |
|
1025
|
|
|
|
|
|
|
|
|
1026
|
674
|
|
|
|
|
|
static int amqp_id_in_reply_list(amqp_method_number_t expected, |
|
1027
|
|
|
|
|
|
|
amqp_method_number_t *list) { |
|
1028
|
720
|
100
|
|
|
|
|
while (*list != 0) { |
|
1029
|
710
|
100
|
|
|
|
|
if (*list == expected) { |
|
1030
|
664
|
|
|
|
|
|
return 1; |
|
1031
|
|
|
|
|
|
|
} |
|
1032
|
46
|
|
|
|
|
|
list++; |
|
1033
|
|
|
|
|
|
|
} |
|
1034
|
10
|
|
|
|
|
|
return 0; |
|
1035
|
|
|
|
|
|
|
} |
|
1036
|
|
|
|
|
|
|
|
|
1037
|
302
|
|
|
|
|
|
static amqp_rpc_reply_t simple_rpc_inner( |
|
1038
|
|
|
|
|
|
|
amqp_connection_state_t state, amqp_channel_t channel, |
|
1039
|
|
|
|
|
|
|
amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, |
|
1040
|
|
|
|
|
|
|
void *decoded_request_method, amqp_time_t deadline) { |
|
1041
|
|
|
|
|
|
|
int status; |
|
1042
|
|
|
|
|
|
|
amqp_rpc_reply_t result; |
|
1043
|
|
|
|
|
|
|
|
|
1044
|
302
|
|
|
|
|
|
memset(&result, 0, sizeof(result)); |
|
1045
|
|
|
|
|
|
|
|
|
1046
|
302
|
|
|
|
|
|
status = amqp_send_method(state, channel, request_id, decoded_request_method); |
|
1047
|
302
|
100
|
|
|
|
|
if (status < 0) { |
|
1048
|
2
|
|
|
|
|
|
return amqp_rpc_reply_error(status); |
|
1049
|
|
|
|
|
|
|
} |
|
1050
|
|
|
|
|
|
|
|
|
1051
|
|
|
|
|
|
|
{ |
|
1052
|
|
|
|
|
|
|
amqp_frame_t frame; |
|
1053
|
|
|
|
|
|
|
|
|
1054
|
|
|
|
|
|
|
retry: |
|
1055
|
300
|
|
|
|
|
|
status = wait_frame_inner(state, &frame, deadline); |
|
1056
|
300
|
50
|
|
|
|
|
if (status < 0) { |
|
1057
|
0
|
0
|
|
|
|
|
if (status == AMQP_STATUS_TIMEOUT) { |
|
1058
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
|
1059
|
|
|
|
|
|
|
} |
|
1060
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(status); |
|
1061
|
|
|
|
|
|
|
} |
|
1062
|
|
|
|
|
|
|
|
|
1063
|
|
|
|
|
|
|
/* |
|
1064
|
|
|
|
|
|
|
* We store the frame for later processing unless it's something |
|
1065
|
|
|
|
|
|
|
* that directly affects us here, namely a method frame that is |
|
1066
|
|
|
|
|
|
|
* either |
|
1067
|
|
|
|
|
|
|
* - on the channel we want, and of the expected type, or |
|
1068
|
|
|
|
|
|
|
* - on the channel we want, and a channel.close frame, or |
|
1069
|
|
|
|
|
|
|
* - on channel zero, and a connection.close frame. |
|
1070
|
|
|
|
|
|
|
*/ |
|
1071
|
300
|
50
|
|
|
|
|
if (!((frame.frame_type == AMQP_FRAME_METHOD) && |
|
|
|
50
|
|
|
|
|
|
|
1072
|
300
|
100
|
|
|
|
|
(((frame.channel == channel) && |
|
1073
|
300
|
|
|
|
|
|
(amqp_id_in_reply_list(frame.payload.method.id, |
|
1074
|
5
|
50
|
|
|
|
|
expected_reply_ids) || |
|
1075
|
0
|
0
|
|
|
|
|
(frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || |
|
1076
|
0
|
0
|
|
|
|
|
((frame.channel == 0) && |
|
1077
|
0
|
|
|
|
|
|
(frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) { |
|
1078
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
|
1079
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
|
1080
|
|
|
|
|
|
|
amqp_link_t *link; |
|
1081
|
|
|
|
|
|
|
|
|
1082
|
0
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); |
|
1083
|
0
|
0
|
|
|
|
|
if (NULL == channel_pool) { |
|
1084
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); |
|
1085
|
|
|
|
|
|
|
} |
|
1086
|
|
|
|
|
|
|
|
|
1087
|
0
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
|
1088
|
0
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
|
1089
|
|
|
|
|
|
|
|
|
1090
|
0
|
0
|
|
|
|
|
if (frame_copy == NULL || link == NULL) { |
|
|
|
0
|
|
|
|
|
|
|
1091
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); |
|
1092
|
|
|
|
|
|
|
} |
|
1093
|
|
|
|
|
|
|
|
|
1094
|
0
|
|
|
|
|
|
*frame_copy = frame; |
|
1095
|
|
|
|
|
|
|
|
|
1096
|
0
|
|
|
|
|
|
link->next = NULL; |
|
1097
|
0
|
|
|
|
|
|
link->data = frame_copy; |
|
1098
|
|
|
|
|
|
|
|
|
1099
|
0
|
0
|
|
|
|
|
if (state->last_queued_frame == NULL) { |
|
1100
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
|
1101
|
|
|
|
|
|
|
} else { |
|
1102
|
0
|
|
|
|
|
|
state->last_queued_frame->next = link; |
|
1103
|
|
|
|
|
|
|
} |
|
1104
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
|
1105
|
|
|
|
|
|
|
|
|
1106
|
0
|
|
|
|
|
|
goto retry; |
|
1107
|
|
|
|
|
|
|
} |
|
1108
|
|
|
|
|
|
|
|
|
1109
|
300
|
100
|
|
|
|
|
result.reply_type = |
|
1110
|
300
|
|
|
|
|
|
(amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) |
|
1111
|
|
|
|
|
|
|
? AMQP_RESPONSE_NORMAL |
|
1112
|
|
|
|
|
|
|
: AMQP_RESPONSE_SERVER_EXCEPTION; |
|
1113
|
|
|
|
|
|
|
|
|
1114
|
300
|
|
|
|
|
|
result.reply = frame.payload.method; |
|
1115
|
302
|
|
|
|
|
|
return result; |
|
1116
|
|
|
|
|
|
|
} |
|
1117
|
|
|
|
|
|
|
} |
|
1118
|
|
|
|
|
|
|
|
|
1119
|
54
|
|
|
|
|
|
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, |
|
1120
|
|
|
|
|
|
|
amqp_channel_t channel, |
|
1121
|
|
|
|
|
|
|
amqp_method_number_t request_id, |
|
1122
|
|
|
|
|
|
|
amqp_method_number_t *expected_reply_ids, |
|
1123
|
|
|
|
|
|
|
void *decoded_request_method) { |
|
1124
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
1125
|
|
|
|
|
|
|
int res; |
|
1126
|
|
|
|
|
|
|
|
|
1127
|
54
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, state->rpc_timeout); |
|
1128
|
54
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
|
1129
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(res); |
|
1130
|
|
|
|
|
|
|
} |
|
1131
|
|
|
|
|
|
|
|
|
1132
|
54
|
|
|
|
|
|
return simple_rpc_inner(state, channel, request_id, expected_reply_ids, |
|
1133
|
|
|
|
|
|
|
decoded_request_method, deadline); |
|
1134
|
|
|
|
|
|
|
} |
|
1135
|
|
|
|
|
|
|
|
|
1136
|
211
|
|
|
|
|
|
void *amqp_simple_rpc_decoded(amqp_connection_state_t state, |
|
1137
|
|
|
|
|
|
|
amqp_channel_t channel, |
|
1138
|
|
|
|
|
|
|
amqp_method_number_t request_id, |
|
1139
|
|
|
|
|
|
|
amqp_method_number_t reply_id, |
|
1140
|
|
|
|
|
|
|
void *decoded_request_method) { |
|
1141
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
1142
|
|
|
|
|
|
|
int res; |
|
1143
|
|
|
|
|
|
|
amqp_method_number_t replies[2]; |
|
1144
|
|
|
|
|
|
|
|
|
1145
|
211
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, state->rpc_timeout); |
|
1146
|
211
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
|
1147
|
0
|
|
|
|
|
|
state->most_recent_api_result = amqp_rpc_reply_error(res); |
|
1148
|
0
|
|
|
|
|
|
return NULL; |
|
1149
|
|
|
|
|
|
|
} |
|
1150
|
|
|
|
|
|
|
|
|
1151
|
211
|
|
|
|
|
|
replies[0] = reply_id; |
|
1152
|
211
|
|
|
|
|
|
replies[1] = 0; |
|
1153
|
|
|
|
|
|
|
|
|
1154
|
211
|
|
|
|
|
|
state->most_recent_api_result = simple_rpc_inner( |
|
1155
|
|
|
|
|
|
|
state, channel, request_id, replies, decoded_request_method, deadline); |
|
1156
|
|
|
|
|
|
|
|
|
1157
|
211
|
100
|
|
|
|
|
if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { |
|
1158
|
206
|
|
|
|
|
|
return state->most_recent_api_result.reply.decoded; |
|
1159
|
|
|
|
|
|
|
} else { |
|
1160
|
211
|
|
|
|
|
|
return NULL; |
|
1161
|
|
|
|
|
|
|
} |
|
1162
|
|
|
|
|
|
|
} |
|
1163
|
|
|
|
|
|
|
|
|
1164
|
191
|
|
|
|
|
|
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { |
|
1165
|
191
|
|
|
|
|
|
return state->most_recent_api_result; |
|
1166
|
|
|
|
|
|
|
} |
|
1167
|
|
|
|
|
|
|
|
|
1168
|
|
|
|
|
|
|
/* |
|
1169
|
|
|
|
|
|
|
* Merge base and add tables. If the two tables contain an entry with the same |
|
1170
|
|
|
|
|
|
|
* key, the entry from the add table takes precedence. For entries that are both |
|
1171
|
|
|
|
|
|
|
* tables with the same key, the table is recursively merged. |
|
1172
|
|
|
|
|
|
|
*/ |
|
1173
|
37
|
|
|
|
|
|
int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add, |
|
1174
|
|
|
|
|
|
|
amqp_table_t *result, amqp_pool_t *pool) { |
|
1175
|
|
|
|
|
|
|
int i; |
|
1176
|
|
|
|
|
|
|
int res; |
|
1177
|
|
|
|
|
|
|
amqp_pool_t temp_pool; |
|
1178
|
|
|
|
|
|
|
amqp_table_t temp_result; |
|
1179
|
37
|
50
|
|
|
|
|
assert(base != NULL); |
|
1180
|
37
|
50
|
|
|
|
|
assert(result != NULL); |
|
1181
|
37
|
50
|
|
|
|
|
assert(pool != NULL); |
|
1182
|
|
|
|
|
|
|
|
|
1183
|
37
|
50
|
|
|
|
|
if (NULL == add) { |
|
1184
|
0
|
|
|
|
|
|
return amqp_table_clone(base, result, pool); |
|
1185
|
|
|
|
|
|
|
} |
|
1186
|
|
|
|
|
|
|
|
|
1187
|
37
|
|
|
|
|
|
init_amqp_pool(&temp_pool, 4096); |
|
1188
|
37
|
|
|
|
|
|
temp_result.num_entries = 0; |
|
1189
|
37
|
|
|
|
|
|
temp_result.entries = |
|
1190
|
37
|
|
|
|
|
|
amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) * |
|
1191
|
37
|
|
|
|
|
|
(base->num_entries + add->num_entries)); |
|
1192
|
37
|
50
|
|
|
|
|
if (NULL == temp_result.entries) { |
|
1193
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
|
1194
|
0
|
|
|
|
|
|
goto error_out; |
|
1195
|
|
|
|
|
|
|
} |
|
1196
|
259
|
100
|
|
|
|
|
for (i = 0; i < base->num_entries; ++i) { |
|
1197
|
222
|
|
|
|
|
|
temp_result.entries[temp_result.num_entries] = base->entries[i]; |
|
1198
|
222
|
|
|
|
|
|
temp_result.num_entries++; |
|
1199
|
|
|
|
|
|
|
} |
|
1200
|
37
|
50
|
|
|
|
|
for (i = 0; i < add->num_entries; ++i) { |
|
1201
|
0
|
|
|
|
|
|
amqp_table_entry_t *e = |
|
1202
|
0
|
|
|
|
|
|
amqp_table_get_entry_by_key(&temp_result, add->entries[i].key); |
|
1203
|
0
|
0
|
|
|
|
|
if (NULL != e) { |
|
1204
|
0
|
0
|
|
|
|
|
if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind && |
|
|
|
0
|
|
|
|
|
|
|
1205
|
0
|
|
|
|
|
|
AMQP_FIELD_KIND_TABLE == e->value.kind) { |
|
1206
|
0
|
|
|
|
|
|
amqp_table_entry_t *be = |
|
1207
|
0
|
|
|
|
|
|
amqp_table_get_entry_by_key(base, add->entries[i].key); |
|
1208
|
|
|
|
|
|
|
|
|
1209
|
0
|
|
|
|
|
|
res = amqp_merge_capabilities(&be->value.value.table, |
|
1210
|
0
|
|
|
|
|
|
&add->entries[i].value.value.table, |
|
1211
|
|
|
|
|
|
|
&e->value.value.table, &temp_pool); |
|
1212
|
0
|
0
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1213
|
0
|
|
|
|
|
|
goto error_out; |
|
1214
|
|
|
|
|
|
|
} |
|
1215
|
|
|
|
|
|
|
} else { |
|
1216
|
0
|
|
|
|
|
|
e->value = add->entries[i].value; |
|
1217
|
|
|
|
|
|
|
} |
|
1218
|
|
|
|
|
|
|
} else { |
|
1219
|
0
|
|
|
|
|
|
temp_result.entries[temp_result.num_entries] = add->entries[i]; |
|
1220
|
0
|
|
|
|
|
|
temp_result.num_entries++; |
|
1221
|
|
|
|
|
|
|
} |
|
1222
|
|
|
|
|
|
|
} |
|
1223
|
37
|
|
|
|
|
|
res = amqp_table_clone(&temp_result, result, pool); |
|
1224
|
|
|
|
|
|
|
error_out: |
|
1225
|
37
|
|
|
|
|
|
empty_amqp_pool(&temp_pool); |
|
1226
|
37
|
|
|
|
|
|
return res; |
|
1227
|
|
|
|
|
|
|
} |
|
1228
|
|
|
|
|
|
|
|
|
1229
|
37
|
|
|
|
|
|
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, |
|
1230
|
|
|
|
|
|
|
char const *vhost, int channel_max, |
|
1231
|
|
|
|
|
|
|
int frame_max, int heartbeat, |
|
1232
|
|
|
|
|
|
|
const amqp_table_t *client_properties, |
|
1233
|
|
|
|
|
|
|
const struct timeval *timeout, |
|
1234
|
|
|
|
|
|
|
amqp_sasl_method_enum sasl_method, |
|
1235
|
|
|
|
|
|
|
va_list vl) { |
|
1236
|
|
|
|
|
|
|
int res; |
|
1237
|
|
|
|
|
|
|
amqp_method_t method; |
|
1238
|
|
|
|
|
|
|
|
|
1239
|
|
|
|
|
|
|
uint16_t client_channel_max; |
|
1240
|
|
|
|
|
|
|
uint32_t client_frame_max; |
|
1241
|
|
|
|
|
|
|
uint16_t client_heartbeat; |
|
1242
|
|
|
|
|
|
|
|
|
1243
|
|
|
|
|
|
|
uint16_t server_channel_max; |
|
1244
|
|
|
|
|
|
|
uint32_t server_frame_max; |
|
1245
|
|
|
|
|
|
|
uint16_t server_heartbeat; |
|
1246
|
|
|
|
|
|
|
|
|
1247
|
|
|
|
|
|
|
amqp_rpc_reply_t result; |
|
1248
|
|
|
|
|
|
|
amqp_time_t deadline; |
|
1249
|
|
|
|
|
|
|
|
|
1250
|
37
|
50
|
|
|
|
|
if (channel_max < 0 || channel_max > UINT16_MAX) { |
|
|
|
50
|
|
|
|
|
|
|
1251
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
|
1252
|
|
|
|
|
|
|
} |
|
1253
|
37
|
|
|
|
|
|
client_channel_max = (uint16_t)channel_max; |
|
1254
|
|
|
|
|
|
|
|
|
1255
|
37
|
50
|
|
|
|
|
if (frame_max < 0) { |
|
1256
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
|
1257
|
|
|
|
|
|
|
} |
|
1258
|
37
|
|
|
|
|
|
client_frame_max = (uint32_t)frame_max; |
|
1259
|
|
|
|
|
|
|
|
|
1260
|
37
|
50
|
|
|
|
|
if (heartbeat < 0 || heartbeat > UINT16_MAX) { |
|
|
|
50
|
|
|
|
|
|
|
1261
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
|
1262
|
|
|
|
|
|
|
} |
|
1263
|
37
|
|
|
|
|
|
client_heartbeat = (uint16_t)heartbeat; |
|
1264
|
|
|
|
|
|
|
|
|
1265
|
37
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, timeout); |
|
1266
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1267
|
0
|
|
|
|
|
|
goto error_res; |
|
1268
|
|
|
|
|
|
|
} |
|
1269
|
|
|
|
|
|
|
|
|
1270
|
37
|
|
|
|
|
|
res = send_header_inner(state, deadline); |
|
1271
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1272
|
0
|
|
|
|
|
|
goto error_res; |
|
1273
|
|
|
|
|
|
|
} |
|
1274
|
|
|
|
|
|
|
|
|
1275
|
37
|
|
|
|
|
|
res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD, |
|
1276
|
|
|
|
|
|
|
deadline, &method); |
|
1277
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1278
|
0
|
|
|
|
|
|
goto error_res; |
|
1279
|
|
|
|
|
|
|
} |
|
1280
|
|
|
|
|
|
|
|
|
1281
|
|
|
|
|
|
|
{ |
|
1282
|
37
|
|
|
|
|
|
amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded; |
|
1283
|
37
|
50
|
|
|
|
|
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || |
|
|
|
50
|
|
|
|
|
|
|
1284
|
37
|
|
|
|
|
|
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { |
|
1285
|
0
|
|
|
|
|
|
res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION; |
|
1286
|
0
|
|
|
|
|
|
goto error_res; |
|
1287
|
|
|
|
|
|
|
} |
|
1288
|
|
|
|
|
|
|
|
|
1289
|
37
|
|
|
|
|
|
res = amqp_table_clone(&s->server_properties, &state->server_properties, |
|
1290
|
|
|
|
|
|
|
&state->properties_pool); |
|
1291
|
|
|
|
|
|
|
|
|
1292
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1293
|
0
|
|
|
|
|
|
goto error_res; |
|
1294
|
|
|
|
|
|
|
} |
|
1295
|
|
|
|
|
|
|
|
|
1296
|
|
|
|
|
|
|
/* TODO: check that our chosen SASL mechanism is in the list of |
|
1297
|
|
|
|
|
|
|
acceptable mechanisms. Or even let the application choose from |
|
1298
|
|
|
|
|
|
|
the list! */ |
|
1299
|
37
|
50
|
|
|
|
|
if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) { |
|
1300
|
0
|
|
|
|
|
|
res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD; |
|
1301
|
0
|
|
|
|
|
|
goto error_res; |
|
1302
|
|
|
|
|
|
|
} |
|
1303
|
|
|
|
|
|
|
} |
|
1304
|
|
|
|
|
|
|
|
|
1305
|
|
|
|
|
|
|
{ |
|
1306
|
|
|
|
|
|
|
amqp_table_entry_t default_properties[6]; |
|
1307
|
|
|
|
|
|
|
amqp_table_t default_table; |
|
1308
|
|
|
|
|
|
|
amqp_table_entry_t client_capabilities[2]; |
|
1309
|
|
|
|
|
|
|
amqp_table_t client_capabilities_table; |
|
1310
|
|
|
|
|
|
|
amqp_connection_start_ok_t s; |
|
1311
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
|
1312
|
|
|
|
|
|
|
amqp_bytes_t response_bytes; |
|
1313
|
|
|
|
|
|
|
|
|
1314
|
37
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, 0); |
|
1315
|
37
|
50
|
|
|
|
|
if (NULL == channel_pool) { |
|
1316
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
|
1317
|
0
|
|
|
|
|
|
goto error_res; |
|
1318
|
|
|
|
|
|
|
} |
|
1319
|
|
|
|
|
|
|
|
|
1320
|
37
|
|
|
|
|
|
response_bytes = sasl_response(channel_pool, sasl_method, vl); |
|
1321
|
37
|
50
|
|
|
|
|
if (response_bytes.bytes == NULL) { |
|
1322
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
|
1323
|
0
|
|
|
|
|
|
goto error_res; |
|
1324
|
|
|
|
|
|
|
} |
|
1325
|
|
|
|
|
|
|
|
|
1326
|
37
|
|
|
|
|
|
client_capabilities[0] = |
|
1327
|
|
|
|
|
|
|
amqp_table_construct_bool_entry("authentication_failure_close", 1); |
|
1328
|
37
|
|
|
|
|
|
client_capabilities[1] = |
|
1329
|
|
|
|
|
|
|
amqp_table_construct_bool_entry("exchange_exchange_bindings", 1); |
|
1330
|
|
|
|
|
|
|
|
|
1331
|
37
|
|
|
|
|
|
client_capabilities_table.entries = client_capabilities; |
|
1332
|
37
|
|
|
|
|
|
client_capabilities_table.num_entries = |
|
1333
|
|
|
|
|
|
|
sizeof(client_capabilities) / sizeof(amqp_table_entry_t); |
|
1334
|
|
|
|
|
|
|
|
|
1335
|
37
|
|
|
|
|
|
default_properties[0] = |
|
1336
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("product", "rabbitmq-c"); |
|
1337
|
37
|
|
|
|
|
|
default_properties[1] = |
|
1338
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING); |
|
1339
|
37
|
|
|
|
|
|
default_properties[2] = |
|
1340
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM); |
|
1341
|
37
|
|
|
|
|
|
default_properties[3] = |
|
1342
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT); |
|
1343
|
37
|
|
|
|
|
|
default_properties[4] = amqp_table_construct_utf8_entry( |
|
1344
|
|
|
|
|
|
|
"information", "See https://github.com/alanxz/rabbitmq-c"); |
|
1345
|
37
|
|
|
|
|
|
default_properties[5] = amqp_table_construct_table_entry( |
|
1346
|
|
|
|
|
|
|
"capabilities", &client_capabilities_table); |
|
1347
|
|
|
|
|
|
|
|
|
1348
|
37
|
|
|
|
|
|
default_table.entries = default_properties; |
|
1349
|
37
|
|
|
|
|
|
default_table.num_entries = |
|
1350
|
|
|
|
|
|
|
sizeof(default_properties) / sizeof(amqp_table_entry_t); |
|
1351
|
|
|
|
|
|
|
|
|
1352
|
37
|
|
|
|
|
|
res = amqp_merge_capabilities(&default_table, client_properties, |
|
1353
|
|
|
|
|
|
|
&state->client_properties, channel_pool); |
|
1354
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1355
|
0
|
|
|
|
|
|
goto error_res; |
|
1356
|
|
|
|
|
|
|
} |
|
1357
|
|
|
|
|
|
|
|
|
1358
|
37
|
|
|
|
|
|
s.client_properties = state->client_properties; |
|
1359
|
37
|
|
|
|
|
|
s.mechanism = sasl_method_name(sasl_method); |
|
1360
|
37
|
|
|
|
|
|
s.response = response_bytes; |
|
1361
|
37
|
|
|
|
|
|
s.locale = amqp_cstring_bytes("en_US"); |
|
1362
|
|
|
|
|
|
|
|
|
1363
|
37
|
|
|
|
|
|
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s, |
|
1364
|
|
|
|
|
|
|
AMQP_SF_NONE, deadline); |
|
1365
|
37
|
50
|
|
|
|
|
if (res < 0) { |
|
1366
|
0
|
|
|
|
|
|
goto error_res; |
|
1367
|
|
|
|
|
|
|
} |
|
1368
|
|
|
|
|
|
|
} |
|
1369
|
|
|
|
|
|
|
|
|
1370
|
37
|
|
|
|
|
|
amqp_release_buffers(state); |
|
1371
|
|
|
|
|
|
|
|
|
1372
|
|
|
|
|
|
|
{ |
|
1373
|
37
|
|
|
|
|
|
amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD, |
|
1374
|
|
|
|
|
|
|
AMQP_CONNECTION_CLOSE_METHOD, 0}; |
|
1375
|
|
|
|
|
|
|
|
|
1376
|
37
|
|
|
|
|
|
res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method); |
|
1377
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
|
1378
|
0
|
|
|
|
|
|
goto error_res; |
|
1379
|
|
|
|
|
|
|
} |
|
1380
|
|
|
|
|
|
|
} |
|
1381
|
|
|
|
|
|
|
|
|
1382
|
37
|
50
|
|
|
|
|
if (AMQP_CONNECTION_CLOSE_METHOD == method.id) { |
|
1383
|
0
|
|
|
|
|
|
result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; |
|
1384
|
0
|
|
|
|
|
|
result.reply = method; |
|
1385
|
0
|
|
|
|
|
|
result.library_error = 0; |
|
1386
|
0
|
|
|
|
|
|
goto out; |
|
1387
|
|
|
|
|
|
|
} |
|
1388
|
|
|
|
|
|
|
|
|
1389
|
|
|
|
|
|
|
{ |
|
1390
|
37
|
|
|
|
|
|
amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded; |
|
1391
|
37
|
|
|
|
|
|
server_channel_max = s->channel_max; |
|
1392
|
37
|
|
|
|
|
|
server_frame_max = s->frame_max; |
|
1393
|
37
|
|
|
|
|
|
server_heartbeat = s->heartbeat; |
|
1394
|
|
|
|
|
|
|
} |
|
1395
|
|
|
|
|
|
|
|
|
1396
|
37
|
50
|
|
|
|
|
if (server_channel_max != 0 && |
|
|
|
50
|
|
|
|
|
|
|
1397
|
37
|
50
|
|
|
|
|
(server_channel_max < client_channel_max || client_channel_max == 0)) { |
|
1398
|
37
|
|
|
|
|
|
client_channel_max = server_channel_max; |
|
1399
|
0
|
0
|
|
|
|
|
} else if (server_channel_max == 0 && client_channel_max == 0) { |
|
|
|
0
|
|
|
|
|
|
|
1400
|
0
|
|
|
|
|
|
client_channel_max = UINT16_MAX; |
|
1401
|
|
|
|
|
|
|
} |
|
1402
|
|
|
|
|
|
|
|
|
1403
|
37
|
50
|
|
|
|
|
if (server_frame_max != 0 && server_frame_max < client_frame_max) { |
|
|
|
50
|
|
|
|
|
|
|
1404
|
37
|
|
|
|
|
|
client_frame_max = server_frame_max; |
|
1405
|
|
|
|
|
|
|
} |
|
1406
|
|
|
|
|
|
|
|
|
1407
|
37
|
50
|
|
|
|
|
if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) { |
|
|
|
50
|
|
|
|
|
|
|
1408
|
0
|
|
|
|
|
|
client_heartbeat = server_heartbeat; |
|
1409
|
|
|
|
|
|
|
} |
|
1410
|
|
|
|
|
|
|
|
|
1411
|
37
|
|
|
|
|
|
res = amqp_tune_connection(state, client_channel_max, client_frame_max, |
|
1412
|
|
|
|
|
|
|
client_heartbeat); |
|
1413
|
37
|
50
|
|
|
|
|
if (res < 0) { |
|
1414
|
0
|
|
|
|
|
|
goto error_res; |
|
1415
|
|
|
|
|
|
|
} |
|
1416
|
|
|
|
|
|
|
|
|
1417
|
|
|
|
|
|
|
{ |
|
1418
|
|
|
|
|
|
|
amqp_connection_tune_ok_t s; |
|
1419
|
37
|
|
|
|
|
|
s.frame_max = client_frame_max; |
|
1420
|
37
|
|
|
|
|
|
s.channel_max = client_channel_max; |
|
1421
|
37
|
|
|
|
|
|
s.heartbeat = client_heartbeat; |
|
1422
|
|
|
|
|
|
|
|
|
1423
|
37
|
|
|
|
|
|
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s, |
|
1424
|
|
|
|
|
|
|
AMQP_SF_NONE, deadline); |
|
1425
|
37
|
50
|
|
|
|
|
if (res < 0) { |
|
1426
|
0
|
|
|
|
|
|
goto error_res; |
|
1427
|
|
|
|
|
|
|
} |
|
1428
|
|
|
|
|
|
|
} |
|
1429
|
|
|
|
|
|
|
|
|
1430
|
37
|
|
|
|
|
|
amqp_release_buffers(state); |
|
1431
|
|
|
|
|
|
|
|
|
1432
|
|
|
|
|
|
|
{ |
|
1433
|
37
|
|
|
|
|
|
amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0}; |
|
1434
|
|
|
|
|
|
|
amqp_connection_open_t s; |
|
1435
|
37
|
|
|
|
|
|
s.virtual_host = amqp_cstring_bytes(vhost); |
|
1436
|
37
|
|
|
|
|
|
s.capabilities = amqp_empty_bytes; |
|
1437
|
37
|
|
|
|
|
|
s.insist = 1; |
|
1438
|
|
|
|
|
|
|
|
|
1439
|
37
|
|
|
|
|
|
result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies, |
|
1440
|
|
|
|
|
|
|
&s, deadline); |
|
1441
|
37
|
50
|
|
|
|
|
if (result.reply_type != AMQP_RESPONSE_NORMAL) { |
|
1442
|
0
|
|
|
|
|
|
goto out; |
|
1443
|
|
|
|
|
|
|
} |
|
1444
|
|
|
|
|
|
|
} |
|
1445
|
|
|
|
|
|
|
|
|
1446
|
37
|
|
|
|
|
|
result.reply_type = AMQP_RESPONSE_NORMAL; |
|
1447
|
37
|
|
|
|
|
|
result.reply.id = 0; |
|
1448
|
37
|
|
|
|
|
|
result.reply.decoded = NULL; |
|
1449
|
37
|
|
|
|
|
|
result.library_error = 0; |
|
1450
|
37
|
|
|
|
|
|
amqp_maybe_release_buffers(state); |
|
1451
|
|
|
|
|
|
|
|
|
1452
|
|
|
|
|
|
|
out: |
|
1453
|
37
|
|
|
|
|
|
return result; |
|
1454
|
|
|
|
|
|
|
|
|
1455
|
|
|
|
|
|
|
error_res: |
|
1456
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
|
1457
|
0
|
|
|
|
|
|
result = amqp_rpc_reply_error(res); |
|
1458
|
|
|
|
|
|
|
|
|
1459
|
37
|
|
|
|
|
|
goto out; |
|
1460
|
|
|
|
|
|
|
} |
|
1461
|
|
|
|
|
|
|
|
|
1462
|
37
|
|
|
|
|
|
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, |
|
1463
|
|
|
|
|
|
|
int channel_max, int frame_max, int heartbeat, |
|
1464
|
|
|
|
|
|
|
int sasl_method, ...) { |
|
1465
|
|
|
|
|
|
|
va_list vl; |
|
1466
|
|
|
|
|
|
|
amqp_rpc_reply_t ret; |
|
1467
|
|
|
|
|
|
|
|
|
1468
|
37
|
|
|
|
|
|
va_start(vl, sasl_method); |
|
1469
|
|
|
|
|
|
|
|
|
1470
|
37
|
|
|
|
|
|
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, |
|
1471
|
37
|
|
|
|
|
|
&amqp_empty_table, state->handshake_timeout, |
|
1472
|
|
|
|
|
|
|
sasl_method, vl); |
|
1473
|
|
|
|
|
|
|
|
|
1474
|
37
|
|
|
|
|
|
va_end(vl); |
|
1475
|
|
|
|
|
|
|
|
|
1476
|
37
|
|
|
|
|
|
return ret; |
|
1477
|
|
|
|
|
|
|
} |
|
1478
|
|
|
|
|
|
|
|
|
1479
|
0
|
|
|
|
|
|
amqp_rpc_reply_t amqp_login_with_properties( |
|
1480
|
|
|
|
|
|
|
amqp_connection_state_t state, char const *vhost, int channel_max, |
|
1481
|
|
|
|
|
|
|
int frame_max, int heartbeat, const amqp_table_t *client_properties, |
|
1482
|
|
|
|
|
|
|
int sasl_method, ...) { |
|
1483
|
|
|
|
|
|
|
va_list vl; |
|
1484
|
|
|
|
|
|
|
amqp_rpc_reply_t ret; |
|
1485
|
|
|
|
|
|
|
|
|
1486
|
0
|
|
|
|
|
|
va_start(vl, sasl_method); |
|
1487
|
|
|
|
|
|
|
|
|
1488
|
0
|
|
|
|
|
|
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, |
|
1489
|
0
|
|
|
|
|
|
client_properties, state->handshake_timeout, |
|
1490
|
|
|
|
|
|
|
sasl_method, vl); |
|
1491
|
|
|
|
|
|
|
|
|
1492
|
0
|
|
|
|
|
|
va_end(vl); |
|
1493
|
|
|
|
|
|
|
|
|
1494
|
0
|
|
|
|
|
|
return ret; |
|
1495
|
|
|
|
|
|
|
} |