stasis_channels.c: Eliminate another overuse of RAII_VAR().
[asterisk/asterisk.git] / main / stasis_channels.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Matt Jordan <mjordan@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Channel Objects
22  *
23  * \author \verbatim Matt Jordan <mjordan@digium.com> \endverbatim
24  *
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34
35 #include "asterisk/astobj2.h"
36 #include "asterisk/json.h"
37 #include "asterisk/pbx.h"
38 #include "asterisk/bridge.h"
39 #include "asterisk/translate.h"
40 #include "asterisk/stasis.h"
41 #include "asterisk/stasis_cache_pattern.h"
42 #include "asterisk/stasis_channels.h"
43
44 /*** DOCUMENTATION
45         <managerEvent language="en_US" name="VarSet">
46                 <managerEventInstance class="EVENT_FLAG_DIALPLAN">
47                         <synopsis>Raised when a variable is set to a particular value.</synopsis>
48                         <syntax>
49                                 <channel_snapshot/>
50                                 <parameter name="Variable">
51                                         <para>The variable being set.</para>
52                                 </parameter>
53                                 <parameter name="Value">
54                                         <para>The new value of the variable.</para>
55                                 </parameter>
56                         </syntax>
57                 </managerEventInstance>
58         </managerEvent>
59         <managerEvent language="en_US" name="AgentLogin">
60                 <managerEventInstance class="EVENT_FLAG_AGENT">
61                         <synopsis>Raised when an Agent has logged in.</synopsis>
62                         <syntax>
63                                 <channel_snapshot/>
64                                 <parameter name="Agent">
65                                         <para>Agent ID of the agent.</para>
66                                 </parameter>
67                         </syntax>
68                         <see-also>
69                                 <ref type="application">AgentLogin</ref>
70                                 <ref type="managerEvent">AgentLogoff</ref>
71                         </see-also>
72                 </managerEventInstance>
73         </managerEvent>
74         <managerEvent language="en_US" name="AgentLogoff">
75                 <managerEventInstance class="EVENT_FLAG_AGENT">
76                         <synopsis>Raised when an Agent has logged off.</synopsis>
77                         <syntax>
78                                 <xi:include xpointer="xpointer(/docs/managerEvent[@name='AgentLogin']/managerEventInstance/syntax/parameter)" />
79                                 <parameter name="Logintime">
80                                         <para>The number of seconds the agent was logged in.</para>
81                                 </parameter>
82                         </syntax>
83                         <see-also>
84                                 <ref type="managerEvent">AgentLogin</ref>
85                         </see-also>
86                 </managerEventInstance>
87         </managerEvent>
88 ***/
89
90 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
91
92 static struct stasis_cp_all *channel_cache_all;
93 static struct stasis_cache *channel_cache_by_name;
94 static struct stasis_caching_topic *channel_by_name_topic;
95
96 struct stasis_cp_all *ast_channel_cache_all(void)
97 {
98         return channel_cache_all;
99 }
100
101 struct stasis_cache *ast_channel_cache(void)
102 {
103         return stasis_cp_all_cache(channel_cache_all);
104 }
105
106 struct stasis_topic *ast_channel_topic_all(void)
107 {
108         return stasis_cp_all_topic(channel_cache_all);
109 }
110
111 struct stasis_topic *ast_channel_topic_all_cached(void)
112 {
113         return stasis_cp_all_topic_cached(channel_cache_all);
114 }
115
116 struct stasis_cache *ast_channel_cache_by_name(void)
117 {
118         return channel_cache_by_name;
119 }
120
121 static const char *channel_snapshot_get_id(struct stasis_message *message)
122 {
123         struct ast_channel_snapshot *snapshot;
124         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
125                 return NULL;
126         }
127         snapshot = stasis_message_data(message);
128         return snapshot->uniqueid;
129 }
130
131 static const char *channel_snapshot_get_name(struct stasis_message *message)
132 {
133         struct ast_channel_snapshot *snapshot;
134         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
135                 return NULL;
136         }
137         snapshot = stasis_message_data(message);
138         return snapshot->name;
139 }
140
141 /*!
142  * \internal
143  * \brief Hash function for \ref ast_channel_snapshot objects
144  */
145 static int channel_snapshot_hash_cb(const void *obj, const int flags)
146 {
147         const struct ast_channel_snapshot *snapshot = obj;
148         const char *name = (flags & OBJ_KEY) ? obj : snapshot->name;
149         return ast_str_case_hash(name);
150 }
151
152 /*!
153  * \internal
154  * \brief Comparison function for \ref ast_channel_snapshot objects
155  */
156 static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
157 {
158         struct ast_channel_snapshot *left = obj;
159         struct ast_channel_snapshot *right = arg;
160         const char *match = (flags & OBJ_KEY) ? arg : right->name;
161         return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
162 }
163
164 static void channel_snapshot_dtor(void *obj)
165 {
166         struct ast_channel_snapshot *snapshot = obj;
167
168         ast_string_field_free_memory(snapshot);
169         ao2_cleanup(snapshot->manager_vars);
170         ao2_cleanup(snapshot->channel_vars);
171 }
172
173 struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
174 {
175         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
176         RAII_VAR(struct ast_bridge *, bridge, NULL, ao2_cleanup);
177         char nativeformats[256];
178         struct ast_str *write_transpath = ast_str_alloca(256);
179         struct ast_str *read_transpath = ast_str_alloca(256);
180         struct ast_party_id effective_connected_id;
181         struct ast_callid *callid;
182
183         /* no snapshots for dummy channels */
184         if (!ast_channel_tech(chan)) {
185                 return NULL;
186         }
187
188         snapshot = ao2_alloc(sizeof(*snapshot), channel_snapshot_dtor);
189         if (!snapshot || ast_string_field_init(snapshot, 1024)) {
190                 return NULL;
191         }
192
193         ast_string_field_set(snapshot, name, ast_channel_name(chan));
194         ast_string_field_set(snapshot, type, ast_channel_tech(chan)->type);
195         ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
196         ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
197         ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
198         ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
199         ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
200         ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
201         if (ast_channel_appl(chan)) {
202                 ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
203         }
204         if (ast_channel_data(chan)) {
205                 ast_string_field_set(snapshot, data, ast_channel_data(chan));
206         }
207         ast_string_field_set(snapshot, context, ast_channel_context(chan));
208         ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
209
210         ast_string_field_set(snapshot, caller_name,
211                 S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
212         ast_string_field_set(snapshot, caller_number,
213                 S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
214         ast_string_field_set(snapshot, caller_dnid, S_OR(ast_channel_dialed(chan)->number.str, ""));
215         ast_string_field_set(snapshot, caller_subaddr,
216                 S_COR(ast_channel_caller(chan)->id.subaddress.valid, ast_channel_caller(chan)->id.subaddress.str, ""));
217         ast_string_field_set(snapshot, dialed_subaddr,
218                 S_COR(ast_channel_dialed(chan)->subaddress.valid, ast_channel_dialed(chan)->subaddress.str, ""));
219         ast_string_field_set(snapshot, caller_ani,
220                 S_COR(ast_channel_caller(chan)->ani.number.valid, ast_channel_caller(chan)->ani.number.str, ""));
221         ast_string_field_set(snapshot, caller_rdnis,
222                 S_COR(ast_channel_redirecting(chan)->from.number.valid, ast_channel_redirecting(chan)->from.number.str, ""));
223         ast_string_field_set(snapshot, caller_dnid,
224                 S_OR(ast_channel_dialed(chan)->number.str, ""));
225
226         ast_string_field_set(snapshot, connected_name,
227                 S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
228         ast_string_field_set(snapshot, connected_number,
229                 S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
230         ast_string_field_set(snapshot, language, ast_channel_language(chan));
231
232         if ((bridge = ast_channel_get_bridge(chan))) {
233                 ast_string_field_set(snapshot, bridgeid, bridge->uniqueid);
234         }
235
236         ast_string_field_set(snapshot, nativeformats, ast_getformatname_multiple(nativeformats, sizeof(nativeformats),
237                 ast_channel_nativeformats(chan)));
238         ast_string_field_set(snapshot, readformat, ast_getformatname(ast_channel_readformat(chan)));
239         ast_string_field_set(snapshot, writeformat, ast_getformatname(ast_channel_writeformat(chan)));
240         ast_string_field_set(snapshot, writetrans, ast_translate_path_to_str(ast_channel_writetrans(chan), &write_transpath));
241         ast_string_field_set(snapshot, readtrans, ast_translate_path_to_str(ast_channel_readtrans(chan), &read_transpath));
242
243         effective_connected_id = ast_channel_connected_effective_id(chan);
244         ast_string_field_set(snapshot, effective_name,
245                 S_COR(effective_connected_id.name.valid, effective_connected_id.name.str, ""));
246         ast_string_field_set(snapshot, effective_number,
247                 S_COR(effective_connected_id.number.valid, effective_connected_id.number.str, ""));
248
249         if ((callid = ast_channel_callid(chan))) {
250                 ast_callid_strnprint(snapshot->callid, sizeof(snapshot->callid), callid);
251                 ast_callid_unref(callid);
252         }
253
254         snapshot->creationtime = ast_channel_creationtime(chan);
255         snapshot->hanguptime = *(ast_channel_whentohangup(chan));
256         snapshot->state = ast_channel_state(chan);
257         snapshot->priority = ast_channel_priority(chan);
258         snapshot->amaflags = ast_channel_amaflags(chan);
259         snapshot->hangupcause = ast_channel_hangupcause(chan);
260         ast_copy_flags(&snapshot->flags, ast_channel_flags(chan), 0xFFFFFFFF);
261         snapshot->caller_pres = ast_party_id_presentation(&ast_channel_caller(chan)->id);
262         snapshot->callgroup = ast_channel_callgroup(chan);
263         snapshot->pickupgroup = ast_channel_pickupgroup(chan);
264         ast_set_flag(&snapshot->softhangup_flags, ast_channel_softhangup_internal_flag(chan));
265
266         snapshot->manager_vars = ast_channel_get_manager_vars(chan);
267         snapshot->channel_vars = ast_channel_get_vars(chan);
268         snapshot->tech_properties = ast_channel_tech(chan)->properties;
269
270         ao2_ref(snapshot, +1);
271         return snapshot;
272 }
273
274 static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
275 {
276         if (chan) {
277                 stasis_publish(ast_channel_topic(chan), message);
278         } else {
279                 stasis_publish(ast_channel_topic_all(), message);
280         }
281 }
282
283 static void channel_blob_dtor(void *obj)
284 {
285         struct ast_channel_blob *event = obj;
286         ao2_cleanup(event->snapshot);
287         ast_json_unref(event->blob);
288 }
289
290 /*! \brief Dummy callback for receiving events */
291 static void dummy_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
292 {
293 }
294
295 void ast_channel_publish_dial_forward(struct ast_channel *caller, struct ast_channel *peer,
296         struct ast_channel *forwarded, const char *dialstring, const char *dialstatus,
297         const char *forward)
298 {
299         RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
300         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
301         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
302         RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
303         RAII_VAR(struct ast_channel_snapshot *, peer_snapshot, NULL, ao2_cleanup);
304         RAII_VAR(struct ast_channel_snapshot *, forwarded_snapshot, NULL, ao2_cleanup);
305
306         ast_assert(peer != NULL);
307         blob = ast_json_pack("{s: s, s: s, s: s}",
308                              "dialstatus", S_OR(dialstatus, ""),
309                              "forward", S_OR(forward, ""),
310                              "dialstring", S_OR(dialstring, ""));
311         if (!blob) {
312                 return;
313         }
314         payload = ast_multi_channel_blob_create(blob);
315         if (!payload) {
316                 return;
317         }
318
319         if (caller) {
320                 ast_channel_lock(caller);
321                 caller_snapshot = ast_channel_snapshot_create(caller);
322                 ast_channel_unlock(caller);
323                 if (!caller_snapshot) {
324                         return;
325                 }
326                 ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
327         }
328
329         ast_channel_lock(peer);
330         peer_snapshot = ast_channel_snapshot_create(peer);
331         ast_channel_unlock(peer);
332         if (!peer_snapshot) {
333                 return;
334         }
335         ast_multi_channel_blob_add_channel(payload, "peer", peer_snapshot);
336
337         if (forwarded) {
338                 forwarded_snapshot = ast_channel_snapshot_create(forwarded);
339                 if (!forwarded_snapshot) {
340                         return;
341                 }
342                 ast_multi_channel_blob_add_channel(payload, "forwarded", forwarded_snapshot);
343         }
344
345         msg = stasis_message_create(ast_channel_dial_type(), payload);
346         if (!msg) {
347                 return;
348         }
349
350         if (forwarded) {
351                 struct stasis_subscription *subscription = stasis_subscribe(ast_channel_topic(peer), dummy_event_cb, NULL);
352
353                 stasis_publish(ast_channel_topic(peer), msg);
354                 stasis_unsubscribe_and_join(subscription);
355         } else {
356                 publish_message_for_channel_topics(msg, caller);
357         }
358 }
359
360 void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *peer,
361         const char *dialstring, const char *dialstatus)
362 {
363         ast_channel_publish_dial_forward(caller, peer, NULL, dialstring, dialstatus, NULL);
364 }
365
366 static struct stasis_message *create_channel_blob_message(struct ast_channel_snapshot *snapshot,
367                 struct stasis_message_type *type,
368                 struct ast_json *blob)
369 {
370         struct stasis_message *msg;
371         struct ast_channel_blob *obj;
372
373         obj = ao2_alloc(sizeof(*obj), channel_blob_dtor);
374         if (!obj) {
375                 return NULL;
376         }
377
378         if (snapshot) {
379                 obj->snapshot = snapshot;
380                 ao2_ref(obj->snapshot, +1);
381         }
382         if (!blob) {
383                 blob = ast_json_null();
384         }
385         obj->blob = ast_json_ref(blob);
386
387         msg = stasis_message_create(type, obj);
388         ao2_cleanup(obj);
389         return msg;
390 }
391
392 struct stasis_message *ast_channel_blob_create_from_cache(const char *channel_id,
393                                                struct stasis_message_type *type,
394                                                struct ast_json *blob)
395 {
396         RAII_VAR(struct ast_channel_snapshot *, snapshot,
397                         ast_channel_snapshot_get_latest(channel_id),
398                         ao2_cleanup);
399
400         return create_channel_blob_message(snapshot, type, blob);
401 }
402
403 struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
404         struct stasis_message_type *type, struct ast_json *blob)
405 {
406         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
407
408         if (chan) {
409                 snapshot = ast_channel_snapshot_create(chan);
410         }
411
412         return create_channel_blob_message(snapshot, type, blob);
413 }
414
415 /*! \brief A channel snapshot wrapper object used in \ref ast_multi_channel_blob objects */
416 struct channel_role_snapshot {
417         struct ast_channel_snapshot *snapshot;  /*!< A channel snapshot */
418         char role[0];                                                   /*!< The role assigned to the channel */
419 };
420
421 /*! \brief A multi channel blob data structure for multi_channel_blob stasis messages */
422 struct ast_multi_channel_blob {
423         struct ao2_container *channel_snapshots;        /*!< A container holding the snapshots */
424         struct ast_json *blob;                                          /*< A blob of JSON data */
425 };
426
427 /*!
428  * \internal
429  * \brief Standard comparison function for \ref channel_role_snapshot objects
430  */
431 static int channel_role_single_cmp_cb(void *obj, void *arg, int flags)
432 {
433         struct channel_role_snapshot *left = obj;
434         struct channel_role_snapshot *right = arg;
435         const char *match = (flags & OBJ_KEY) ? arg : right->role;
436         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH | CMP_STOP);
437 }
438
439 /*!
440  * \internal
441  * \brief Multi comparison function for \ref channel_role_snapshot objects
442  */
443 static int channel_role_multi_cmp_cb(void *obj, void *arg, int flags)
444 {
445         struct channel_role_snapshot *left = obj;
446         struct channel_role_snapshot *right = arg;
447         const char *match = (flags & OBJ_KEY) ? arg : right->role;
448         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH);
449 }
450
451 /*!
452  * \internal
453  * \brief Hash function for \ref channel_role_snapshot objects
454  */
455 static int channel_role_hash_cb(const void *obj, const int flags)
456 {
457         const struct channel_role_snapshot *snapshot = obj;
458         const char *name = (flags & OBJ_KEY) ? obj : snapshot->role;
459         return ast_str_case_hash(name);
460 }
461
462 /*!
463  * \internal
464  * \brief Destructor for \ref ast_multi_channel_blob objects
465  */
466 static void multi_channel_blob_dtor(void *obj)
467 {
468         struct ast_multi_channel_blob *multi_blob = obj;
469
470         ao2_cleanup(multi_blob->channel_snapshots);
471         ast_json_unref(multi_blob->blob);
472 }
473
474 struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *blob)
475 {
476         RAII_VAR(struct ast_multi_channel_blob *, obj,
477                         ao2_alloc(sizeof(*obj), multi_channel_blob_dtor),
478                         ao2_cleanup);
479
480         ast_assert(blob != NULL);
481
482         if (!obj) {
483                 return NULL;
484         }
485
486         obj->channel_snapshots = ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS,
487                         channel_role_hash_cb, channel_role_single_cmp_cb);
488         if (!obj->channel_snapshots) {
489                 return NULL;
490         }
491
492         obj->blob = ast_json_ref(blob);
493
494         ao2_ref(obj, +1);
495         return obj;
496 }
497
498 struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
499 {
500         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
501         struct ast_channel_snapshot *snapshot;
502
503         ast_assert(!ast_strlen_zero(uniqueid));
504
505         message = stasis_cache_get(ast_channel_cache(),
506                         ast_channel_snapshot_type(),
507                         uniqueid);
508         if (!message) {
509                 return NULL;
510         }
511
512         snapshot = stasis_message_data(message);
513         if (!snapshot) {
514                 return NULL;
515         }
516         ao2_ref(snapshot, +1);
517         return snapshot;
518 }
519
520 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
521 {
522         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
523         struct ast_channel_snapshot *snapshot;
524
525         ast_assert(!ast_strlen_zero(name));
526
527         message = stasis_cache_get(ast_channel_cache_by_name(),
528                         ast_channel_snapshot_type(),
529                         name);
530         if (!message) {
531                 return NULL;
532         }
533
534         snapshot = stasis_message_data(message);
535         if (!snapshot) {
536                 return NULL;
537         }
538         ao2_ref(snapshot, +1);
539         return snapshot;
540 }
541
542 static void channel_role_snapshot_dtor(void *obj)
543 {
544         struct channel_role_snapshot *role_snapshot = obj;
545         ao2_cleanup(role_snapshot->snapshot);
546 }
547
548 void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj, const char *role, struct ast_channel_snapshot *snapshot)
549 {
550         RAII_VAR(struct channel_role_snapshot *, role_snapshot, NULL, ao2_cleanup);
551         int role_len = strlen(role) + 1;
552
553         if (!obj || ast_strlen_zero(role) || !snapshot) {
554                 return;
555         }
556
557         role_snapshot = ao2_alloc(sizeof(*role_snapshot) + role_len, channel_role_snapshot_dtor);
558         if (!role_snapshot) {
559                 return;
560         }
561         ast_copy_string(role_snapshot->role, role, role_len);
562         role_snapshot->snapshot = snapshot;
563         ao2_ref(role_snapshot->snapshot, +1);
564         ao2_link(obj->channel_snapshots, role_snapshot);
565 }
566
567 struct ast_channel_snapshot *ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
568 {
569         struct channel_role_snapshot *role_snapshot;
570
571         if (!obj || ast_strlen_zero(role)) {
572                 return NULL;
573         }
574         role_snapshot = ao2_find(obj->channel_snapshots, role, OBJ_KEY);
575         /* Note that this function does not increase the ref count on snapshot */
576         if (!role_snapshot) {
577                 return NULL;
578         }
579         ao2_ref(role_snapshot, -1);
580         return role_snapshot->snapshot;
581 }
582
583 struct ao2_container *ast_multi_channel_blob_get_channels(struct ast_multi_channel_blob *obj, const char *role)
584 {
585         RAII_VAR(struct ao2_container *, ret_container,
586                 ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS, channel_snapshot_hash_cb, channel_snapshot_cmp_cb),
587                 ao2_cleanup);
588         struct ao2_iterator *it_role_snapshots;
589         struct channel_role_snapshot *role_snapshot;
590         char *arg;
591
592         if (!obj || ast_strlen_zero(role) || !ret_container) {
593                 return NULL;
594         }
595         arg = ast_strdupa(role);
596
597         it_role_snapshots = ao2_callback(obj->channel_snapshots, OBJ_MULTIPLE | OBJ_KEY, channel_role_multi_cmp_cb, arg);
598         if (!it_role_snapshots) {
599                 return NULL;
600         }
601
602         while ((role_snapshot = ao2_iterator_next(it_role_snapshots))) {
603                 ao2_link(ret_container, role_snapshot->snapshot);
604                 ao2_ref(role_snapshot, -1);
605         }
606         ao2_iterator_destroy(it_role_snapshots);
607
608         ao2_ref(ret_container, +1);
609         return ret_container;
610 }
611
612 struct ast_json *ast_multi_channel_blob_get_json(struct ast_multi_channel_blob *obj)
613 {
614         if (!obj) {
615                 return NULL;
616         }
617         return obj->blob;
618 }
619
620 void ast_channel_stage_snapshot(struct ast_channel *chan)
621 {
622         ast_set_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
623 }
624
625 void ast_channel_stage_snapshot_done(struct ast_channel *chan)
626 {
627         ast_clear_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
628         ast_channel_publish_snapshot(chan);
629 }
630
631 void ast_channel_publish_snapshot(struct ast_channel *chan)
632 {
633         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
634         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
635
636         if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)) {
637                 return;
638         }
639
640         snapshot = ast_channel_snapshot_create(chan);
641         if (!snapshot) {
642                 return;
643         }
644
645         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
646         if (!message) {
647                 return;
648         }
649
650         ast_assert(ast_channel_topic(chan) != NULL);
651         stasis_publish(ast_channel_topic(chan), message);
652 }
653
654 void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
655 {
656         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
657
658         if (!blob) {
659                 blob = ast_json_null();
660         }
661
662         message = ast_channel_blob_create(chan, type, blob);
663         if (message) {
664                 stasis_publish(ast_channel_topic(chan), message);
665         }
666 }
667
668 void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
669 {
670         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
671
672         ast_assert(name != NULL);
673         ast_assert(value != NULL);
674
675         blob = ast_json_pack("{s: s, s: s}",
676                              "variable", name,
677                              "value", value);
678         if (!blob) {
679                 ast_log(LOG_ERROR, "Error creating message\n");
680                 return;
681         }
682
683         ast_channel_publish_blob(chan, ast_channel_varset_type(), blob);
684 }
685
686 static struct ast_manager_event_blob *varset_to_ami(struct stasis_message *msg)
687 {
688         RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
689         struct ast_channel_blob *obj = stasis_message_data(msg);
690         const char *variable =
691                 ast_json_string_get(ast_json_object_get(obj->blob, "variable"));
692         const char *value =
693                 ast_json_string_get(ast_json_object_get(obj->blob, "value"));
694
695         if (obj->snapshot) {
696                 channel_event_string =
697                         ast_manager_build_channel_state_string(obj->snapshot);
698         } else {
699                 channel_event_string = ast_str_create(35);
700                 ast_str_set(&channel_event_string, 0,
701                             "Channel: none\r\n"
702                             "Uniqueid: none\r\n");
703         }
704
705         if (!channel_event_string) {
706                 return NULL;
707         }
708
709         return ast_manager_event_blob_create(EVENT_FLAG_DIALPLAN, "VarSet",
710                 "%s"
711                 "Variable: %s\r\n"
712                 "Value: %s\r\n",
713                 ast_str_buffer(channel_event_string), variable, value);
714 }
715
716 static struct ast_manager_event_blob *agent_login_to_ami(struct stasis_message *msg)
717 {
718         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
719         RAII_VAR(struct ast_str *, party_string, ast_str_create(256), ast_free);
720         struct ast_channel_blob *obj = stasis_message_data(msg);
721         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
722
723         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
724         if (!channel_string) {
725                 return NULL;
726         }
727
728         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogin",
729                 "%s"
730                 "Agent: %s\r\n",
731                 ast_str_buffer(channel_string), agent);
732 }
733
734 static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message *msg)
735 {
736         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
737         RAII_VAR(struct ast_str *, party_string, ast_str_create(256), ast_free);
738         struct ast_channel_blob *obj = stasis_message_data(msg);
739         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
740         long logintime = ast_json_integer_get(ast_json_object_get(obj->blob, "logintime"));
741
742         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
743         if (!channel_string) {
744                 return NULL;
745         }
746
747         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogoff",
748                 "%s"
749                 "Agent: %s\r\n"
750                 "Logintime: %ld\r\n",
751                 ast_str_buffer(channel_string), agent, logintime);
752 }
753
754 void ast_publish_channel_state(struct ast_channel *chan)
755 {
756         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
757         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
758
759         ast_assert(chan != NULL);
760         if (!chan) {
761                 return;
762         }
763
764         snapshot = ast_channel_snapshot_create(chan);
765         if (!snapshot) {
766                 return;
767         }
768
769         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
770         if (!message) {
771                 return;
772         }
773
774         ast_assert(ast_channel_topic(chan) != NULL);
775         stasis_publish(ast_channel_topic(chan), message);
776 }
777
778 struct ast_json *ast_channel_snapshot_to_json(
779         const struct ast_channel_snapshot *snapshot,
780         const struct stasis_message_sanitizer *sanitize)
781 {
782         RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
783
784         if (snapshot == NULL
785                 || (sanitize && sanitize->channel_snapshot
786                 && sanitize->channel_snapshot(snapshot))) {
787                 return NULL;
788         }
789
790         json_chan = ast_json_pack(
791                 /* Broken up into groups of three for readability */
792                 "{ s: s, s: s, s: s,"
793                 "  s: o, s: o, s: s,"
794                 "  s: o, s: o }",
795                 /* First line */
796                 "id", snapshot->uniqueid,
797                 "name", snapshot->name,
798                 "state", ast_state2str(snapshot->state),
799                 /* Second line */
800                 "caller", ast_json_name_number(
801                         snapshot->caller_name, snapshot->caller_number),
802                 "connected", ast_json_name_number(
803                         snapshot->connected_name, snapshot->connected_number),
804                 "accountcode", snapshot->accountcode,
805                 /* Third line */
806                 "dialplan", ast_json_dialplan_cep(
807                         snapshot->context, snapshot->exten, snapshot->priority),
808                 "creationtime", ast_json_timeval(snapshot->creationtime, NULL));
809
810         return ast_json_ref(json_chan);
811 }
812
813 int ast_channel_snapshot_cep_equal(
814         const struct ast_channel_snapshot *old_snapshot,
815         const struct ast_channel_snapshot *new_snapshot)
816 {
817         ast_assert(old_snapshot != NULL);
818         ast_assert(new_snapshot != NULL);
819
820         /* We actually get some snapshots with CEP set, but before the
821          * application is set. Since empty application is invalid, we treat
822          * setting the application from nothing as a CEP change.
823          */
824         if (ast_strlen_zero(old_snapshot->appl) &&
825             !ast_strlen_zero(new_snapshot->appl)) {
826                 return 0;
827         }
828
829         return old_snapshot->priority == new_snapshot->priority &&
830                 strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
831                 strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
832 }
833
834 int ast_channel_snapshot_caller_id_equal(
835         const struct ast_channel_snapshot *old_snapshot,
836         const struct ast_channel_snapshot *new_snapshot)
837 {
838         ast_assert(old_snapshot != NULL);
839         ast_assert(new_snapshot != NULL);
840         return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
841                 strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
842 }
843
844 static struct ast_json *channel_blob_to_json(
845         struct stasis_message *message,
846         const char *type,
847         const struct stasis_message_sanitizer *sanitize)
848 {
849         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
850         struct ast_channel_blob *channel_blob = stasis_message_data(message);
851         struct ast_json *blob = channel_blob->blob;
852         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
853         const struct timeval *tv = stasis_message_timestamp(message);
854         int res = 0;
855
856         if (blob == NULL || ast_json_is_null(blob)) {
857                 out = ast_json_object_create();
858         } else {
859                 /* blobs are immutable, so shallow copies are fine */
860                 out = ast_json_copy(blob);
861         }
862
863         if (!out) {
864                 return NULL;
865         }
866
867         res |= ast_json_object_set(out, "type", ast_json_string_create(type));
868         res |= ast_json_object_set(out, "timestamp",
869                 ast_json_timeval(*tv, NULL));
870
871         /* For global channel messages, the snapshot is optional */
872         if (snapshot) {
873                 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
874
875                 if (!json_channel) {
876                         return NULL;
877                 }
878
879                 res |= ast_json_object_set(out, "channel", json_channel);
880         }
881
882         if (res != 0) {
883                 return NULL;
884         }
885
886         return ast_json_ref(out);
887 }
888
889 static struct ast_json *dtmf_end_to_json(
890         struct stasis_message *message,
891         const struct stasis_message_sanitizer *sanitize)
892 {
893         struct ast_channel_blob *channel_blob = stasis_message_data(message);
894         struct ast_json *blob = channel_blob->blob;
895         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
896         const char *direction =
897                 ast_json_string_get(ast_json_object_get(blob, "direction"));
898         const struct timeval *tv = stasis_message_timestamp(message);
899         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
900
901         /* Only present received DTMF end events as JSON */
902         if (strcasecmp("Received", direction) != 0) {
903                 return NULL;
904         }
905
906         if (!json_channel) {
907                 return NULL;
908         }
909
910         return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}",
911                 "type", "ChannelDtmfReceived",
912                 "timestamp", ast_json_timeval(*tv, NULL),
913                 "digit", ast_json_object_get(blob, "digit"),
914                 "duration_ms", ast_json_object_get(blob, "duration_ms"),
915                 "channel", json_channel);
916 }
917
918 static struct ast_json *user_event_to_json(
919         struct stasis_message *message,
920         const struct stasis_message_sanitizer *sanitize)
921 {
922         struct ast_channel_blob *channel_blob = stasis_message_data(message);
923         struct ast_json *blob = channel_blob->blob;
924         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
925         const struct timeval *tv = stasis_message_timestamp(message);
926         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
927
928         if (!json_channel) {
929                 return NULL;
930         }
931
932         return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}",
933                 "type", "ChannelUserevent",
934                 "timestamp", ast_json_timeval(*tv, NULL),
935                 "eventname", ast_json_object_get(blob, "eventname"),
936                 "userevent", blob,
937                 "channel", json_channel);
938 }
939
940 static struct ast_json *varset_to_json(
941         struct stasis_message *message,
942         const struct stasis_message_sanitizer *sanitize)
943 {
944         return channel_blob_to_json(message, "ChannelVarset", sanitize);
945 }
946
947 static struct ast_json *hangup_request_to_json(
948         struct stasis_message *message,
949         const struct stasis_message_sanitizer *sanitize)
950 {
951         return channel_blob_to_json(message, "ChannelHangupRequest", sanitize);
952 }
953
954 static struct ast_json *dial_to_json(
955         struct stasis_message *message,
956         const struct stasis_message_sanitizer *sanitize)
957 {
958         struct ast_multi_channel_blob *payload = stasis_message_data(message);
959         struct ast_json *blob = ast_multi_channel_blob_get_json(payload);
960         struct ast_json *caller_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "caller"), sanitize);
961         struct ast_json *peer_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "peer"), sanitize);
962         struct ast_json *forwarded_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "forwarded"), sanitize);
963         struct ast_json *json;
964         const struct timeval *tv = stasis_message_timestamp(message);
965         int res = 0;
966
967         json = ast_json_pack("{s: s, s: o, s: O, s: O, s: O}",
968                 "type", "Dial",
969                 "timestamp", ast_json_timeval(*tv, NULL),
970                 "dialstatus", ast_json_object_get(blob, "dialstatus"),
971                 "forward", ast_json_object_get(blob, "forward"),
972                 "dialstring", ast_json_object_get(blob, "dialstring"));
973         if (!json) {
974                 ast_json_unref(caller_json);
975                 ast_json_unref(peer_json);
976                 ast_json_unref(forwarded_json);
977                 return NULL;
978         }
979
980         if (caller_json) {
981                 res |= ast_json_object_set(json, "caller", caller_json);
982         }
983         if (peer_json) {
984                 res |= ast_json_object_set(json, "peer", peer_json);
985         }
986         if (forwarded_json) {
987                 res |= ast_json_object_set(json, "forwarded", forwarded_json);
988         }
989
990         if (res) {
991                 ast_json_unref(json);
992                 return NULL;
993         }
994
995         return json;
996 }
997
998 /*!
999  * @{ \brief Define channel message types.
1000  */
1001 STASIS_MESSAGE_TYPE_DEFN(ast_channel_snapshot_type);
1002 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dial_type,
1003         .to_json = dial_to_json,
1004         );
1005 STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
1006         .to_ami = varset_to_ami,
1007         .to_json = varset_to_json,
1008         );
1009 STASIS_MESSAGE_TYPE_DEFN(ast_channel_user_event_type,
1010         .to_json = user_event_to_json,
1011         );
1012 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
1013         .to_json = hangup_request_to_json,
1014         );
1015 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
1016 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type,
1017         .to_json = dtmf_end_to_json,
1018         );
1019 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hold_type);
1020 STASIS_MESSAGE_TYPE_DEFN(ast_channel_unhold_type);
1021 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_start_type);
1022 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_stop_type);
1023 STASIS_MESSAGE_TYPE_DEFN(ast_channel_fax_type);
1024 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_handler_type);
1025 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_start_type);
1026 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_stop_type);
1027 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_start_type);
1028 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_stop_type);
1029 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_login_type,
1030         .to_ami = agent_login_to_ami,
1031         );
1032 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type,
1033         .to_ami = agent_logoff_to_ami,
1034         );
1035
1036 /*! @} */
1037
1038 static void stasis_channels_cleanup(void)
1039 {
1040         stasis_caching_unsubscribe_and_join(channel_by_name_topic);
1041         channel_by_name_topic = NULL;
1042         ao2_cleanup(channel_cache_by_name);
1043         channel_cache_by_name = NULL;
1044         ao2_cleanup(channel_cache_all);
1045         channel_cache_all = NULL;
1046
1047         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
1048         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
1049         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
1050         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_user_event_type);
1051         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
1052         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
1053         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
1054         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hold_type);
1055         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_unhold_type);
1056         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_start_type);
1057         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_stop_type);
1058         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_fax_type);
1059         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_handler_type);
1060         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_start_type);
1061         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_stop_type);
1062         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_start_type);
1063         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
1064         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type);
1065         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type);
1066 }
1067
1068 int ast_stasis_channels_init(void)
1069 {
1070         int res = 0;
1071
1072         ast_register_cleanup(stasis_channels_cleanup);
1073
1074         channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
1075                 channel_snapshot_get_id);
1076         if (!channel_cache_all) {
1077                 return -1;
1078         }
1079         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
1080         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
1081
1082         channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
1083         if (!channel_cache_by_name) {
1084                 return -1;
1085         }
1086
1087         /* This should be initialized before the caching topic */
1088         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
1089
1090         channel_by_name_topic = stasis_caching_topic_create(
1091                 stasis_cp_all_topic(channel_cache_all),
1092                 channel_cache_by_name);
1093         if (!channel_by_name_topic) {
1094                 return -1;
1095         }
1096
1097         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
1098         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
1099         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type);
1100         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
1101         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
1102         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
1103         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
1104         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
1105         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
1106         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
1107         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
1108         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
1109         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
1110         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
1111         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
1112         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
1113
1114         return res;
1115 }