Merge "lock: Improve performance of DEBUG_THREADS."
[asterisk/asterisk.git] / main / stasis_endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis.h"
34 #include "asterisk/stasis_endpoints.h"
35
36 /*** DOCUMENTATION
37         <managerEvent language="en_US" name="PeerStatus">
38                 <managerEventInstance class="EVENT_FLAG_SYSTEM">
39                         <synopsis>Raised when the state of a peer changes.</synopsis>
40                         <syntax>
41                                 <parameter name="ChannelType">
42                                         <para>The channel technology of the peer.</para>
43                                 </parameter>
44                                 <parameter name="Peer">
45                                         <para>The name of the peer (including channel technology).</para>
46                                 </parameter>
47                                 <parameter name="PeerStatus">
48                                         <para>New status of the peer.</para>
49                                         <enumlist>
50                                                 <enum name="Unknown"/>
51                                                 <enum name="Registered"/>
52                                                 <enum name="Unregistered"/>
53                                                 <enum name="Rejected"/>
54                                                 <enum name="Lagged"/>
55                                         </enumlist>
56                                 </parameter>
57                                 <parameter name="Cause">
58                                         <para>The reason the status has changed.</para>
59                                 </parameter>
60                                 <parameter name="Address">
61                                         <para>New address of the peer.</para>
62                                 </parameter>
63                                 <parameter name="Port">
64                                         <para>New port for the peer.</para>
65                                 </parameter>
66                                 <parameter name="Time">
67                                         <para>Time it takes to reach the peer and receive a response.</para>
68                                 </parameter>
69                         </syntax>
70                 </managerEventInstance>
71         </managerEvent>
72         <managerEvent language="en_US" name="ContactStatus">
73                 <managerEventInstance class="EVENT_FLAG_SYSTEM">
74                         <synopsis>Raised when the state of a contact changes.</synopsis>
75                         <syntax>
76                                 <parameter name="URI">
77                                         <para>This contact's URI.</para>
78                                 </parameter>
79                                 <parameter name="ContactStatus">
80                                         <para>New status of the contact.</para>
81                                         <enumlist>
82                                                 <enum name="Unknown"/>
83                                                 <enum name="Unreachable"/>
84                                                 <enum name="Reachable"/>
85                                                 <enum name="Unqualified"/>
86                                                 <enum name="Removed"/>
87                                                 <enum name="Updated"/>
88                                         </enumlist>
89                                 </parameter>
90                                 <parameter name="AOR">
91                                         <para>The name of the associated aor.</para>
92                                 </parameter>
93                                 <parameter name="EndpointName">
94                                         <para>The name of the associated endpoint.</para>
95                                 </parameter>
96                                 <parameter name="RoundtripUsec">
97                                         <para>The RTT measured during the last qualify.</para>
98                                 </parameter>
99                         </syntax>
100                 </managerEventInstance>
101         </managerEvent>
102 ***/
103
104 static struct stasis_cp_all *endpoint_cache_all;
105
106 struct stasis_cp_all *ast_endpoint_cache_all(void)
107 {
108         return endpoint_cache_all;
109 }
110
111 struct stasis_cache *ast_endpoint_cache(void)
112 {
113         return stasis_cp_all_cache(endpoint_cache_all);
114 }
115
116 struct stasis_topic *ast_endpoint_topic_all(void)
117 {
118         return stasis_cp_all_topic(endpoint_cache_all);
119 }
120
121 struct stasis_topic *ast_endpoint_topic_all_cached(void)
122 {
123         return stasis_cp_all_topic_cached(endpoint_cache_all);
124 }
125
126 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
127
128 static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
129 {
130         struct ast_endpoint_blob *obj = stasis_message_data(msg);
131         RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
132         const char *value;
133
134         /* peer_status is the only *required* thing */
135         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
136                 return NULL;
137         }
138         ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
139
140         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
141                 ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
142         }
143         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
144                 ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
145         }
146         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
147                 ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
148         }
149         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
150                 ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
151         }
152
153         return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
154                 "ChannelType: %s\r\n"
155                 "Peer: %s/%s\r\n"
156                 "%s",
157                 obj->snapshot->tech,
158                 obj->snapshot->tech,
159                 obj->snapshot->resource,
160                 ast_str_buffer(peerstatus_event_string));
161 }
162
163 static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
164 {
165         struct ast_endpoint_blob *obj = stasis_message_data(msg);
166         struct ast_json *json_endpoint;
167         struct ast_json *json_peer;
168         struct ast_json *json_final;
169         const struct timeval *tv = stasis_message_timestamp(msg);
170
171         json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
172         if (!json_endpoint) {
173                 return NULL;
174         }
175
176         json_peer = ast_json_object_create();
177         if (!json_peer) {
178                 ast_json_unref(json_endpoint);
179                 return NULL;
180         }
181
182         /* Copy all fields from the blob */
183         ast_json_object_update(json_peer, obj->blob);
184
185         json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
186                 "type", "PeerStatusChange",
187                 "timestamp", ast_json_timeval(*tv, NULL),
188                 "endpoint", json_endpoint,
189                 "peer", json_peer);
190         if (!json_final) {
191                 ast_json_unref(json_endpoint);
192                 ast_json_unref(json_peer);
193         }
194
195         return json_final;
196 }
197
198 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
199         .to_ami = peerstatus_to_ami,
200         .to_json = peerstatus_to_json,
201 );
202
203 static struct ast_manager_event_blob *contactstatus_to_ami(struct stasis_message *msg)
204 {
205         struct ast_endpoint_blob *obj = stasis_message_data(msg);
206         RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
207         const char *value;
208
209         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
210                 return NULL;
211         }
212         ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
213
214         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
215                 return NULL;
216         }
217         ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
218
219         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
220                 return NULL;
221         }
222         ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
223
224         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
225                 return NULL;
226         }
227         ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
228
229         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
230                 ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
231         }
232
233         return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
234                 "%s", ast_str_buffer(contactstatus_event_string));
235 }
236
237 static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
238 {
239         struct ast_endpoint_blob *obj = stasis_message_data(msg);
240         struct ast_json *json_endpoint;
241         struct ast_json *json_final;
242         const char *rtt;
243         const struct timeval *tv = stasis_message_timestamp(msg);
244
245         json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
246         if (!json_endpoint) {
247                 return NULL;
248         }
249
250         /* The roundtrip time is optional. */
251         rtt = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec"));
252         if (!ast_strlen_zero(rtt)) {
253                 json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
254                         "type", "ContactStatusChange",
255                         "timestamp", ast_json_timeval(*tv, NULL),
256                         "endpoint", json_endpoint,
257                         "contact_info",
258                         "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
259                         "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
260                                 "contact_status")),
261                         "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
262                         "roundtrip_usec", rtt);
263         } else {
264                 json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s } } ",
265                         "type", "ContactStatusChange",
266                         "timestamp", ast_json_timeval(*tv, NULL),
267                         "endpoint", json_endpoint,
268                         "contact_info",
269                         "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
270                         "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
271                                 "contact_status")),
272                         "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")));
273         }
274         if (!json_final) {
275                 ast_json_unref(json_endpoint);
276         }
277
278         return json_final;
279 }
280
281 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_contact_state_type,
282         .to_ami = contactstatus_to_ami,
283         .to_json = contactstatus_to_json
284 );
285
286 static void endpoint_blob_dtor(void *obj)
287 {
288         struct ast_endpoint_blob *event = obj;
289         ao2_cleanup(event->snapshot);
290         ast_json_unref(event->blob);
291 }
292
293 struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
294         struct stasis_message_type *type, struct ast_json *blob)
295 {
296         struct ast_endpoint_blob *obj;
297         struct stasis_message *msg;
298
299         if (!type) {
300                 return NULL;
301         }
302         if (!blob) {
303                 blob = ast_json_null();
304         }
305
306         if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
307                 return NULL;
308         }
309
310         if (endpoint) {
311                 if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
312                         ao2_ref(obj, -1);
313
314                         return NULL;
315                 }
316         }
317
318         obj->blob = ast_json_ref(blob);
319         msg = stasis_message_create(type, obj);
320         ao2_ref(obj, -1);
321
322         return msg;
323 }
324
325 void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
326         struct ast_json *blob)
327 {
328         struct stasis_message *message;
329
330         if (!blob) {
331                 return;
332         }
333
334         message = ast_endpoint_blob_create(endpoint, type, blob);
335         if (message) {
336                 stasis_publish(ast_endpoint_topic(endpoint), message);
337                 ao2_ref(message, -1);
338         }
339 }
340
341 struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
342         const char *name)
343 {
344         char *id = NULL;
345         struct stasis_message *msg;
346         struct ast_endpoint_snapshot *snapshot;
347
348         if (ast_strlen_zero(name)) {
349                 ast_asprintf(&id, "%s", tech);
350         } else {
351                 ast_asprintf(&id, "%s/%s", tech, name);
352         }
353         if (!id) {
354                 return NULL;
355         }
356         ast_tech_to_upper(id);
357
358         msg = stasis_cache_get(ast_endpoint_cache(), ast_endpoint_snapshot_type(), id);
359         ast_free(id);
360         if (!msg) {
361                 return NULL;
362         }
363
364         snapshot = stasis_message_data(msg);
365         ast_assert(snapshot != NULL);
366
367         ao2_ref(snapshot, +1);
368         ao2_ref(msg, -1);
369
370         return snapshot;
371 }
372
373 /*!
374  * \brief Callback extract a unique identity from a snapshot message.
375  *
376  * This identity is unique to the underlying object of the snapshot, such as the
377  * UniqueId field of a channel.
378  *
379  * \param message Message to extract id from.
380  * \return String representing the snapshot's id.
381  * \return \c NULL if the message_type of the message isn't a handled snapshot.
382  * \since 12
383  */
384 static const char *endpoint_snapshot_get_id(struct stasis_message *message)
385 {
386         struct ast_endpoint_snapshot *snapshot;
387
388         if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
389                 return NULL;
390         }
391
392         snapshot = stasis_message_data(message);
393
394         return snapshot->id;
395 }
396
397
398 struct ast_json *ast_endpoint_snapshot_to_json(
399         const struct ast_endpoint_snapshot *snapshot,
400         const struct stasis_message_sanitizer *sanitize)
401 {
402         struct ast_json *json;
403         struct ast_json *channel_array;
404         int i;
405
406         json = ast_json_pack("{s: s, s: s, s: s, s: []}",
407                 "technology", snapshot->tech,
408                 "resource", snapshot->resource,
409                 "state", ast_endpoint_state_to_string(snapshot->state),
410                 "channel_ids");
411
412         if (json == NULL) {
413                 return NULL;
414         }
415
416         if (snapshot->max_channels != -1) {
417                 int res = ast_json_object_set(json, "max_channels",
418                         ast_json_integer_create(snapshot->max_channels));
419                 if (res != 0) {
420                         ast_json_unref(json);
421
422                         return NULL;
423                 }
424         }
425
426         channel_array = ast_json_object_get(json, "channel_ids");
427         ast_assert(channel_array != NULL);
428         for (i = 0; i < snapshot->num_channels; ++i) {
429                 int res;
430
431                 if (sanitize && sanitize->channel_id
432                         && sanitize->channel_id(snapshot->channel_ids[i])) {
433                         continue;
434                 }
435
436                 res = ast_json_array_append(channel_array,
437                         ast_json_string_create(snapshot->channel_ids[i]));
438                 if (res != 0) {
439                         ast_json_unref(json);
440
441                         return NULL;
442                 }
443         }
444
445         return json;
446 }
447
448 static void endpoints_stasis_cleanup(void)
449 {
450         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
451         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
452         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_contact_state_type);
453
454         ao2_cleanup(endpoint_cache_all);
455         endpoint_cache_all = NULL;
456 }
457
458 int ast_endpoint_stasis_init(void)
459 {
460         int res = 0;
461         ast_register_cleanup(endpoints_stasis_cleanup);
462
463         endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
464                 endpoint_snapshot_get_id);
465         if (!endpoint_cache_all) {
466                 return -1;
467         }
468
469         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
470         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
471         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_contact_state_type);
472
473         return res;
474 }