14d3d0ca1e55ba7f1451ce8b2f4a8207c9a71762
[asterisk/asterisk.git] / main / stasis_endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE()
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/stasis_endpoints.h"
37
38 /*** DOCUMENTATION
39         <managerEvent language="en_US" name="PeerStatus">
40                 <managerEventInstance class="EVENT_FLAG_SYSTEM">
41                         <synopsis>Raised when the state of a peer changes.</synopsis>
42                         <syntax>
43                                 <parameter name="ChannelType">
44                                         <para>The channel technology of the peer.</para>
45                                 </parameter>
46                                 <parameter name="Peer">
47                                         <para>The name of the peer (including channel technology).</para>
48                                 </parameter>
49                                 <parameter name="PeerStatus">
50                                         <para>New status of the peer.</para>
51                                         <enumlist>
52                                                 <enum name="Unknown"/>
53                                                 <enum name="Registered"/>
54                                                 <enum name="Unregistered"/>
55                                                 <enum name="Rejected"/>
56                                                 <enum name="Lagged"/>
57                                         </enumlist>
58                                 </parameter>
59                                 <parameter name="Cause">
60                                         <para>The reason the status has changed.</para>
61                                 </parameter>
62                                 <parameter name="Address">
63                                         <para>New address of the peer.</para>
64                                 </parameter>
65                                 <parameter name="Port">
66                                         <para>New port for the peer.</para>
67                                 </parameter>
68                                 <parameter name="Time">
69                                         <para>Time it takes to reach the peer and receive a response.</para>
70                                 </parameter>
71                         </syntax>
72                 </managerEventInstance>
73         </managerEvent>
74         <managerEvent language="en_US" name="ContactStatus">
75                 <managerEventInstance class="EVENT_FLAG_SYSTEM">
76                         <synopsis>Raised when the state of a contact changes.</synopsis>
77                         <syntax>
78                                 <parameter name="URI">
79                                         <para>This contact's URI.</para>
80                                 </parameter>
81                                 <parameter name="ContactStatus">
82                                         <para>New status of the contact.</para>
83                                         <enumlist>
84                                                 <enum name="Unknown"/>
85                                                 <enum name="Unreachable"/>
86                                                 <enum name="Reachable"/>
87                                                 <enum name="Created"/>
88                                                 <enum name="Removed"/>
89                                                 <enum name="Updated"/>
90                                         </enumlist>
91                                 </parameter>
92                                 <parameter name="AOR">
93                                         <para>The name of the associated aor.</para>
94                                 </parameter>
95                                 <parameter name="EndpointName">
96                                         <para>The name of the associated endpoint.</para>
97                                 </parameter>
98                                 <parameter name="RoundtripUsec">
99                                         <para>The RTT measured during the last qualify.</para>
100                                 </parameter>
101                                 <parameter name="UserAgent">
102                                         <para>Content of the User-Agent header in REGISTER request</para>
103                                 </parameter>
104                                 <parameter name="RegExpire">
105                                         <para>Absolute time that this contact is no longer valid after</para>
106                                 </parameter>
107                         </syntax>
108                 </managerEventInstance>
109         </managerEvent>
110 ***/
111
112 static struct stasis_cp_all *endpoint_cache_all;
113
114 struct stasis_cp_all *ast_endpoint_cache_all(void)
115 {
116         return endpoint_cache_all;
117 }
118
119 struct stasis_cache *ast_endpoint_cache(void)
120 {
121         return stasis_cp_all_cache(endpoint_cache_all);
122 }
123
124 struct stasis_topic *ast_endpoint_topic_all(void)
125 {
126         return stasis_cp_all_topic(endpoint_cache_all);
127 }
128
129 struct stasis_topic *ast_endpoint_topic_all_cached(void)
130 {
131         return stasis_cp_all_topic_cached(endpoint_cache_all);
132 }
133
134 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
135
136 static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
137 {
138         struct ast_endpoint_blob *obj = stasis_message_data(msg);
139         RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
140         const char *value;
141
142         /* peer_status is the only *required* thing */
143         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
144                 return NULL;
145         }
146         ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
147
148         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
149                 ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
150         }
151         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
152                 ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
153         }
154         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
155                 ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
156         }
157         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
158                 ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
159         }
160
161         return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
162                 "ChannelType: %s\r\n"
163                 "Peer: %s/%s\r\n"
164                 "%s",
165                 obj->snapshot->tech,
166                 obj->snapshot->tech,
167                 obj->snapshot->resource,
168                 ast_str_buffer(peerstatus_event_string));
169 }
170
171 static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
172 {
173         struct ast_endpoint_blob *obj = stasis_message_data(msg);
174         struct ast_json *json_endpoint;
175         struct ast_json *json_peer;
176         struct ast_json *json_final;
177         const struct timeval *tv = stasis_message_timestamp(msg);
178
179         json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
180         if (!json_endpoint) {
181                 return NULL;
182         }
183
184         json_peer = ast_json_object_create();
185         if (!json_peer) {
186                 ast_json_unref(json_endpoint);
187                 return NULL;
188         }
189
190         /* Copy all fields from the blob */
191         ast_json_object_update(json_peer, obj->blob);
192
193         json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
194                 "type", "PeerStatusChange",
195                 "timestamp", ast_json_timeval(*tv, NULL),
196                 "endpoint", json_endpoint,
197                 "peer", json_peer);
198         if (!json_final) {
199                 ast_json_unref(json_endpoint);
200                 ast_json_unref(json_peer);
201         }
202
203         return json_final;
204 }
205
206 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
207         .to_ami = peerstatus_to_ami,
208         .to_json = peerstatus_to_json,
209 );
210
211 static struct ast_manager_event_blob *contactstatus_to_ami(struct stasis_message *msg)
212 {
213         struct ast_endpoint_blob *obj = stasis_message_data(msg);
214         RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
215         const char *value;
216
217         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
218                 return NULL;
219         }
220         ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
221
222         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
223                 return NULL;
224         }
225         ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
226
227         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
228                 return NULL;
229         }
230         ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
231
232         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
233                 return NULL;
234         }
235         ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
236
237         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
238                 ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
239         }
240
241         return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
242                 "%s", ast_str_buffer(contactstatus_event_string));
243 }
244
245 static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
246 {
247         struct ast_endpoint_blob *obj = stasis_message_data(msg);
248         struct ast_json *json_endpoint;
249         struct ast_json *json_final;
250         const struct timeval *tv = stasis_message_timestamp(msg);
251
252         json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
253         if (!json_endpoint) {
254                 return NULL;
255         }
256
257         json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
258                 "type", "ContactStatusChange",
259                 "timestamp", ast_json_timeval(*tv, NULL),
260                 "endpoint", json_endpoint,
261                 "contact_info",
262                 "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
263                 "contact_status", ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")),
264                 "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
265                 "roundtrip_usec", ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")));
266         if (!json_final) {
267                 ast_json_unref(json_endpoint);
268         }
269
270         return json_final;
271 }
272
273 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_contact_state_type,
274         .to_ami = contactstatus_to_ami,
275         .to_json = contactstatus_to_json
276 );
277
278 static void endpoint_blob_dtor(void *obj)
279 {
280         struct ast_endpoint_blob *event = obj;
281         ao2_cleanup(event->snapshot);
282         ast_json_unref(event->blob);
283 }
284
285 struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
286         struct stasis_message_type *type, struct ast_json *blob)
287 {
288         RAII_VAR(struct ast_endpoint_blob *, obj, NULL, ao2_cleanup);
289         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
290
291         if (!type) {
292                 return NULL;
293         }
294         if (!blob) {
295                 blob = ast_json_null();
296         }
297
298         if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
299                 return NULL;
300         }
301
302         if (endpoint) {
303                 if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
304                         return NULL;
305                 }
306         }
307
308         obj->blob = ast_json_ref(blob);
309
310         if (!(msg = stasis_message_create(type, obj))) {
311                 return NULL;
312         }
313
314         ao2_ref(msg, +1);
315         return msg;
316 }
317
318 void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
319         struct ast_json *blob)
320 {
321         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
322         if (blob) {
323                 message = ast_endpoint_blob_create(endpoint, type, blob);
324         }
325         if (message) {
326                 stasis_publish(ast_endpoint_topic(endpoint), message);
327         }
328 }
329
330 struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
331         const char *name)
332 {
333         RAII_VAR(char *, id, NULL, ast_free);
334         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
335         struct ast_endpoint_snapshot *snapshot;
336
337         if (ast_strlen_zero(name)) {
338                 ast_asprintf(&id, "%s", tech);
339         } else {
340                 ast_asprintf(&id, "%s/%s", tech, name);
341         }
342         if (!id) {
343                 return NULL;
344         }
345         ast_tech_to_upper(id);
346
347         msg = stasis_cache_get(ast_endpoint_cache(),
348                 ast_endpoint_snapshot_type(), id);
349         if (!msg) {
350                 return NULL;
351         }
352
353         snapshot = stasis_message_data(msg);
354         ast_assert(snapshot != NULL);
355
356         ao2_ref(snapshot, +1);
357         return snapshot;
358 }
359
360 /*!
361  * \brief Callback extract a unique identity from a snapshot message.
362  *
363  * This identity is unique to the underlying object of the snapshot, such as the
364  * UniqueId field of a channel.
365  *
366  * \param message Message to extract id from.
367  * \return String representing the snapshot's id.
368  * \return \c NULL if the message_type of the message isn't a handled snapshot.
369  * \since 12
370  */
371 static const char *endpoint_snapshot_get_id(struct stasis_message *message)
372 {
373         struct ast_endpoint_snapshot *snapshot;
374
375         if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
376                 return NULL;
377         }
378
379         snapshot = stasis_message_data(message);
380
381         return snapshot->id;
382 }
383
384
385 struct ast_json *ast_endpoint_snapshot_to_json(
386         const struct ast_endpoint_snapshot *snapshot,
387         const struct stasis_message_sanitizer *sanitize)
388 {
389         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
390         struct ast_json *channel_array;
391         int i;
392
393         json = ast_json_pack("{s: s, s: s, s: s, s: []}",
394                 "technology", snapshot->tech,
395                 "resource", snapshot->resource,
396                 "state", ast_endpoint_state_to_string(snapshot->state),
397                 "channel_ids");
398
399         if (json == NULL) {
400                 return NULL;
401         }
402
403         if (snapshot->max_channels != -1) {
404                 int res = ast_json_object_set(json, "max_channels",
405                         ast_json_integer_create(snapshot->max_channels));
406                 if (res != 0) {
407                         return NULL;
408                 }
409         }
410
411         channel_array = ast_json_object_get(json, "channel_ids");
412         ast_assert(channel_array != NULL);
413         for (i = 0; i < snapshot->num_channels; ++i) {
414                 int res;
415
416                 if (sanitize && sanitize->channel_id
417                         && sanitize->channel_id(snapshot->channel_ids[i])) {
418                         continue;
419                 }
420
421                 res = ast_json_array_append(channel_array,
422                         ast_json_string_create(snapshot->channel_ids[i]));
423                 if (res != 0) {
424                         return NULL;
425                 }
426         }
427
428         return ast_json_ref(json);
429 }
430
431 static void endpoints_stasis_cleanup(void)
432 {
433         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
434         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
435         STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_contact_state_type);
436
437         ao2_cleanup(endpoint_cache_all);
438         endpoint_cache_all = NULL;
439 }
440
441 int ast_endpoint_stasis_init(void)
442 {
443         int res = 0;
444         ast_register_cleanup(endpoints_stasis_cleanup);
445
446         endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
447                 endpoint_snapshot_get_id);
448         if (!endpoint_cache_all) {
449                 return -1;
450         }
451
452         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
453         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
454         res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_contact_state_type);
455
456         return res;
457 }