ARI: Add the ability to intercept hold and raise an event
[asterisk/asterisk.git] / main / stasis_channels.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Matt Jordan <mjordan@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Channel Objects
22  *
23  * \author \verbatim Matt Jordan <mjordan@digium.com> \endverbatim
24  *
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34
35 #include "asterisk/astobj2.h"
36 #include "asterisk/json.h"
37 #include "asterisk/pbx.h"
38 #include "asterisk/bridge.h"
39 #include "asterisk/translate.h"
40 #include "asterisk/stasis.h"
41 #include "asterisk/stasis_cache_pattern.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/dial.h"
44 #include "asterisk/linkedlists.h"
45
46 /*** DOCUMENTATION
47         <managerEvent language="en_US" name="VarSet">
48                 <managerEventInstance class="EVENT_FLAG_DIALPLAN">
49                         <synopsis>Raised when a variable is set to a particular value.</synopsis>
50                         <syntax>
51                                 <channel_snapshot/>
52                                 <parameter name="Variable">
53                                         <para>The variable being set.</para>
54                                 </parameter>
55                                 <parameter name="Value">
56                                         <para>The new value of the variable.</para>
57                                 </parameter>
58                         </syntax>
59                 </managerEventInstance>
60         </managerEvent>
61         <managerEvent language="en_US" name="AgentLogin">
62                 <managerEventInstance class="EVENT_FLAG_AGENT">
63                         <synopsis>Raised when an Agent has logged in.</synopsis>
64                         <syntax>
65                                 <channel_snapshot/>
66                                 <parameter name="Agent">
67                                         <para>Agent ID of the agent.</para>
68                                 </parameter>
69                         </syntax>
70                         <see-also>
71                                 <ref type="application">AgentLogin</ref>
72                                 <ref type="managerEvent">AgentLogoff</ref>
73                         </see-also>
74                 </managerEventInstance>
75         </managerEvent>
76         <managerEvent language="en_US" name="AgentLogoff">
77                 <managerEventInstance class="EVENT_FLAG_AGENT">
78                         <synopsis>Raised when an Agent has logged off.</synopsis>
79                         <syntax>
80                                 <xi:include xpointer="xpointer(/docs/managerEvent[@name='AgentLogin']/managerEventInstance/syntax/parameter)" />
81                                 <parameter name="Logintime">
82                                         <para>The number of seconds the agent was logged in.</para>
83                                 </parameter>
84                         </syntax>
85                         <see-also>
86                                 <ref type="managerEvent">AgentLogin</ref>
87                         </see-also>
88                 </managerEventInstance>
89         </managerEvent>
90         <managerEvent language="en_US" name="ChannelTalkingStart">
91                 <managerEventInstance class="EVENT_FLAG_CLASS">
92                         <synopsis>Raised when talking is detected on a channel.</synopsis>
93                         <syntax>
94                                 <channel_snapshot/>
95                         </syntax>
96                         <see-also>
97                                 <ref type="function">TALK_DETECT</ref>
98                                 <ref type="managerEvent">ChannelTalkingStop</ref>
99                         </see-also>
100                 </managerEventInstance>
101         </managerEvent>
102         <managerEvent language="en_US" name="ChannelTalkingStop">
103                 <managerEventInstance class="EVENT_FLAG_CLASS">
104                         <synopsis>Raised when talking is no longer detected on a channel.</synopsis>
105                         <syntax>
106                                 <channel_snapshot/>
107                                 <parameter name="Duration">
108                                         <para>The length in time, in milliseconds, that talking was
109                                         detected on the channel.</para>
110                                 </parameter>
111                         </syntax>
112                         <see-also>
113                                 <ref type="function">TALK_DETECT</ref>
114                                 <ref type="managerEvent">ChannelTalkingStart</ref>
115                         </see-also>
116                 </managerEventInstance>
117         </managerEvent>
118 ***/
119
120 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
121
122 static struct stasis_cp_all *channel_cache_all;
123 static struct stasis_cache *channel_cache_by_name;
124 static struct stasis_caching_topic *channel_by_name_topic;
125
126 struct stasis_cp_all *ast_channel_cache_all(void)
127 {
128         return channel_cache_all;
129 }
130
131 struct stasis_cache *ast_channel_cache(void)
132 {
133         return stasis_cp_all_cache(channel_cache_all);
134 }
135
136 struct stasis_topic *ast_channel_topic_all(void)
137 {
138         return stasis_cp_all_topic(channel_cache_all);
139 }
140
141 struct stasis_topic *ast_channel_topic_all_cached(void)
142 {
143         return stasis_cp_all_topic_cached(channel_cache_all);
144 }
145
146 struct stasis_cache *ast_channel_cache_by_name(void)
147 {
148         return channel_cache_by_name;
149 }
150
151 static const char *channel_snapshot_get_id(struct stasis_message *message)
152 {
153         struct ast_channel_snapshot *snapshot;
154         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
155                 return NULL;
156         }
157         snapshot = stasis_message_data(message);
158         return snapshot->uniqueid;
159 }
160
161 static const char *channel_snapshot_get_name(struct stasis_message *message)
162 {
163         struct ast_channel_snapshot *snapshot;
164         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
165                 return NULL;
166         }
167         snapshot = stasis_message_data(message);
168         return snapshot->name;
169 }
170
171 /*!
172  * \internal
173  * \brief Hash function for \ref ast_channel_snapshot objects
174  */
175 static int channel_snapshot_hash_cb(const void *obj, const int flags)
176 {
177         const struct ast_channel_snapshot *snapshot = obj;
178         const char *name = (flags & OBJ_KEY) ? obj : snapshot->name;
179         return ast_str_case_hash(name);
180 }
181
182 /*!
183  * \internal
184  * \brief Comparison function for \ref ast_channel_snapshot objects
185  */
186 static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
187 {
188         struct ast_channel_snapshot *left = obj;
189         struct ast_channel_snapshot *right = arg;
190         const char *match = (flags & OBJ_KEY) ? arg : right->name;
191         return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
192 }
193
194 static void channel_snapshot_dtor(void *obj)
195 {
196         struct ast_channel_snapshot *snapshot = obj;
197
198         ast_string_field_free_memory(snapshot);
199         ao2_cleanup(snapshot->manager_vars);
200 }
201
202 struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
203 {
204         struct ast_channel_snapshot *snapshot;
205         struct ast_bridge *bridge;
206
207         /* no snapshots for dummy channels */
208         if (!ast_channel_tech(chan)) {
209                 return NULL;
210         }
211
212         snapshot = ao2_alloc_options(sizeof(*snapshot), channel_snapshot_dtor,
213                 AO2_ALLOC_OPT_LOCK_NOLOCK);
214         if (!snapshot || ast_string_field_init(snapshot, 1024)) {
215                 ao2_cleanup(snapshot);
216                 return NULL;
217         }
218
219         ast_string_field_set(snapshot, name, ast_channel_name(chan));
220         ast_string_field_set(snapshot, type, ast_channel_tech(chan)->type);
221         ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
222         ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
223         ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
224         ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
225         ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
226         ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
227         if (ast_channel_appl(chan)) {
228                 ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
229         }
230         if (ast_channel_data(chan)) {
231                 ast_string_field_set(snapshot, data, ast_channel_data(chan));
232         }
233         ast_string_field_set(snapshot, context, ast_channel_context(chan));
234         ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
235
236         ast_string_field_set(snapshot, caller_name,
237                 S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
238         ast_string_field_set(snapshot, caller_number,
239                 S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
240         ast_string_field_set(snapshot, caller_dnid, S_OR(ast_channel_dialed(chan)->number.str, ""));
241         ast_string_field_set(snapshot, caller_subaddr,
242                 S_COR(ast_channel_caller(chan)->id.subaddress.valid, ast_channel_caller(chan)->id.subaddress.str, ""));
243         ast_string_field_set(snapshot, dialed_subaddr,
244                 S_COR(ast_channel_dialed(chan)->subaddress.valid, ast_channel_dialed(chan)->subaddress.str, ""));
245         ast_string_field_set(snapshot, caller_ani,
246                 S_COR(ast_channel_caller(chan)->ani.number.valid, ast_channel_caller(chan)->ani.number.str, ""));
247         ast_string_field_set(snapshot, caller_rdnis,
248                 S_COR(ast_channel_redirecting(chan)->from.number.valid, ast_channel_redirecting(chan)->from.number.str, ""));
249         ast_string_field_set(snapshot, caller_dnid,
250                 S_OR(ast_channel_dialed(chan)->number.str, ""));
251
252         ast_string_field_set(snapshot, connected_name,
253                 S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
254         ast_string_field_set(snapshot, connected_number,
255                 S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
256         ast_string_field_set(snapshot, language, ast_channel_language(chan));
257
258         if ((bridge = ast_channel_get_bridge(chan))) {
259                 ast_string_field_set(snapshot, bridgeid, bridge->uniqueid);
260                 ao2_cleanup(bridge);
261         }
262
263         snapshot->creationtime = ast_channel_creationtime(chan);
264         snapshot->state = ast_channel_state(chan);
265         snapshot->priority = ast_channel_priority(chan);
266         snapshot->amaflags = ast_channel_amaflags(chan);
267         snapshot->hangupcause = ast_channel_hangupcause(chan);
268         ast_copy_flags(&snapshot->flags, ast_channel_flags(chan), 0xFFFFFFFF);
269         snapshot->caller_pres = ast_party_id_presentation(&ast_channel_caller(chan)->id);
270         ast_set_flag(&snapshot->softhangup_flags, ast_channel_softhangup_internal_flag(chan));
271
272         snapshot->manager_vars = ast_channel_get_manager_vars(chan);
273         snapshot->tech_properties = ast_channel_tech(chan)->properties;
274
275         return snapshot;
276 }
277
278 static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
279 {
280         if (chan) {
281                 stasis_publish(ast_channel_topic(chan), message);
282         } else {
283                 stasis_publish(ast_channel_topic_all(), message);
284         }
285 }
286
287 static void channel_blob_dtor(void *obj)
288 {
289         struct ast_channel_blob *event = obj;
290         ao2_cleanup(event->snapshot);
291         ast_json_unref(event->blob);
292 }
293
294 static void ast_channel_publish_dial_internal(struct ast_channel *caller,
295         struct ast_channel *peer, struct ast_channel *forwarded, const char *dialstring,
296         const char *dialstatus, const char *forward)
297 {
298         RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
299         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
300         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
301         RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
302         RAII_VAR(struct ast_channel_snapshot *, peer_snapshot, NULL, ao2_cleanup);
303         RAII_VAR(struct ast_channel_snapshot *, forwarded_snapshot, NULL, ao2_cleanup);
304
305         if (!ast_channel_dial_type()) {
306                 return;
307         }
308
309         ast_assert(peer != NULL);
310         blob = ast_json_pack("{s: s, s: s, s: s}",
311                              "dialstatus", S_OR(dialstatus, ""),
312                              "forward", S_OR(forward, ""),
313                              "dialstring", S_OR(dialstring, ""));
314         if (!blob) {
315                 return;
316         }
317         payload = ast_multi_channel_blob_create(blob);
318         if (!payload) {
319                 return;
320         }
321
322         if (caller) {
323                 ast_channel_lock(caller);
324                 if (ast_strlen_zero(dialstatus)) {
325                         caller_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(caller));
326                 } else {
327                         caller_snapshot = ast_channel_snapshot_create(caller);
328                 }
329                 ast_channel_unlock(caller);
330                 if (!caller_snapshot) {
331                         return;
332                 }
333                 ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
334         }
335
336         ast_channel_lock(peer);
337         if (ast_strlen_zero(dialstatus)) {
338                 peer_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(peer));
339         } else {
340                 peer_snapshot = ast_channel_snapshot_create(peer);
341         }
342         ast_channel_unlock(peer);
343         if (!peer_snapshot) {
344                 return;
345         }
346         ast_multi_channel_blob_add_channel(payload, "peer", peer_snapshot);
347
348         if (forwarded) {
349                 ast_channel_lock(forwarded);
350                 forwarded_snapshot = ast_channel_snapshot_create(forwarded);
351                 ast_channel_unlock(forwarded);
352                 if (!forwarded_snapshot) {
353                         return;
354                 }
355                 ast_multi_channel_blob_add_channel(payload, "forwarded", forwarded_snapshot);
356         }
357
358         msg = stasis_message_create(ast_channel_dial_type(), payload);
359         if (!msg) {
360                 return;
361         }
362
363         publish_message_for_channel_topics(msg, caller);
364 }
365
366 static void remove_dial_masquerade(struct ast_channel *peer);
367 static int set_dial_masquerade(struct ast_channel *caller,
368         struct ast_channel *peer, const char *dialstring);
369
370 void ast_channel_publish_dial_forward(struct ast_channel *caller, struct ast_channel *peer,
371         struct ast_channel *forwarded, const char *dialstring, const char *dialstatus,
372         const char *forward)
373 {
374         ast_assert(peer != NULL);
375
376         if (caller) {
377                 /*
378                  * Lock two or three channels.
379                  *
380                  * We need to hold the locks to hold off a potential masquerade
381                  * messing up the stasis dial event ordering.
382                  */
383                 for (;; ast_channel_unlock(caller), sched_yield()) {
384                         ast_channel_lock(caller);
385                         if (ast_channel_trylock(peer)) {
386                                 continue;
387                         }
388                         if (forwarded && ast_channel_trylock(forwarded)) {
389                                 ast_channel_unlock(peer);
390                                 continue;
391                         }
392                         break;
393                 }
394
395                 if (ast_strlen_zero(dialstatus)) {
396                         set_dial_masquerade(caller, peer, dialstring);
397                 } else {
398                         remove_dial_masquerade(peer);
399                 }
400         }
401
402         ast_channel_publish_dial_internal(caller, peer, forwarded, dialstring, dialstatus,
403                 forward);
404
405         if (caller) {
406                 if (forwarded) {
407                         ast_channel_unlock(forwarded);
408                 }
409                 ast_channel_unlock(peer);
410                 ast_channel_unlock(caller);
411         }
412 }
413
414 void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *peer,
415         const char *dialstring, const char *dialstatus)
416 {
417         ast_channel_publish_dial_forward(caller, peer, NULL, dialstring, dialstatus, NULL);
418 }
419
420 static struct stasis_message *create_channel_blob_message(struct ast_channel_snapshot *snapshot,
421                 struct stasis_message_type *type,
422                 struct ast_json *blob)
423 {
424         struct stasis_message *msg;
425         struct ast_channel_blob *obj;
426
427         obj = ao2_alloc(sizeof(*obj), channel_blob_dtor);
428         if (!obj) {
429                 return NULL;
430         }
431
432         if (snapshot) {
433                 obj->snapshot = snapshot;
434                 ao2_ref(obj->snapshot, +1);
435         }
436         if (!blob) {
437                 blob = ast_json_null();
438         }
439         obj->blob = ast_json_ref(blob);
440
441         msg = stasis_message_create(type, obj);
442         ao2_cleanup(obj);
443         return msg;
444 }
445
446 struct stasis_message *ast_channel_blob_create_from_cache(const char *channel_id,
447                                                struct stasis_message_type *type,
448                                                struct ast_json *blob)
449 {
450         RAII_VAR(struct ast_channel_snapshot *, snapshot,
451                         NULL,
452                         ao2_cleanup);
453
454         if (!type) {
455                 return NULL;
456         }
457
458         snapshot = ast_channel_snapshot_get_latest(channel_id);
459
460         return create_channel_blob_message(snapshot, type, blob);
461 }
462
463 struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
464         struct stasis_message_type *type, struct ast_json *blob)
465 {
466         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
467
468         if (!type) {
469                 return NULL;
470         }
471
472         if (chan) {
473                 snapshot = ast_channel_snapshot_create(chan);
474         }
475
476         return create_channel_blob_message(snapshot, type, blob);
477 }
478
479 /*! \brief A channel snapshot wrapper object used in \ref ast_multi_channel_blob objects */
480 struct channel_role_snapshot {
481         struct ast_channel_snapshot *snapshot;  /*!< A channel snapshot */
482         char role[0];                                                   /*!< The role assigned to the channel */
483 };
484
485 /*! \brief A multi channel blob data structure for multi_channel_blob stasis messages */
486 struct ast_multi_channel_blob {
487         struct ao2_container *channel_snapshots;        /*!< A container holding the snapshots */
488         struct ast_json *blob;                                          /*< A blob of JSON data */
489 };
490
491 /*!
492  * \internal
493  * \brief Standard comparison function for \ref channel_role_snapshot objects
494  */
495 static int channel_role_single_cmp_cb(void *obj, void *arg, int flags)
496 {
497         struct channel_role_snapshot *left = obj;
498         struct channel_role_snapshot *right = arg;
499         const char *match = (flags & OBJ_KEY) ? arg : right->role;
500         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH | CMP_STOP);
501 }
502
503 /*!
504  * \internal
505  * \brief Multi comparison function for \ref channel_role_snapshot objects
506  */
507 static int channel_role_multi_cmp_cb(void *obj, void *arg, int flags)
508 {
509         struct channel_role_snapshot *left = obj;
510         struct channel_role_snapshot *right = arg;
511         const char *match = (flags & OBJ_KEY) ? arg : right->role;
512         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH);
513 }
514
515 /*!
516  * \internal
517  * \brief Hash function for \ref channel_role_snapshot objects
518  */
519 static int channel_role_hash_cb(const void *obj, const int flags)
520 {
521         const struct channel_role_snapshot *snapshot = obj;
522         const char *name = (flags & OBJ_KEY) ? obj : snapshot->role;
523         return ast_str_case_hash(name);
524 }
525
526 /*!
527  * \internal
528  * \brief Destructor for \ref ast_multi_channel_blob objects
529  */
530 static void multi_channel_blob_dtor(void *obj)
531 {
532         struct ast_multi_channel_blob *multi_blob = obj;
533
534         ao2_cleanup(multi_blob->channel_snapshots);
535         ast_json_unref(multi_blob->blob);
536 }
537
538 struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *blob)
539 {
540         RAII_VAR(struct ast_multi_channel_blob *, obj,
541                         ao2_alloc(sizeof(*obj), multi_channel_blob_dtor),
542                         ao2_cleanup);
543
544         ast_assert(blob != NULL);
545
546         if (!obj) {
547                 return NULL;
548         }
549
550         obj->channel_snapshots = ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS,
551                         channel_role_hash_cb, channel_role_single_cmp_cb);
552         if (!obj->channel_snapshots) {
553                 return NULL;
554         }
555
556         obj->blob = ast_json_ref(blob);
557
558         ao2_ref(obj, +1);
559         return obj;
560 }
561
562 struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
563 {
564         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
565         struct ast_channel_snapshot *snapshot;
566
567         ast_assert(!ast_strlen_zero(uniqueid));
568
569         message = stasis_cache_get(ast_channel_cache(),
570                         ast_channel_snapshot_type(),
571                         uniqueid);
572         if (!message) {
573                 return NULL;
574         }
575
576         snapshot = stasis_message_data(message);
577         if (!snapshot) {
578                 return NULL;
579         }
580         ao2_ref(snapshot, +1);
581         return snapshot;
582 }
583
584 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
585 {
586         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
587         struct ast_channel_snapshot *snapshot;
588
589         ast_assert(!ast_strlen_zero(name));
590
591         message = stasis_cache_get(ast_channel_cache_by_name(),
592                         ast_channel_snapshot_type(),
593                         name);
594         if (!message) {
595                 return NULL;
596         }
597
598         snapshot = stasis_message_data(message);
599         if (!snapshot) {
600                 return NULL;
601         }
602         ao2_ref(snapshot, +1);
603         return snapshot;
604 }
605
606 static void channel_role_snapshot_dtor(void *obj)
607 {
608         struct channel_role_snapshot *role_snapshot = obj;
609         ao2_cleanup(role_snapshot->snapshot);
610 }
611
612 void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj, const char *role, struct ast_channel_snapshot *snapshot)
613 {
614         RAII_VAR(struct channel_role_snapshot *, role_snapshot, NULL, ao2_cleanup);
615         int role_len = strlen(role) + 1;
616
617         if (!obj || ast_strlen_zero(role) || !snapshot) {
618                 return;
619         }
620
621         role_snapshot = ao2_alloc_options(sizeof(*role_snapshot) + role_len, channel_role_snapshot_dtor,
622                 AO2_ALLOC_OPT_LOCK_NOLOCK);
623         if (!role_snapshot) {
624                 return;
625         }
626         ast_copy_string(role_snapshot->role, role, role_len);
627         role_snapshot->snapshot = snapshot;
628         ao2_ref(role_snapshot->snapshot, +1);
629         ao2_link(obj->channel_snapshots, role_snapshot);
630 }
631
632 struct ast_channel_snapshot *ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
633 {
634         struct channel_role_snapshot *role_snapshot;
635
636         if (!obj || ast_strlen_zero(role)) {
637                 return NULL;
638         }
639         role_snapshot = ao2_find(obj->channel_snapshots, role, OBJ_KEY);
640         /* Note that this function does not increase the ref count on snapshot */
641         if (!role_snapshot) {
642                 return NULL;
643         }
644         ao2_ref(role_snapshot, -1);
645         return role_snapshot->snapshot;
646 }
647
648 struct ao2_container *ast_multi_channel_blob_get_channels(struct ast_multi_channel_blob *obj, const char *role)
649 {
650         RAII_VAR(struct ao2_container *, ret_container,
651                 ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS, channel_snapshot_hash_cb, channel_snapshot_cmp_cb),
652                 ao2_cleanup);
653         struct ao2_iterator *it_role_snapshots;
654         struct channel_role_snapshot *role_snapshot;
655         char *arg;
656
657         if (!obj || ast_strlen_zero(role) || !ret_container) {
658                 return NULL;
659         }
660         arg = ast_strdupa(role);
661
662         it_role_snapshots = ao2_callback(obj->channel_snapshots, OBJ_MULTIPLE | OBJ_KEY, channel_role_multi_cmp_cb, arg);
663         if (!it_role_snapshots) {
664                 return NULL;
665         }
666
667         while ((role_snapshot = ao2_iterator_next(it_role_snapshots))) {
668                 ao2_link(ret_container, role_snapshot->snapshot);
669                 ao2_ref(role_snapshot, -1);
670         }
671         ao2_iterator_destroy(it_role_snapshots);
672
673         ao2_ref(ret_container, +1);
674         return ret_container;
675 }
676
677 struct ast_json *ast_multi_channel_blob_get_json(struct ast_multi_channel_blob *obj)
678 {
679         if (!obj) {
680                 return NULL;
681         }
682         return obj->blob;
683 }
684
685 void ast_channel_stage_snapshot(struct ast_channel *chan)
686 {
687         ast_set_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
688 }
689
690 void ast_channel_stage_snapshot_done(struct ast_channel *chan)
691 {
692         ast_clear_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
693         ast_channel_publish_snapshot(chan);
694 }
695
696 void ast_channel_publish_snapshot(struct ast_channel *chan)
697 {
698         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
699         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
700
701         if (!ast_channel_snapshot_type()) {
702                 return;
703         }
704
705         if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)) {
706                 return;
707         }
708
709         snapshot = ast_channel_snapshot_create(chan);
710         if (!snapshot) {
711                 return;
712         }
713
714         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
715         if (!message) {
716                 return;
717         }
718
719         ast_assert(ast_channel_topic(chan) != NULL);
720         stasis_publish(ast_channel_topic(chan), message);
721 }
722
723 void ast_channel_publish_cached_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
724 {
725         struct stasis_message *message;
726
727         if (!blob) {
728                 blob = ast_json_null();
729         }
730
731         message = ast_channel_blob_create_from_cache(ast_channel_uniqueid(chan), type, blob);
732         if (message) {
733                 stasis_publish(ast_channel_topic(chan), message);
734         }
735         ao2_cleanup(message);
736 }
737
738 void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
739 {
740         struct stasis_message *message;
741
742         if (!blob) {
743                 blob = ast_json_null();
744         }
745
746         message = ast_channel_blob_create(chan, type, blob);
747         if (message) {
748                 stasis_publish(ast_channel_topic(chan), message);
749         }
750         ao2_cleanup(message);
751 }
752
753 void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
754 {
755         struct ast_json *blob;
756
757         ast_assert(name != NULL);
758         ast_assert(value != NULL);
759
760         blob = ast_json_pack("{s: s, s: s}",
761                              "variable", name,
762                              "value", value);
763         if (!blob) {
764                 ast_log(LOG_ERROR, "Error creating message\n");
765                 return;
766         }
767
768         /*! If there are manager variables, force a cache update */
769         if (chan && ast_channel_has_manager_vars()) {
770                 ast_channel_publish_snapshot(chan);
771         }
772
773         if (chan) {
774                 ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob);
775         } else {
776                 /* This function is NULL safe for global variables */
777                 ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob);
778         }
779
780         ast_json_unref(blob);
781 }
782
783 static struct ast_manager_event_blob *varset_to_ami(struct stasis_message *msg)
784 {
785         RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
786         struct ast_channel_blob *obj = stasis_message_data(msg);
787         const char *variable =
788                 ast_json_string_get(ast_json_object_get(obj->blob, "variable"));
789         const char *value =
790                 ast_json_string_get(ast_json_object_get(obj->blob, "value"));
791
792         if (obj->snapshot) {
793                 channel_event_string =
794                         ast_manager_build_channel_state_string(obj->snapshot);
795         } else {
796                 channel_event_string = ast_str_create(35);
797                 ast_str_set(&channel_event_string, 0,
798                             "Channel: none\r\n"
799                             "Uniqueid: none\r\n");
800         }
801
802         if (!channel_event_string) {
803                 return NULL;
804         }
805
806         return ast_manager_event_blob_create(EVENT_FLAG_DIALPLAN, "VarSet",
807                 "%s"
808                 "Variable: %s\r\n"
809                 "Value: %s\r\n",
810                 ast_str_buffer(channel_event_string), variable, value);
811 }
812
813 static struct ast_manager_event_blob *agent_login_to_ami(struct stasis_message *msg)
814 {
815         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
816         struct ast_channel_blob *obj = stasis_message_data(msg);
817         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
818
819         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
820         if (!channel_string) {
821                 return NULL;
822         }
823
824         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogin",
825                 "%s"
826                 "Agent: %s\r\n",
827                 ast_str_buffer(channel_string), agent);
828 }
829
830 static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message *msg)
831 {
832         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
833         struct ast_channel_blob *obj = stasis_message_data(msg);
834         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
835         long logintime = ast_json_integer_get(ast_json_object_get(obj->blob, "logintime"));
836
837         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
838         if (!channel_string) {
839                 return NULL;
840         }
841
842         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogoff",
843                 "%s"
844                 "Agent: %s\r\n"
845                 "Logintime: %ld\r\n",
846                 ast_str_buffer(channel_string), agent, logintime);
847 }
848
849 void ast_publish_channel_state(struct ast_channel *chan)
850 {
851         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
852         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
853
854         if (!ast_channel_snapshot_type()) {
855                 return;
856         }
857
858         ast_assert(chan != NULL);
859         if (!chan) {
860                 return;
861         }
862
863         snapshot = ast_channel_snapshot_create(chan);
864         if (!snapshot) {
865                 return;
866         }
867
868         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
869         if (!message) {
870                 return;
871         }
872
873         ast_assert(ast_channel_topic(chan) != NULL);
874         stasis_publish(ast_channel_topic(chan), message);
875 }
876
877 struct ast_json *ast_channel_snapshot_to_json(
878         const struct ast_channel_snapshot *snapshot,
879         const struct stasis_message_sanitizer *sanitize)
880 {
881         RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
882
883         if (snapshot == NULL
884                 || (sanitize && sanitize->channel_snapshot
885                 && sanitize->channel_snapshot(snapshot))) {
886                 return NULL;
887         }
888
889         json_chan = ast_json_pack(
890                 /* Broken up into groups of three for readability */
891                 "{ s: s, s: s, s: s,"
892                 "  s: o, s: o, s: s,"
893                 "  s: o, s: o, s: s }",
894                 /* First line */
895                 "id", snapshot->uniqueid,
896                 "name", snapshot->name,
897                 "state", ast_state2str(snapshot->state),
898                 /* Second line */
899                 "caller", ast_json_name_number(
900                         snapshot->caller_name, snapshot->caller_number),
901                 "connected", ast_json_name_number(
902                         snapshot->connected_name, snapshot->connected_number),
903                 "accountcode", snapshot->accountcode,
904                 /* Third line */
905                 "dialplan", ast_json_dialplan_cep(
906                         snapshot->context, snapshot->exten, snapshot->priority),
907                 "creationtime", ast_json_timeval(snapshot->creationtime, NULL),
908                 "language", snapshot->language);
909
910         return ast_json_ref(json_chan);
911 }
912
913 int ast_channel_snapshot_cep_equal(
914         const struct ast_channel_snapshot *old_snapshot,
915         const struct ast_channel_snapshot *new_snapshot)
916 {
917         ast_assert(old_snapshot != NULL);
918         ast_assert(new_snapshot != NULL);
919
920         /* We actually get some snapshots with CEP set, but before the
921          * application is set. Since empty application is invalid, we treat
922          * setting the application from nothing as a CEP change.
923          */
924         if (ast_strlen_zero(old_snapshot->appl) &&
925             !ast_strlen_zero(new_snapshot->appl)) {
926                 return 0;
927         }
928
929         return old_snapshot->priority == new_snapshot->priority &&
930                 strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
931                 strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
932 }
933
934 int ast_channel_snapshot_caller_id_equal(
935         const struct ast_channel_snapshot *old_snapshot,
936         const struct ast_channel_snapshot *new_snapshot)
937 {
938         ast_assert(old_snapshot != NULL);
939         ast_assert(new_snapshot != NULL);
940         return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
941                 strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
942 }
943
944 int ast_channel_snapshot_connected_line_equal(
945         const struct ast_channel_snapshot *old_snapshot,
946         const struct ast_channel_snapshot *new_snapshot)
947 {
948         ast_assert(old_snapshot != NULL);
949         ast_assert(new_snapshot != NULL);
950         return strcmp(old_snapshot->connected_number, new_snapshot->connected_number) == 0 &&
951                 strcmp(old_snapshot->connected_name, new_snapshot->connected_name) == 0;
952 }
953
954 static struct ast_json *channel_blob_to_json(
955         struct stasis_message *message,
956         const char *type,
957         const struct stasis_message_sanitizer *sanitize)
958 {
959         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
960         struct ast_channel_blob *channel_blob = stasis_message_data(message);
961         struct ast_json *blob = channel_blob->blob;
962         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
963         const struct timeval *tv = stasis_message_timestamp(message);
964         int res = 0;
965
966         if (blob == NULL || ast_json_is_null(blob)) {
967                 out = ast_json_object_create();
968         } else {
969                 /* blobs are immutable, so shallow copies are fine */
970                 out = ast_json_copy(blob);
971         }
972
973         if (!out) {
974                 return NULL;
975         }
976
977         res |= ast_json_object_set(out, "type", ast_json_string_create(type));
978         res |= ast_json_object_set(out, "timestamp",
979                 ast_json_timeval(*tv, NULL));
980
981         /* For global channel messages, the snapshot is optional */
982         if (snapshot) {
983                 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
984
985                 if (!json_channel) {
986                         return NULL;
987                 }
988
989                 res |= ast_json_object_set(out, "channel", json_channel);
990         }
991
992         if (res != 0) {
993                 return NULL;
994         }
995
996         return ast_json_ref(out);
997 }
998
999 static struct ast_json *dtmf_end_to_json(
1000         struct stasis_message *message,
1001         const struct stasis_message_sanitizer *sanitize)
1002 {
1003         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1004         struct ast_json *blob = channel_blob->blob;
1005         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1006         const char *direction =
1007                 ast_json_string_get(ast_json_object_get(blob, "direction"));
1008         const struct timeval *tv = stasis_message_timestamp(message);
1009         struct ast_json *json_channel;
1010
1011         /* Only present received DTMF end events as JSON */
1012         if (strcasecmp("Received", direction) != 0) {
1013                 return NULL;
1014         }
1015
1016         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1017         if (!json_channel) {
1018                 return NULL;
1019         }
1020
1021         return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}",
1022                 "type", "ChannelDtmfReceived",
1023                 "timestamp", ast_json_timeval(*tv, NULL),
1024                 "digit", ast_json_object_get(blob, "digit"),
1025                 "duration_ms", ast_json_object_get(blob, "duration_ms"),
1026                 "channel", json_channel);
1027 }
1028
1029 static struct ast_json *varset_to_json(
1030         struct stasis_message *message,
1031         const struct stasis_message_sanitizer *sanitize)
1032 {
1033         return channel_blob_to_json(message, "ChannelVarset", sanitize);
1034 }
1035
1036 static struct ast_json *hangup_request_to_json(
1037         struct stasis_message *message,
1038         const struct stasis_message_sanitizer *sanitize)
1039 {
1040         return channel_blob_to_json(message, "ChannelHangupRequest", sanitize);
1041 }
1042
1043 static struct ast_json *dial_to_json(
1044         struct stasis_message *message,
1045         const struct stasis_message_sanitizer *sanitize)
1046 {
1047         struct ast_multi_channel_blob *payload = stasis_message_data(message);
1048         struct ast_json *blob = ast_multi_channel_blob_get_json(payload);
1049         struct ast_json *caller_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "caller"), sanitize);
1050         struct ast_json *peer_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "peer"), sanitize);
1051         struct ast_json *forwarded_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "forwarded"), sanitize);
1052         struct ast_json *json;
1053         const struct timeval *tv = stasis_message_timestamp(message);
1054         int res = 0;
1055
1056         json = ast_json_pack("{s: s, s: o, s: O, s: O, s: O}",
1057                 "type", "Dial",
1058                 "timestamp", ast_json_timeval(*tv, NULL),
1059                 "dialstatus", ast_json_object_get(blob, "dialstatus"),
1060                 "forward", ast_json_object_get(blob, "forward"),
1061                 "dialstring", ast_json_object_get(blob, "dialstring"));
1062         if (!json) {
1063                 ast_json_unref(caller_json);
1064                 ast_json_unref(peer_json);
1065                 ast_json_unref(forwarded_json);
1066                 return NULL;
1067         }
1068
1069         if (caller_json) {
1070                 res |= ast_json_object_set(json, "caller", caller_json);
1071         }
1072         if (peer_json) {
1073                 res |= ast_json_object_set(json, "peer", peer_json);
1074         }
1075         if (forwarded_json) {
1076                 res |= ast_json_object_set(json, "forwarded", forwarded_json);
1077         }
1078
1079         if (res) {
1080                 ast_json_unref(json);
1081                 return NULL;
1082         }
1083
1084         return json;
1085 }
1086
1087 static struct ast_manager_event_blob *talking_start_to_ami(struct stasis_message *msg)
1088 {
1089         struct ast_str *channel_string;
1090         struct ast_channel_blob *obj = stasis_message_data(msg);
1091         struct ast_manager_event_blob *blob;
1092
1093         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1094         if (!channel_string) {
1095                 return NULL;
1096         }
1097
1098         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStart",
1099                                              "%s", ast_str_buffer(channel_string));
1100         ast_free(channel_string);
1101
1102         return blob;
1103 }
1104
1105 static struct ast_json *talking_start_to_json(struct stasis_message *message,
1106         const struct stasis_message_sanitizer *sanitize)
1107 {
1108         return channel_blob_to_json(message, "ChannelTalkingStarted", sanitize);
1109 }
1110
1111 static struct ast_manager_event_blob *talking_stop_to_ami(struct stasis_message *msg)
1112 {
1113         struct ast_str *channel_string;
1114         struct ast_channel_blob *obj = stasis_message_data(msg);
1115         int duration = ast_json_integer_get(ast_json_object_get(obj->blob, "duration"));
1116         struct ast_manager_event_blob *blob;
1117
1118         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1119         if (!channel_string) {
1120                 return NULL;
1121         }
1122
1123         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStop",
1124                                              "%s"
1125                                              "Duration: %d\r\n",
1126                                              ast_str_buffer(channel_string),
1127                                              duration);
1128         ast_free(channel_string);
1129
1130         return blob;
1131 }
1132
1133 static struct ast_json *talking_stop_to_json(struct stasis_message *message,
1134         const struct stasis_message_sanitizer *sanitize)
1135 {
1136         return channel_blob_to_json(message, "ChannelTalkingFinished", sanitize);
1137 }
1138
1139 static struct ast_json *hold_to_json(struct stasis_message *message,
1140         const struct stasis_message_sanitizer *sanitize)
1141 {
1142         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1143         struct ast_json *blob = channel_blob->blob;
1144         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1145         const char *musicclass = ast_json_string_get(ast_json_object_get(blob, "musicclass"));
1146         const struct timeval *tv = stasis_message_timestamp(message);
1147         struct ast_json *json_channel;
1148
1149         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1150         if (!json_channel) {
1151                 return NULL;
1152         }
1153
1154         return ast_json_pack("{s: s, s: o, s: s, s: o}",
1155                 "type", "ChannelHold",
1156                 "timestamp", ast_json_timeval(*tv, NULL),
1157                 "musicclass", S_OR(musicclass, "N/A"),
1158                 "channel", json_channel);
1159 }
1160
1161 static struct ast_json *unhold_to_json(struct stasis_message *message,
1162         const struct stasis_message_sanitizer *sanitize)
1163 {
1164         struct ast_channel_blob *channel_blob = stasis_message_data(message);
1165         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
1166         const struct timeval *tv = stasis_message_timestamp(message);
1167         struct ast_json *json_channel;
1168
1169         json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
1170         if (!json_channel) {
1171                 return NULL;
1172         }
1173
1174         return ast_json_pack("{s: s, s: o, s: o}",
1175                 "type", "ChannelUnhold",
1176                 "timestamp", ast_json_timeval(*tv, NULL),
1177                 "channel", json_channel);
1178 }
1179
1180 /*!
1181  * @{ \brief Define channel message types.
1182  */
1183 STASIS_MESSAGE_TYPE_DEFN(ast_channel_snapshot_type);
1184 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dial_type,
1185         .to_json = dial_to_json,
1186         );
1187 STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
1188         .to_ami = varset_to_ami,
1189         .to_json = varset_to_json,
1190         );
1191 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
1192         .to_json = hangup_request_to_json,
1193         );
1194 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
1195 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type,
1196         .to_json = dtmf_end_to_json,
1197         );
1198 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hold_type,
1199         .to_json = hold_to_json,
1200         );
1201 STASIS_MESSAGE_TYPE_DEFN(ast_channel_unhold_type,
1202         .to_json = unhold_to_json,
1203         );
1204 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_start_type);
1205 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_stop_type);
1206 STASIS_MESSAGE_TYPE_DEFN(ast_channel_fax_type);
1207 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_handler_type);
1208 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_start_type);
1209 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_stop_type);
1210 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_start_type);
1211 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_stop_type);
1212 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_login_type,
1213         .to_ami = agent_login_to_ami,
1214         );
1215 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type,
1216         .to_ami = agent_logoff_to_ami,
1217         );
1218 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_start,
1219         .to_ami = talking_start_to_ami,
1220         .to_json = talking_start_to_json,
1221         );
1222 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_stop,
1223         .to_ami = talking_stop_to_ami,
1224         .to_json = talking_stop_to_json,
1225         );
1226
1227 /*! @} */
1228
1229 static void stasis_channels_cleanup(void)
1230 {
1231         stasis_caching_unsubscribe_and_join(channel_by_name_topic);
1232         channel_by_name_topic = NULL;
1233         ao2_cleanup(channel_cache_by_name);
1234         channel_cache_by_name = NULL;
1235         ao2_cleanup(channel_cache_all);
1236         channel_cache_all = NULL;
1237
1238         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
1239         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
1240         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
1241         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
1242         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
1243         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
1244         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hold_type);
1245         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_unhold_type);
1246         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_start_type);
1247         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_stop_type);
1248         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_fax_type);
1249         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_handler_type);
1250         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_start_type);
1251         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_stop_type);
1252         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_start_type);
1253         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
1254         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type);
1255         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type);
1256         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_start);
1257         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_stop);
1258 }
1259
1260 int ast_stasis_channels_init(void)
1261 {
1262         int res = 0;
1263
1264         ast_register_cleanup(stasis_channels_cleanup);
1265
1266         channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
1267                 channel_snapshot_get_id);
1268         if (!channel_cache_all) {
1269                 return -1;
1270         }
1271         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
1272         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
1273
1274         channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
1275         if (!channel_cache_by_name) {
1276                 return -1;
1277         }
1278
1279         /* This should be initialized before the caching topic */
1280         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
1281
1282         channel_by_name_topic = stasis_caching_topic_create(
1283                 stasis_cp_all_topic(channel_cache_all),
1284                 channel_cache_by_name);
1285         if (!channel_by_name_topic) {
1286                 return -1;
1287         }
1288
1289         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
1290         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
1291         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
1292         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
1293         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
1294         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
1295         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
1296         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
1297         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
1298         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
1299         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
1300         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
1301         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
1302         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
1303         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
1304         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_start);
1305         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_stop);
1306
1307         return res;
1308 }
1309
1310 /*!
1311  * \internal
1312  * \brief A list element for the dial_masquerade_datastore -- stores data about a dialed peer
1313  */
1314 struct dial_target {
1315         /*! Called party channel. */
1316         struct ast_channel *peer;
1317         /*! Dialstring used to call the peer. */
1318         char *dialstring;
1319         /*! Next entry in the list. */
1320         AST_LIST_ENTRY(dial_target) list;
1321 };
1322
1323 static void dial_target_free(struct dial_target *doomed)
1324 {
1325         if (!doomed) {
1326                 return;
1327         }
1328         ast_free(doomed->dialstring);
1329         ast_free(doomed);
1330 }
1331
1332 /*!
1333  * \internal
1334  * \brief Datastore used for advancing dial state in the case of a masquerade
1335  *        against a channel in the process of dialing.
1336  */
1337 struct dial_masquerade_datastore {
1338         /*! Calling party channel. */
1339         struct ast_channel *caller;
1340         /*! List of called peers. */
1341         AST_LIST_HEAD_NOLOCK(, dial_target) dialed_peers;
1342 };
1343
1344 static void dial_masquerade_datastore_cleanup(struct dial_masquerade_datastore *masq_data)
1345 {
1346         struct dial_target *cur;
1347
1348         while ((cur = AST_LIST_REMOVE_HEAD(&masq_data->dialed_peers, list))) {
1349                 dial_target_free(cur);
1350         }
1351         masq_data->caller = NULL;
1352 }
1353
1354 static void dial_masquerade_datastore_remove_chan(struct dial_masquerade_datastore *masq_data, struct ast_channel *chan)
1355 {
1356         struct dial_target *cur;
1357
1358         ao2_lock(masq_data);
1359         if (masq_data->caller == chan) {
1360                 dial_masquerade_datastore_cleanup(masq_data);
1361         } else {
1362                 AST_LIST_TRAVERSE_SAFE_BEGIN(&masq_data->dialed_peers, cur, list) {
1363                         if (cur->peer == chan) {
1364                                 AST_LIST_REMOVE_CURRENT(list);
1365                                 dial_target_free(cur);
1366                                 break;
1367                         }
1368                 }
1369                 AST_LIST_TRAVERSE_SAFE_END;
1370         }
1371         ao2_unlock(masq_data);
1372 }
1373
1374 static void dial_masquerade_datastore_dtor(void *vdoomed)
1375 {
1376         dial_masquerade_datastore_cleanup(vdoomed);
1377 }
1378
1379 static struct dial_masquerade_datastore *dial_masquerade_datastore_alloc(void)
1380 {
1381         struct dial_masquerade_datastore *masq_data;
1382
1383         masq_data = ao2_alloc(sizeof(struct dial_masquerade_datastore),
1384                 dial_masquerade_datastore_dtor);
1385         if (!masq_data) {
1386                 return NULL;
1387         }
1388         AST_LIST_HEAD_INIT_NOLOCK(&masq_data->dialed_peers);
1389         return masq_data;
1390 }
1391
1392 /*!
1393  * \internal
1394  * \brief Datastore destructor for dial_masquerade_datastore
1395  */
1396 static void dial_masquerade_datastore_destroy(void *data)
1397 {
1398         ao2_ref(data, -1);
1399 }
1400
1401 static struct ast_datastore *dial_masquerade_datastore_find(struct ast_channel *chan);
1402
1403 static void dial_masquerade_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1404 {
1405         struct dial_masquerade_datastore *masq_data = data;
1406         struct dial_target *cur;
1407         struct ast_datastore *datastore;
1408
1409         ao2_lock(masq_data);
1410         if (!masq_data->caller) {
1411                 /* Nothing to do but remove the datastore */
1412         } else if (masq_data->caller == old_chan) {
1413                 /* The caller channel is being masqueraded out. */
1414                 ast_debug(1, "Caller channel %s being masqueraded out to %s (is_empty:%d)\n",
1415                         ast_channel_name(new_chan), ast_channel_name(old_chan),
1416                         AST_LIST_EMPTY(&masq_data->dialed_peers));
1417                 AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1418                         ast_channel_publish_dial_internal(new_chan, cur->peer, NULL,
1419                                 cur->dialstring, "NOANSWER", NULL);
1420                         ast_channel_publish_dial_internal(old_chan, cur->peer, NULL,
1421                                 cur->dialstring, NULL, NULL);
1422                 }
1423                 dial_masquerade_datastore_cleanup(masq_data);
1424         } else {
1425                 /* One of the peer channels is being masqueraded out. */
1426                 AST_LIST_TRAVERSE_SAFE_BEGIN(&masq_data->dialed_peers, cur, list) {
1427                         if (cur->peer == old_chan) {
1428                                 ast_debug(1, "Peer channel %s being masqueraded out to %s\n",
1429                                         ast_channel_name(new_chan), ast_channel_name(old_chan));
1430                                 ast_channel_publish_dial_internal(masq_data->caller, new_chan, NULL,
1431                                         cur->dialstring, "CANCEL", NULL);
1432                                 ast_channel_publish_dial_internal(masq_data->caller, old_chan, NULL,
1433                                         cur->dialstring, NULL, NULL);
1434
1435                                 AST_LIST_REMOVE_CURRENT(list);
1436                                 dial_target_free(cur);
1437                                 break;
1438                         }
1439                 }
1440                 AST_LIST_TRAVERSE_SAFE_END;
1441         }
1442         ao2_unlock(masq_data);
1443
1444         /* Remove the datastore from the channel. */
1445         datastore = dial_masquerade_datastore_find(old_chan);
1446         if (!datastore) {
1447                 return;
1448         }
1449         ast_channel_datastore_remove(old_chan, datastore);
1450         ast_datastore_free(datastore);
1451 }
1452
1453 /*!
1454  * \internal
1455  * \brief Primary purpose for dial_masquerade_datastore, publishes
1456  *        the channel dial event needed to set the incoming channel into the
1457  *        dial state during a masquerade.
1458  * \param data pointer to the dial_masquerade_datastore
1459  * \param old_chan Channel being replaced
1460  * \param new_chan Channel being pushed to dial mode
1461  */
1462 static void dial_masquerade_breakdown(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1463 {
1464         struct dial_masquerade_datastore *masq_data = data;
1465         struct dial_target *cur;
1466
1467         ao2_lock(masq_data);
1468
1469         if (!masq_data->caller) {
1470                 ao2_unlock(masq_data);
1471                 return;
1472         }
1473
1474         if (masq_data->caller == new_chan) {
1475                 /*
1476                  * The caller channel is being masqueraded into.
1477                  * The masquerade is likely because of a blonde transfer.
1478                  */
1479                 ast_debug(1, "Caller channel %s being masqueraded into by %s (is_empty:%d)\n",
1480                         ast_channel_name(old_chan), ast_channel_name(new_chan),
1481                         AST_LIST_EMPTY(&masq_data->dialed_peers));
1482                 AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1483                         ast_channel_publish_dial_internal(old_chan, cur->peer, NULL,
1484                                 cur->dialstring, "NOANSWER", NULL);
1485                         ast_channel_publish_dial_internal(new_chan, cur->peer, NULL,
1486                                 cur->dialstring, NULL, NULL);
1487                 }
1488
1489                 ao2_unlock(masq_data);
1490                 return;
1491         }
1492
1493         /*
1494          * One of the peer channels is being masqueraded into.
1495          * The masquerade is likely because of a call pickup.
1496          */
1497         AST_LIST_TRAVERSE(&masq_data->dialed_peers, cur, list) {
1498                 if (cur->peer == new_chan) {
1499                         ast_debug(1, "Peer channel %s being masqueraded into by %s\n",
1500                                 ast_channel_name(old_chan), ast_channel_name(new_chan));
1501                         ast_channel_publish_dial_internal(masq_data->caller, old_chan, NULL,
1502                                 cur->dialstring, "CANCEL", NULL);
1503                         ast_channel_publish_dial_internal(masq_data->caller, new_chan, NULL,
1504                                 cur->dialstring, NULL, NULL);
1505                         break;
1506                 }
1507         }
1508
1509         ao2_unlock(masq_data);
1510 }
1511
1512 static const struct ast_datastore_info dial_masquerade_info = {
1513         .type = "stasis-chan-dial-masq",
1514         .destroy = dial_masquerade_datastore_destroy,
1515         .chan_fixup = dial_masquerade_fixup,
1516         .chan_breakdown = dial_masquerade_breakdown,
1517 };
1518
1519 /*!
1520  * \internal
1521  * \brief Find the dial masquerade datastore on the given channel.
1522  *
1523  * \param chan Channel a datastore data is wanted from
1524  *
1525  * \return A pointer to the datastore if it exists.
1526  */
1527 static struct ast_datastore *dial_masquerade_datastore_find(struct ast_channel *chan)
1528 {
1529         return ast_channel_datastore_find(chan, &dial_masquerade_info, NULL);
1530 }
1531
1532 /*!
1533  * \internal
1534  * \brief Add the dial masquerade datastore to a channel.
1535  *
1536  * \param chan Channel to setup dial masquerade datastore on.
1537  * \param masq_data NULL to setup caller datastore otherwise steals the ref on success.
1538  *
1539  * \retval masq_data given or created on success.
1540  *         (A ref is not returned but can be obtained before chan is unlocked.)
1541  * \retval NULL on error.  masq_data ref is not stolen.
1542  */
1543 static struct dial_masquerade_datastore *dial_masquerade_datastore_add(
1544         struct ast_channel *chan, struct dial_masquerade_datastore *masq_data)
1545 {
1546         struct ast_datastore *datastore;
1547
1548         datastore = ast_datastore_alloc(&dial_masquerade_info, NULL);
1549         if (!datastore) {
1550                 return NULL;
1551         }
1552
1553         if (!masq_data) {
1554                 masq_data = dial_masquerade_datastore_alloc();
1555                 if (!masq_data) {
1556                         ast_datastore_free(datastore);
1557                         return NULL;
1558                 }
1559                 masq_data->caller = chan;
1560         }
1561
1562         datastore->data = masq_data;
1563         ast_channel_datastore_add(chan, datastore);
1564
1565         return masq_data;
1566 }
1567
1568 static int set_dial_masquerade(struct ast_channel *caller, struct ast_channel *peer, const char *dialstring)
1569 {
1570         struct ast_datastore *datastore;
1571         struct dial_masquerade_datastore *masq_data;
1572         struct dial_target *target;
1573
1574         /* Find or create caller datastore */
1575         datastore = dial_masquerade_datastore_find(caller);
1576         if (!datastore) {
1577                 masq_data = dial_masquerade_datastore_add(caller, NULL);
1578         } else {
1579                 masq_data = datastore->data;
1580         }
1581         if (!masq_data) {
1582                 return -1;
1583         }
1584         ao2_ref(masq_data, +1);
1585
1586         /*
1587          * Someone likely forgot to do an ast_channel_publish_dial()
1588          * or ast_channel_publish_dial_forward() with a final dial
1589          * status on the channel.
1590          */
1591         ast_assert(masq_data->caller == caller);
1592
1593         /* Create peer target to put into datastore */
1594         target = ast_calloc(1, sizeof(*target));
1595         if (!target) {
1596                 ao2_ref(masq_data, -1);
1597                 return -1;
1598         }
1599         if (dialstring) {
1600                 target->dialstring = ast_strdup(dialstring);
1601                 if (!target->dialstring) {
1602                         ast_free(target);
1603                         ao2_ref(masq_data, -1);
1604                         return -1;
1605                 }
1606         }
1607         target->peer = peer;
1608
1609         /* Put peer target into datastore */
1610         ao2_lock(masq_data);
1611         dial_masquerade_datastore_remove_chan(masq_data, peer);
1612         AST_LIST_INSERT_HEAD(&masq_data->dialed_peers, target, list);
1613         ao2_unlock(masq_data);
1614
1615         datastore = dial_masquerade_datastore_find(peer);
1616         if (datastore) {
1617                 if (datastore->data == masq_data) {
1618                         /*
1619                          * Peer already had the datastore for this dial masquerade.
1620                          * This was a redundant peer dial masquerade setup.
1621                          */
1622                         ao2_ref(masq_data, -1);
1623                         return 0;
1624                 }
1625
1626                 /* Something is wrong.  Try to fix if the assert doesn't abort. */
1627                 ast_assert(0);
1628
1629                 /* Remove the stale dial masquerade datastore */
1630                 dial_masquerade_datastore_remove_chan(datastore->data, peer);
1631                 ast_channel_datastore_remove(peer, datastore);
1632                 ast_datastore_free(datastore);
1633         }
1634
1635         /* Create the peer dial masquerade datastore */
1636         if (dial_masquerade_datastore_add(peer, masq_data)) {
1637                 /* Success */
1638                 return 0;
1639         }
1640
1641         /* Failed to create the peer datastore */
1642         dial_masquerade_datastore_remove_chan(masq_data, peer);
1643         ao2_ref(masq_data, -1);
1644         return -1;
1645 }
1646
1647 static void remove_dial_masquerade(struct ast_channel *peer)
1648 {
1649         struct ast_datastore *datastore;
1650         struct dial_masquerade_datastore *masq_data;
1651
1652         datastore = dial_masquerade_datastore_find(peer);
1653         if (!datastore) {
1654                 return;
1655         }
1656
1657         masq_data = datastore->data;
1658         if (masq_data) {
1659                 dial_masquerade_datastore_remove_chan(masq_data, peer);
1660         }
1661
1662         ast_channel_datastore_remove(peer, datastore);
1663         ast_datastore_free(datastore);
1664 }