CLI: Create ast_cli_completion_add function.
[asterisk/asterisk.git] / main / stasis_channels.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Matt Jordan <mjordan@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Channel Objects
22  *
23  * \author \verbatim Matt Jordan <mjordan@digium.com> \endverbatim
24  *
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 #include "asterisk/astobj2.h"
34 #include "asterisk/json.h"
35 #include "asterisk/pbx.h"
36 #include "asterisk/bridge.h"
37 #include "asterisk/translate.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_cache_pattern.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/dial.h"
42 #include "asterisk/linkedlists.h"
43
44 /*** DOCUMENTATION
45         <managerEvent language="en_US" name="VarSet">
46                 <managerEventInstance class="EVENT_FLAG_DIALPLAN">
47                         <synopsis>Raised when a variable is set to a particular value.</synopsis>
48                         <syntax>
49                                 <channel_snapshot/>
50                                 <parameter name="Variable">
51                                         <para>The variable being set.</para>
52                                 </parameter>
53                                 <parameter name="Value">
54                                         <para>The new value of the variable.</para>
55                                 </parameter>
56                         </syntax>
57                 </managerEventInstance>
58         </managerEvent>
59         <managerEvent language="en_US" name="AgentLogin">
60                 <managerEventInstance class="EVENT_FLAG_AGENT">
61                         <synopsis>Raised when an Agent has logged in.</synopsis>
62                         <syntax>
63                                 <channel_snapshot/>
64                                 <parameter name="Agent">
65                                         <para>Agent ID of the agent.</para>
66                                 </parameter>
67                         </syntax>
68                         <see-also>
69                                 <ref type="application">AgentLogin</ref>
70                                 <ref type="managerEvent">AgentLogoff</ref>
71                         </see-also>
72                 </managerEventInstance>
73         </managerEvent>
74         <managerEvent language="en_US" name="AgentLogoff">
75                 <managerEventInstance class="EVENT_FLAG_AGENT">
76                         <synopsis>Raised when an Agent has logged off.</synopsis>
77                         <syntax>
78                                 <xi:include xpointer="xpointer(/docs/managerEvent[@name='AgentLogin']/managerEventInstance/syntax/parameter)" />
79                                 <parameter name="Logintime">
80                                         <para>The number of seconds the agent was logged in.</para>
81                                 </parameter>
82                         </syntax>
83                         <see-also>
84                                 <ref type="managerEvent">AgentLogin</ref>
85                         </see-also>
86                 </managerEventInstance>
87         </managerEvent>
88         <managerEvent language="en_US" name="ChannelTalkingStart">
89                 <managerEventInstance class="EVENT_FLAG_CLASS">
90                         <synopsis>Raised when talking is detected on a channel.</synopsis>
91                         <syntax>
92                                 <channel_snapshot/>
93                         </syntax>
94                         <see-also>
95                                 <ref type="function">TALK_DETECT</ref>
96                                 <ref type="managerEvent">ChannelTalkingStop</ref>
97                         </see-also>
98                 </managerEventInstance>
99         </managerEvent>
100         <managerEvent language="en_US" name="ChannelTalkingStop">
101                 <managerEventInstance class="EVENT_FLAG_CLASS">
102                         <synopsis>Raised when talking is no longer detected on a channel.</synopsis>
103                         <syntax>
104                                 <channel_snapshot/>
105                                 <parameter name="Duration">
106                                         <para>The length in time, in milliseconds, that talking was
107                                         detected on the channel.</para>
108                                 </parameter>
109                         </syntax>
110                         <see-also>
111                                 <ref type="function">TALK_DETECT</ref>
112                                 <ref type="managerEvent">ChannelTalkingStart</ref>
113                         </see-also>
114                 </managerEventInstance>
115         </managerEvent>
116 ***/
117
118 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
119
120 static struct stasis_cp_all *channel_cache_all;
121 static struct stasis_cache *channel_cache_by_name;
122 static struct stasis_caching_topic *channel_by_name_topic;
123
124 struct stasis_cp_all *ast_channel_cache_all(void)
125 {
126         return channel_cache_all;
127 }
128
129 struct stasis_cache *ast_channel_cache(void)
130 {
131         return stasis_cp_all_cache(channel_cache_all);
132 }
133
134 struct stasis_topic *ast_channel_topic_all(void)
135 {
136         return stasis_cp_all_topic(channel_cache_all);
137 }
138
139 struct stasis_topic *ast_channel_topic_all_cached(void)
140 {
141         return stasis_cp_all_topic_cached(channel_cache_all);
142 }
143
144 struct stasis_cache *ast_channel_cache_by_name(void)
145 {
146         return channel_cache_by_name;
147 }
148
149 static const char *channel_snapshot_get_id(struct stasis_message *message)
150 {
151         struct ast_channel_snapshot *snapshot;
152         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
153                 return NULL;
154         }
155         snapshot = stasis_message_data(message);
156         return snapshot->uniqueid;
157 }
158
159 static const char *channel_snapshot_get_name(struct stasis_message *message)
160 {
161         struct ast_channel_snapshot *snapshot;
162         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
163                 return NULL;
164         }
165         snapshot = stasis_message_data(message);
166         return snapshot->name;
167 }
168
169 /*!
170  * \internal
171  * \brief Hash function for \ref ast_channel_snapshot objects
172  */
173 static int channel_snapshot_hash_cb(const void *obj, const int flags)
174 {
175         const struct ast_channel_snapshot *snapshot = obj;
176         const char *name = (flags & OBJ_KEY) ? obj : snapshot->name;
177         return ast_str_case_hash(name);
178 }
179
180 /*!
181  * \internal
182  * \brief Comparison function for \ref ast_channel_snapshot objects
183  */
184 static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
185 {
186         struct ast_channel_snapshot *left = obj;
187         struct ast_channel_snapshot *right = arg;
188         const char *match = (flags & OBJ_KEY) ? arg : right->name;
189         return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
190 }
191
192 static void channel_snapshot_dtor(void *obj)
193 {
194         struct ast_channel_snapshot *snapshot = obj;
195
196         ast_string_field_free_memory(snapshot);
197         ao2_cleanup(snapshot->manager_vars);
198         ao2_cleanup(snapshot->ari_vars);
199 }
200
201 struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
202 {
203         struct ast_channel_snapshot *snapshot;
204         struct ast_bridge *bridge;
205
206         /* no snapshots for dummy channels */
207         if (!ast_channel_tech(chan)) {
208                 return NULL;
209         }
210
211         snapshot = ao2_alloc_options(sizeof(*snapshot), channel_snapshot_dtor,
212                 AO2_ALLOC_OPT_LOCK_NOLOCK);
213         if (!snapshot || ast_string_field_init(snapshot, 1024)) {
214                 ao2_cleanup(snapshot);
215                 return NULL;
216         }
217
218         ast_string_field_set(snapshot, name, ast_channel_name(chan));
219         ast_string_field_set(snapshot, type, ast_channel_tech(chan)->type);
220         ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
221         ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
222         ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
223         ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
224         ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
225         ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
226         if (ast_channel_appl(chan)) {
227                 ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
228         }
229         if (ast_channel_data(chan)) {
230                 ast_string_field_set(snapshot, data, ast_channel_data(chan));
231         }
232         ast_string_field_set(snapshot, context, ast_channel_context(chan));
233         ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
234
235         ast_string_field_set(snapshot, caller_name,
236                 S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
237         ast_string_field_set(snapshot, caller_number,
238                 S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
239         ast_string_field_set(snapshot, caller_dnid, S_OR(ast_channel_dialed(chan)->number.str, ""));
240         ast_string_field_set(snapshot, caller_subaddr,
241                 S_COR(ast_channel_caller(chan)->id.subaddress.valid, ast_channel_caller(chan)->id.subaddress.str, ""));
242         ast_string_field_set(snapshot, dialed_subaddr,
243                 S_COR(ast_channel_dialed(chan)->subaddress.valid, ast_channel_dialed(chan)->subaddress.str, ""));
244         ast_string_field_set(snapshot, caller_ani,
245                 S_COR(ast_channel_caller(chan)->ani.number.valid, ast_channel_caller(chan)->ani.number.str, ""));
246         ast_string_field_set(snapshot, caller_rdnis,
247                 S_COR(ast_channel_redirecting(chan)->from.number.valid, ast_channel_redirecting(chan)->from.number.str, ""));
248         ast_string_field_set(snapshot, caller_dnid,
249                 S_OR(ast_channel_dialed(chan)->number.str, ""));
250
251         ast_string_field_set(snapshot, connected_name,
252                 S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
253         ast_string_field_set(snapshot, connected_number,
254                 S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
255         ast_string_field_set(snapshot, language, ast_channel_language(chan));
256
257         if ((bridge = ast_channel_get_bridge(chan))) {
258                 if (!ast_test_flag(&bridge->feature_flags, AST_BRIDGE_FLAG_INVISIBLE)) {
259                         ast_string_field_set(snapshot, bridgeid, bridge->uniqueid);
260                 }
261                 ao2_cleanup(bridge);
262         }
263
264         snapshot->creationtime = ast_channel_creationtime(chan);
265         snapshot->state = ast_channel_state(chan);
266         snapshot->priority = ast_channel_priority(chan);
267         snapshot->amaflags = ast_channel_amaflags(chan);
268         snapshot->hangupcause = ast_channel_hangupcause(chan);
269         ast_copy_flags(&snapshot->flags, ast_channel_flags(chan), 0xFFFFFFFF);
270         snapshot->caller_pres = ast_party_id_presentation(&ast_channel_caller(chan)->id);
271         ast_set_flag(&snapshot->softhangup_flags, ast_channel_softhangup_internal_flag(chan));
272
273         snapshot->manager_vars = ast_channel_get_manager_vars(chan);
274         snapshot->ari_vars = ast_channel_get_ari_vars(chan);
275         snapshot->tech_properties = ast_channel_tech(chan)->properties;
276
277         return snapshot;
278 }
279
280 static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
281 {
282         if (chan) {
283                 stasis_publish(ast_channel_topic(chan), message);
284         } else {
285                 stasis_publish(ast_channel_topic_all(), message);
286         }
287 }
288
289 static void channel_blob_dtor(void *obj)
290 {
291         struct ast_channel_blob *event = obj;
292         ao2_cleanup(event->snapshot);
293         ast_json_unref(event->blob);
294 }
295
296 static void ast_channel_publish_dial_internal(struct ast_channel *caller,
297         struct ast_channel *peer, struct ast_channel *forwarded, const char *dialstring,
298         const char *dialstatus, const char *forward)
299 {
300         RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
301         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
302         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
303         RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
304         RAII_VAR(struct ast_channel_snapshot *, peer_snapshot, NULL, ao2_cleanup);
305         RAII_VAR(struct ast_channel_snapshot *, forwarded_snapshot, NULL, ao2_cleanup);
306
307         if (!ast_channel_dial_type()) {
308                 return;
309         }
310
311         ast_assert(peer != NULL);
312         blob = ast_json_pack("{s: s, s: s, s: s}",
313                              "dialstatus", S_OR(dialstatus, ""),
314                              "forward", S_OR(forward, ""),
315                              "dialstring", S_OR(dialstring, ""));
316         if (!blob) {
317                 return;
318         }
319         payload = ast_multi_channel_blob_create(blob);
320         if (!payload) {
321                 return;
322         }
323
324         if (caller) {
325                 ast_channel_lock(caller);
326                 if (ast_strlen_zero(dialstatus)) {
327                         caller_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(caller));
328                 } else {
329                         caller_snapshot = ast_channel_snapshot_create(caller);
330                 }
331                 ast_channel_unlock(caller);
332                 if (!caller_snapshot) {
333                         return;
334                 }
335                 ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
336         }
337
338         ast_channel_lock(peer);
339         if (ast_strlen_zero(dialstatus)) {
340                 peer_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(peer));
341         } else {
342                 peer_snapshot = ast_channel_snapshot_create(peer);
343         }
344         ast_channel_unlock(peer);
345         if (!peer_snapshot) {
346                 return;
347         }
348         ast_multi_channel_blob_add_channel(payload, "peer", peer_snapshot);
349
350         if (forwarded) {
351                 ast_channel_lock(forwarded);
352                 forwarded_snapshot = ast_channel_snapshot_create(forwarded);
353                 ast_channel_unlock(forwarded);
354                 if (!forwarded_snapshot) {
355                         return;
356                 }
357                 ast_multi_channel_blob_add_channel(payload, "forwarded", forwarded_snapshot);
358         }
359
360         msg = stasis_message_create(ast_channel_dial_type(), payload);
361         if (!msg) {
362                 return;
363         }
364
365         publish_message_for_channel_topics(msg, caller ?: peer);
366 }
367
368 static void remove_dial_masquerade(struct ast_channel *peer);
369 static void remove_dial_masquerade_caller(struct ast_channel *caller);
370 static int set_dial_masquerade(struct ast_channel *caller,
371         struct ast_channel *peer, const char *dialstring);
372
373 void ast_channel_publish_dial_forward(struct ast_channel *caller, struct ast_channel *peer,
374         struct ast_channel *forwarded, const char *dialstring, const char *dialstatus,
375         const char *forward)
376 {
377         ast_assert(peer != NULL);
378
379         /* XXX With an early bridge the below dial masquerade datastore code could, theoretically,
380          * go away as the act of changing the channel during dialing would be done using the bridge
381          * API itself and not a masquerade.
382          */
383
384         if (caller) {
385                 /*
386                  * Lock two or three channels.
387                  *
388                  * We need to hold the locks to hold off a potential masquerade
389                  * messing up the stasis dial event ordering.
390                  */
391                 for (;; ast_channel_unlock(caller), sched_yield()) {
392                         ast_channel_lock(caller);
393                         if (ast_channel_trylock(peer)) {
394                                 continue;
395                         }
396                         if (forwarded && ast_channel_trylock(forwarded)) {
397                                 ast_channel_unlock(peer);
398                                 continue;
399                         }
400                         break;
401                 }
402
403                 if (ast_strlen_zero(dialstatus)) {
404                         set_dial_masquerade(caller, peer, dialstring);
405                 } else {
406                         remove_dial_masquerade(peer);
407                 }
408         }
409
410         ast_channel_publish_dial_internal(caller, peer, forwarded, dialstring, dialstatus,
411                 forward);
412
413         if (caller) {
414                 if (forwarded) {
415                         ast_channel_unlock(forwarded);
416                 }
417                 ast_channel_unlock(peer);
418                 remove_dial_masquerade_caller(caller);
419                 ast_channel_unlock(caller);
420         }
421 }
422
423 void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *peer,
424         const char *dialstring, const char *dialstatus)
425 {
426         ast_channel_publish_dial_forward(caller, peer, NULL, dialstring, dialstatus, NULL);
427 }
428
429 static struct stasis_message *create_channel_blob_message(struct ast_channel_snapshot *snapshot,
430                 struct stasis_message_type *type,
431                 struct ast_json *blob)
432 {
433         struct stasis_message *msg;
434         struct ast_channel_blob *obj;
435
436         obj = ao2_alloc(sizeof(*obj), channel_blob_dtor);
437         if (!obj) {
438                 return NULL;
439         }
440
441         if (snapshot) {
442                 obj->snapshot = snapshot;
443                 ao2_ref(obj->snapshot, +1);
444         }
445         if (!blob) {
446                 blob = ast_json_null();
447         }
448         obj->blob = ast_json_ref(blob);
449
450         msg = stasis_message_create(type, obj);
451         ao2_cleanup(obj);
452         return msg;
453 }
454
455 struct stasis_message *ast_channel_blob_create_from_cache(const char *channel_id,
456                                                struct stasis_message_type *type,
457                                                struct ast_json *blob)
458 {
459         RAII_VAR(struct ast_channel_snapshot *, snapshot,
460                         NULL,
461                         ao2_cleanup);
462
463         if (!type) {
464                 return NULL;
465         }
466
467         snapshot = ast_channel_snapshot_get_latest(channel_id);
468
469         return create_channel_blob_message(snapshot, type, blob);
470 }
471
472 struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
473         struct stasis_message_type *type, struct ast_json *blob)
474 {
475         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
476
477         if (!type) {
478                 return NULL;
479         }
480
481         if (chan) {
482                 snapshot = ast_channel_snapshot_create(chan);
483         }
484
485         return create_channel_blob_message(snapshot, type, blob);
486 }
487
488 /*! \brief A channel snapshot wrapper object used in \ref ast_multi_channel_blob objects */
489 struct channel_role_snapshot {
490         struct ast_channel_snapshot *snapshot;  /*!< A channel snapshot */
491         char role[0];                                                   /*!< The role assigned to the channel */
492 };
493
494 /*! \brief A multi channel blob data structure for multi_channel_blob stasis messages */
495 struct ast_multi_channel_blob {
496         struct ao2_container *channel_snapshots;        /*!< A container holding the snapshots */
497         struct ast_json *blob;                                          /*< A blob of JSON data */
498 };
499
500 /*!
501  * \internal
502  * \brief Standard comparison function for \ref channel_role_snapshot objects
503  */
504 static int channel_role_single_cmp_cb(void *obj, void *arg, int flags)
505 {
506         struct channel_role_snapshot *left = obj;
507         struct channel_role_snapshot *right = arg;
508         const char *match = (flags & OBJ_KEY) ? arg : right->role;
509         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH | CMP_STOP);
510 }
511
512 /*!
513  * \internal
514  * \brief Multi comparison function for \ref channel_role_snapshot objects
515  */
516 static int channel_role_multi_cmp_cb(void *obj, void *arg, int flags)
517 {
518         struct channel_role_snapshot *left = obj;
519         struct channel_role_snapshot *right = arg;
520         const char *match = (flags & OBJ_KEY) ? arg : right->role;
521         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH);
522 }
523
524 /*!
525  * \internal
526  * \brief Hash function for \ref channel_role_snapshot objects
527  */
528 static int channel_role_hash_cb(const void *obj, const int flags)
529 {
530         const struct channel_role_snapshot *snapshot = obj;
531         const char *name = (flags & OBJ_KEY) ? obj : snapshot->role;
532         return ast_str_case_hash(name);
533 }
534
535 /*!
536  * \internal
537  * \brief Destructor for \ref ast_multi_channel_blob objects
538  */
539 static void multi_channel_blob_dtor(void *obj)
540 {
541         struct ast_multi_channel_blob *multi_blob = obj;
542
543         ao2_cleanup(multi_blob->channel_snapshots);
544         ast_json_unref(multi_blob->blob);
545 }
546
547 struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *blob)
548 {
549         RAII_VAR(struct ast_multi_channel_blob *, obj,
550                         ao2_alloc(sizeof(*obj), multi_channel_blob_dtor),
551                         ao2_cleanup);
552
553         ast_assert(blob != NULL);
554
555         if (!obj) {
556                 return NULL;
557         }
558
559         obj->channel_snapshots = ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS,
560                         channel_role_hash_cb, channel_role_single_cmp_cb);
561         if (!obj->channel_snapshots) {
562                 return NULL;
563         }
564
565         obj->blob = ast_json_ref(blob);
566
567         ao2_ref(obj, +1);
568         return obj;
569 }
570
571 struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
572 {
573         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
574         struct ast_channel_snapshot *snapshot;
575
576         ast_assert(!ast_strlen_zero(uniqueid));
577
578         message = stasis_cache_get(ast_channel_cache(),
579                         ast_channel_snapshot_type(),
580                         uniqueid);
581         if (!message) {
582                 return NULL;
583         }
584
585         snapshot = stasis_message_data(message);
586         if (!snapshot) {
587                 return NULL;
588         }
589         ao2_ref(snapshot, +1);
590         return snapshot;
591 }
592
593 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
594 {
595         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
596         struct ast_channel_snapshot *snapshot;
597
598         ast_assert(!ast_strlen_zero(name));
599
600         message = stasis_cache_get(ast_channel_cache_by_name(),
601                         ast_channel_snapshot_type(),
602                         name);
603         if (!message) {
604                 return NULL;
605         }
606
607         snapshot = stasis_message_data(message);
608         if (!snapshot) {
609                 return NULL;
610         }
611         ao2_ref(snapshot, +1);
612         return snapshot;
613 }
614
615 static void channel_role_snapshot_dtor(void *obj)
616 {
617         struct channel_role_snapshot *role_snapshot = obj;
618         ao2_cleanup(role_snapshot->snapshot);
619 }
620
621 void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj, const char *role, struct ast_channel_snapshot *snapshot)
622 {
623         RAII_VAR(struct channel_role_snapshot *, role_snapshot, NULL, ao2_cleanup);
624         int role_len = strlen(role) + 1;
625
626         if (!obj || ast_strlen_zero(role) || !snapshot) {
627                 return;
628         }
629
630         role_snapshot = ao2_alloc_options(sizeof(*role_snapshot) + role_len, channel_role_snapshot_dtor,
631                 AO2_ALLOC_OPT_LOCK_NOLOCK);
632         if (!role_snapshot) {
633                 return;
634         }
635         ast_copy_string(role_snapshot->role, role, role_len);
636         role_snapshot->snapshot = snapshot;
637         ao2_ref(role_snapshot->snapshot, +1);
638         ao2_link(obj->channel_snapshots, role_snapshot);
639 }
640
641 struct ast_channel_snapshot *ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
642 {
643         struct channel_role_snapshot *role_snapshot;
644
645         if (!obj || ast_strlen_zero(role)) {
646                 return NULL;
647         }
648         role_snapshot = ao2_find(obj->channel_snapshots, role, OBJ_KEY);
649         /* Note that this function does not increase the ref count on snapshot */
650         if (!role_snapshot) {
651                 return NULL;
652         }
653         ao2_ref(role_snapshot, -1);
654         return role_snapshot->snapshot;
655 }
656
657 struct ao2_container *ast_multi_channel_blob_get_channels(struct ast_multi_channel_blob *obj, const char *role)
658 {
659         RAII_VAR(struct ao2_container *, ret_container,
660                 ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS, channel_snapshot_hash_cb, channel_snapshot_cmp_cb),
661                 ao2_cleanup);
662         struct ao2_iterator *it_role_snapshots;
663         struct channel_role_snapshot *role_snapshot;
664         char *arg;
665
666         if (!obj || ast_strlen_zero(role) || !ret_container) {
667                 return NULL;
668         }
669         arg = ast_strdupa(role);
670
671         it_role_snapshots = ao2_callback(obj->channel_snapshots, OBJ_MULTIPLE | OBJ_KEY, channel_role_multi_cmp_cb, arg);
672         if (!it_role_snapshots) {
673                 return NULL;
674         }
675
676         while ((role_snapshot = ao2_iterator_next(it_role_snapshots))) {
677                 ao2_link(ret_container, role_snapshot->snapshot);
678                 ao2_ref(role_snapshot, -1);
679         }
680         ao2_iterator_destroy(it_role_snapshots);
681
682         ao2_ref(ret_container, +1);
683         return ret_container;
684 }
685
686 struct ast_json *ast_multi_channel_blob_get_json(struct ast_multi_channel_blob *obj)
687 {
688         if (!obj) {
689                 return NULL;
690         }
691         return obj->blob;
692 }
693
694 void ast_channel_stage_snapshot(struct ast_channel *chan)
695 {
696         ast_set_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
697 }
698
699 void ast_channel_stage_snapshot_done(struct ast_channel *chan)
700 {
701         ast_clear_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
702         ast_channel_publish_snapshot(chan);
703 }
704
705 void ast_channel_publish_snapshot(struct ast_channel *chan)
706 {
707         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
708         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
709
710         if (!ast_channel_snapshot_type()) {
711                 return;
712         }
713
714         if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)) {
715                 return;
716         }
717
718         snapshot = ast_channel_snapshot_create(chan);
719         if (!snapshot) {
720                 return;
721         }
722
723         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
724         if (!message) {
725                 return;
726         }
727
728         ast_assert(ast_channel_topic(chan) != NULL);
729         stasis_publish(ast_channel_topic(chan), message);
730 }
731
732 void ast_channel_publish_cached_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
733 {
734         struct stasis_message *message;
735
736         if (!blob) {
737                 blob = ast_json_null();
738         }
739
740         message = ast_channel_blob_create_from_cache(ast_channel_uniqueid(chan), type, blob);
741         if (message) {
742                 stasis_publish(ast_channel_topic(chan), message);
743         }
744         ao2_cleanup(message);
745 }
746
747 void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
748 {
749         struct stasis_message *message;
750
751         if (!blob) {
752                 blob = ast_json_null();
753         }
754
755         message = ast_channel_blob_create(chan, type, blob);
756         if (message) {
757                 stasis_publish(ast_channel_topic(chan), message);
758         }
759         ao2_cleanup(message);
760 }
761
762 void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
763 {
764         struct ast_json *blob;
765
766         ast_assert(name != NULL);
767         ast_assert(value != NULL);
768
769         blob = ast_json_pack("{s: s, s: s}",
770                              "variable", name,
771                              "value", value);
772         if (!blob) {
773                 ast_log(LOG_ERROR, "Error creating message\n");
774                 return;
775         }
776
777         /*! If there are manager variables, force a cache update */
778         if (chan && ast_channel_has_manager_vars()) {
779                 ast_channel_publish_snapshot(chan);
780         }
781
782         if (chan) {
783                 ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob);
784         } else {
785                 /* This function is NULL safe for global variables */
786                 ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob);
787         }
788
789         ast_json_unref(blob);
790 }
791
792 static struct ast_manager_event_blob *varset_to_ami(struct stasis_message *msg)
793 {
794         RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
795         struct ast_channel_blob *obj = stasis_message_data(msg);
796         const char *variable =
797                 ast_json_string_get(ast_json_object_get(obj->blob, "variable"));
798         RAII_VAR(char *, value, ast_escape_c_alloc(
799                          ast_json_string_get(ast_json_object_get(obj->blob, "value"))), ast_free);
800
801         if (!value) {
802                 return NULL;
803         }
804
805         if (obj->snapshot) {
806                 channel_event_string =
807                         ast_manager_build_channel_state_string(obj->snapshot);
808         } else {
809                 channel_event_string = ast_str_create(35);
810                 ast_str_set(&channel_event_string, 0,
811                             "Channel: none\r\n"
812                             "Uniqueid: none\r\n");
813         }
814
815         if (!channel_event_string) {
816                 return NULL;
817         }
818
819         return ast_manager_event_blob_create(EVENT_FLAG_DIALPLAN, "VarSet",
820                 "%s"
821                 "Variable: %s\r\n"
822                 "Value: %s\r\n",
823                 ast_str_buffer(channel_event_string), variable, value);
824 }
825
826 static struct ast_manager_event_blob *agent_login_to_ami(struct stasis_message *msg)
827 {
828         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
829         struct ast_channel_blob *obj = stasis_message_data(msg);
830         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
831
832         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
833         if (!channel_string) {
834                 return NULL;
835         }
836
837         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogin",
838                 "%s"
839                 "Agent: %s\r\n",
840                 ast_str_buffer(channel_string), agent);
841 }
842
843 static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message *msg)
844 {
845         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
846         struct ast_channel_blob *obj = stasis_message_data(msg);
847         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
848         long logintime = ast_json_integer_get(ast_json_object_get(obj->blob, "logintime"));
849
850         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
851         if (!channel_string) {
852                 return NULL;
853         }
854
855         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogoff",
856                 "%s"
857                 "Agent: %s\r\n"
858                 "Logintime: %ld\r\n",
859                 ast_str_buffer(channel_string), agent, logintime);
860 }
861
862 void ast_publish_channel_state(struct ast_channel *chan)
863 {
864         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
865         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
866
867         if (!ast_channel_snapshot_type()) {
868                 return;
869         }
870
871         ast_assert(chan != NULL);
872         if (!chan) {
873                 return;
874         }
875
876         snapshot = ast_channel_snapshot_create(chan);
877         if (!snapshot) {
878                 return;
879         }
880
881         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
882         if (!message) {
883                 return;
884         }
885
886         ast_assert(ast_channel_topic(chan) != NULL);
887         stasis_publish(ast_channel_topic(chan), message);
888 }
889
890 struct ast_json *ast_channel_snapshot_to_json(
891         const struct ast_channel_snapshot *snapshot,
892         const struct stasis_message_sanitizer *sanitize)
893 {
894         struct ast_json *json_chan;
895
896         if (snapshot == NULL
897                 || (sanitize && sanitize->channel_snapshot
898                 && sanitize->channel_snapshot(snapshot))) {
899                 return NULL;
900         }
901
902         json_chan = ast_json_pack(
903                 /* Broken up into groups of three for readability */
904                 "{ s: s, s: s, s: s,"
905                 "  s: o, s: o, s: s,"
906                 "  s: o, s: o, s: s }",
907                 /* First line */
908                 "id", snapshot->uniqueid,
909                 "name", snapshot->name,
910                 "state", ast_state2str(snapshot->state),
911                 /* Second line */
912                 "caller", ast_json_name_number(
913                         snapshot->caller_name, snapshot->caller_number),
914                 "connected", ast_json_name_number(
915                         snapshot->connected_name, snapshot->connected_number),
916                 "accountcode", snapshot->accountcode,
917                 /* Third line */
918                 "dialplan", ast_json_dialplan_cep(
919                         snapshot->context, snapshot->exten, snapshot->priority),
920                 "creationtime", ast_json_timeval(snapshot->creationtime, NULL),
921                 "language", snapshot->language);
922
923         if (snapshot->ari_vars && !AST_LIST_EMPTY(snapshot->ari_vars)) {
924                 ast_json_object_set(json_chan, "channelvars", ast_json_channel_vars(snapshot->ari_vars));
925         }
926
927         return json_chan;
928 }
929
930 int ast_channel_snapshot_cep_equal(
931         const struct ast_channel_snapshot *old_snapshot,
932         const struct ast_channel_snapshot *new_snapshot)
933 {
934         ast_assert(old_snapshot != NULL);
935         ast_assert(new_snapshot != NULL);
936
937         /* We actually get some snapshots with CEP set, but before the
938          * application is set. Since empty application is invalid, we treat
939          * setting the application from nothing as a CEP change.
940          */
941         if (ast_strlen_zero(old_snapshot->appl) &&
942             !ast_strlen_zero(new_snapshot->appl)) {
943                 return 0;
944         }
945
946         return old_snapshot->priority == new_snapshot->priority &&
947                 strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
948                 strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
949 }
950
951 int ast_channel_snapshot_caller_id_equal(
952         const struct ast_channel_snapshot *old_snapshot,
953         const struct ast_channel_snapshot *new_snapshot)
954 {
955         ast_assert(old_snapshot != NULL);
956         ast_assert(new_snapshot != NULL);
957         return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
958                 strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
959 }
960
961 int ast_channel_snapshot_connected_line_equal(
962         const struct ast_channel_snapshot *old_snapshot,
963         const struct ast_channel_snapshot *new_snapshot)
964 {
965         ast_assert(old_snapshot != NULL);
966         ast_assert(new_snapshot != NULL);
967         return strcmp(old_snapshot->connected_number, new_snapshot->connected_number) == 0 &&
968                 strcmp(old_snapshot->connected_name, new_snapshot->connected_name) == 0;
969 }
970
971 static struct ast_json *channel_blob_to_json(
972         struct stasis_message *message,
973         const char *type,
974         const struct stasis_message_sanitizer *sanitize)
975 {
976         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
977         struct ast_channel_blob *channel_blob = stasis_message_data(message);
978         struct ast_json *blob = channel_blob->blob;
979         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
980         const struct timeval *tv = stasis_message_timestamp(message);
981         int res = 0;
982
983         if (blob == NULL || ast_json_is_null(blob)) {
984                 out = ast_json_object_create();
985         } else {
986                 /* blobs are immutable, so shallow copies are fine */
987                 out = ast_json_copy(blob);
988         }
989
990         if (!out) {
991                 return NULL;
992         }
993
994         res |= ast_json_object_set(out, "type", ast_json_string_create(type));
995         res |= ast_json_object_set(out, "timestamp",
996                 ast_json_timeval(*tv, NULL));
997
998         /* For global channel messages, the snapshot is optional */
999         if (snapshot) {
1000                 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1001
1002                 if (!json_channel) {
1003                         return NULL;
1004                 }
1005
1006                 res |= ast_json_object_set(out, "channel", json_channel);
1007         }
1008
1009         if (res != 0) {
1010                 return NULL;
1011         }
1012
1013         return ast_json_ref(out);
1014 }
1015
1016 static struct ast_json *dtmf_end_to_json(
1017         struct stasis_message *message,
1018         const struct stasis_message_sanitizer *sanitize)
1019 {
1020         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1021         struct ast_json *blob = channel_blob->blob;
1022         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1023         const char *direction =
1024                 ast_json_string_get(ast_json_object_get(blob, "direction"));
1025         const char *digit =
1026                 ast_json_string_get(ast_json_object_get(blob, "digit"));
1027         long duration_ms =
1028                 ast_json_integer_get(ast_json_object_get(blob, "duration_ms"));
1029         const struct timeval *tv = stasis_message_timestamp(message);
1030         struct ast_json *json_channel;
1031
1032         /* Only present received DTMF end events as JSON */
1033         if (strcasecmp("Received", direction) != 0) {
1034                 return NULL;
1035         }
1036
1037         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1038         if (!json_channel) {
1039                 return NULL;
1040         }
1041
1042         return ast_json_pack("{s: s, s: o, s: s, s: i, s: o}",
1043                 "type", "ChannelDtmfReceived",
1044                 "timestamp", ast_json_timeval(*tv, NULL),
1045                 "digit", digit,
1046                 "duration_ms", duration_ms,
1047                 "channel", json_channel);
1048 }
1049
1050 static struct ast_json *varset_to_json(
1051         struct stasis_message *message,
1052         const struct stasis_message_sanitizer *sanitize)
1053 {
1054         return channel_blob_to_json(message, "ChannelVarset", sanitize);
1055 }
1056
1057 static struct ast_json *hangup_request_to_json(
1058         struct stasis_message *message,
1059         const struct stasis_message_sanitizer *sanitize)
1060 {
1061         return channel_blob_to_json(message, "ChannelHangupRequest", sanitize);
1062 }
1063
1064 static struct ast_json *dial_to_json(
1065         struct stasis_message *message,
1066         const struct stasis_message_sanitizer *sanitize)
1067 {
1068         struct ast_multi_channel_blob *payload = stasis_message_data(message);
1069         struct ast_json *blob = ast_multi_channel_blob_get_json(payload);
1070         const char *dialstatus =
1071                 ast_json_string_get(ast_json_object_get(blob, "dialstatus"));
1072         const char *forward =
1073                 ast_json_string_get(ast_json_object_get(blob, "forward"));
1074         const char *dialstring =
1075                 ast_json_string_get(ast_json_object_get(blob, "dialstring"));
1076         struct ast_json *caller_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "caller"), sanitize);
1077         struct ast_json *peer_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "peer"), sanitize);
1078         struct ast_json *forwarded_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "forwarded"), sanitize);
1079         struct ast_json *json;
1080         const struct timeval *tv = stasis_message_timestamp(message);
1081         int res = 0;
1082
1083         json = ast_json_pack("{s: s, s: o, s: s, s: s, s: s}",
1084                 "type", "Dial",
1085                 "timestamp", ast_json_timeval(*tv, NULL),
1086                 "dialstatus", dialstatus,
1087                 "forward", forward,
1088                 "dialstring", dialstring);
1089         if (!json) {
1090                 ast_json_unref(caller_json);
1091                 ast_json_unref(peer_json);
1092                 ast_json_unref(forwarded_json);
1093                 return NULL;
1094         }
1095
1096         if (caller_json) {
1097                 res |= ast_json_object_set(json, "caller", caller_json);
1098         }
1099         if (peer_json) {
1100                 res |= ast_json_object_set(json, "peer", peer_json);
1101         }
1102         if (forwarded_json) {
1103                 res |= ast_json_object_set(json, "forwarded", forwarded_json);
1104         }
1105
1106         if (res) {
1107                 ast_json_unref(json);
1108                 return NULL;
1109         }
1110
1111         return json;
1112 }
1113
1114 static struct ast_manager_event_blob *talking_start_to_ami(struct stasis_message *msg)
1115 {
1116         struct ast_str *channel_string;
1117         struct ast_channel_blob *obj = stasis_message_data(msg);
1118         struct ast_manager_event_blob *blob;
1119
1120         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1121         if (!channel_string) {
1122                 return NULL;
1123         }
1124
1125         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStart",
1126                                              "%s", ast_str_buffer(channel_string));
1127         ast_free(channel_string);
1128
1129         return blob;
1130 }
1131
1132 static struct ast_json *talking_start_to_json(struct stasis_message *message,
1133         const struct stasis_message_sanitizer *sanitize)
1134 {
1135         return channel_blob_to_json(message, "ChannelTalkingStarted", sanitize);
1136 }
1137
1138 static struct ast_manager_event_blob *talking_stop_to_ami(struct stasis_message *msg)
1139 {
1140         struct ast_str *channel_string;
1141         struct ast_channel_blob *obj = stasis_message_data(msg);
1142         int duration = ast_json_integer_get(ast_json_object_get(obj->blob, "duration"));
1143         struct ast_manager_event_blob *blob;
1144
1145         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1146         if (!channel_string) {
1147                 return NULL;
1148         }
1149
1150         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStop",
1151                                              "%s"
1152                                              "Duration: %d\r\n",
1153                                              ast_str_buffer(channel_string),
1154                                              duration);
1155         ast_free(channel_string);
1156
1157         return blob;
1158 }
1159
1160 static struct ast_json *talking_stop_to_json(struct stasis_message *message,
1161         const struct stasis_message_sanitizer *sanitize)
1162 {
1163         return channel_blob_to_json(message, "ChannelTalkingFinished", sanitize);
1164 }
1165
1166 static struct ast_json *hold_to_json(struct stasis_message *message,
1167         const struct stasis_message_sanitizer *sanitize)
1168 {
1169         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1170         struct ast_json *blob = channel_blob->blob;
1171         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1172         const char *musicclass = ast_json_string_get(ast_json_object_get(blob, "musicclass"));
1173         const struct timeval *tv = stasis_message_timestamp(message);
1174         struct ast_json *json_channel;
1175
1176         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1177         if (!json_channel) {
1178                 return NULL;
1179         }
1180
1181         return ast_json_pack("{s: s, s: o, s: s, s: o}",
1182                 "type", "ChannelHold",
1183                 "timestamp", ast_json_timeval(*tv, NULL),
1184                 "musicclass", S_OR(musicclass, "N/A"),
1185                 "channel", json_channel);
1186 }
1187
1188 static struct ast_json *unhold_to_json(struct stasis_message *message,
1189         const struct stasis_message_sanitizer *sanitize)
1190 {
1191         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1192         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1193         const struct timeval *tv = stasis_message_timestamp(message);
1194         struct ast_json *json_channel;
1195
1196         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1197         if (!json_channel) {
1198                 return NULL;
1199         }
1200
1201         return ast_json_pack("{s: s, s: o, s: o}",
1202                 "type", "ChannelUnhold",
1203                 "timestamp", ast_json_timeval(*tv, NULL),
1204                 "channel", json_channel);
1205 }
1206
1207 /*!
1208  * @{ \brief Define channel message types.
1209  */
1210 STASIS_MESSAGE_TYPE_DEFN(ast_channel_snapshot_type);
1211 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dial_type,
1212         .to_json = dial_to_json,
1213         );
1214 STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
1215         .to_ami = varset_to_ami,
1216         .to_json = varset_to_json,
1217         );
1218 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
1219         .to_json = hangup_request_to_json,
1220         );
1221 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
1222 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type,
1223         .to_json = dtmf_end_to_json,
1224         );
1225 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hold_type,
1226         .to_json = hold_to_json,
1227         );
1228 STASIS_MESSAGE_TYPE_DEFN(ast_channel_unhold_type,
1229         .to_json = unhold_to_json,
1230         );
1231 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_start_type);
1232 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_stop_type);
1233 STASIS_MESSAGE_TYPE_DEFN(ast_channel_fax_type);
1234 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_handler_type);
1235 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_start_type);
1236 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_stop_type);
1237 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_start_type);
1238 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_stop_type);
1239 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_login_type,
1240         .to_ami = agent_login_to_ami,
1241         );
1242 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type,
1243         .to_ami = agent_logoff_to_ami,
1244         );
1245 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_start,
1246         .to_ami = talking_start_to_ami,
1247         .to_json = talking_start_to_json,
1248         );
1249 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_stop,
1250         .to_ami = talking_stop_to_ami,
1251         .to_json = talking_stop_to_json,
1252         );
1253
1254 /*! @} */
1255
1256 static void stasis_channels_cleanup(void)
1257 {
1258         stasis_caching_unsubscribe_and_join(channel_by_name_topic);
1259         channel_by_name_topic = NULL;
1260         ao2_cleanup(channel_cache_by_name);
1261         channel_cache_by_name = NULL;
1262         ao2_cleanup(channel_cache_all);
1263         channel_cache_all = NULL;
1264
1265         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
1266         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
1267         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
1268         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
1269         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
1270         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
1271         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hold_type);
1272         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_unhold_type);
1273         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_start_type);
1274         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_stop_type);
1275         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_fax_type);
1276         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_handler_type);
1277         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_start_type);
1278         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_stop_type);
1279         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_start_type);
1280         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
1281         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type);
1282         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type);
1283         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_start);
1284         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_stop);
1285 }
1286
1287 int ast_stasis_channels_init(void)
1288 {
1289         int res = 0;
1290
1291         ast_register_cleanup(stasis_channels_cleanup);
1292
1293         channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
1294                 channel_snapshot_get_id);
1295         if (!channel_cache_all) {
1296                 return -1;
1297         }
1298         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
1299         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
1300
1301         channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
1302         if (!channel_cache_by_name) {
1303                 return -1;
1304         }
1305
1306         /* This should be initialized before the caching topic */
1307         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
1308
1309         channel_by_name_topic = stasis_caching_topic_create(
1310                 stasis_cp_all_topic(channel_cache_all),
1311                 channel_cache_by_name);
1312         if (!channel_by_name_topic) {
1313                 return -1;
1314         }
1315
1316         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
1317         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
1318         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
1319         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
1320         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
1321         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
1322         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
1323         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
1324         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
1325         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
1326         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
1327         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
1328         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
1329         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
1330         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
1331         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_start);
1332         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_stop);
1333
1334         return res;
1335 }
1336
1337 /*!
1338  * \internal
1339  * \brief A list element for the dial_masquerade_datastore -- stores data about a dialed peer
1340  */
1341 struct dial_target {
1342         /*! Called party channel. */
1343         struct ast_channel *peer;
1344         /*! Dialstring used to call the peer. */
1345         char *dialstring;
1346         /*! Next entry in the list. */
1347         AST_LIST_ENTRY(dial_target) list;
1348 };
1349
1350 static void dial_target_free(struct dial_target *doomed)
1351 {
1352         if (!doomed) {
1353                 return;
1354         }
1355         ast_free(doomed->dialstring);
1356         ast_channel_cleanup(doomed->peer);
1357         ast_free(doomed);
1358 }
1359
1360 /*!
1361  * \internal
1362  * \brief Datastore used for advancing dial state in the case of a masquerade
1363  *        against a channel in the process of dialing.
1364  */
1365 struct dial_masquerade_datastore {
1366         /*! Calling party channel. */
1367         struct ast_channel *caller;
1368         /*! List of called peers. */
1369         AST_LIST_HEAD_NOLOCK(, dial_target) dialed_peers;
1370 };
1371
1372 static void dial_masquerade_datastore_cleanup(struct dial_masquerade_datastore *masq_data)
1373 {
1374         struct dial_target *cur;
1375
1376         while ((cur = AST_LIST_REMOVE_HEAD(&masq_data->dialed_peers, list))) {
1377                 dial_target_free(cur);
1378         }
1379 }
1380
1381 static void dial_masquerade_datastore_remove_chan(struct dial_masquerade_datastore *masq_data, struct ast_channel *chan)
1382 {
1383         struct dial_target *cur;
1384
1385         ao2_lock(masq_data);
1386         if (masq_data->caller == chan) {
1387                 dial_masquerade_datastore_cleanup(masq_data);
1388         } else {
1389                 AST_LIST_TRAVERSE_SAFE_BEGIN(&masq_data->dialed_peers, cur, list) {
1390                         if (cur->peer == chan) {
1391                                 AST_LIST_REMOVE_CURRENT(list);
1392                                 dial_target_free(cur);
1393                                 break;
1394                         }
1395                 }
1396                 AST_LIST_TRAVERSE_SAFE_END;
1397         }
1398         ao2_unlock(masq_data);
1399 }
1400
1401 static void dial_masquerade_datastore_dtor(void *vdoomed)
1402 {
1403         dial_masquerade_datastore_cleanup(vdoomed);
1404 }
1405
1406 static struct dial_masquerade_datastore *dial_masquerade_datastore_alloc(void)
1407 {
1408         struct dial_masquerade_datastore *masq_data;
1409
1410         masq_data = ao2_alloc(sizeof(struct dial_masquerade_datastore),
1411                 dial_masquerade_datastore_dtor);
1412         if (!masq_data) {
1413                 return NULL;
1414         }
1415         AST_LIST_HEAD_INIT_NOLOCK(&masq_data->dialed_peers);
1416         return masq_data;
1417 }
1418
1419 /*!
1420  * \internal
1421  * \brief Datastore destructor for dial_masquerade_datastore
1422  */
1423 static void dial_masquerade_datastore_destroy(void *data)
1424 {
1425         ao2_ref(data, -1);
1426 }
1427
1428 /*!
1429  * \internal
1430  * \brief Datastore destructor for dial_masquerade_datastore
1431  */
1432 static void dial_masquerade_caller_datastore_destroy(void *data)
1433 {
1434         dial_masquerade_datastore_cleanup(data);
1435         ao2_ref(data, -1);
1436 }
1437
1438 static struct ast_datastore *dial_masquerade_datastore_find(struct ast_channel *chan);
1439
1440 static void dial_masquerade_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1441 {
1442         struct dial_masquerade_datastore *masq_data = data;
1443         struct dial_target *cur;
1444         struct ast_datastore *datastore;
1445
1446         ao2_lock(masq_data);
1447         if (!masq_data->caller) {
1448                 /* Nothing to do but remove the datastore */
1449         } else if (masq_data->caller == old_chan) {
1450                 /* The caller channel is being masqueraded out. */
1451                 ast_debug(1, "Caller channel %s being masqueraded out to %s (is_empty:%d)\n",
1452                         ast_channel_name(new_chan), ast_channel_name(old_chan),
1453                         AST_LIST_EMPTY(&masq_data->dialed_peers));
1454                 AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1455                         ast_channel_publish_dial_internal(new_chan, cur->peer, NULL,
1456                                 cur->dialstring, "NOANSWER", NULL);
1457                         ast_channel_publish_dial_internal(old_chan, cur->peer, NULL,
1458                                 cur->dialstring, NULL, NULL);
1459                 }
1460                 dial_masquerade_datastore_cleanup(masq_data);
1461         } else {
1462                 /* One of the peer channels is being masqueraded out. */
1463                 AST_LIST_TRAVERSE_SAFE_BEGIN(&masq_data->dialed_peers, cur, list) {
1464                         if (cur->peer == old_chan) {
1465                                 ast_debug(1, "Peer channel %s being masqueraded out to %s\n",
1466                                         ast_channel_name(new_chan), ast_channel_name(old_chan));
1467                                 ast_channel_publish_dial_internal(masq_data->caller, new_chan, NULL,
1468                                         cur->dialstring, "CANCEL", NULL);
1469                                 ast_channel_publish_dial_internal(masq_data->caller, old_chan, NULL,
1470                                         cur->dialstring, NULL, NULL);
1471
1472                                 AST_LIST_REMOVE_CURRENT(list);
1473                                 dial_target_free(cur);
1474                                 break;
1475                         }
1476                 }
1477                 AST_LIST_TRAVERSE_SAFE_END;
1478         }
1479         ao2_unlock(masq_data);
1480
1481         /* Remove the datastore from the channel. */
1482         datastore = dial_masquerade_datastore_find(old_chan);
1483         if (!datastore) {
1484                 return;
1485         }
1486         ast_channel_datastore_remove(old_chan, datastore);
1487         ast_datastore_free(datastore);
1488 }
1489
1490 /*!
1491  * \internal
1492  * \brief Primary purpose for dial_masquerade_datastore, publishes
1493  *        the channel dial event needed to set the incoming channel into the
1494  *        dial state during a masquerade.
1495  * \param data pointer to the dial_masquerade_datastore
1496  * \param old_chan Channel being replaced
1497  * \param new_chan Channel being pushed to dial mode
1498  */
1499 static void dial_masquerade_breakdown(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1500 {
1501         struct dial_masquerade_datastore *masq_data = data;
1502         struct dial_target *cur;
1503
1504         ao2_lock(masq_data);
1505
1506         if (!masq_data->caller) {
1507                 ao2_unlock(masq_data);
1508                 return;
1509         }
1510
1511         if (masq_data->caller == new_chan) {
1512                 /*
1513                  * The caller channel is being masqueraded into.
1514                  * The masquerade is likely because of a blonde transfer.
1515                  */
1516                 ast_debug(1, "Caller channel %s being masqueraded into by %s (is_empty:%d)\n",
1517                         ast_channel_name(old_chan), ast_channel_name(new_chan),
1518                         AST_LIST_EMPTY(&masq_data->dialed_peers));
1519                 AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1520                         ast_channel_publish_dial_internal(old_chan, cur->peer, NULL,
1521                                 cur->dialstring, "NOANSWER", NULL);
1522                         ast_channel_publish_dial_internal(new_chan, cur->peer, NULL,
1523                                 cur->dialstring, NULL, NULL);
1524                 }
1525
1526                 ao2_unlock(masq_data);
1527                 return;
1528         }
1529
1530         /*
1531          * One of the peer channels is being masqueraded into.
1532          * The masquerade is likely because of a call pickup.
1533          */
1534         AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1535                 if (cur->peer == new_chan) {
1536                         ast_debug(1, "Peer channel %s being masqueraded into by %s\n",
1537                                 ast_channel_name(old_chan), ast_channel_name(new_chan));
1538                         ast_channel_publish_dial_internal(masq_data->caller, old_chan, NULL,
1539                                 cur->dialstring, "CANCEL", NULL);
1540                         ast_channel_publish_dial_internal(masq_data->caller, new_chan, NULL,
1541                                 cur->dialstring, NULL, NULL);
1542                         break;
1543                 }
1544         }
1545
1546         ao2_unlock(masq_data);
1547 }
1548
1549 static const struct ast_datastore_info dial_masquerade_info = {
1550         .type = "stasis-chan-dial-masq",
1551         .destroy = dial_masquerade_datastore_destroy,
1552         .chan_fixup = dial_masquerade_fixup,
1553         .chan_breakdown = dial_masquerade_breakdown,
1554 };
1555
1556 static const struct ast_datastore_info dial_masquerade_caller_info = {
1557         .type = "stasis-chan-dial-masq",
1558         .destroy = dial_masquerade_caller_datastore_destroy,
1559         .chan_fixup = dial_masquerade_fixup,
1560         .chan_breakdown = dial_masquerade_breakdown,
1561 };
1562
1563 /*!
1564  * \internal
1565  * \brief Find the dial masquerade datastore on the given channel.
1566  *
1567  * \param chan Channel a datastore data is wanted from
1568  *
1569  * \return A pointer to the datastore if it exists.
1570  */
1571 static struct ast_datastore *dial_masquerade_datastore_find(struct ast_channel *chan)
1572 {
1573         struct ast_datastore *datastore;
1574
1575         datastore = ast_channel_datastore_find(chan, &dial_masquerade_info, NULL);
1576         if (!datastore) {
1577                 datastore = ast_channel_datastore_find(chan, &dial_masquerade_caller_info, NULL);
1578         }
1579
1580         return datastore;
1581 }
1582
1583 /*!
1584  * \internal
1585  * \brief Add the dial masquerade datastore to a channel.
1586  *
1587  * \param chan Channel to setup dial masquerade datastore on.
1588  * \param masq_data NULL to setup caller datastore otherwise steals the ref on success.
1589  *
1590  * \retval masq_data given or created on success.
1591  *         (A ref is not returned but can be obtained before chan is unlocked.)
1592  * \retval NULL on error.  masq_data ref is not stolen.
1593  */
1594 static struct dial_masquerade_datastore *dial_masquerade_datastore_add(
1595         struct ast_channel *chan, struct dial_masquerade_datastore *masq_data)
1596 {
1597         struct ast_datastore *datastore;
1598
1599         datastore = ast_datastore_alloc(!masq_data ? &dial_masquerade_caller_info : &dial_masquerade_info, NULL);
1600         if (!datastore) {
1601                 return NULL;
1602         }
1603
1604         if (!masq_data) {
1605                 masq_data = dial_masquerade_datastore_alloc();
1606                 if (!masq_data) {
1607                         ast_datastore_free(datastore);
1608                         return NULL;
1609                 }
1610                 masq_data->caller = chan;
1611         }
1612
1613         datastore->data = masq_data;
1614         ast_channel_datastore_add(chan, datastore);
1615
1616         return masq_data;
1617 }
1618
1619 static int set_dial_masquerade(struct ast_channel *caller, struct ast_channel *peer, const char *dialstring)
1620 {
1621         struct ast_datastore *datastore;
1622         struct dial_masquerade_datastore *masq_data;
1623         struct dial_target *target;
1624
1625         /* Find or create caller datastore */
1626         datastore = dial_masquerade_datastore_find(caller);
1627         if (!datastore) {
1628                 masq_data = dial_masquerade_datastore_add(caller, NULL);
1629         } else {
1630                 masq_data = datastore->data;
1631         }
1632         if (!masq_data) {
1633                 return -1;
1634         }
1635         ao2_ref(masq_data, +1);
1636
1637         /*
1638          * Someone likely forgot to do an ast_channel_publish_dial()
1639          * or ast_channel_publish_dial_forward() with a final dial
1640          * status on the channel.
1641          */
1642         ast_assert(masq_data->caller == caller);
1643
1644         /* Create peer target to put into datastore */
1645         target = ast_calloc(1, sizeof(*target));
1646         if (!target) {
1647                 ao2_ref(masq_data, -1);
1648                 return -1;
1649         }
1650         if (dialstring) {
1651                 target->dialstring = ast_strdup(dialstring);
1652                 if (!target->dialstring) {
1653                         ast_free(target);
1654                         ao2_ref(masq_data, -1);
1655                         return -1;
1656                 }
1657         }
1658         target->peer = ast_channel_ref(peer);
1659
1660         /* Put peer target into datastore */
1661         ao2_lock(masq_data);
1662         dial_masquerade_datastore_remove_chan(masq_data, peer);
1663         AST_LIST_INSERT_HEAD(&masq_data->dialed_peers, target, list);
1664         ao2_unlock(masq_data);
1665
1666         datastore = dial_masquerade_datastore_find(peer);
1667         if (datastore) {
1668                 if (datastore->data == masq_data) {
1669                         /*
1670                          * Peer already had the datastore for this dial masquerade.
1671                          * This was a redundant peer dial masquerade setup.
1672                          */
1673                         ao2_ref(masq_data, -1);
1674                         return 0;
1675                 }
1676
1677                 /* Something is wrong.  Try to fix if the assert doesn't abort. */
1678                 ast_assert(0);
1679
1680                 /* Remove the stale dial masquerade datastore */
1681                 dial_masquerade_datastore_remove_chan(datastore->data, peer);
1682                 ast_channel_datastore_remove(peer, datastore);
1683                 ast_datastore_free(datastore);
1684         }
1685
1686         /* Create the peer dial masquerade datastore */
1687         if (dial_masquerade_datastore_add(peer, masq_data)) {
1688                 /* Success */
1689                 return 0;
1690         }
1691
1692         /* Failed to create the peer datastore */
1693         dial_masquerade_datastore_remove_chan(masq_data, peer);
1694         ao2_ref(masq_data, -1);
1695         return -1;
1696 }
1697
1698 static void remove_dial_masquerade(struct ast_channel *peer)
1699 {
1700         struct ast_datastore *datastore;
1701         struct dial_masquerade_datastore *masq_data;
1702
1703         datastore = dial_masquerade_datastore_find(peer);
1704         if (!datastore) {
1705                 return;
1706         }
1707
1708         masq_data = datastore->data;
1709         if (masq_data) {
1710                 dial_masquerade_datastore_remove_chan(masq_data, peer);
1711         }
1712
1713         ast_channel_datastore_remove(peer, datastore);
1714         ast_datastore_free(datastore);
1715 }
1716
1717 static void remove_dial_masquerade_caller(struct ast_channel *caller)
1718 {
1719         struct ast_datastore *datastore;
1720         struct dial_masquerade_datastore *masq_data;
1721
1722         datastore = dial_masquerade_datastore_find(caller);
1723         if (!datastore) {
1724                 return;
1725         }
1726
1727         masq_data = datastore->data;
1728         if (!masq_data || !AST_LIST_EMPTY(&masq_data->dialed_peers)) {
1729                 return;
1730         }
1731
1732         dial_masquerade_datastore_remove_chan(masq_data, caller);
1733
1734         ast_channel_datastore_remove(caller, datastore);
1735         ast_datastore_free(datastore);
1736 }