Migrate PeerStatus events to stasis, add stasis endpoints, and add chan_pjsip device...
[asterisk/asterisk.git] / main / stasis_endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/stasis_endpoints.h"
37
38 /*** DOCUMENTATION
39         <managerEvent language="en_US" name="PeerStatus">
40                 <managerEventInstance class="EVENT_FLAG_SYSTEM">
41                         <synopsis>Raised when the state of a peer changes.</synopsis>
42                         <syntax>
43                                 <parameter name="ChannelType">
44                                         <para>The channel technology of the peer.</para>
45                                 </parameter>
46                                 <parameter name="Peer">
47                                         <para>The name of the peer (including channel technology).</para>
48                                 </parameter>
49                                 <parameter name="PeerStatus">
50                                         <para>New status of the peer.</para>
51                                         <enumlist>
52                                                 <enum name="Unknown"/>
53                                                 <enum name="Registered"/>
54                                                 <enum name="Unregistered"/>
55                                                 <enum name="Rejected"/>
56                                                 <enum name="Lagged"/>
57                                         </enumlist>
58                                 </parameter>
59                                 <parameter name="Cause">
60                                         <para>The reason the status has changed.</para>
61                                 </parameter>
62                                 <parameter name="Address">
63                                         <para>New address of the peer.</para>
64                                 </parameter>
65                                 <parameter name="Port">
66                                         <para>New port for the peer.</para>
67                                 </parameter>
68                                 <parameter name="Time">
69                                         <para>Time it takes to reach the peer and receive a response.</para>
70                                 </parameter>
71                         </syntax>
72                 </managerEventInstance>
73         </managerEvent>
74 ***/
75
76 static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg);
77
78 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
79 STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
80         .to_ami = peerstatus_to_ami,
81 );
82
83 static struct stasis_topic *endpoint_topic_all;
84
85 static struct stasis_caching_topic *endpoint_topic_all_cached;
86
87 static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
88 {
89         struct ast_endpoint_blob *obj = stasis_message_data(msg);
90         RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
91         const char *value;
92
93         /* peer_status is the only *required* thing */
94         if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
95                 return NULL;
96         }
97         ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
98
99         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
100                 ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
101         }
102         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
103                 ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
104         }
105         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
106                 ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
107         }
108         if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
109                 ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
110         }
111
112         return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
113                 "ChannelType: %s\r\n"
114                 "Peer: %s/%s\r\n"
115                 "%s",
116                 obj->snapshot->tech,
117                 obj->snapshot->tech,
118                 obj->snapshot->resource,
119                 ast_str_buffer(peerstatus_event_string));
120 }
121
122 static void endpoint_blob_dtor(void *obj)
123 {
124         struct ast_endpoint_blob *event = obj;
125         ao2_cleanup(event->snapshot);
126         ast_json_unref(event->blob);
127 }
128
129 struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
130         struct stasis_message_type *type, struct ast_json *blob)
131 {
132         RAII_VAR(struct ast_endpoint_blob *, obj, NULL, ao2_cleanup);
133         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
134
135         if (!blob) {
136                 blob = ast_json_null();
137         }
138
139         if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
140                 return NULL;
141         }
142
143         if (endpoint) {
144                 if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
145                         return NULL;
146                 }
147         }
148
149         obj->blob = ast_json_ref(blob);
150
151         if (!(msg = stasis_message_create(type, obj))) {
152                 return NULL;
153         }
154
155         ao2_ref(msg, +1);
156         return msg;
157 }
158
159 void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
160         struct ast_json *blob)
161 {
162         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
163         if (blob) {
164                 message = ast_endpoint_blob_create(endpoint, type, blob);
165         }
166         if (message) {
167                 stasis_publish(ast_endpoint_topic(endpoint), message);
168         }
169 }
170
171 struct stasis_topic *ast_endpoint_topic_all(void)
172 {
173         return endpoint_topic_all;
174 }
175
176 struct stasis_caching_topic *ast_endpoint_topic_all_cached(void)
177 {
178         return endpoint_topic_all_cached;
179 }
180
181 struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
182         const char *name, unsigned int guaranteed)
183 {
184         RAII_VAR(char *, id, NULL, ast_free);
185         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
186         struct ast_endpoint_snapshot *snapshot;
187
188         ast_asprintf(&id, "%s/%s", tech, name);
189         if (!id) {
190                 return NULL;
191         }
192
193         msg = stasis_cache_get_extended(ast_endpoint_topic_all_cached(),
194                 ast_endpoint_snapshot_type(), id, guaranteed);
195         if (!msg) {
196                 return NULL;
197         }
198
199         snapshot = stasis_message_data(msg);
200         ast_assert(snapshot != NULL);
201
202         ao2_ref(snapshot, +1);
203         return snapshot;
204 }
205
206 /*!
207  * \brief Callback extract a unique identity from a snapshot message.
208  *
209  * This identity is unique to the underlying object of the snapshot, such as the
210  * UniqueId field of a channel.
211  *
212  * \param message Message to extract id from.
213  * \return String representing the snapshot's id.
214  * \return \c NULL if the message_type of the message isn't a handled snapshot.
215  * \since 12
216  */
217 static const char *endpoint_snapshot_get_id(struct stasis_message *message)
218 {
219         struct ast_endpoint_snapshot *snapshot;
220
221         if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
222                 return NULL;
223         }
224
225         snapshot = stasis_message_data(message);
226
227         return snapshot->id;
228 }
229
230
231 struct ast_json *ast_endpoint_snapshot_to_json(
232         const struct ast_endpoint_snapshot *snapshot)
233 {
234         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
235         struct ast_json *channel_array;
236         int i;
237
238         json = ast_json_pack("{s: s, s: s, s: s, s: []}",
239                 "technology", snapshot->tech,
240                 "resource", snapshot->resource,
241                 "state", ast_endpoint_state_to_string(snapshot->state),
242                 "channels");
243
244         if (json == NULL) {
245                 return NULL;
246         }
247
248         if (snapshot->max_channels != -1) {
249                 int res = ast_json_object_set(json, "max_channels",
250                         ast_json_integer_create(snapshot->max_channels));
251                 if (res != 0) {
252                         return NULL;
253                 }
254         }
255
256         channel_array = ast_json_object_get(json, "channels");
257         ast_assert(channel_array != NULL);
258         for (i = 0; i < snapshot->num_channels; ++i) {
259                 int res = ast_json_array_append(channel_array,
260                         ast_json_stringf("channel:%s",
261                                 snapshot->channel_ids[i]));
262                 if (res != 0) {
263                         return NULL;
264                 }
265         }
266
267         return ast_json_ref(json);
268 }
269
270 static void endpoints_stasis_shutdown(void)
271 {
272         stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
273         endpoint_topic_all_cached = NULL;
274
275         ao2_cleanup(endpoint_topic_all);
276         endpoint_topic_all = NULL;
277 }
278
279 int ast_endpoint_stasis_init(void)
280 {
281         ast_register_atexit(endpoints_stasis_shutdown);
282
283         if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type) != 0) {
284                 return -1;
285         }
286
287         if (!endpoint_topic_all) {
288                 endpoint_topic_all = stasis_topic_create("endpoint_topic_all");
289         }
290
291         if (!endpoint_topic_all) {
292                 return -1;
293         }
294
295         if (!endpoint_topic_all_cached) {
296                 endpoint_topic_all_cached =
297                         stasis_caching_topic_create(
298                                 endpoint_topic_all, endpoint_snapshot_get_id);
299         }
300
301         if (!endpoint_topic_all_cached) {
302                 return -1;
303         }
304
305         if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type) != 0) {
306                 return -1;
307         }
308
309         return 0;
310 }