stasis_channels: Update the stasis cache if manager variables are needed
[asterisk/asterisk.git] / main / stasis_channels.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Matt Jordan <mjordan@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Channel Objects
22  *
23  * \author \verbatim Matt Jordan <mjordan@digium.com> \endverbatim
24  *
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34
35 #include "asterisk/astobj2.h"
36 #include "asterisk/json.h"
37 #include "asterisk/pbx.h"
38 #include "asterisk/bridge.h"
39 #include "asterisk/translate.h"
40 #include "asterisk/stasis.h"
41 #include "asterisk/stasis_cache_pattern.h"
42 #include "asterisk/stasis_channels.h"
43
44 /*** DOCUMENTATION
45         <managerEvent language="en_US" name="VarSet">
46                 <managerEventInstance class="EVENT_FLAG_DIALPLAN">
47                         <synopsis>Raised when a variable is set to a particular value.</synopsis>
48                         <syntax>
49                                 <channel_snapshot/>
50                                 <parameter name="Variable">
51                                         <para>The variable being set.</para>
52                                 </parameter>
53                                 <parameter name="Value">
54                                         <para>The new value of the variable.</para>
55                                 </parameter>
56                         </syntax>
57                 </managerEventInstance>
58         </managerEvent>
59         <managerEvent language="en_US" name="AgentLogin">
60                 <managerEventInstance class="EVENT_FLAG_AGENT">
61                         <synopsis>Raised when an Agent has logged in.</synopsis>
62                         <syntax>
63                                 <channel_snapshot/>
64                                 <parameter name="Agent">
65                                         <para>Agent ID of the agent.</para>
66                                 </parameter>
67                         </syntax>
68                         <see-also>
69                                 <ref type="application">AgentLogin</ref>
70                                 <ref type="managerEvent">AgentLogoff</ref>
71                         </see-also>
72                 </managerEventInstance>
73         </managerEvent>
74         <managerEvent language="en_US" name="AgentLogoff">
75                 <managerEventInstance class="EVENT_FLAG_AGENT">
76                         <synopsis>Raised when an Agent has logged off.</synopsis>
77                         <syntax>
78                                 <xi:include xpointer="xpointer(/docs/managerEvent[@name='AgentLogin']/managerEventInstance/syntax/parameter)" />
79                                 <parameter name="Logintime">
80                                         <para>The number of seconds the agent was logged in.</para>
81                                 </parameter>
82                         </syntax>
83                         <see-also>
84                                 <ref type="managerEvent">AgentLogin</ref>
85                         </see-also>
86                 </managerEventInstance>
87         </managerEvent>
88         <managerEvent language="en_US" name="ChannelTalkingStart">
89                 <managerEventInstance class="EVENT_FLAG_CLASS">
90                         <synopsis>Raised when talking is detected on a channel.</synopsis>
91                         <syntax>
92                                 <channel_snapshot/>
93                         </syntax>
94                         <see-also>
95                                 <ref type="function">TALK_DETECT</ref>
96                                 <ref type="managerEvent">ChannelTalkingStop</ref>
97                         </see-also>
98                 </managerEventInstance>
99         </managerEvent>
100         <managerEvent language="en_US" name="ChannelTalkingStop">
101                 <managerEventInstance class="EVENT_FLAG_CLASS">
102                         <synopsis>Raised when talking is no longer detected on a channel.</synopsis>
103                         <syntax>
104                                 <channel_snapshot/>
105                                 <parameter name="Duration">
106                                         <para>The length in time, in milliseconds, that talking was
107                                         detected on the channel.</para>
108                                 </parameter>
109                         </syntax>
110                         <see-also>
111                                 <ref type="function">TALK_DETECT</ref>
112                                 <ref type="managerEvent">ChannelTalkingStart</ref>
113                         </see-also>
114                 </managerEventInstance>
115         </managerEvent>
116 ***/
117
118 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
119
120 static struct stasis_cp_all *channel_cache_all;
121 static struct stasis_cache *channel_cache_by_name;
122 static struct stasis_caching_topic *channel_by_name_topic;
123
124 struct stasis_cp_all *ast_channel_cache_all(void)
125 {
126         return channel_cache_all;
127 }
128
129 struct stasis_cache *ast_channel_cache(void)
130 {
131         return stasis_cp_all_cache(channel_cache_all);
132 }
133
134 struct stasis_topic *ast_channel_topic_all(void)
135 {
136         return stasis_cp_all_topic(channel_cache_all);
137 }
138
139 struct stasis_topic *ast_channel_topic_all_cached(void)
140 {
141         return stasis_cp_all_topic_cached(channel_cache_all);
142 }
143
144 struct stasis_cache *ast_channel_cache_by_name(void)
145 {
146         return channel_cache_by_name;
147 }
148
149 static const char *channel_snapshot_get_id(struct stasis_message *message)
150 {
151         struct ast_channel_snapshot *snapshot;
152         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
153                 return NULL;
154         }
155         snapshot = stasis_message_data(message);
156         return snapshot->uniqueid;
157 }
158
159 static const char *channel_snapshot_get_name(struct stasis_message *message)
160 {
161         struct ast_channel_snapshot *snapshot;
162         if (ast_channel_snapshot_type() != stasis_message_type(message)) {
163                 return NULL;
164         }
165         snapshot = stasis_message_data(message);
166         return snapshot->name;
167 }
168
169 /*!
170  * \internal
171  * \brief Hash function for \ref ast_channel_snapshot objects
172  */
173 static int channel_snapshot_hash_cb(const void *obj, const int flags)
174 {
175         const struct ast_channel_snapshot *snapshot = obj;
176         const char *name = (flags & OBJ_KEY) ? obj : snapshot->name;
177         return ast_str_case_hash(name);
178 }
179
180 /*!
181  * \internal
182  * \brief Comparison function for \ref ast_channel_snapshot objects
183  */
184 static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
185 {
186         struct ast_channel_snapshot *left = obj;
187         struct ast_channel_snapshot *right = arg;
188         const char *match = (flags & OBJ_KEY) ? arg : right->name;
189         return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
190 }
191
192 static void channel_snapshot_dtor(void *obj)
193 {
194         struct ast_channel_snapshot *snapshot = obj;
195
196         ast_string_field_free_memory(snapshot);
197         ao2_cleanup(snapshot->manager_vars);
198 }
199
200 struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
201 {
202         struct ast_channel_snapshot *snapshot;
203         struct ast_bridge *bridge;
204
205         /* no snapshots for dummy channels */
206         if (!ast_channel_tech(chan)) {
207                 return NULL;
208         }
209
210         snapshot = ao2_alloc(sizeof(*snapshot), channel_snapshot_dtor);
211         if (!snapshot || ast_string_field_init(snapshot, 1024)) {
212                 ao2_cleanup(snapshot);
213                 return NULL;
214         }
215
216         ast_string_field_set(snapshot, name, ast_channel_name(chan));
217         ast_string_field_set(snapshot, type, ast_channel_tech(chan)->type);
218         ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
219         ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
220         ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
221         ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
222         ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
223         ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
224         if (ast_channel_appl(chan)) {
225                 ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
226         }
227         if (ast_channel_data(chan)) {
228                 ast_string_field_set(snapshot, data, ast_channel_data(chan));
229         }
230         ast_string_field_set(snapshot, context, ast_channel_context(chan));
231         ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
232
233         ast_string_field_set(snapshot, caller_name,
234                 S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
235         ast_string_field_set(snapshot, caller_number,
236                 S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
237         ast_string_field_set(snapshot, caller_dnid, S_OR(ast_channel_dialed(chan)->number.str, ""));
238         ast_string_field_set(snapshot, caller_subaddr,
239                 S_COR(ast_channel_caller(chan)->id.subaddress.valid, ast_channel_caller(chan)->id.subaddress.str, ""));
240         ast_string_field_set(snapshot, dialed_subaddr,
241                 S_COR(ast_channel_dialed(chan)->subaddress.valid, ast_channel_dialed(chan)->subaddress.str, ""));
242         ast_string_field_set(snapshot, caller_ani,
243                 S_COR(ast_channel_caller(chan)->ani.number.valid, ast_channel_caller(chan)->ani.number.str, ""));
244         ast_string_field_set(snapshot, caller_rdnis,
245                 S_COR(ast_channel_redirecting(chan)->from.number.valid, ast_channel_redirecting(chan)->from.number.str, ""));
246         ast_string_field_set(snapshot, caller_dnid,
247                 S_OR(ast_channel_dialed(chan)->number.str, ""));
248
249         ast_string_field_set(snapshot, connected_name,
250                 S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
251         ast_string_field_set(snapshot, connected_number,
252                 S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
253         ast_string_field_set(snapshot, language, ast_channel_language(chan));
254
255         if ((bridge = ast_channel_get_bridge(chan))) {
256                 ast_string_field_set(snapshot, bridgeid, bridge->uniqueid);
257                 ao2_cleanup(bridge);
258         }
259
260         snapshot->creationtime = ast_channel_creationtime(chan);
261         snapshot->state = ast_channel_state(chan);
262         snapshot->priority = ast_channel_priority(chan);
263         snapshot->amaflags = ast_channel_amaflags(chan);
264         snapshot->hangupcause = ast_channel_hangupcause(chan);
265         ast_copy_flags(&snapshot->flags, ast_channel_flags(chan), 0xFFFFFFFF);
266         snapshot->caller_pres = ast_party_id_presentation(&ast_channel_caller(chan)->id);
267         ast_set_flag(&snapshot->softhangup_flags, ast_channel_softhangup_internal_flag(chan));
268
269         snapshot->manager_vars = ast_channel_get_manager_vars(chan);
270         snapshot->tech_properties = ast_channel_tech(chan)->properties;
271
272         return snapshot;
273 }
274
275 static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
276 {
277         if (chan) {
278                 stasis_publish(ast_channel_topic(chan), message);
279         } else {
280                 stasis_publish(ast_channel_topic_all(), message);
281         }
282 }
283
284 static void channel_blob_dtor(void *obj)
285 {
286         struct ast_channel_blob *event = obj;
287         ao2_cleanup(event->snapshot);
288         ast_json_unref(event->blob);
289 }
290
291 /*! \brief Dummy callback for receiving events */
292 static void dummy_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
293 {
294 }
295
296 void ast_channel_publish_dial_forward(struct ast_channel *caller, struct ast_channel *peer,
297         struct ast_channel *forwarded, const char *dialstring, const char *dialstatus,
298         const char *forward)
299 {
300         RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
301         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
302         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
303         RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
304         RAII_VAR(struct ast_channel_snapshot *, peer_snapshot, NULL, ao2_cleanup);
305         RAII_VAR(struct ast_channel_snapshot *, forwarded_snapshot, NULL, ao2_cleanup);
306
307         ast_assert(peer != NULL);
308         blob = ast_json_pack("{s: s, s: s, s: s}",
309                              "dialstatus", S_OR(dialstatus, ""),
310                              "forward", S_OR(forward, ""),
311                              "dialstring", S_OR(dialstring, ""));
312         if (!blob) {
313                 return;
314         }
315         payload = ast_multi_channel_blob_create(blob);
316         if (!payload) {
317                 return;
318         }
319
320         if (caller) {
321                 ast_channel_lock(caller);
322                 if (ast_strlen_zero(dialstatus)) {
323                         caller_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(caller));
324                 } else {
325                         caller_snapshot = ast_channel_snapshot_create(caller);
326                 }
327                 ast_channel_unlock(caller);
328                 if (!caller_snapshot) {
329                         return;
330                 }
331                 ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
332         }
333
334         ast_channel_lock(peer);
335         if (ast_strlen_zero(dialstatus)) {
336                 peer_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(peer));
337         } else {
338                 peer_snapshot = ast_channel_snapshot_create(peer);
339         }
340         ast_channel_unlock(peer);
341         if (!peer_snapshot) {
342                 return;
343         }
344         ast_multi_channel_blob_add_channel(payload, "peer", peer_snapshot);
345
346         if (forwarded) {
347                 ast_channel_lock(forwarded);
348                 forwarded_snapshot = ast_channel_snapshot_create(forwarded);
349                 ast_channel_unlock(forwarded);
350                 if (!forwarded_snapshot) {
351                         return;
352                 }
353                 ast_multi_channel_blob_add_channel(payload, "forwarded", forwarded_snapshot);
354         }
355
356         msg = stasis_message_create(ast_channel_dial_type(), payload);
357         if (!msg) {
358                 return;
359         }
360
361         if (forwarded) {
362                 struct stasis_subscription *subscription = stasis_subscribe(ast_channel_topic(peer), dummy_event_cb, NULL);
363
364                 stasis_publish(ast_channel_topic(peer), msg);
365                 stasis_unsubscribe_and_join(subscription);
366         } else {
367                 publish_message_for_channel_topics(msg, caller);
368         }
369 }
370
371 void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *peer,
372         const char *dialstring, const char *dialstatus)
373 {
374         ast_channel_publish_dial_forward(caller, peer, NULL, dialstring, dialstatus, NULL);
375 }
376
377 static struct stasis_message *create_channel_blob_message(struct ast_channel_snapshot *snapshot,
378                 struct stasis_message_type *type,
379                 struct ast_json *blob)
380 {
381         struct stasis_message *msg;
382         struct ast_channel_blob *obj;
383
384         obj = ao2_alloc(sizeof(*obj), channel_blob_dtor);
385         if (!obj) {
386                 return NULL;
387         }
388
389         if (snapshot) {
390                 obj->snapshot = snapshot;
391                 ao2_ref(obj->snapshot, +1);
392         }
393         if (!blob) {
394                 blob = ast_json_null();
395         }
396         obj->blob = ast_json_ref(blob);
397
398         msg = stasis_message_create(type, obj);
399         ao2_cleanup(obj);
400         return msg;
401 }
402
403 struct stasis_message *ast_channel_blob_create_from_cache(const char *channel_id,
404                                                struct stasis_message_type *type,
405                                                struct ast_json *blob)
406 {
407         RAII_VAR(struct ast_channel_snapshot *, snapshot,
408                         ast_channel_snapshot_get_latest(channel_id),
409                         ao2_cleanup);
410
411         return create_channel_blob_message(snapshot, type, blob);
412 }
413
414 struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
415         struct stasis_message_type *type, struct ast_json *blob)
416 {
417         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
418
419         if (chan) {
420                 snapshot = ast_channel_snapshot_create(chan);
421         }
422
423         return create_channel_blob_message(snapshot, type, blob);
424 }
425
426 /*! \brief A channel snapshot wrapper object used in \ref ast_multi_channel_blob objects */
427 struct channel_role_snapshot {
428         struct ast_channel_snapshot *snapshot;  /*!< A channel snapshot */
429         char role[0];                                                   /*!< The role assigned to the channel */
430 };
431
432 /*! \brief A multi channel blob data structure for multi_channel_blob stasis messages */
433 struct ast_multi_channel_blob {
434         struct ao2_container *channel_snapshots;        /*!< A container holding the snapshots */
435         struct ast_json *blob;                                          /*< A blob of JSON data */
436 };
437
438 /*!
439  * \internal
440  * \brief Standard comparison function for \ref channel_role_snapshot objects
441  */
442 static int channel_role_single_cmp_cb(void *obj, void *arg, int flags)
443 {
444         struct channel_role_snapshot *left = obj;
445         struct channel_role_snapshot *right = arg;
446         const char *match = (flags & OBJ_KEY) ? arg : right->role;
447         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH | CMP_STOP);
448 }
449
450 /*!
451  * \internal
452  * \brief Multi comparison function for \ref channel_role_snapshot objects
453  */
454 static int channel_role_multi_cmp_cb(void *obj, void *arg, int flags)
455 {
456         struct channel_role_snapshot *left = obj;
457         struct channel_role_snapshot *right = arg;
458         const char *match = (flags & OBJ_KEY) ? arg : right->role;
459         return strcasecmp(left->role, match) ? 0 : (CMP_MATCH);
460 }
461
462 /*!
463  * \internal
464  * \brief Hash function for \ref channel_role_snapshot objects
465  */
466 static int channel_role_hash_cb(const void *obj, const int flags)
467 {
468         const struct channel_role_snapshot *snapshot = obj;
469         const char *name = (flags & OBJ_KEY) ? obj : snapshot->role;
470         return ast_str_case_hash(name);
471 }
472
473 /*!
474  * \internal
475  * \brief Destructor for \ref ast_multi_channel_blob objects
476  */
477 static void multi_channel_blob_dtor(void *obj)
478 {
479         struct ast_multi_channel_blob *multi_blob = obj;
480
481         ao2_cleanup(multi_blob->channel_snapshots);
482         ast_json_unref(multi_blob->blob);
483 }
484
485 struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *blob)
486 {
487         RAII_VAR(struct ast_multi_channel_blob *, obj,
488                         ao2_alloc(sizeof(*obj), multi_channel_blob_dtor),
489                         ao2_cleanup);
490
491         ast_assert(blob != NULL);
492
493         if (!obj) {
494                 return NULL;
495         }
496
497         obj->channel_snapshots = ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS,
498                         channel_role_hash_cb, channel_role_single_cmp_cb);
499         if (!obj->channel_snapshots) {
500                 return NULL;
501         }
502
503         obj->blob = ast_json_ref(blob);
504
505         ao2_ref(obj, +1);
506         return obj;
507 }
508
509 struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
510 {
511         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
512         struct ast_channel_snapshot *snapshot;
513
514         ast_assert(!ast_strlen_zero(uniqueid));
515
516         message = stasis_cache_get(ast_channel_cache(),
517                         ast_channel_snapshot_type(),
518                         uniqueid);
519         if (!message) {
520                 return NULL;
521         }
522
523         snapshot = stasis_message_data(message);
524         if (!snapshot) {
525                 return NULL;
526         }
527         ao2_ref(snapshot, +1);
528         return snapshot;
529 }
530
531 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
532 {
533         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
534         struct ast_channel_snapshot *snapshot;
535
536         ast_assert(!ast_strlen_zero(name));
537
538         message = stasis_cache_get(ast_channel_cache_by_name(),
539                         ast_channel_snapshot_type(),
540                         name);
541         if (!message) {
542                 return NULL;
543         }
544
545         snapshot = stasis_message_data(message);
546         if (!snapshot) {
547                 return NULL;
548         }
549         ao2_ref(snapshot, +1);
550         return snapshot;
551 }
552
553 static void channel_role_snapshot_dtor(void *obj)
554 {
555         struct channel_role_snapshot *role_snapshot = obj;
556         ao2_cleanup(role_snapshot->snapshot);
557 }
558
559 void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj, const char *role, struct ast_channel_snapshot *snapshot)
560 {
561         RAII_VAR(struct channel_role_snapshot *, role_snapshot, NULL, ao2_cleanup);
562         int role_len = strlen(role) + 1;
563
564         if (!obj || ast_strlen_zero(role) || !snapshot) {
565                 return;
566         }
567
568         role_snapshot = ao2_alloc(sizeof(*role_snapshot) + role_len, channel_role_snapshot_dtor);
569         if (!role_snapshot) {
570                 return;
571         }
572         ast_copy_string(role_snapshot->role, role, role_len);
573         role_snapshot->snapshot = snapshot;
574         ao2_ref(role_snapshot->snapshot, +1);
575         ao2_link(obj->channel_snapshots, role_snapshot);
576 }
577
578 struct ast_channel_snapshot *ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
579 {
580         struct channel_role_snapshot *role_snapshot;
581
582         if (!obj || ast_strlen_zero(role)) {
583                 return NULL;
584         }
585         role_snapshot = ao2_find(obj->channel_snapshots, role, OBJ_KEY);
586         /* Note that this function does not increase the ref count on snapshot */
587         if (!role_snapshot) {
588                 return NULL;
589         }
590         ao2_ref(role_snapshot, -1);
591         return role_snapshot->snapshot;
592 }
593
594 struct ao2_container *ast_multi_channel_blob_get_channels(struct ast_multi_channel_blob *obj, const char *role)
595 {
596         RAII_VAR(struct ao2_container *, ret_container,
597                 ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS, channel_snapshot_hash_cb, channel_snapshot_cmp_cb),
598                 ao2_cleanup);
599         struct ao2_iterator *it_role_snapshots;
600         struct channel_role_snapshot *role_snapshot;
601         char *arg;
602
603         if (!obj || ast_strlen_zero(role) || !ret_container) {
604                 return NULL;
605         }
606         arg = ast_strdupa(role);
607
608         it_role_snapshots = ao2_callback(obj->channel_snapshots, OBJ_MULTIPLE | OBJ_KEY, channel_role_multi_cmp_cb, arg);
609         if (!it_role_snapshots) {
610                 return NULL;
611         }
612
613         while ((role_snapshot = ao2_iterator_next(it_role_snapshots))) {
614                 ao2_link(ret_container, role_snapshot->snapshot);
615                 ao2_ref(role_snapshot, -1);
616         }
617         ao2_iterator_destroy(it_role_snapshots);
618
619         ao2_ref(ret_container, +1);
620         return ret_container;
621 }
622
623 struct ast_json *ast_multi_channel_blob_get_json(struct ast_multi_channel_blob *obj)
624 {
625         if (!obj) {
626                 return NULL;
627         }
628         return obj->blob;
629 }
630
631 void ast_channel_stage_snapshot(struct ast_channel *chan)
632 {
633         ast_set_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
634 }
635
636 void ast_channel_stage_snapshot_done(struct ast_channel *chan)
637 {
638         ast_clear_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE);
639         ast_channel_publish_snapshot(chan);
640 }
641
642 void ast_channel_publish_snapshot(struct ast_channel *chan)
643 {
644         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
645         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
646
647         if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)) {
648                 return;
649         }
650
651         snapshot = ast_channel_snapshot_create(chan);
652         if (!snapshot) {
653                 return;
654         }
655
656         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
657         if (!message) {
658                 return;
659         }
660
661         ast_assert(ast_channel_topic(chan) != NULL);
662         stasis_publish(ast_channel_topic(chan), message);
663 }
664
665 void ast_channel_publish_cached_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
666 {
667         struct stasis_message *message;
668
669         if (!blob) {
670                 blob = ast_json_null();
671         }
672
673         message = ast_channel_blob_create_from_cache(ast_channel_uniqueid(chan), type, blob);
674         if (message) {
675                 stasis_publish(ast_channel_topic(chan), message);
676         }
677         ao2_cleanup(message);
678 }
679
680 void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
681 {
682         struct stasis_message *message;
683
684         if (!blob) {
685                 blob = ast_json_null();
686         }
687
688         message = ast_channel_blob_create(chan, type, blob);
689         if (message) {
690                 stasis_publish(ast_channel_topic(chan), message);
691         }
692         ao2_cleanup(message);
693 }
694
695 void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
696 {
697         struct ast_json *blob;
698
699         ast_assert(name != NULL);
700         ast_assert(value != NULL);
701
702         blob = ast_json_pack("{s: s, s: s}",
703                              "variable", name,
704                              "value", value);
705         if (!blob) {
706                 ast_log(LOG_ERROR, "Error creating message\n");
707                 return;
708         }
709
710         /*! If there are manager variables, force a cache update */
711         if (chan && ast_channel_has_manager_vars()) {
712                 ast_channel_publish_snapshot(chan);
713         }
714
715         if (chan) {
716                 ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob);
717         } else {
718                 /* This function is NULL safe for global variables */
719                 ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob);
720         }
721
722         ast_json_unref(blob);
723 }
724
725 static struct ast_manager_event_blob *varset_to_ami(struct stasis_message *msg)
726 {
727         RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
728         struct ast_channel_blob *obj = stasis_message_data(msg);
729         const char *variable =
730                 ast_json_string_get(ast_json_object_get(obj->blob, "variable"));
731         const char *value =
732                 ast_json_string_get(ast_json_object_get(obj->blob, "value"));
733
734         if (obj->snapshot) {
735                 channel_event_string =
736                         ast_manager_build_channel_state_string(obj->snapshot);
737         } else {
738                 channel_event_string = ast_str_create(35);
739                 ast_str_set(&channel_event_string, 0,
740                             "Channel: none\r\n"
741                             "Uniqueid: none\r\n");
742         }
743
744         if (!channel_event_string) {
745                 return NULL;
746         }
747
748         return ast_manager_event_blob_create(EVENT_FLAG_DIALPLAN, "VarSet",
749                 "%s"
750                 "Variable: %s\r\n"
751                 "Value: %s\r\n",
752                 ast_str_buffer(channel_event_string), variable, value);
753 }
754
755 static struct ast_manager_event_blob *agent_login_to_ami(struct stasis_message *msg)
756 {
757         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
758         struct ast_channel_blob *obj = stasis_message_data(msg);
759         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
760
761         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
762         if (!channel_string) {
763                 return NULL;
764         }
765
766         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogin",
767                 "%s"
768                 "Agent: %s\r\n",
769                 ast_str_buffer(channel_string), agent);
770 }
771
772 static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message *msg)
773 {
774         RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
775         struct ast_channel_blob *obj = stasis_message_data(msg);
776         const char *agent = ast_json_string_get(ast_json_object_get(obj->blob, "agent"));
777         long logintime = ast_json_integer_get(ast_json_object_get(obj->blob, "logintime"));
778
779         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
780         if (!channel_string) {
781                 return NULL;
782         }
783
784         return ast_manager_event_blob_create(EVENT_FLAG_AGENT, "AgentLogoff",
785                 "%s"
786                 "Agent: %s\r\n"
787                 "Logintime: %ld\r\n",
788                 ast_str_buffer(channel_string), agent, logintime);
789 }
790
791 void ast_publish_channel_state(struct ast_channel *chan)
792 {
793         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
794         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
795
796         ast_assert(chan != NULL);
797         if (!chan) {
798                 return;
799         }
800
801         snapshot = ast_channel_snapshot_create(chan);
802         if (!snapshot) {
803                 return;
804         }
805
806         message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
807         if (!message) {
808                 return;
809         }
810
811         ast_assert(ast_channel_topic(chan) != NULL);
812         stasis_publish(ast_channel_topic(chan), message);
813 }
814
815 struct ast_json *ast_channel_snapshot_to_json(
816         const struct ast_channel_snapshot *snapshot,
817         const struct stasis_message_sanitizer *sanitize)
818 {
819         RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
820
821         if (snapshot == NULL
822                 || (sanitize && sanitize->channel_snapshot
823                 && sanitize->channel_snapshot(snapshot))) {
824                 return NULL;
825         }
826
827         json_chan = ast_json_pack(
828                 /* Broken up into groups of three for readability */
829                 "{ s: s, s: s, s: s,"
830                 "  s: o, s: o, s: s,"
831                 "  s: o, s: o }",
832                 /* First line */
833                 "id", snapshot->uniqueid,
834                 "name", snapshot->name,
835                 "state", ast_state2str(snapshot->state),
836                 /* Second line */
837                 "caller", ast_json_name_number(
838                         snapshot->caller_name, snapshot->caller_number),
839                 "connected", ast_json_name_number(
840                         snapshot->connected_name, snapshot->connected_number),
841                 "accountcode", snapshot->accountcode,
842                 /* Third line */
843                 "dialplan", ast_json_dialplan_cep(
844                         snapshot->context, snapshot->exten, snapshot->priority),
845                 "creationtime", ast_json_timeval(snapshot->creationtime, NULL));
846
847         return ast_json_ref(json_chan);
848 }
849
850 int ast_channel_snapshot_cep_equal(
851         const struct ast_channel_snapshot *old_snapshot,
852         const struct ast_channel_snapshot *new_snapshot)
853 {
854         ast_assert(old_snapshot != NULL);
855         ast_assert(new_snapshot != NULL);
856
857         /* We actually get some snapshots with CEP set, but before the
858          * application is set. Since empty application is invalid, we treat
859          * setting the application from nothing as a CEP change.
860          */
861         if (ast_strlen_zero(old_snapshot->appl) &&
862             !ast_strlen_zero(new_snapshot->appl)) {
863                 return 0;
864         }
865
866         return old_snapshot->priority == new_snapshot->priority &&
867                 strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
868                 strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
869 }
870
871 int ast_channel_snapshot_caller_id_equal(
872         const struct ast_channel_snapshot *old_snapshot,
873         const struct ast_channel_snapshot *new_snapshot)
874 {
875         ast_assert(old_snapshot != NULL);
876         ast_assert(new_snapshot != NULL);
877         return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
878                 strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
879 }
880
881 static struct ast_json *channel_blob_to_json(
882         struct stasis_message *message,
883         const char *type,
884         const struct stasis_message_sanitizer *sanitize)
885 {
886         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
887         struct ast_channel_blob *channel_blob = stasis_message_data(message);
888         struct ast_json *blob = channel_blob->blob;
889         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
890         const struct timeval *tv = stasis_message_timestamp(message);
891         int res = 0;
892
893         if (blob == NULL || ast_json_is_null(blob)) {
894                 out = ast_json_object_create();
895         } else {
896                 /* blobs are immutable, so shallow copies are fine */
897                 out = ast_json_copy(blob);
898         }
899
900         if (!out) {
901                 return NULL;
902         }
903
904         res |= ast_json_object_set(out, "type", ast_json_string_create(type));
905         res |= ast_json_object_set(out, "timestamp",
906                 ast_json_timeval(*tv, NULL));
907
908         /* For global channel messages, the snapshot is optional */
909         if (snapshot) {
910                 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
911
912                 if (!json_channel) {
913                         return NULL;
914                 }
915
916                 res |= ast_json_object_set(out, "channel", json_channel);
917         }
918
919         if (res != 0) {
920                 return NULL;
921         }
922
923         return ast_json_ref(out);
924 }
925
926 static struct ast_json *dtmf_end_to_json(
927         struct stasis_message *message,
928         const struct stasis_message_sanitizer *sanitize)
929 {
930         struct ast_channel_blob *channel_blob = stasis_message_data(message);
931         struct ast_json *blob = channel_blob->blob;
932         struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
933         const char *direction =
934                 ast_json_string_get(ast_json_object_get(blob, "direction"));
935         const struct timeval *tv = stasis_message_timestamp(message);
936         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
937
938         /* Only present received DTMF end events as JSON */
939         if (strcasecmp("Received", direction) != 0) {
940                 return NULL;
941         }
942
943         if (!json_channel) {
944                 return NULL;
945         }
946
947         return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}",
948                 "type", "ChannelDtmfReceived",
949                 "timestamp", ast_json_timeval(*tv, NULL),
950                 "digit", ast_json_object_get(blob, "digit"),
951                 "duration_ms", ast_json_object_get(blob, "duration_ms"),
952                 "channel", json_channel);
953 }
954
955 static struct ast_json *varset_to_json(
956         struct stasis_message *message,
957         const struct stasis_message_sanitizer *sanitize)
958 {
959         return channel_blob_to_json(message, "ChannelVarset", sanitize);
960 }
961
962 static struct ast_json *hangup_request_to_json(
963         struct stasis_message *message,
964         const struct stasis_message_sanitizer *sanitize)
965 {
966         return channel_blob_to_json(message, "ChannelHangupRequest", sanitize);
967 }
968
969 static struct ast_json *dial_to_json(
970         struct stasis_message *message,
971         const struct stasis_message_sanitizer *sanitize)
972 {
973         struct ast_multi_channel_blob *payload = stasis_message_data(message);
974         struct ast_json *blob = ast_multi_channel_blob_get_json(payload);
975         struct ast_json *caller_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "caller"), sanitize);
976         struct ast_json *peer_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "peer"), sanitize);
977         struct ast_json *forwarded_json = ast_channel_snapshot_to_json(ast_multi_channel_blob_get_channel(payload, "forwarded"), sanitize);
978         struct ast_json *json;
979         const struct timeval *tv = stasis_message_timestamp(message);
980         int res = 0;
981
982         json = ast_json_pack("{s: s, s: o, s: O, s: O, s: O}",
983                 "type", "Dial",
984                 "timestamp", ast_json_timeval(*tv, NULL),
985                 "dialstatus", ast_json_object_get(blob, "dialstatus"),
986                 "forward", ast_json_object_get(blob, "forward"),
987                 "dialstring", ast_json_object_get(blob, "dialstring"));
988         if (!json) {
989                 ast_json_unref(caller_json);
990                 ast_json_unref(peer_json);
991                 ast_json_unref(forwarded_json);
992                 return NULL;
993         }
994
995         if (caller_json) {
996                 res |= ast_json_object_set(json, "caller", caller_json);
997         }
998         if (peer_json) {
999                 res |= ast_json_object_set(json, "peer", peer_json);
1000         }
1001         if (forwarded_json) {
1002                 res |= ast_json_object_set(json, "forwarded", forwarded_json);
1003         }
1004
1005         if (res) {
1006                 ast_json_unref(json);
1007                 return NULL;
1008         }
1009
1010         return json;
1011 }
1012
1013 static struct ast_manager_event_blob *talking_start_to_ami(struct stasis_message *msg)
1014 {
1015         struct ast_str *channel_string;
1016         struct ast_channel_blob *obj = stasis_message_data(msg);
1017         struct ast_manager_event_blob *blob;
1018
1019         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1020         if (!channel_string) {
1021                 return NULL;
1022         }
1023
1024         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStart",
1025                                              "%s", ast_str_buffer(channel_string));
1026         ast_free(channel_string);
1027
1028         return blob;
1029 }
1030
1031 static struct ast_json *talking_start_to_json(struct stasis_message *message,
1032         const struct stasis_message_sanitizer *sanitize)
1033 {
1034         return channel_blob_to_json(message, "ChannelTalkingStarted", sanitize);
1035 }
1036
1037 static struct ast_manager_event_blob *talking_stop_to_ami(struct stasis_message *msg)
1038 {
1039         struct ast_str *channel_string;
1040         struct ast_channel_blob *obj = stasis_message_data(msg);
1041         int duration = ast_json_integer_get(ast_json_object_get(obj->blob, "duration"));
1042         struct ast_manager_event_blob *blob;
1043
1044         channel_string = ast_manager_build_channel_state_string(obj->snapshot);
1045         if (!channel_string) {
1046                 return NULL;
1047         }
1048
1049         blob = ast_manager_event_blob_create(EVENT_FLAG_CALL, "ChannelTalkingStop",
1050                                              "%s"
1051                                              "Duration: %d\r\n",
1052                                              ast_str_buffer(channel_string),
1053                                              duration);
1054         ast_free(channel_string);
1055
1056         return blob;
1057 }
1058
1059 static struct ast_json *talking_stop_to_json(struct stasis_message *message,
1060         const struct stasis_message_sanitizer *sanitize)
1061 {
1062         return channel_blob_to_json(message, "ChannelTalkingFinished", sanitize);
1063 }
1064
1065 /*!
1066  * @{ \brief Define channel message types.
1067  */
1068 STASIS_MESSAGE_TYPE_DEFN(ast_channel_snapshot_type);
1069 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dial_type,
1070         .to_json = dial_to_json,
1071         );
1072 STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
1073         .to_ami = varset_to_ami,
1074         .to_json = varset_to_json,
1075         );
1076 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
1077         .to_json = hangup_request_to_json,
1078         );
1079 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
1080 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type,
1081         .to_json = dtmf_end_to_json,
1082         );
1083 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hold_type);
1084 STASIS_MESSAGE_TYPE_DEFN(ast_channel_unhold_type);
1085 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_start_type);
1086 STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_stop_type);
1087 STASIS_MESSAGE_TYPE_DEFN(ast_channel_fax_type);
1088 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_handler_type);
1089 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_start_type);
1090 STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_stop_type);
1091 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_start_type);
1092 STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_stop_type);
1093 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_login_type,
1094         .to_ami = agent_login_to_ami,
1095         );
1096 STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type,
1097         .to_ami = agent_logoff_to_ami,
1098         );
1099 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_start,
1100         .to_ami = talking_start_to_ami,
1101         .to_json = talking_start_to_json,
1102         );
1103 STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_stop,
1104         .to_ami = talking_stop_to_ami,
1105         .to_json = talking_stop_to_json,
1106         );
1107
1108 /*! @} */
1109
1110 static void stasis_channels_cleanup(void)
1111 {
1112         stasis_caching_unsubscribe_and_join(channel_by_name_topic);
1113         channel_by_name_topic = NULL;
1114         ao2_cleanup(channel_cache_by_name);
1115         channel_cache_by_name = NULL;
1116         ao2_cleanup(channel_cache_all);
1117         channel_cache_all = NULL;
1118
1119         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
1120         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
1121         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
1122         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
1123         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
1124         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
1125         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hold_type);
1126         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_unhold_type);
1127         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_start_type);
1128         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_stop_type);
1129         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_fax_type);
1130         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_handler_type);
1131         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_start_type);
1132         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_stop_type);
1133         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_start_type);
1134         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
1135         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type);
1136         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type);
1137         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_start);
1138         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_talking_stop);
1139 }
1140
1141 int ast_stasis_channels_init(void)
1142 {
1143         int res = 0;
1144
1145         ast_register_cleanup(stasis_channels_cleanup);
1146
1147         channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
1148                 channel_snapshot_get_id);
1149         if (!channel_cache_all) {
1150                 return -1;
1151         }
1152         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
1153         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
1154
1155         channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
1156         if (!channel_cache_by_name) {
1157                 return -1;
1158         }
1159
1160         /* This should be initialized before the caching topic */
1161         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
1162
1163         channel_by_name_topic = stasis_caching_topic_create(
1164                 stasis_cp_all_topic(channel_cache_all),
1165                 channel_cache_by_name);
1166         if (!channel_by_name_topic) {
1167                 return -1;
1168         }
1169
1170         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
1171         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
1172         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
1173         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
1174         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
1175         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);
1176         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type);
1177         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
1178         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
1179         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
1180         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
1181         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
1182         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
1183         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
1184         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
1185         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_start);
1186         res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_talking_stop);
1187
1188         return res;
1189 }
1190