Multiple revisions 374570,374581
[asterisk/asterisk.git] / main / message.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2010, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Out-of-call text message support
22  *
23  * \author Russell Bryant <russell@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/_private.h"
35
36 #include "asterisk/module.h"
37 #include "asterisk/datastore.h"
38 #include "asterisk/pbx.h"
39 #include "asterisk/manager.h"
40 #include "asterisk/strings.h"
41 #include "asterisk/astobj2.h"
42 #include "asterisk/app.h"
43 #include "asterisk/taskprocessor.h"
44 #include "asterisk/message.h"
45
46 /*** DOCUMENTATION
47         <function name="MESSAGE" language="en_US">
48                 <synopsis>
49                         Create a message or read fields from a message.
50                 </synopsis>
51                 <syntax argsep="/">
52                         <parameter name="argument" required="true">
53                         <para>Field of the message to get or set.</para>
54                         <enumlist>
55                                 <enum name="to">
56                                         <para>Read-only.  The destination of the message.  When processing an
57                                         incoming message, this will be set to the destination listed as
58                                         the recipient of the message that was received by Asterisk.</para>
59                                 </enum>
60                                 <enum name="from">
61                                         <para>Read-only.  The source of the message.  When processing an
62                                         incoming message, this will be set to the source of the message.</para>
63                                 </enum>
64                                 <enum name="custom_data">
65                                         <para>Write-only.  Mark or unmark all message headers for an outgoing
66                                         message.  The following values can be set:</para>
67                                         <enumlist>
68                                                 <enum name="mark_all_outbound">
69                                                         <para>Mark all headers for an outgoing message.</para>
70                                                 </enum>
71                                                 <enum name="clear_all_outbound">
72                                                         <para>Unmark all headers for an outgoing message.</para>
73                                                 </enum>
74                                         </enumlist>
75                                 </enum>
76                                 <enum name="body">
77                                         <para>Read/Write.  The message body.  When processing an incoming
78                                         message, this includes the body of the message that Asterisk
79                                         received.  When MessageSend() is executed, the contents of this
80                                         field are used as the body of the outgoing message.  The body
81                                         will always be UTF-8.</para>
82                                 </enum>
83                         </enumlist>
84                         </parameter>
85                 </syntax>
86                 <description>
87                         <para>This function will read from or write a value to a text message.
88                         It is used both to read the data out of an incoming message, as well as
89                         modify or create a message that will be sent outbound.</para>
90                 </description>
91                 <see-also>
92                         <ref type="application">MessageSend</ref>
93                 </see-also>
94         </function>
95         <function name="MESSAGE_DATA" language="en_US">
96                 <synopsis>
97                         Read or write custom data attached to a message.
98                 </synopsis>
99                 <syntax argsep="/">
100                         <parameter name="argument" required="true">
101                         <para>Field of the message to get or set.</para>
102                         </parameter>
103                 </syntax>
104                 <description>
105                         <para>This function will read from or write a value to a text message.
106                         It is used both to read the data out of an incoming message, as well as
107                         modify a message that will be sent outbound.</para>
108                         <note>
109                                 <para>If you want to set an outbound message to carry data in the
110                                 current message, do
111                                 Set(MESSAGE_DATA(<replaceable>key</replaceable>)=${MESSAGE_DATA(<replaceable>key</replaceable>)}).</para>
112                         </note>
113                 </description>
114                 <see-also>
115                         <ref type="application">MessageSend</ref>
116                 </see-also>
117         </function>
118         <application name="MessageSend" language="en_US">
119                 <synopsis>
120                         Send a text message.
121                 </synopsis>
122                 <syntax>
123                         <parameter name="to" required="true">
124                                 <para>A To URI for the message.</para>
125                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
126                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
127                         </parameter>
128                         <parameter name="from" required="false">
129                                 <para>A From URI for the message if needed for the
130                                 message technology being used to send this message.</para>
131                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
132                         </parameter>
133                 </syntax>
134                 <description>
135                         <para>Send a text message.  The body of the message that will be
136                         sent is what is currently set to <literal>MESSAGE(body)</literal>.
137                           The technology chosen for sending the message is determined
138                         based on a prefix to the <literal>to</literal> parameter.</para>
139                         <para>This application sets the following channel variables:</para>
140                         <variablelist>
141                                 <variable name="MESSAGE_SEND_STATUS">
142                                         <para>This is the message delivery status returned by this application.</para>
143                                         <value name="INVALID_PROTOCOL">
144                                                 No handler for the technology part of the URI was found.
145                                         </value>
146                                         <value name="INVALID_URI">
147                                                 The protocol handler reported that the URI was not valid.
148                                         </value>
149                                         <value name="SUCCESS">
150                                                 Successfully passed on to the protocol handler, but delivery has not necessarily been guaranteed.
151                                         </value>
152                                         <value name="FAILURE">
153                                                 The protocol handler reported that it was unabled to deliver the message for some reason.
154                                         </value>
155                                 </variable>
156                         </variablelist>
157                 </description>
158         </application>
159         <manager name="MessageSend" language="en_US">
160                 <synopsis>
161                         Send an out of call message to an endpoint.
162                 </synopsis>
163                 <syntax>
164                         <xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
165                         <parameter name="To" required="true">
166                                 <para>The URI the message is to be sent to.</para>
167                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
168                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
169                         </parameter>
170                         <parameter name="From">
171                                 <para>A From URI for the message if needed for the
172                                 message technology being used to send this message.</para>
173                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
174                         </parameter>
175                         <parameter name="Body">
176                                 <para>The message body text.  This must not contain any newlines as that
177                                 conflicts with the AMI protocol.</para>
178                         </parameter>
179                         <parameter name="Base64Body">
180                                 <para>Text bodies requiring the use of newlines have to be base64 encoded
181                                 in this field.  Base64Body will be decoded before being sent out.
182                                 Base64Body takes precedence over Body.</para>
183                         </parameter>
184                         <parameter name="Variable">
185                                 <para>Message variable to set, multiple Variable: headers are
186                                 allowed.  The header value is a comma separated list of
187                                 name=value pairs.</para>
188                         </parameter>
189                 </syntax>
190         </manager>
191  ***/
192
193 struct msg_data {
194         AST_DECLARE_STRING_FIELDS(
195                 AST_STRING_FIELD(name);
196                 AST_STRING_FIELD(value);
197         );
198         unsigned int send:1; /* Whether to send out on outbound messages */
199 };
200
201 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
202
203 /*!
204  * \brief A message.
205  *
206  * \todo Consider whether stringfields would be an appropriate optimization here.
207  */
208 struct ast_msg {
209         struct ast_str *to;
210         struct ast_str *from;
211         struct ast_str *body;
212         struct ast_str *context;
213         struct ast_str *exten;
214         struct ao2_container *vars;
215 };
216
217 struct ast_msg_tech_holder {
218         const struct ast_msg_tech *tech;
219         /*!
220          * \brief A rwlock for this object
221          *
222          * a read/write lock must be used to protect the wrapper instead
223          * of the ao2 lock. A rdlock must be held to read tech_holder->tech.
224          */
225         ast_rwlock_t tech_lock;
226 };
227
228 static struct ao2_container *msg_techs;
229
230 static struct ast_taskprocessor *msg_q_tp;
231
232 static const char app_msg_send[] = "MessageSend";
233
234 static void msg_ds_destroy(void *data);
235
236 static const struct ast_datastore_info msg_datastore = {
237         .type = "message",
238         .destroy = msg_ds_destroy,
239 };
240
241 static int msg_func_read(struct ast_channel *chan, const char *function,
242                 char *data, char *buf, size_t len);
243 static int msg_func_write(struct ast_channel *chan, const char *function,
244                 char *data, const char *value);
245
246 static struct ast_custom_function msg_function = {
247         .name = "MESSAGE",
248         .read = msg_func_read,
249         .write = msg_func_write,
250 };
251
252 static int msg_data_func_read(struct ast_channel *chan, const char *function,
253                 char *data, char *buf, size_t len);
254 static int msg_data_func_write(struct ast_channel *chan, const char *function,
255                 char *data, const char *value);
256
257 static struct ast_custom_function msg_data_function = {
258         .name = "MESSAGE_DATA",
259         .read = msg_data_func_read,
260         .write = msg_data_func_write,
261 };
262
263 static struct ast_frame *chan_msg_read(struct ast_channel *chan);
264 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr);
265 static int chan_msg_indicate(struct ast_channel *chan, int condition,
266                 const void *data, size_t datalen);
267 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit);
268 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
269                 unsigned int duration);
270
271 /*!
272  * \internal
273  * \brief A bare minimum channel technology
274  *
275  * This will not be registered as we never want anything to try
276  * to create Message channels other than internally in this file.
277  */
278 static const struct ast_channel_tech msg_chan_tech_hack = {
279         .type             = "Message",
280         .description      = "Internal Text Message Processing",
281         .read             = chan_msg_read,
282         .write            = chan_msg_write,
283         .indicate         = chan_msg_indicate,
284         .send_digit_begin = chan_msg_send_digit_begin,
285         .send_digit_end   = chan_msg_send_digit_end,
286 };
287
288 /*!
289  * \internal
290  * \brief ast_channel_tech read callback
291  *
292  * This should never be called.  However, we say that about chan_iax2's
293  * read callback, too, and it seems to randomly get called for some
294  * reason.  If it does, a simple NULL frame will suffice.
295  */
296 static struct ast_frame *chan_msg_read(struct ast_channel *chan)
297 {
298         return &ast_null_frame;
299 }
300
301 /*!
302  * \internal
303  * \brief ast_channel_tech write callback
304  *
305  * Throw all frames away.  We don't care about any of them.
306  */
307 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr)
308 {
309         return 0;
310 }
311
312 /*!
313  * \internal
314  * \brief ast_channel_tech indicate callback
315  *
316  * The indicate callback is here just so it can return success.
317  * We don't want any callers of ast_indicate() to think something
318  * has failed.  We also don't want ast_indicate() itself to try
319  * to generate inband tones since we didn't tell it that we took
320  * care of it ourselves.
321  */
322 static int chan_msg_indicate(struct ast_channel *chan, int condition,
323                 const void *data, size_t datalen)
324 {
325         return 0;
326 }
327
328 /*!
329  * \internal
330  * \brief ast_channel_tech send_digit_begin callback
331  *
332  * This is here so that just in case a digit comes at a message channel
333  * that the Asterisk core doesn't waste any time trying to generate
334  * inband DTMF in audio.  It's a waste of resources.
335  */
336 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit)
337 {
338         return 0;
339 }
340
341 /*!
342  * \internal
343  * \brief ast_channel_tech send_digit_end callback
344  *
345  * This is here so that just in case a digit comes at a message channel
346  * that the Asterisk core doesn't waste any time trying to generate
347  * inband DTMF in audio.  It's a waste of resources.
348  */
349 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
350                 unsigned int duration)
351 {
352         return 0;
353 }
354
355 static void msg_ds_destroy(void *data)
356 {
357         struct ast_msg *msg = data;
358
359         ao2_ref(msg, -1);
360 }
361
362 static int msg_data_hash_fn(const void *obj, const int flags)
363 {
364         const struct msg_data *data = obj;
365         return ast_str_case_hash(data->name);
366 }
367
368 static int msg_data_cmp_fn(void *obj, void *arg, int flags)
369 {
370         const struct msg_data *one = obj, *two = arg;
371         return !strcasecmp(one->name, two->name) ? CMP_MATCH | CMP_STOP : 0;
372 }
373
374 static void msg_data_destructor(void *obj)
375 {
376         struct msg_data *data = obj;
377         ast_string_field_free_memory(data);
378 }
379
380 static void msg_destructor(void *obj)
381 {
382         struct ast_msg *msg = obj;
383
384         ast_free(msg->to);
385         msg->to = NULL;
386
387         ast_free(msg->from);
388         msg->from = NULL;
389
390         ast_free(msg->body);
391         msg->body = NULL;
392
393         ast_free(msg->context);
394         msg->context = NULL;
395
396         ast_free(msg->exten);
397         msg->exten = NULL;
398
399         ao2_ref(msg->vars, -1);
400 }
401
402 struct ast_msg *ast_msg_alloc(void)
403 {
404         struct ast_msg *msg;
405
406         if (!(msg = ao2_alloc(sizeof(*msg), msg_destructor))) {
407                 return NULL;
408         }
409
410         if (!(msg->to = ast_str_create(32))) {
411                 ao2_ref(msg, -1);
412                 return NULL;
413         }
414
415         if (!(msg->from = ast_str_create(32))) {
416                 ao2_ref(msg, -1);
417                 return NULL;
418         }
419
420         if (!(msg->body = ast_str_create(128))) {
421                 ao2_ref(msg, -1);
422                 return NULL;
423         }
424
425         if (!(msg->context = ast_str_create(16))) {
426                 ao2_ref(msg, -1);
427                 return NULL;
428         }
429
430         if (!(msg->exten = ast_str_create(16))) {
431                 ao2_ref(msg, -1);
432                 return NULL;
433         }
434
435         if (!(msg->vars = ao2_container_alloc(1, msg_data_hash_fn, msg_data_cmp_fn))) {
436                 ao2_ref(msg, -1);
437                 return NULL;
438         }
439
440         ast_str_set(&msg->context, 0, "default");
441
442         return msg;
443 }
444
445 struct ast_msg *ast_msg_ref(struct ast_msg *msg)
446 {
447         ao2_ref(msg, 1);
448         return msg;
449 }
450
451 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
452 {
453         ao2_ref(msg, -1);
454
455         return NULL;
456 }
457
458 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
459 {
460         va_list ap;
461         int res;
462
463         va_start(ap, fmt);
464         res = ast_str_set_va(&msg->to, 0, fmt, ap);
465         va_end(ap);
466
467         return res < 0 ? -1 : 0;
468 }
469
470 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
471 {
472         va_list ap;
473         int res;
474
475         va_start(ap, fmt);
476         res = ast_str_set_va(&msg->from, 0, fmt, ap);
477         va_end(ap);
478
479         return res < 0 ? -1 : 0;
480 }
481
482 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
483 {
484         va_list ap;
485         int res;
486
487         va_start(ap, fmt);
488         res = ast_str_set_va(&msg->body, 0, fmt, ap);
489         va_end(ap);
490
491         return res < 0 ? -1 : 0;
492 }
493
494 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
495 {
496         va_list ap;
497         int res;
498
499         va_start(ap, fmt);
500         res = ast_str_set_va(&msg->context, 0, fmt, ap);
501         va_end(ap);
502
503         return res < 0 ? -1 : 0;
504 }
505
506 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
507 {
508         va_list ap;
509         int res;
510
511         va_start(ap, fmt);
512         res = ast_str_set_va(&msg->exten, 0, fmt, ap);
513         va_end(ap);
514
515         return res < 0 ? -1 : 0;
516 }
517
518 const char *ast_msg_get_body(const struct ast_msg *msg)
519 {
520         return ast_str_buffer(msg->body);
521 }
522
523 static struct msg_data *msg_data_alloc(void)
524 {
525         struct msg_data *data;
526
527         if (!(data = ao2_alloc(sizeof(*data), msg_data_destructor))) {
528                 return NULL;
529         }
530
531         if (ast_string_field_init(data, 32)) {
532                 ao2_ref(data, -1);
533                 return NULL;
534         }
535
536         return data;
537 }
538
539 static struct msg_data *msg_data_find(struct ao2_container *vars, const char *name)
540 {
541         struct msg_data tmp = {
542                 .name = name,
543         };
544         return ao2_find(vars, &tmp, OBJ_POINTER);
545 }
546
547 static int msg_set_var_full(struct ast_msg *msg, const char *name, const char *value, unsigned int outbound)
548 {
549         struct msg_data *data;
550
551         if (!(data = msg_data_find(msg->vars, name))) {
552                 if (ast_strlen_zero(value)) {
553                         return 0;
554                 }
555                 if (!(data = msg_data_alloc())) {
556                         return -1;
557                 };
558
559                 ast_string_field_set(data, name, name);
560                 ast_string_field_set(data, value, value);
561                 data->send = outbound;
562                 ao2_link(msg->vars, data);
563         } else {
564                 if (ast_strlen_zero(value)) {
565                         ao2_unlink(msg->vars, data);
566                 } else {
567                         ast_string_field_set(data, value, value);
568                         data->send = outbound;
569                 }
570         }
571
572         ao2_ref(data, -1);
573
574         return 0;
575 }
576
577 int ast_msg_set_var_outbound(struct ast_msg *msg, const char *name, const char *value)
578 {
579         return msg_set_var_full(msg, name, value, 1);
580 }
581
582 int ast_msg_set_var(struct ast_msg *msg, const char *name, const char *value)
583 {
584         return msg_set_var_full(msg, name, value, 0);
585 }
586
587 const char *ast_msg_get_var(struct ast_msg *msg, const char *name)
588 {
589         struct msg_data *data;
590         const char *val = NULL;
591
592         if (!(data = msg_data_find(msg->vars, name))) {
593                 return NULL;
594         }
595
596         /* Yep, this definitely looks like val would be a dangling pointer
597          * after the ref count is decremented.  As long as the message structure
598          * is used in a thread safe manner, this will not be the case though.
599          * The ast_msg holds a reference to this object in the msg->vars container. */
600         val = data->value;
601         ao2_ref(data, -1);
602
603         return val;
604 }
605
606 struct ast_msg_var_iterator {
607         struct ao2_iterator i;
608         struct msg_data *current_used;
609 };
610
611 struct ast_msg_var_iterator *ast_msg_var_iterator_init(const struct ast_msg *msg)
612 {
613         struct ast_msg_var_iterator *i;
614         if (!(i = ast_calloc(1, sizeof(*i)))) {
615                 return NULL;
616         }
617
618         i->i = ao2_iterator_init(msg->vars, 0);
619
620         return i;
621 }
622
623 int ast_msg_var_iterator_next(const struct ast_msg *msg, struct ast_msg_var_iterator *i, const char **name, const char **value)
624 {
625         struct msg_data *data;
626
627         /* Skip any that aren't marked for sending out */
628         while ((data = ao2_iterator_next(&i->i)) && !data->send) {
629                 ao2_ref(data, -1);
630         }
631
632         if (!data) {
633                 return 0;
634         }
635
636         if (data->send) {
637                 *name = data->name;
638                 *value = data->value;
639         }
640
641         /* Leave the refcount to be cleaned up by the caller with
642          * ast_msg_var_unref_current after they finish with the pointers to the data */
643         i->current_used = data;
644
645         return 1;
646 }
647
648 void ast_msg_var_unref_current(struct ast_msg_var_iterator *i) {
649         if (i->current_used) {
650                 ao2_ref(i->current_used, -1);
651         }
652         i->current_used = NULL;
653 }
654
655 void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *i)
656 {
657         ao2_iterator_destroy(&i->i);
658         ast_free(i);
659 }
660
661 static struct ast_channel *create_msg_q_chan(void)
662 {
663         struct ast_channel *chan;
664         struct ast_datastore *ds;
665
666         chan = ast_channel_alloc(1, AST_STATE_UP,
667                         NULL, NULL, NULL,
668                         NULL, NULL, NULL, 0,
669                         "%s", "Message/ast_msg_queue");
670
671         if (!chan) {
672                 return NULL;
673         }
674
675         ast_channel_unlink(chan);
676
677         ast_channel_tech_set(chan, &msg_chan_tech_hack);
678
679         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
680                 ast_hangup(chan);
681                 return NULL;
682         }
683
684         ast_channel_lock(chan);
685         ast_channel_datastore_add(chan, ds);
686         ast_channel_unlock(chan);
687
688         return chan;
689 }
690
691 /*!
692  * \internal
693  * \brief Run the dialplan for message processing
694  *
695  * \pre The message has already been set up on the msg datastore
696  *      on this channel.
697  */
698 static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
699 {
700         struct ast_pbx_args pbx_args;
701
702         ast_explicit_goto(chan, ast_str_buffer(msg->context), AS_OR(msg->exten, "s"), 1);
703
704         memset(&pbx_args, 0, sizeof(pbx_args));
705         pbx_args.no_hangup_chan = 1,
706         ast_pbx_run_args(chan, &pbx_args);
707 }
708
709 /*!
710  * \internal
711  * \brief Clean up ast_channel after each message
712  *
713  * Reset various bits of state after routing each message so the same ast_channel
714  * can just be reused.
715  */
716 static void chan_cleanup(struct ast_channel *chan)
717 {
718         struct ast_datastore *msg_ds, *ds;
719         struct varshead *headp;
720         struct ast_var_t *vardata;
721
722         ast_channel_lock(chan);
723
724         /*
725          * Remove the msg datastore.  Free its data but keep around the datastore
726          * object and just reuse it.
727          */
728         if ((msg_ds = ast_channel_datastore_find(chan, &msg_datastore, NULL)) && msg_ds->data) {
729                 ast_channel_datastore_remove(chan, msg_ds);
730                 ao2_ref(msg_ds->data, -1);
731                 msg_ds->data = NULL;
732         }
733
734         /*
735          * Destroy all other datastores.
736          */
737         while ((ds = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) {
738                 ast_datastore_free(ds);
739         }
740
741         /*
742          * Destroy all channel variables.
743          */
744         headp = ast_channel_varshead(chan);
745         while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries))) {
746                 ast_var_delete(vardata);
747         }
748
749         /*
750          * Restore msg datastore.
751          */
752         if (msg_ds) {
753                 ast_channel_datastore_add(chan, msg_ds);
754         }
755         /*
756          * Clear softhangup flags.
757          */
758         ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ALL);
759
760         ast_channel_unlock(chan);
761 }
762
763 static void destroy_msg_q_chan(void *data)
764 {
765         struct ast_channel **chan = data;
766
767         if (!*chan) {
768                 return;
769         }
770
771         ast_channel_release(*chan);
772 }
773
774 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
775
776 /*!
777  * \internal
778  * \brief Message queue task processor callback
779  *
780  * \retval 0 success
781  * \retval -1 failure
782  *
783  * \note Even though this returns a value, the taskprocessor code ignores the value.
784  */
785 static int msg_q_cb(void *data)
786 {
787         struct ast_msg *msg = data;
788         struct ast_channel **chan_p, *chan;
789         struct ast_datastore *ds;
790
791         if (!(chan_p = ast_threadstorage_get(&msg_q_chan, sizeof(struct ast_channel *)))) {
792                 return -1;
793         }
794         if (!*chan_p) {
795                 if (!(*chan_p = create_msg_q_chan())) {
796                         return -1;
797                 }
798         }
799         chan = *chan_p;
800
801         ast_channel_lock(chan);
802         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
803                 ast_channel_unlock(chan);
804                 return -1;
805         }
806         ao2_ref(msg, +1);
807         ds->data = msg;
808         ast_channel_unlock(chan);
809
810         msg_route(chan, msg);
811         chan_cleanup(chan);
812
813         ao2_ref(msg, -1);
814
815         return 0;
816 }
817
818 int ast_msg_queue(struct ast_msg *msg)
819 {
820         int res;
821
822         res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
823         if (res == -1) {
824                 ao2_ref(msg, -1);
825         }
826
827         return res;
828 }
829
830 /*!
831  * \internal
832  * \brief Find or create a message datastore on a channel
833  *
834  * \pre chan is locked
835  *
836  * \param chan the relevant channel
837  *
838  * \return the channel's message datastore, or NULL on error
839  */
840 static struct ast_datastore *msg_datastore_find_or_create(struct ast_channel *chan)
841 {
842         struct ast_datastore *ds;
843
844         if ((ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
845                 return ds;
846         }
847
848         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
849                 return NULL;
850         }
851
852         if (!(ds->data = ast_msg_alloc())) {
853                 ast_datastore_free(ds);
854                 return NULL;
855         }
856
857         ast_channel_datastore_add(chan, ds);
858
859         return ds;
860 }
861
862 static int msg_func_read(struct ast_channel *chan, const char *function,
863                 char *data, char *buf, size_t len)
864 {
865         struct ast_datastore *ds;
866         struct ast_msg *msg;
867
868         ast_channel_lock(chan);
869
870         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
871                 ast_channel_unlock(chan);
872                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
873                 return -1;
874         }
875
876         msg = ds->data;
877         ao2_ref(msg, +1);
878         ast_channel_unlock(chan);
879
880         ao2_lock(msg);
881
882         if (!strcasecmp(data, "to")) {
883                 ast_copy_string(buf, ast_str_buffer(msg->to), len);
884         } else if (!strcasecmp(data, "from")) {
885                 ast_copy_string(buf, ast_str_buffer(msg->from), len);
886         } else if (!strcasecmp(data, "body")) {
887                 ast_copy_string(buf, ast_msg_get_body(msg), len);
888         } else {
889                 ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
890         }
891
892         ao2_unlock(msg);
893         ao2_ref(msg, -1);
894
895         return 0;
896 }
897
898 static int msg_func_write(struct ast_channel *chan, const char *function,
899                 char *data, const char *value)
900 {
901         struct ast_datastore *ds;
902         struct ast_msg *msg;
903
904         ast_channel_lock(chan);
905
906         if (!(ds = msg_datastore_find_or_create(chan))) {
907                 ast_channel_unlock(chan);
908                 return -1;
909         }
910
911         msg = ds->data;
912         ao2_ref(msg, +1);
913         ast_channel_unlock(chan);
914
915         ao2_lock(msg);
916
917         if (!strcasecmp(data, "to")) {
918                 ast_msg_set_to(msg, "%s", value);
919         } else if (!strcasecmp(data, "from")) {
920                 ast_msg_set_from(msg, "%s", value);
921         } else if (!strcasecmp(data, "body")) {
922                 ast_msg_set_body(msg, "%s", value);
923         } else if (!strcasecmp(data, "custom_data")) {
924                 int outbound = -1;
925                 if (!strcasecmp(value, "mark_all_outbound")) {
926                         outbound = 1;
927                 } else if (!strcasecmp(value, "clear_all_outbound")) {
928                         outbound = 0;
929                 } else {
930                         ast_log(LOG_WARNING, "'%s' is not a valid value for custom_data\n", value);
931                 }
932
933                 if (outbound != -1) {
934                         struct msg_data *hdr_data;
935                         struct ao2_iterator iter = ao2_iterator_init(msg->vars, 0);
936
937                         while ((hdr_data = ao2_iterator_next(&iter))) {
938                                 hdr_data->send = outbound;
939                                 ao2_ref(hdr_data, -1);
940                         }
941                         ao2_iterator_destroy(&iter);
942                 }
943         } else {
944                 ast_log(LOG_WARNING, "'%s' is not a valid write argument.\n", data);
945         }
946
947         ao2_unlock(msg);
948         ao2_ref(msg, -1);
949
950         return 0;
951 }
952
953 static int msg_data_func_read(struct ast_channel *chan, const char *function,
954                 char *data, char *buf, size_t len)
955 {
956         struct ast_datastore *ds;
957         struct ast_msg *msg;
958         const char *val;
959
960         ast_channel_lock(chan);
961
962         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
963                 ast_channel_unlock(chan);
964                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
965                 return -1;
966         }
967
968         msg = ds->data;
969         ao2_ref(msg, +1);
970         ast_channel_unlock(chan);
971
972         ao2_lock(msg);
973
974         if ((val = ast_msg_get_var(msg, data))) {
975                 ast_copy_string(buf, val, len);
976         }
977
978         ao2_unlock(msg);
979         ao2_ref(msg, -1);
980
981         return 0;
982 }
983
984 static int msg_data_func_write(struct ast_channel *chan, const char *function,
985                 char *data, const char *value)
986 {
987         struct ast_datastore *ds;
988         struct ast_msg *msg;
989
990         ast_channel_lock(chan);
991
992         if (!(ds = msg_datastore_find_or_create(chan))) {
993                 ast_channel_unlock(chan);
994                 return -1;
995         }
996
997         msg = ds->data;
998         ao2_ref(msg, +1);
999         ast_channel_unlock(chan);
1000
1001         ao2_lock(msg);
1002
1003         ast_msg_set_var_outbound(msg, data, value);
1004
1005         ao2_unlock(msg);
1006         ao2_ref(msg, -1);
1007
1008         return 0;
1009 }
1010 static int msg_tech_hash(const void *obj, const int flags)
1011 {
1012         struct ast_msg_tech_holder *tech_holder = (struct ast_msg_tech_holder *) obj;
1013         int res = 0;
1014
1015         ast_rwlock_rdlock(&tech_holder->tech_lock);
1016         if (tech_holder->tech) {
1017                 res = ast_str_case_hash(tech_holder->tech->name);
1018         }
1019         ast_rwlock_unlock(&tech_holder->tech_lock);
1020
1021         return res;
1022 }
1023
1024 static int msg_tech_cmp(void *obj, void *arg, int flags)
1025 {
1026         struct ast_msg_tech_holder *tech_holder = obj;
1027         const struct ast_msg_tech_holder *tech_holder2 = arg;
1028         int res = 1;
1029
1030         ast_rwlock_rdlock(&tech_holder->tech_lock);
1031         /*
1032          * tech_holder2 is a temporary fake tech_holder.
1033          */
1034         if (tech_holder->tech) {
1035                 res = strcasecmp(tech_holder->tech->name, tech_holder2->tech->name) ? 0 : CMP_MATCH | CMP_STOP;
1036         }
1037         ast_rwlock_unlock(&tech_holder->tech_lock);
1038
1039         return res;
1040 }
1041
1042 /*!
1043  * \internal
1044  * \brief MessageSend() application
1045  */
1046 static int msg_send_exec(struct ast_channel *chan, const char *data)
1047 {
1048         struct ast_datastore *ds;
1049         struct ast_msg *msg;
1050         char *tech_name;
1051         struct ast_msg_tech_holder *tech_holder = NULL;
1052         char *parse;
1053         int res = -1;
1054         AST_DECLARE_APP_ARGS(args,
1055                 AST_APP_ARG(to);
1056                 AST_APP_ARG(from);
1057         );
1058
1059         if (ast_strlen_zero(data)) {
1060                 ast_log(LOG_WARNING, "An argument is required to MessageSend()\n");
1061                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1062                 return 0;
1063         }
1064
1065         parse = ast_strdupa(data);
1066         AST_STANDARD_APP_ARGS(args, parse);
1067
1068         if (ast_strlen_zero(args.to)) {
1069                 ast_log(LOG_WARNING, "A 'to' URI is required for MessageSend()\n");
1070                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1071                 return 0;
1072         }
1073
1074         ast_channel_lock(chan);
1075
1076         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
1077                 ast_channel_unlock(chan);
1078                 ast_log(LOG_WARNING, "No message data found on channel to send.\n");
1079                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "FAILURE");
1080                 return 0;
1081         }
1082
1083         msg = ds->data;
1084         ao2_ref(msg, +1);
1085         ast_channel_unlock(chan);
1086
1087         tech_name = ast_strdupa(args.to);
1088         tech_name = strsep(&tech_name, ":");
1089
1090         {
1091                 struct ast_msg_tech tmp_msg_tech = {
1092                         .name = tech_name,
1093                 };
1094                 struct ast_msg_tech_holder tmp_tech_holder = {
1095                         .tech = &tmp_msg_tech,
1096                 };
1097
1098                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1099         }
1100
1101         if (!tech_holder) {
1102                 ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
1103                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
1104                 goto exit_cleanup;
1105         }
1106
1107         /*
1108          * The message lock is held here to safely allow the technology
1109          * implementation to access the message fields without worrying
1110          * that they could change.
1111          */
1112         ao2_lock(msg);
1113         ast_rwlock_rdlock(&tech_holder->tech_lock);
1114         if (tech_holder->tech) {
1115                 res = tech_holder->tech->msg_send(msg, S_OR(args.to, ""),
1116                                                         S_OR(args.from, ""));
1117         }
1118         ast_rwlock_unlock(&tech_holder->tech_lock);
1119         ao2_unlock(msg);
1120
1121         pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
1122
1123 exit_cleanup:
1124         if (tech_holder) {
1125                 ao2_ref(tech_holder, -1);
1126                 tech_holder = NULL;
1127         }
1128
1129         ao2_ref(msg, -1);
1130
1131         return 0;
1132 }
1133
1134 static int action_messagesend(struct mansession *s, const struct message *m)
1135 {
1136         const char *to = ast_strdupa(astman_get_header(m, "To"));
1137         const char *from = astman_get_header(m, "From");
1138         const char *body = astman_get_header(m, "Body");
1139         const char *base64body = astman_get_header(m, "Base64Body");
1140         char base64decoded[1301] = { 0, };
1141         char *tech_name = NULL;
1142         struct ast_variable *vars = NULL;
1143         struct ast_variable *data = NULL;
1144         struct ast_msg_tech_holder *tech_holder = NULL;
1145         struct ast_msg *msg;
1146         int res = -1;
1147
1148         if (ast_strlen_zero(to)) {
1149                 astman_send_error(s, m, "No 'To' address specified.");
1150                 return -1;
1151         }
1152
1153         if (!ast_strlen_zero(base64body)) {
1154                 ast_base64decode((unsigned char *) base64decoded, base64body, sizeof(base64decoded) - 1);
1155                 body = base64decoded;
1156         }
1157
1158         tech_name = ast_strdupa(to);
1159         tech_name = strsep(&tech_name, ":");
1160         {
1161                 struct ast_msg_tech tmp_msg_tech = {
1162                         .name = tech_name,
1163                 };
1164                 struct ast_msg_tech_holder tmp_tech_holder = {
1165                         .tech = &tmp_msg_tech,
1166                 };
1167
1168                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1169         }
1170
1171         if (!tech_holder) {
1172                 astman_send_error(s, m, "Message technology not found.");
1173                 return -1;
1174         }
1175
1176         if (!(msg = ast_msg_alloc())) {
1177                 ao2_ref(tech_holder, -1);
1178                 astman_send_error(s, m, "Internal failure\n");
1179                 return -1;
1180         }
1181
1182         data = astman_get_variables(m);
1183         for (vars = data; vars; vars = vars->next) {
1184                 ast_msg_set_var_outbound(msg, vars->name, vars->value);
1185         }
1186
1187         ast_msg_set_body(msg, "%s", body);
1188
1189         ast_rwlock_rdlock(&tech_holder->tech_lock);
1190         if (tech_holder->tech) {
1191                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1192         }
1193         ast_rwlock_unlock(&tech_holder->tech_lock);
1194
1195         ast_variables_destroy(vars);
1196         ao2_ref(tech_holder, -1);
1197         ao2_ref(msg, -1);
1198
1199         if (res) {
1200                 astman_send_error(s, m, "Message failed to send.");
1201         } else {
1202                 astman_send_ack(s, m, "Message successfully sent");
1203         }
1204         return res;
1205 }
1206
1207 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
1208 {
1209         char *tech_name = NULL;
1210         struct ast_msg_tech_holder *tech_holder = NULL;
1211         int res = -1;
1212
1213         if (ast_strlen_zero(to)) {
1214                 ao2_ref(msg, -1);
1215                 return -1;
1216         }
1217
1218         tech_name = ast_strdupa(to);
1219         tech_name = strsep(&tech_name, ":");
1220         {
1221                 struct ast_msg_tech tmp_msg_tech = {
1222                         .name = tech_name,
1223                 };
1224                 struct ast_msg_tech_holder tmp_tech_holder = {
1225                         .tech = &tmp_msg_tech,
1226                 };
1227
1228                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1229         }
1230
1231         if (!tech_holder) {
1232                 ao2_ref(msg, -1);
1233                 return -1;
1234         }
1235
1236         ast_rwlock_rdlock(&tech_holder->tech_lock);
1237         if (tech_holder->tech) {
1238                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1239         }
1240         ast_rwlock_unlock(&tech_holder->tech_lock);
1241
1242         ao2_ref(tech_holder, -1);
1243         ao2_ref(msg, -1);
1244
1245         return res;
1246 }
1247
1248 int ast_msg_tech_register(const struct ast_msg_tech *tech)
1249 {
1250         struct ast_msg_tech_holder tmp_tech_holder = {
1251                 .tech = tech,
1252         };
1253         struct ast_msg_tech_holder *tech_holder;
1254
1255         if ((tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER))) {
1256                 ao2_ref(tech_holder, -1);
1257                 ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
1258                                 tech->name);
1259                 return -1;
1260         }
1261
1262         if (!(tech_holder = ao2_alloc(sizeof(*tech_holder), NULL))) {
1263                 return -1;
1264         }
1265
1266         ast_rwlock_init(&tech_holder->tech_lock);
1267         tech_holder->tech = tech;
1268
1269         ao2_link(msg_techs, tech_holder);
1270
1271         ao2_ref(tech_holder, -1);
1272         tech_holder = NULL;
1273
1274         ast_verb(3, "Message technology handler '%s' registered.\n", tech->name);
1275
1276         return 0;
1277 }
1278
1279 int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
1280 {
1281         struct ast_msg_tech_holder tmp_tech_holder = {
1282                 .tech = tech,
1283         };
1284         struct ast_msg_tech_holder *tech_holder;
1285
1286         tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER | OBJ_UNLINK);
1287
1288         if (!tech_holder) {
1289                 ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
1290                 return -1;
1291         }
1292
1293         ast_rwlock_wrlock(&tech_holder->tech_lock);
1294         tech_holder->tech = NULL;
1295         ast_rwlock_unlock(&tech_holder->tech_lock);
1296
1297         ao2_ref(tech_holder, -1);
1298         tech_holder = NULL;
1299
1300         ast_verb(3, "Message technology handler '%s' unregistered.\n", tech->name);
1301
1302         return 0;
1303 }
1304
1305 void ast_msg_shutdown(void)
1306 {
1307         if (msg_q_tp) {
1308                 msg_q_tp = ast_taskprocessor_unreference(msg_q_tp);
1309         }
1310 }
1311
1312 /*! \internal \brief Clean up other resources on Asterisk shutdown
1313  * \note This does not include the msg_q_tp object, which must be disposed
1314  * of prior to Asterisk checking for channel destruction in its shutdown
1315  * sequence.  The atexit handlers are executed after this occurs. */
1316 static void message_shutdown(void)
1317 {
1318         ast_custom_function_unregister(&msg_function);
1319         ast_custom_function_unregister(&msg_data_function);
1320         ast_unregister_application(app_msg_send);
1321         ast_manager_unregister("MessageSend");
1322
1323         if (msg_techs) {
1324                 ao2_ref(msg_techs, -1);
1325                 msg_techs = NULL;
1326         }
1327 }
1328
1329 /*
1330  * \internal
1331  * \brief Initialize stuff during Asterisk startup.
1332  *
1333  * Cleanup isn't a big deal in this function.  If we return non-zero,
1334  * Asterisk is going to exit.
1335  *
1336  * \retval 0 success
1337  * \retval non-zero failure
1338  */
1339 int ast_msg_init(void)
1340 {
1341         int res;
1342
1343         msg_q_tp = ast_taskprocessor_get("ast_msg_queue", TPS_REF_DEFAULT);
1344         if (!msg_q_tp) {
1345                 return -1;
1346         }
1347
1348         msg_techs = ao2_container_alloc(17, msg_tech_hash, msg_tech_cmp);
1349         if (!msg_techs) {
1350                 return -1;
1351         }
1352
1353         res = __ast_custom_function_register(&msg_function, NULL);
1354         res |= __ast_custom_function_register(&msg_data_function, NULL);
1355         res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
1356         res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
1357
1358         ast_register_atexit(message_shutdown);
1359
1360         return res;
1361 }