26fea1aca9273af691fc4c8548d009559e3b0ba3
[asterisk/asterisk.git] / main / message.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2010, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Out-of-call text message support
22  *
23  * \author Russell Bryant <russell@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/_private.h"
35
36 #include "asterisk/module.h"
37 #include "asterisk/datastore.h"
38 #include "asterisk/pbx.h"
39 #include "asterisk/manager.h"
40 #include "asterisk/strings.h"
41 #include "asterisk/astobj2.h"
42 #include "asterisk/app.h"
43 #include "asterisk/taskprocessor.h"
44 #include "asterisk/message.h"
45
46 /*** DOCUMENTATION
47         <function name="MESSAGE" language="en_US">
48                 <synopsis>
49                         Create a message or read fields from a message.
50                 </synopsis>
51                 <syntax argsep="/">
52                         <parameter name="argument" required="true">
53                         <para>Field of the message to get or set.</para>
54                         <enumlist>
55                                 <enum name="to">
56                                         <para>Read-only.  The destination of the message.  When processing an
57                                         incoming message, this will be set to the destination listed as
58                                         the recipient of the message that was received by Asterisk.</para>
59                                 </enum>
60                                 <enum name="from">
61                                         <para>Read-only.  The source of the message.  When processing an
62                                         incoming message, this will be set to the source of the message.</para>
63                                 </enum>
64                                 <enum name="custom_data">
65                                         <para>Write-only.  Mark or unmark all message headers for an outgoing
66                                         message.  The following values can be set:</para>
67                                         <enumlist>
68                                                 <enum name="mark_all_outbound">
69                                                         <para>Mark all headers for an outgoing message.</para>
70                                                 </enum>
71                                                 <enum name="clear_all_outbound">
72                                                         <para>Unmark all headers for an outgoing message.</para>
73                                                 </enum>
74                                         </enumlist>
75                                 </enum>
76                                 <enum name="body">
77                                         <para>Read/Write.  The message body.  When processing an incoming
78                                         message, this includes the body of the message that Asterisk
79                                         received.  When MessageSend() is executed, the contents of this
80                                         field are used as the body of the outgoing message.  The body
81                                         will always be UTF-8.</para>
82                                 </enum>
83                         </enumlist>
84                         </parameter>
85                 </syntax>
86                 <description>
87                         <para>This function will read from or write a value to a text message.
88                         It is used both to read the data out of an incoming message, as well as
89                         modify or create a message that will be sent outbound.</para>
90                 </description>
91                 <see-also>
92                         <ref type="application">MessageSend</ref>
93                 </see-also>
94         </function>
95         <function name="MESSAGE_DATA" language="en_US">
96                 <synopsis>
97                         Read or write custom data attached to a message.
98                 </synopsis>
99                 <syntax argsep="/">
100                         <parameter name="argument" required="true">
101                         <para>Field of the message to get or set.</para>
102                         </parameter>
103                 </syntax>
104                 <description>
105                         <para>This function will read from or write a value to a text message.
106                         It is used both to read the data out of an incoming message, as well as
107                         modify a message that will be sent outbound.</para>
108                         <note>
109                                 <para>If you want to set an outbound message to carry data in the
110                                 current message, do
111                                 Set(MESSAGE_DATA(<replaceable>key</replaceable>)=${MESSAGE_DATA(<replaceable>key</replaceable>)}).</para>
112                         </note>
113                 </description>
114                 <see-also>
115                         <ref type="application">MessageSend</ref>
116                 </see-also>
117         </function>
118         <application name="MessageSend" language="en_US">
119                 <synopsis>
120                         Send a text message.
121                 </synopsis>
122                 <syntax>
123                         <parameter name="to" required="true">
124                                 <para>A To URI for the message.</para>
125                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
126                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
127                         </parameter>
128                         <parameter name="from" required="false">
129                                 <para>A From URI for the message if needed for the
130                                 message technology being used to send this message.</para>
131                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
132                         </parameter>
133                 </syntax>
134                 <description>
135                         <para>Send a text message.  The body of the message that will be
136                         sent is what is currently set to <literal>MESSAGE(body)</literal>.
137                           The technology chosen for sending the message is determined
138                         based on a prefix to the <literal>to</literal> parameter.</para>
139                         <para>This application sets the following channel variables:</para>
140                         <variablelist>
141                                 <variable name="MESSAGE_SEND_STATUS">
142                                         <para>This is the message delivery status returned by this application.</para>
143                                         <value name="INVALID_PROTOCOL">
144                                                 No handler for the technology part of the URI was found.
145                                         </value>
146                                         <value name="INVALID_URI">
147                                                 The protocol handler reported that the URI was not valid.
148                                         </value>
149                                         <value name="SUCCESS">
150                                                 Successfully passed on to the protocol handler, but delivery has not necessarily been guaranteed.
151                                         </value>
152                                         <value name="FAILURE">
153                                                 The protocol handler reported that it was unabled to deliver the message for some reason.
154                                         </value>
155                                 </variable>
156                         </variablelist>
157                 </description>
158         </application>
159         <manager name="MessageSend" language="en_US">
160                 <synopsis>
161                         Send an out of call message to an endpoint.
162                 </synopsis>
163                 <syntax>
164                         <xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
165                         <parameter name="To" required="true">
166                                 <para>The URI the message is to be sent to.</para>
167                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
168                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
169                         </parameter>
170                         <parameter name="From">
171                                 <para>A From URI for the message if needed for the
172                                 message technology being used to send this message.</para>
173                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
174                         </parameter>
175                         <parameter name="Body">
176                                 <para>The message body text.  This must not contain any newlines as that
177                                 conflicts with the AMI protocol.</para>
178                         </parameter>
179                         <parameter name="Base64Body">
180                                 <para>Text bodies requiring the use of newlines have to be base64 encoded
181                                 in this field.  Base64Body will be decoded before being sent out.
182                                 Base64Body takes precedence over Body.</para>
183                         </parameter>
184                         <parameter name="Variable">
185                                 <para>Message variable to set, multiple Variable: headers are
186                                 allowed.  The header value is a comma separated list of
187                                 name=value pairs.</para>
188                         </parameter>
189                 </syntax>
190         </manager>
191  ***/
192
193 struct msg_data {
194         AST_DECLARE_STRING_FIELDS(
195                 AST_STRING_FIELD(name);
196                 AST_STRING_FIELD(value);
197         );
198         unsigned int send:1; /* Whether to send out on outbound messages */
199 };
200
201 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
202
203 /*!
204  * \brief A message.
205  *
206  * \todo Consider whether stringfields would be an appropriate optimization here.
207  */
208 struct ast_msg {
209         struct ast_str *to;
210         struct ast_str *from;
211         struct ast_str *body;
212         struct ast_str *context;
213         struct ast_str *exten;
214         struct ao2_container *vars;
215 };
216
217 struct ast_msg_tech_holder {
218         const struct ast_msg_tech *tech;
219         /*!
220          * \brief A rwlock for this object
221          *
222          * a read/write lock must be used to protect the wrapper instead
223          * of the ao2 lock. A rdlock must be held to read tech_holder->tech.
224          */
225         ast_rwlock_t tech_lock;
226 };
227
228 static struct ao2_container *msg_techs;
229
230 static struct ast_taskprocessor *msg_q_tp;
231
232 static const char app_msg_send[] = "MessageSend";
233
234 static void msg_ds_destroy(void *data);
235
236 static const struct ast_datastore_info msg_datastore = {
237         .type = "message",
238         .destroy = msg_ds_destroy,
239 };
240
241 static int msg_func_read(struct ast_channel *chan, const char *function,
242                 char *data, char *buf, size_t len);
243 static int msg_func_write(struct ast_channel *chan, const char *function,
244                 char *data, const char *value);
245
246 static struct ast_custom_function msg_function = {
247         .name = "MESSAGE",
248         .read = msg_func_read,
249         .write = msg_func_write,
250 };
251
252 static int msg_data_func_read(struct ast_channel *chan, const char *function,
253                 char *data, char *buf, size_t len);
254 static int msg_data_func_write(struct ast_channel *chan, const char *function,
255                 char *data, const char *value);
256
257 static struct ast_custom_function msg_data_function = {
258         .name = "MESSAGE_DATA",
259         .read = msg_data_func_read,
260         .write = msg_data_func_write,
261 };
262
263 static struct ast_frame *chan_msg_read(struct ast_channel *chan);
264 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr);
265 static int chan_msg_indicate(struct ast_channel *chan, int condition,
266                 const void *data, size_t datalen);
267 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit);
268 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
269                 unsigned int duration);
270
271 /*!
272  * \internal
273  * \brief A bare minimum channel technology
274  *
275  * This will not be registered as we never want anything to try
276  * to create Message channels other than internally in this file.
277  */
278 static const struct ast_channel_tech msg_chan_tech_hack = {
279         .type             = "Message",
280         .description      = "Internal Text Message Processing",
281         .read             = chan_msg_read,
282         .write            = chan_msg_write,
283         .indicate         = chan_msg_indicate,
284         .send_digit_begin = chan_msg_send_digit_begin,
285         .send_digit_end   = chan_msg_send_digit_end,
286 };
287
288 /*!
289  * \internal
290  * \brief ast_channel_tech read callback
291  *
292  * This should never be called.  However, we say that about chan_iax2's
293  * read callback, too, and it seems to randomly get called for some
294  * reason.  If it does, a simple NULL frame will suffice.
295  */
296 static struct ast_frame *chan_msg_read(struct ast_channel *chan)
297 {
298         return &ast_null_frame;
299 }
300
301 /*!
302  * \internal
303  * \brief ast_channel_tech write callback
304  *
305  * Throw all frames away.  We don't care about any of them.
306  */
307 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr)
308 {
309         return 0;
310 }
311
312 /*!
313  * \internal
314  * \brief ast_channel_tech indicate callback
315  *
316  * The indicate callback is here just so it can return success.
317  * We don't want any callers of ast_indicate() to think something
318  * has failed.  We also don't want ast_indicate() itself to try
319  * to generate inband tones since we didn't tell it that we took
320  * care of it ourselves.
321  */
322 static int chan_msg_indicate(struct ast_channel *chan, int condition,
323                 const void *data, size_t datalen)
324 {
325         return 0;
326 }
327
328 /*!
329  * \internal
330  * \brief ast_channel_tech send_digit_begin callback
331  *
332  * This is here so that just in case a digit comes at a message channel
333  * that the Asterisk core doesn't waste any time trying to generate
334  * inband DTMF in audio.  It's a waste of resources.
335  */
336 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit)
337 {
338         return 0;
339 }
340
341 /*!
342  * \internal
343  * \brief ast_channel_tech send_digit_end callback
344  *
345  * This is here so that just in case a digit comes at a message channel
346  * that the Asterisk core doesn't waste any time trying to generate
347  * inband DTMF in audio.  It's a waste of resources.
348  */
349 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
350                 unsigned int duration)
351 {
352         return 0;
353 }
354
355 static void msg_ds_destroy(void *data)
356 {
357         struct ast_msg *msg = data;
358
359         ao2_ref(msg, -1);
360 }
361
362 static int msg_data_hash_fn(const void *obj, const int flags)
363 {
364         const struct msg_data *data = obj;
365         return ast_str_case_hash(data->name);
366 }
367
368 static int msg_data_cmp_fn(void *obj, void *arg, int flags)
369 {
370         const struct msg_data *one = obj, *two = arg;
371         return !strcasecmp(one->name, two->name) ? CMP_MATCH | CMP_STOP : 0;
372 }
373
374 static void msg_data_destructor(void *obj)
375 {
376         struct msg_data *data = obj;
377         ast_string_field_free_memory(data);
378 }
379
380 static void msg_destructor(void *obj)
381 {
382         struct ast_msg *msg = obj;
383
384         ast_free(msg->to);
385         msg->to = NULL;
386
387         ast_free(msg->from);
388         msg->from = NULL;
389
390         ast_free(msg->body);
391         msg->body = NULL;
392
393         ast_free(msg->context);
394         msg->context = NULL;
395
396         ast_free(msg->exten);
397         msg->exten = NULL;
398
399         ao2_ref(msg->vars, -1);
400 }
401
402 struct ast_msg *ast_msg_alloc(void)
403 {
404         struct ast_msg *msg;
405
406         if (!(msg = ao2_alloc(sizeof(*msg), msg_destructor))) {
407                 return NULL;
408         }
409
410         if (!(msg->to = ast_str_create(32))) {
411                 ao2_ref(msg, -1);
412                 return NULL;
413         }
414
415         if (!(msg->from = ast_str_create(32))) {
416                 ao2_ref(msg, -1);
417                 return NULL;
418         }
419
420         if (!(msg->body = ast_str_create(128))) {
421                 ao2_ref(msg, -1);
422                 return NULL;
423         }
424
425         if (!(msg->context = ast_str_create(16))) {
426                 ao2_ref(msg, -1);
427                 return NULL;
428         }
429
430         if (!(msg->exten = ast_str_create(16))) {
431                 ao2_ref(msg, -1);
432                 return NULL;
433         }
434
435         if (!(msg->vars = ao2_container_alloc(1, msg_data_hash_fn, msg_data_cmp_fn))) {
436                 ao2_ref(msg, -1);
437                 return NULL;
438         }
439
440         ast_str_set(&msg->context, 0, "default");
441
442         return msg;
443 }
444
445 struct ast_msg *ast_msg_ref(struct ast_msg *msg)
446 {
447         ao2_ref(msg, 1);
448         return msg;
449 }
450
451 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
452 {
453         ao2_ref(msg, -1);
454
455         return NULL;
456 }
457
458 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
459 {
460         va_list ap;
461         int res;
462
463         va_start(ap, fmt);
464         res = ast_str_set_va(&msg->to, 0, fmt, ap);
465         va_end(ap);
466
467         return res < 0 ? -1 : 0;
468 }
469
470 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
471 {
472         va_list ap;
473         int res;
474
475         va_start(ap, fmt);
476         res = ast_str_set_va(&msg->from, 0, fmt, ap);
477         va_end(ap);
478
479         return res < 0 ? -1 : 0;
480 }
481
482 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
483 {
484         va_list ap;
485         int res;
486
487         va_start(ap, fmt);
488         res = ast_str_set_va(&msg->body, 0, fmt, ap);
489         va_end(ap);
490
491         return res < 0 ? -1 : 0;
492 }
493
494 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
495 {
496         va_list ap;
497         int res;
498
499         va_start(ap, fmt);
500         res = ast_str_set_va(&msg->context, 0, fmt, ap);
501         va_end(ap);
502
503         return res < 0 ? -1 : 0;
504 }
505
506 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
507 {
508         va_list ap;
509         int res;
510
511         va_start(ap, fmt);
512         res = ast_str_set_va(&msg->exten, 0, fmt, ap);
513         va_end(ap);
514
515         return res < 0 ? -1 : 0;
516 }
517
518 const char *ast_msg_get_body(const struct ast_msg *msg)
519 {
520         return ast_str_buffer(msg->body);
521 }
522
523 static struct msg_data *msg_data_alloc(void)
524 {
525         struct msg_data *data;
526
527         if (!(data = ao2_alloc(sizeof(*data), msg_data_destructor))) {
528                 return NULL;
529         }
530
531         if (ast_string_field_init(data, 32)) {
532                 ao2_ref(data, -1);
533                 return NULL;
534         }
535
536         return data;
537 }
538
539 static struct msg_data *msg_data_find(struct ao2_container *vars, const char *name)
540 {
541         struct msg_data tmp = {
542                 .name = name,
543         };
544         return ao2_find(vars, &tmp, OBJ_POINTER);
545 }
546
547 static int msg_set_var_full(struct ast_msg *msg, const char *name, const char *value, unsigned int outbound)
548 {
549         struct msg_data *data;
550
551         if (!(data = msg_data_find(msg->vars, name))) {
552                 if (ast_strlen_zero(value)) {
553                         return 0;
554                 }
555                 if (!(data = msg_data_alloc())) {
556                         return -1;
557                 };
558
559                 ast_string_field_set(data, name, name);
560                 ast_string_field_set(data, value, value);
561                 data->send = outbound;
562                 ao2_link(msg->vars, data);
563         } else {
564                 if (ast_strlen_zero(value)) {
565                         ao2_unlink(msg->vars, data);
566                 } else {
567                         ast_string_field_set(data, value, value);
568                         data->send = outbound;
569                 }
570         }
571
572         ao2_ref(data, -1);
573
574         return 0;
575 }
576
577 int ast_msg_set_var_outbound(struct ast_msg *msg, const char *name, const char *value)
578 {
579         return msg_set_var_full(msg, name, value, 1);
580 }
581
582 int ast_msg_set_var(struct ast_msg *msg, const char *name, const char *value)
583 {
584         return msg_set_var_full(msg, name, value, 0);
585 }
586
587 const char *ast_msg_get_var(struct ast_msg *msg, const char *name)
588 {
589         struct msg_data *data;
590         const char *val = NULL;
591
592         if (!(data = msg_data_find(msg->vars, name))) {
593                 return NULL;
594         }
595
596         /* Yep, this definitely looks like val would be a dangling pointer
597          * after the ref count is decremented.  As long as the message structure
598          * is used in a thread safe manner, this will not be the case though.
599          * The ast_msg holds a reference to this object in the msg->vars container. */
600         val = data->value;
601         ao2_ref(data, -1);
602
603         return val;
604 }
605
606 struct ast_msg_var_iterator {
607         struct ao2_iterator i;
608         struct msg_data *current_used;
609 };
610
611 struct ast_msg_var_iterator *ast_msg_var_iterator_init(const struct ast_msg *msg)
612 {
613         struct ast_msg_var_iterator *i;
614         if (!(i = ast_calloc(1, sizeof(*i)))) {
615                 return NULL;
616         }
617
618         i->i = ao2_iterator_init(msg->vars, 0);
619
620         return i;
621 }
622
623 int ast_msg_var_iterator_next(const struct ast_msg *msg, struct ast_msg_var_iterator *i, const char **name, const char **value)
624 {
625         struct msg_data *data;
626
627         /* Skip any that aren't marked for sending out */
628         while ((data = ao2_iterator_next(&i->i)) && !data->send) {
629                 ao2_ref(data, -1);
630         }
631
632         if (!data) {
633                 return 0;
634         }
635
636         if (data->send) {
637                 *name = data->name;
638                 *value = data->value;
639         }
640
641         /* Leave the refcount to be cleaned up by the caller with
642          * ast_msg_var_unref_current after they finish with the pointers to the data */
643         i->current_used = data;
644
645         return 1;
646 }
647
648 void ast_msg_var_unref_current(struct ast_msg_var_iterator *i) {
649         if (i->current_used) {
650                 ao2_ref(i->current_used, -1);
651         }
652         i->current_used = NULL;
653 }
654
655 void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *i)
656 {
657         ao2_iterator_destroy(&i->i);
658         ast_free(i);
659 }
660
661 static struct ast_channel *create_msg_q_chan(void)
662 {
663         struct ast_channel *chan;
664         struct ast_datastore *ds;
665
666         chan = ast_channel_alloc(1, AST_STATE_UP,
667                         NULL, NULL, NULL,
668                         NULL, NULL, NULL, 0,
669                         "%s", "Message/ast_msg_queue");
670
671         if (!chan) {
672                 return NULL;
673         }
674
675         ast_channel_unlink(chan);
676
677         ast_channel_tech_set(chan, &msg_chan_tech_hack);
678
679         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
680                 ast_hangup(chan);
681                 return NULL;
682         }
683
684         ast_channel_lock(chan);
685         ast_channel_datastore_add(chan, ds);
686         ast_channel_unlock(chan);
687
688         return chan;
689 }
690
691 /*!
692  * \internal
693  * \brief Run the dialplan for message processing
694  *
695  * \pre The message has already been set up on the msg datastore
696  *      on this channel.
697  */
698 static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
699 {
700         struct ast_pbx_args pbx_args;
701
702         ast_explicit_goto(chan, ast_str_buffer(msg->context), AS_OR(msg->exten, "s"), 1);
703
704         memset(&pbx_args, 0, sizeof(pbx_args));
705         pbx_args.no_hangup_chan = 1,
706         ast_pbx_run_args(chan, &pbx_args);
707 }
708
709 /*!
710  * \internal
711  * \brief Clean up ast_channel after each message
712  *
713  * Reset various bits of state after routing each message so the same ast_channel
714  * can just be reused.
715  */
716 static void chan_cleanup(struct ast_channel *chan)
717 {
718         struct ast_datastore *msg_ds, *ds;
719         struct varshead *headp;
720         struct ast_var_t *vardata;
721
722         ast_channel_lock(chan);
723
724         /*
725          * Remove the msg datastore.  Free its data but keep around the datastore
726          * object and just reuse it.
727          */
728         if ((msg_ds = ast_channel_datastore_find(chan, &msg_datastore, NULL)) && msg_ds->data) {
729                 ast_channel_datastore_remove(chan, msg_ds);
730                 ao2_ref(msg_ds->data, -1);
731                 msg_ds->data = NULL;
732         }
733
734         /*
735          * Destroy all other datastores.
736          */
737         while ((ds = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) {
738                 ast_datastore_free(ds);
739         }
740
741         /*
742          * Destroy all channel variables.
743          */
744         headp = ast_channel_varshead(chan);
745         while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries))) {
746                 ast_var_delete(vardata);
747         }
748
749         /*
750          * Restore msg datastore.
751          */
752         if (msg_ds) {
753                 ast_channel_datastore_add(chan, msg_ds);
754         }
755
756         ast_channel_unlock(chan);
757 }
758
759 static void destroy_msg_q_chan(void *data)
760 {
761         struct ast_channel **chan = data;
762
763         if (!*chan) {
764                 return;
765         }
766
767         ast_channel_release(*chan);
768 }
769
770 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
771
772 /*!
773  * \internal
774  * \brief Message queue task processor callback
775  *
776  * \retval 0 success
777  * \retval -1 failure
778  *
779  * \note Even though this returns a value, the taskprocessor code ignores the value.
780  */
781 static int msg_q_cb(void *data)
782 {
783         struct ast_msg *msg = data;
784         struct ast_channel **chan_p, *chan;
785         struct ast_datastore *ds;
786
787         if (!(chan_p = ast_threadstorage_get(&msg_q_chan, sizeof(struct ast_channel *)))) {
788                 return -1;
789         }
790         if (!*chan_p) {
791                 if (!(*chan_p = create_msg_q_chan())) {
792                         return -1;
793                 }
794         }
795         chan = *chan_p;
796
797         ast_channel_lock(chan);
798         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
799                 ast_channel_unlock(chan);
800                 return -1;
801         }
802         ao2_ref(msg, +1);
803         ds->data = msg;
804         ast_channel_unlock(chan);
805
806         msg_route(chan, msg);
807         chan_cleanup(chan);
808
809         ao2_ref(msg, -1);
810
811         return 0;
812 }
813
814 int ast_msg_queue(struct ast_msg *msg)
815 {
816         int res;
817
818         res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
819         if (res == -1) {
820                 ao2_ref(msg, -1);
821         }
822
823         return res;
824 }
825
826 /*!
827  * \internal
828  * \brief Find or create a message datastore on a channel
829  *
830  * \pre chan is locked
831  *
832  * \param chan the relevant channel
833  *
834  * \return the channel's message datastore, or NULL on error
835  */
836 static struct ast_datastore *msg_datastore_find_or_create(struct ast_channel *chan)
837 {
838         struct ast_datastore *ds;
839
840         if ((ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
841                 return ds;
842         }
843
844         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
845                 return NULL;
846         }
847
848         if (!(ds->data = ast_msg_alloc())) {
849                 ast_datastore_free(ds);
850                 return NULL;
851         }
852
853         ast_channel_datastore_add(chan, ds);
854
855         return ds;
856 }
857
858 static int msg_func_read(struct ast_channel *chan, const char *function,
859                 char *data, char *buf, size_t len)
860 {
861         struct ast_datastore *ds;
862         struct ast_msg *msg;
863
864         ast_channel_lock(chan);
865
866         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
867                 ast_channel_unlock(chan);
868                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
869                 return -1;
870         }
871
872         msg = ds->data;
873         ao2_ref(msg, +1);
874         ast_channel_unlock(chan);
875
876         ao2_lock(msg);
877
878         if (!strcasecmp(data, "to")) {
879                 ast_copy_string(buf, ast_str_buffer(msg->to), len);
880         } else if (!strcasecmp(data, "from")) {
881                 ast_copy_string(buf, ast_str_buffer(msg->from), len);
882         } else if (!strcasecmp(data, "body")) {
883                 ast_copy_string(buf, ast_msg_get_body(msg), len);
884         } else {
885                 ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
886         }
887
888         ao2_unlock(msg);
889         ao2_ref(msg, -1);
890
891         return 0;
892 }
893
894 static int msg_func_write(struct ast_channel *chan, const char *function,
895                 char *data, const char *value)
896 {
897         struct ast_datastore *ds;
898         struct ast_msg *msg;
899
900         ast_channel_lock(chan);
901
902         if (!(ds = msg_datastore_find_or_create(chan))) {
903                 ast_channel_unlock(chan);
904                 return -1;
905         }
906
907         msg = ds->data;
908         ao2_ref(msg, +1);
909         ast_channel_unlock(chan);
910
911         ao2_lock(msg);
912
913         if (!strcasecmp(data, "to")) {
914                 ast_msg_set_to(msg, "%s", value);
915         } else if (!strcasecmp(data, "from")) {
916                 ast_msg_set_from(msg, "%s", value);
917         } else if (!strcasecmp(data, "body")) {
918                 ast_msg_set_body(msg, "%s", value);
919         } else if (!strcasecmp(data, "custom_data")) {
920                 int outbound = -1;
921                 if (!strcasecmp(value, "mark_all_outbound")) {
922                         outbound = 1;
923                 } else if (!strcasecmp(value, "clear_all_outbound")) {
924                         outbound = 0;
925                 } else {
926                         ast_log(LOG_WARNING, "'%s' is not a valid value for custom_data\n", value);
927                 }
928
929                 if (outbound != -1) {
930                         struct msg_data *hdr_data;
931                         struct ao2_iterator iter = ao2_iterator_init(msg->vars, 0);
932
933                         while ((hdr_data = ao2_iterator_next(&iter))) {
934                                 hdr_data->send = outbound;
935                                 ao2_ref(hdr_data, -1);
936                         }
937                         ao2_iterator_destroy(&iter);
938                 }
939         } else {
940                 ast_log(LOG_WARNING, "'%s' is not a valid write argument.\n", data);
941         }
942
943         ao2_unlock(msg);
944         ao2_ref(msg, -1);
945
946         return 0;
947 }
948
949 static int msg_data_func_read(struct ast_channel *chan, const char *function,
950                 char *data, char *buf, size_t len)
951 {
952         struct ast_datastore *ds;
953         struct ast_msg *msg;
954         const char *val;
955
956         ast_channel_lock(chan);
957
958         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
959                 ast_channel_unlock(chan);
960                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
961                 return -1;
962         }
963
964         msg = ds->data;
965         ao2_ref(msg, +1);
966         ast_channel_unlock(chan);
967
968         ao2_lock(msg);
969
970         if ((val = ast_msg_get_var(msg, data))) {
971                 ast_copy_string(buf, val, len);
972         }
973
974         ao2_unlock(msg);
975         ao2_ref(msg, -1);
976
977         return 0;
978 }
979
980 static int msg_data_func_write(struct ast_channel *chan, const char *function,
981                 char *data, const char *value)
982 {
983         struct ast_datastore *ds;
984         struct ast_msg *msg;
985
986         ast_channel_lock(chan);
987
988         if (!(ds = msg_datastore_find_or_create(chan))) {
989                 ast_channel_unlock(chan);
990                 return -1;
991         }
992
993         msg = ds->data;
994         ao2_ref(msg, +1);
995         ast_channel_unlock(chan);
996
997         ao2_lock(msg);
998
999         ast_msg_set_var_outbound(msg, data, value);
1000
1001         ao2_unlock(msg);
1002         ao2_ref(msg, -1);
1003
1004         return 0;
1005 }
1006 static int msg_tech_hash(const void *obj, const int flags)
1007 {
1008         struct ast_msg_tech_holder *tech_holder = (struct ast_msg_tech_holder *) obj;
1009         int res = 0;
1010
1011         ast_rwlock_rdlock(&tech_holder->tech_lock);
1012         if (tech_holder->tech) {
1013                 res = ast_str_case_hash(tech_holder->tech->name);
1014         }
1015         ast_rwlock_unlock(&tech_holder->tech_lock);
1016
1017         return res;
1018 }
1019
1020 static int msg_tech_cmp(void *obj, void *arg, int flags)
1021 {
1022         struct ast_msg_tech_holder *tech_holder = obj;
1023         const struct ast_msg_tech_holder *tech_holder2 = arg;
1024         int res = 1;
1025
1026         ast_rwlock_rdlock(&tech_holder->tech_lock);
1027         /*
1028          * tech_holder2 is a temporary fake tech_holder.
1029          */
1030         if (tech_holder->tech) {
1031                 res = strcasecmp(tech_holder->tech->name, tech_holder2->tech->name) ? 0 : CMP_MATCH | CMP_STOP;
1032         }
1033         ast_rwlock_unlock(&tech_holder->tech_lock);
1034
1035         return res;
1036 }
1037
1038 /*!
1039  * \internal
1040  * \brief MessageSend() application
1041  */
1042 static int msg_send_exec(struct ast_channel *chan, const char *data)
1043 {
1044         struct ast_datastore *ds;
1045         struct ast_msg *msg;
1046         char *tech_name;
1047         struct ast_msg_tech_holder *tech_holder = NULL;
1048         char *parse;
1049         int res = -1;
1050         AST_DECLARE_APP_ARGS(args,
1051                 AST_APP_ARG(to);
1052                 AST_APP_ARG(from);
1053         );
1054
1055         if (ast_strlen_zero(data)) {
1056                 ast_log(LOG_WARNING, "An argument is required to MessageSend()\n");
1057                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1058                 return 0;
1059         }
1060
1061         parse = ast_strdupa(data);
1062         AST_STANDARD_APP_ARGS(args, parse);
1063
1064         if (ast_strlen_zero(args.to)) {
1065                 ast_log(LOG_WARNING, "A 'to' URI is required for MessageSend()\n");
1066                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1067                 return 0;
1068         }
1069
1070         ast_channel_lock(chan);
1071
1072         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
1073                 ast_channel_unlock(chan);
1074                 ast_log(LOG_WARNING, "No message data found on channel to send.\n");
1075                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "FAILURE");
1076                 return 0;
1077         }
1078
1079         msg = ds->data;
1080         ao2_ref(msg, +1);
1081         ast_channel_unlock(chan);
1082
1083         tech_name = ast_strdupa(args.to);
1084         tech_name = strsep(&tech_name, ":");
1085
1086         {
1087                 struct ast_msg_tech tmp_msg_tech = {
1088                         .name = tech_name,
1089                 };
1090                 struct ast_msg_tech_holder tmp_tech_holder = {
1091                         .tech = &tmp_msg_tech,
1092                 };
1093
1094                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1095         }
1096
1097         if (!tech_holder) {
1098                 ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
1099                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
1100                 goto exit_cleanup;
1101         }
1102
1103         /*
1104          * The message lock is held here to safely allow the technology
1105          * implementation to access the message fields without worrying
1106          * that they could change.
1107          */
1108         ao2_lock(msg);
1109         ast_rwlock_rdlock(&tech_holder->tech_lock);
1110         if (tech_holder->tech) {
1111                 res = tech_holder->tech->msg_send(msg, S_OR(args.to, ""),
1112                                                         S_OR(args.from, ""));
1113         }
1114         ast_rwlock_unlock(&tech_holder->tech_lock);
1115         ao2_unlock(msg);
1116
1117         pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
1118
1119 exit_cleanup:
1120         if (tech_holder) {
1121                 ao2_ref(tech_holder, -1);
1122                 tech_holder = NULL;
1123         }
1124
1125         ao2_ref(msg, -1);
1126
1127         return 0;
1128 }
1129
1130 static int action_messagesend(struct mansession *s, const struct message *m)
1131 {
1132         const char *to = ast_strdupa(astman_get_header(m, "To"));
1133         const char *from = astman_get_header(m, "From");
1134         const char *body = astman_get_header(m, "Body");
1135         const char *base64body = astman_get_header(m, "Base64Body");
1136         char base64decoded[1301] = { 0, };
1137         char *tech_name = NULL;
1138         struct ast_variable *vars = NULL;
1139         struct ast_variable *data = NULL;
1140         struct ast_msg_tech_holder *tech_holder = NULL;
1141         struct ast_msg *msg;
1142         int res = -1;
1143
1144         if (ast_strlen_zero(to)) {
1145                 astman_send_error(s, m, "No 'To' address specified.");
1146                 return -1;
1147         }
1148
1149         if (!ast_strlen_zero(base64body)) {
1150                 ast_base64decode((unsigned char *) base64decoded, base64body, sizeof(base64decoded) - 1);
1151                 body = base64decoded;
1152         }
1153
1154         tech_name = ast_strdupa(to);
1155         tech_name = strsep(&tech_name, ":");
1156         {
1157                 struct ast_msg_tech tmp_msg_tech = {
1158                         .name = tech_name,
1159                 };
1160                 struct ast_msg_tech_holder tmp_tech_holder = {
1161                         .tech = &tmp_msg_tech,
1162                 };
1163
1164                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1165         }
1166
1167         if (!tech_holder) {
1168                 astman_send_error(s, m, "Message technology not found.");
1169                 return -1;
1170         }
1171
1172         if (!(msg = ast_msg_alloc())) {
1173                 ao2_ref(tech_holder, -1);
1174                 astman_send_error(s, m, "Internal failure\n");
1175                 return -1;
1176         }
1177
1178         data = astman_get_variables(m);
1179         for (vars = data; vars; vars = vars->next) {
1180                 ast_msg_set_var_outbound(msg, vars->name, vars->value);
1181         }
1182
1183         ast_msg_set_body(msg, "%s", body);
1184
1185         ast_rwlock_rdlock(&tech_holder->tech_lock);
1186         if (tech_holder->tech) {
1187                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1188         }
1189         ast_rwlock_unlock(&tech_holder->tech_lock);
1190
1191         ast_variables_destroy(vars);
1192         ao2_ref(tech_holder, -1);
1193         ao2_ref(msg, -1);
1194
1195         if (res) {
1196                 astman_send_error(s, m, "Message failed to send.");
1197         } else {
1198                 astman_send_ack(s, m, "Message successfully sent");
1199         }
1200         return res;
1201 }
1202
1203 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
1204 {
1205         char *tech_name = NULL;
1206         struct ast_msg_tech_holder *tech_holder = NULL;
1207         int res = -1;
1208
1209         if (ast_strlen_zero(to)) {
1210                 ao2_ref(msg, -1);
1211                 return -1;
1212         }
1213
1214         tech_name = ast_strdupa(to);
1215         tech_name = strsep(&tech_name, ":");
1216         {
1217                 struct ast_msg_tech tmp_msg_tech = {
1218                         .name = tech_name,
1219                 };
1220                 struct ast_msg_tech_holder tmp_tech_holder = {
1221                         .tech = &tmp_msg_tech,
1222                 };
1223
1224                 tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER);
1225         }
1226
1227         if (!tech_holder) {
1228                 ao2_ref(msg, -1);
1229                 return -1;
1230         }
1231
1232         ast_rwlock_rdlock(&tech_holder->tech_lock);
1233         if (tech_holder->tech) {
1234                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1235         }
1236         ast_rwlock_unlock(&tech_holder->tech_lock);
1237
1238         ao2_ref(tech_holder, -1);
1239         ao2_ref(msg, -1);
1240
1241         return res;
1242 }
1243
1244 int ast_msg_tech_register(const struct ast_msg_tech *tech)
1245 {
1246         struct ast_msg_tech_holder tmp_tech_holder = {
1247                 .tech = tech,
1248         };
1249         struct ast_msg_tech_holder *tech_holder;
1250
1251         if ((tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER))) {
1252                 ao2_ref(tech_holder, -1);
1253                 ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
1254                                 tech->name);
1255                 return -1;
1256         }
1257
1258         if (!(tech_holder = ao2_alloc(sizeof(*tech_holder), NULL))) {
1259                 return -1;
1260         }
1261
1262         ast_rwlock_init(&tech_holder->tech_lock);
1263         tech_holder->tech = tech;
1264
1265         ao2_link(msg_techs, tech_holder);
1266
1267         ao2_ref(tech_holder, -1);
1268         tech_holder = NULL;
1269
1270         ast_verb(3, "Message technology handler '%s' registered.\n", tech->name);
1271
1272         return 0;
1273 }
1274
1275 int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
1276 {
1277         struct ast_msg_tech_holder tmp_tech_holder = {
1278                 .tech = tech,
1279         };
1280         struct ast_msg_tech_holder *tech_holder;
1281
1282         tech_holder = ao2_find(msg_techs, &tmp_tech_holder, OBJ_POINTER | OBJ_UNLINK);
1283
1284         if (!tech_holder) {
1285                 ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
1286                 return -1;
1287         }
1288
1289         ast_rwlock_wrlock(&tech_holder->tech_lock);
1290         tech_holder->tech = NULL;
1291         ast_rwlock_unlock(&tech_holder->tech_lock);
1292
1293         ao2_ref(tech_holder, -1);
1294         tech_holder = NULL;
1295
1296         ast_verb(3, "Message technology handler '%s' unregistered.\n", tech->name);
1297
1298         return 0;
1299 }
1300
1301 /*
1302  * \internal
1303  * \brief Initialize stuff during Asterisk startup.
1304  *
1305  * Cleanup isn't a big deal in this function.  If we return non-zero,
1306  * Asterisk is going to exit.
1307  *
1308  * \retval 0 success
1309  * \retval non-zero failure
1310  */
1311 int ast_msg_init(void)
1312 {
1313         int res;
1314
1315         msg_q_tp = ast_taskprocessor_get("ast_msg_queue", TPS_REF_DEFAULT);
1316         if (!msg_q_tp) {
1317                 return -1;
1318         }
1319
1320         msg_techs = ao2_container_alloc(17, msg_tech_hash, msg_tech_cmp);
1321         if (!msg_techs) {
1322                 return -1;
1323         }
1324
1325         res = __ast_custom_function_register(&msg_function, NULL);
1326         res |= __ast_custom_function_register(&msg_data_function, NULL);
1327         res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
1328         res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
1329
1330         return res;
1331 }
1332
1333 void ast_msg_shutdown(void)
1334 {
1335         msg_q_tp = ast_taskprocessor_unreference(msg_q_tp);
1336 }