Fix DEBUG_THREADS when lock is acquired in __constructor__
[asterisk/asterisk.git] / main / message.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2010, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Out-of-call text message support
22  *
23  * \author Russell Bryant <russell@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/_private.h"
35
36 #include "asterisk/module.h"
37 #include "asterisk/datastore.h"
38 #include "asterisk/pbx.h"
39 #include "asterisk/manager.h"
40 #include "asterisk/strings.h"
41 #include "asterisk/astobj2.h"
42 #include "asterisk/app.h"
43 #include "asterisk/taskprocessor.h"
44 #include "asterisk/message.h"
45
46 /*** DOCUMENTATION
47         <function name="MESSAGE" language="en_US">
48                 <synopsis>
49                         Create a message or read fields from a message.
50                 </synopsis>
51                 <syntax argsep="/">
52                         <parameter name="argument" required="true">
53                         <para>Field of the message to get or set.</para>
54                         <enumlist>
55                                 <enum name="to">
56                                         <para>Read-only.  The destination of the message.  When processing an
57                                         incoming message, this will be set to the destination listed as
58                                         the recipient of the message that was received by Asterisk.</para>
59                                 </enum>
60                                 <enum name="from">
61                                         <para>Read-only.  The source of the message.  When processing an
62                                         incoming message, this will be set to the source of the message.</para>
63                                 </enum>
64                                 <enum name="custom_data">
65                                         <para>Write-only.  Mark or unmark all message headers for an outgoing
66                                         message.  The following values can be set:</para>
67                                         <enumlist>
68                                                 <enum name="mark_all_outbound">
69                                                         <para>Mark all headers for an outgoing message.</para>
70                                                 </enum>
71                                                 <enum name="clear_all_outbound">
72                                                         <para>Unmark all headers for an outgoing message.</para>
73                                                 </enum>
74                                         </enumlist>
75                                 </enum>
76                                 <enum name="body">
77                                         <para>Read/Write.  The message body.  When processing an incoming
78                                         message, this includes the body of the message that Asterisk
79                                         received.  When MessageSend() is executed, the contents of this
80                                         field are used as the body of the outgoing message.  The body
81                                         will always be UTF-8.</para>
82                                 </enum>
83                         </enumlist>
84                         </parameter>
85                 </syntax>
86                 <description>
87                         <para>This function will read from or write a value to a text message.
88                         It is used both to read the data out of an incoming message, as well as
89                         modify or create a message that will be sent outbound.</para>
90                 </description>
91                 <see-also>
92                         <ref type="application">MessageSend</ref>
93                 </see-also>
94         </function>
95         <function name="MESSAGE_DATA" language="en_US">
96                 <synopsis>
97                         Read or write custom data attached to a message.
98                 </synopsis>
99                 <syntax argsep="/">
100                         <parameter name="argument" required="true">
101                         <para>Field of the message to get or set.</para>
102                         </parameter>
103                 </syntax>
104                 <description>
105                         <para>This function will read from or write a value to a text message.
106                         It is used both to read the data out of an incoming message, as well as
107                         modify a message that will be sent outbound.</para>
108                         <note>
109                                 <para>If you want to set an outbound message to carry data in the
110                                 current message, do
111                                 Set(MESSAGE_DATA(<replaceable>key</replaceable>)=${MESSAGE_DATA(<replaceable>key</replaceable>)}).</para>
112                         </note>
113                 </description>
114                 <see-also>
115                         <ref type="application">MessageSend</ref>
116                 </see-also>
117         </function>
118         <application name="MessageSend" language="en_US">
119                 <synopsis>
120                         Send a text message.
121                 </synopsis>
122                 <syntax>
123                         <parameter name="to" required="true">
124                                 <para>A To URI for the message.</para>
125                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
126                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
127                         </parameter>
128                         <parameter name="from" required="false">
129                                 <para>A From URI for the message if needed for the
130                                 message technology being used to send this message.</para>
131                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
132                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageFromInfo'])" />
133                         </parameter>
134                 </syntax>
135                 <description>
136                         <para>Send a text message.  The body of the message that will be
137                         sent is what is currently set to <literal>MESSAGE(body)</literal>.
138                           The technology chosen for sending the message is determined
139                         based on a prefix to the <literal>to</literal> parameter.</para>
140                         <para>This application sets the following channel variables:</para>
141                         <variablelist>
142                                 <variable name="MESSAGE_SEND_STATUS">
143                                         <para>This is the message delivery status returned by this application.</para>
144                                         <value name="INVALID_PROTOCOL">
145                                                 No handler for the technology part of the URI was found.
146                                         </value>
147                                         <value name="INVALID_URI">
148                                                 The protocol handler reported that the URI was not valid.
149                                         </value>
150                                         <value name="SUCCESS">
151                                                 Successfully passed on to the protocol handler, but delivery has not necessarily been guaranteed.
152                                         </value>
153                                         <value name="FAILURE">
154                                                 The protocol handler reported that it was unabled to deliver the message for some reason.
155                                         </value>
156                                 </variable>
157                         </variablelist>
158                 </description>
159         </application>
160         <manager name="MessageSend" language="en_US">
161                 <synopsis>
162                         Send an out of call message to an endpoint.
163                 </synopsis>
164                 <syntax>
165                         <xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
166                         <parameter name="To" required="true">
167                                 <para>The URI the message is to be sent to.</para>
168                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
169                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
170                         </parameter>
171                         <parameter name="From">
172                                 <para>A From URI for the message if needed for the
173                                 message technology being used to send this message.</para>
174                                 <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
175                                 <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageFromInfo'])" />
176                         </parameter>
177                         <parameter name="Body">
178                                 <para>The message body text.  This must not contain any newlines as that
179                                 conflicts with the AMI protocol.</para>
180                         </parameter>
181                         <parameter name="Base64Body">
182                                 <para>Text bodies requiring the use of newlines have to be base64 encoded
183                                 in this field.  Base64Body will be decoded before being sent out.
184                                 Base64Body takes precedence over Body.</para>
185                         </parameter>
186                         <parameter name="Variable">
187                                 <para>Message variable to set, multiple Variable: headers are
188                                 allowed.  The header value is a comma separated list of
189                                 name=value pairs.</para>
190                         </parameter>
191                 </syntax>
192         </manager>
193  ***/
194
195 struct msg_data {
196         AST_DECLARE_STRING_FIELDS(
197                 AST_STRING_FIELD(name);
198                 AST_STRING_FIELD(value);
199         );
200         unsigned int send:1; /* Whether to send out on outbound messages */
201 };
202
203 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
204
205 /*!
206  * \brief A message.
207  *
208  * \todo Consider whether stringfields would be an appropriate optimization here.
209  */
210 struct ast_msg {
211         struct ast_str *to;
212         struct ast_str *from;
213         struct ast_str *body;
214         struct ast_str *context;
215         struct ast_str *exten;
216         struct ao2_container *vars;
217 };
218
219 struct ast_msg_tech_holder {
220         const struct ast_msg_tech *tech;
221         /*!
222          * \brief A rwlock for this object
223          *
224          * a read/write lock must be used to protect the wrapper instead
225          * of the ao2 lock. A rdlock must be held to read tech_holder->tech.
226          */
227         ast_rwlock_t tech_lock;
228 };
229
230 static struct ao2_container *msg_techs;
231
232 static struct ast_taskprocessor *msg_q_tp;
233
234 static const char app_msg_send[] = "MessageSend";
235
236 static void msg_ds_destroy(void *data);
237
238 static const struct ast_datastore_info msg_datastore = {
239         .type = "message",
240         .destroy = msg_ds_destroy,
241 };
242
243 static int msg_func_read(struct ast_channel *chan, const char *function,
244                 char *data, char *buf, size_t len);
245 static int msg_func_write(struct ast_channel *chan, const char *function,
246                 char *data, const char *value);
247
248 static struct ast_custom_function msg_function = {
249         .name = "MESSAGE",
250         .read = msg_func_read,
251         .write = msg_func_write,
252 };
253
254 static int msg_data_func_read(struct ast_channel *chan, const char *function,
255                 char *data, char *buf, size_t len);
256 static int msg_data_func_write(struct ast_channel *chan, const char *function,
257                 char *data, const char *value);
258
259 static struct ast_custom_function msg_data_function = {
260         .name = "MESSAGE_DATA",
261         .read = msg_data_func_read,
262         .write = msg_data_func_write,
263 };
264
265 static struct ast_frame *chan_msg_read(struct ast_channel *chan);
266 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr);
267 static int chan_msg_indicate(struct ast_channel *chan, int condition,
268                 const void *data, size_t datalen);
269 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit);
270 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
271                 unsigned int duration);
272
273 /*!
274  * \internal
275  * \brief A bare minimum channel technology
276  *
277  * This will not be registered as we never want anything to try
278  * to create Message channels other than internally in this file.
279  */
280 static const struct ast_channel_tech msg_chan_tech_hack = {
281         .type             = "Message",
282         .description      = "Internal Text Message Processing",
283         .read             = chan_msg_read,
284         .write            = chan_msg_write,
285         .indicate         = chan_msg_indicate,
286         .send_digit_begin = chan_msg_send_digit_begin,
287         .send_digit_end   = chan_msg_send_digit_end,
288 };
289
290 /*!
291  * \internal
292  * \brief ast_channel_tech read callback
293  *
294  * This should never be called.  However, we say that about chan_iax2's
295  * read callback, too, and it seems to randomly get called for some
296  * reason.  If it does, a simple NULL frame will suffice.
297  */
298 static struct ast_frame *chan_msg_read(struct ast_channel *chan)
299 {
300         return &ast_null_frame;
301 }
302
303 /*!
304  * \internal
305  * \brief ast_channel_tech write callback
306  *
307  * Throw all frames away.  We don't care about any of them.
308  */
309 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr)
310 {
311         return 0;
312 }
313
314 /*!
315  * \internal
316  * \brief ast_channel_tech indicate callback
317  *
318  * The indicate callback is here just so it can return success.
319  * We don't want any callers of ast_indicate() to think something
320  * has failed.  We also don't want ast_indicate() itself to try
321  * to generate inband tones since we didn't tell it that we took
322  * care of it ourselves.
323  */
324 static int chan_msg_indicate(struct ast_channel *chan, int condition,
325                 const void *data, size_t datalen)
326 {
327         return 0;
328 }
329
330 /*!
331  * \internal
332  * \brief ast_channel_tech send_digit_begin callback
333  *
334  * This is here so that just in case a digit comes at a message channel
335  * that the Asterisk core doesn't waste any time trying to generate
336  * inband DTMF in audio.  It's a waste of resources.
337  */
338 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit)
339 {
340         return 0;
341 }
342
343 /*!
344  * \internal
345  * \brief ast_channel_tech send_digit_end callback
346  *
347  * This is here so that just in case a digit comes at a message channel
348  * that the Asterisk core doesn't waste any time trying to generate
349  * inband DTMF in audio.  It's a waste of resources.
350  */
351 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
352                 unsigned int duration)
353 {
354         return 0;
355 }
356
357 static void msg_ds_destroy(void *data)
358 {
359         struct ast_msg *msg = data;
360
361         ao2_ref(msg, -1);
362 }
363
364 static int msg_data_hash_fn(const void *obj, const int flags)
365 {
366         const struct msg_data *data = obj;
367         return ast_str_case_hash(data->name);
368 }
369
370 static int msg_data_cmp_fn(void *obj, void *arg, int flags)
371 {
372         const struct msg_data *one = obj, *two = arg;
373         return !strcasecmp(one->name, two->name) ? CMP_MATCH | CMP_STOP : 0;
374 }
375
376 static void msg_data_destructor(void *obj)
377 {
378         struct msg_data *data = obj;
379         ast_string_field_free_memory(data);
380 }
381
382 static void msg_destructor(void *obj)
383 {
384         struct ast_msg *msg = obj;
385
386         ast_free(msg->to);
387         msg->to = NULL;
388
389         ast_free(msg->from);
390         msg->from = NULL;
391
392         ast_free(msg->body);
393         msg->body = NULL;
394
395         ast_free(msg->context);
396         msg->context = NULL;
397
398         ast_free(msg->exten);
399         msg->exten = NULL;
400
401         ao2_ref(msg->vars, -1);
402 }
403
404 struct ast_msg *ast_msg_alloc(void)
405 {
406         struct ast_msg *msg;
407
408         if (!(msg = ao2_alloc(sizeof(*msg), msg_destructor))) {
409                 return NULL;
410         }
411
412         if (!(msg->to = ast_str_create(32))) {
413                 ao2_ref(msg, -1);
414                 return NULL;
415         }
416
417         if (!(msg->from = ast_str_create(32))) {
418                 ao2_ref(msg, -1);
419                 return NULL;
420         }
421
422         if (!(msg->body = ast_str_create(128))) {
423                 ao2_ref(msg, -1);
424                 return NULL;
425         }
426
427         if (!(msg->context = ast_str_create(16))) {
428                 ao2_ref(msg, -1);
429                 return NULL;
430         }
431
432         if (!(msg->exten = ast_str_create(16))) {
433                 ao2_ref(msg, -1);
434                 return NULL;
435         }
436
437         if (!(msg->vars = ao2_container_alloc(1, msg_data_hash_fn, msg_data_cmp_fn))) {
438                 ao2_ref(msg, -1);
439                 return NULL;
440         }
441
442         ast_str_set(&msg->context, 0, "default");
443
444         return msg;
445 }
446
447 struct ast_msg *ast_msg_ref(struct ast_msg *msg)
448 {
449         ao2_ref(msg, 1);
450         return msg;
451 }
452
453 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
454 {
455         ao2_ref(msg, -1);
456
457         return NULL;
458 }
459
460 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
461 {
462         va_list ap;
463         int res;
464
465         va_start(ap, fmt);
466         res = ast_str_set_va(&msg->to, 0, fmt, ap);
467         va_end(ap);
468
469         return res < 0 ? -1 : 0;
470 }
471
472 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
473 {
474         va_list ap;
475         int res;
476
477         va_start(ap, fmt);
478         res = ast_str_set_va(&msg->from, 0, fmt, ap);
479         va_end(ap);
480
481         return res < 0 ? -1 : 0;
482 }
483
484 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
485 {
486         va_list ap;
487         int res;
488
489         va_start(ap, fmt);
490         res = ast_str_set_va(&msg->body, 0, fmt, ap);
491         va_end(ap);
492
493         return res < 0 ? -1 : 0;
494 }
495
496 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
497 {
498         va_list ap;
499         int res;
500
501         va_start(ap, fmt);
502         res = ast_str_set_va(&msg->context, 0, fmt, ap);
503         va_end(ap);
504
505         return res < 0 ? -1 : 0;
506 }
507
508 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
509 {
510         va_list ap;
511         int res;
512
513         va_start(ap, fmt);
514         res = ast_str_set_va(&msg->exten, 0, fmt, ap);
515         va_end(ap);
516
517         return res < 0 ? -1 : 0;
518 }
519
520 const char *ast_msg_get_body(const struct ast_msg *msg)
521 {
522         return ast_str_buffer(msg->body);
523 }
524
525 static struct msg_data *msg_data_alloc(void)
526 {
527         struct msg_data *data;
528
529         if (!(data = ao2_alloc(sizeof(*data), msg_data_destructor))) {
530                 return NULL;
531         }
532
533         if (ast_string_field_init(data, 32)) {
534                 ao2_ref(data, -1);
535                 return NULL;
536         }
537
538         return data;
539 }
540
541 static struct msg_data *msg_data_find(struct ao2_container *vars, const char *name)
542 {
543         struct msg_data tmp = {
544                 .name = name,
545         };
546         return ao2_find(vars, &tmp, OBJ_POINTER);
547 }
548
549 static int msg_set_var_full(struct ast_msg *msg, const char *name, const char *value, unsigned int outbound)
550 {
551         struct msg_data *data;
552
553         if (!(data = msg_data_find(msg->vars, name))) {
554                 if (ast_strlen_zero(value)) {
555                         return 0;
556                 }
557                 if (!(data = msg_data_alloc())) {
558                         return -1;
559                 };
560
561                 ast_string_field_set(data, name, name);
562                 ast_string_field_set(data, value, value);
563                 data->send = outbound;
564                 ao2_link(msg->vars, data);
565         } else {
566                 if (ast_strlen_zero(value)) {
567                         ao2_unlink(msg->vars, data);
568                 } else {
569                         ast_string_field_set(data, value, value);
570                         data->send = outbound;
571                 }
572         }
573
574         ao2_ref(data, -1);
575
576         return 0;
577 }
578
579 int ast_msg_set_var_outbound(struct ast_msg *msg, const char *name, const char *value)
580 {
581         return msg_set_var_full(msg, name, value, 1);
582 }
583
584 int ast_msg_set_var(struct ast_msg *msg, const char *name, const char *value)
585 {
586         return msg_set_var_full(msg, name, value, 0);
587 }
588
589 const char *ast_msg_get_var(struct ast_msg *msg, const char *name)
590 {
591         struct msg_data *data;
592         const char *val = NULL;
593
594         if (!(data = msg_data_find(msg->vars, name))) {
595                 return NULL;
596         }
597
598         /* Yep, this definitely looks like val would be a dangling pointer
599          * after the ref count is decremented.  As long as the message structure
600          * is used in a thread safe manner, this will not be the case though.
601          * The ast_msg holds a reference to this object in the msg->vars container. */
602         val = data->value;
603         ao2_ref(data, -1);
604
605         return val;
606 }
607
608 struct ast_msg_var_iterator {
609         struct ao2_iterator i;
610         struct msg_data *current_used;
611 };
612
613 struct ast_msg_var_iterator *ast_msg_var_iterator_init(const struct ast_msg *msg)
614 {
615         struct ast_msg_var_iterator *i;
616         if (!(i = ast_calloc(1, sizeof(*i)))) {
617                 return NULL;
618         }
619
620         i->i = ao2_iterator_init(msg->vars, 0);
621
622         return i;
623 }
624
625 int ast_msg_var_iterator_next(const struct ast_msg *msg, struct ast_msg_var_iterator *i, const char **name, const char **value)
626 {
627         struct msg_data *data;
628
629         /* Skip any that aren't marked for sending out */
630         while ((data = ao2_iterator_next(&i->i)) && !data->send) {
631                 ao2_ref(data, -1);
632         }
633
634         if (!data) {
635                 return 0;
636         }
637
638         if (data->send) {
639                 *name = data->name;
640                 *value = data->value;
641         }
642
643         /* Leave the refcount to be cleaned up by the caller with
644          * ast_msg_var_unref_current after they finish with the pointers to the data */
645         i->current_used = data;
646
647         return 1;
648 }
649
650 void ast_msg_var_unref_current(struct ast_msg_var_iterator *i) {
651         if (i->current_used) {
652                 ao2_ref(i->current_used, -1);
653         }
654         i->current_used = NULL;
655 }
656
657 void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *i)
658 {
659         ao2_iterator_destroy(&i->i);
660         ast_free(i);
661 }
662
663 static struct ast_channel *create_msg_q_chan(void)
664 {
665         struct ast_channel *chan;
666         struct ast_datastore *ds;
667
668         chan = ast_channel_alloc(1, AST_STATE_UP,
669                         NULL, NULL, NULL,
670                         NULL, NULL, NULL, 0,
671                         "%s", "Message/ast_msg_queue");
672
673         if (!chan) {
674                 return NULL;
675         }
676
677         ast_channel_unlink(chan);
678
679         ast_channel_tech_set(chan, &msg_chan_tech_hack);
680
681         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
682                 ast_hangup(chan);
683                 return NULL;
684         }
685
686         ast_channel_lock(chan);
687         ast_channel_datastore_add(chan, ds);
688         ast_channel_unlock(chan);
689
690         return chan;
691 }
692
693 /*!
694  * \internal
695  * \brief Run the dialplan for message processing
696  *
697  * \pre The message has already been set up on the msg datastore
698  *      on this channel.
699  */
700 static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
701 {
702         struct ast_pbx_args pbx_args;
703
704         ast_explicit_goto(chan, ast_str_buffer(msg->context), AS_OR(msg->exten, "s"), 1);
705
706         memset(&pbx_args, 0, sizeof(pbx_args));
707         pbx_args.no_hangup_chan = 1,
708         ast_pbx_run_args(chan, &pbx_args);
709 }
710
711 /*!
712  * \internal
713  * \brief Clean up ast_channel after each message
714  *
715  * Reset various bits of state after routing each message so the same ast_channel
716  * can just be reused.
717  */
718 static void chan_cleanup(struct ast_channel *chan)
719 {
720         struct ast_datastore *msg_ds, *ds;
721         struct varshead *headp;
722         struct ast_var_t *vardata;
723
724         ast_channel_lock(chan);
725
726         /*
727          * Remove the msg datastore.  Free its data but keep around the datastore
728          * object and just reuse it.
729          */
730         if ((msg_ds = ast_channel_datastore_find(chan, &msg_datastore, NULL)) && msg_ds->data) {
731                 ast_channel_datastore_remove(chan, msg_ds);
732                 ao2_ref(msg_ds->data, -1);
733                 msg_ds->data = NULL;
734         }
735
736         /*
737          * Destroy all other datastores.
738          */
739         while ((ds = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) {
740                 ast_datastore_free(ds);
741         }
742
743         /*
744          * Destroy all channel variables.
745          */
746         headp = ast_channel_varshead(chan);
747         while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries))) {
748                 ast_var_delete(vardata);
749         }
750
751         /*
752          * Restore msg datastore.
753          */
754         if (msg_ds) {
755                 ast_channel_datastore_add(chan, msg_ds);
756         }
757         /*
758          * Clear softhangup flags.
759          */
760         ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ALL);
761
762         ast_channel_unlock(chan);
763 }
764
765 static void destroy_msg_q_chan(void *data)
766 {
767         struct ast_channel **chan = data;
768
769         if (!*chan) {
770                 return;
771         }
772
773         ast_channel_release(*chan);
774 }
775
776 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
777
778 /*!
779  * \internal
780  * \brief Message queue task processor callback
781  *
782  * \retval 0 success
783  * \retval -1 failure
784  *
785  * \note Even though this returns a value, the taskprocessor code ignores the value.
786  */
787 static int msg_q_cb(void *data)
788 {
789         struct ast_msg *msg = data;
790         struct ast_channel **chan_p, *chan;
791         struct ast_datastore *ds;
792
793         if (!(chan_p = ast_threadstorage_get(&msg_q_chan, sizeof(struct ast_channel *)))) {
794                 return -1;
795         }
796         if (!*chan_p) {
797                 if (!(*chan_p = create_msg_q_chan())) {
798                         return -1;
799                 }
800         }
801         chan = *chan_p;
802
803         ast_channel_lock(chan);
804         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
805                 ast_channel_unlock(chan);
806                 return -1;
807         }
808         ao2_ref(msg, +1);
809         ds->data = msg;
810         ast_channel_unlock(chan);
811
812         msg_route(chan, msg);
813         chan_cleanup(chan);
814
815         ao2_ref(msg, -1);
816
817         return 0;
818 }
819
820 int ast_msg_queue(struct ast_msg *msg)
821 {
822         int res;
823
824         res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
825         if (res == -1) {
826                 ao2_ref(msg, -1);
827         }
828
829         return res;
830 }
831
832 /*!
833  * \internal
834  * \brief Find or create a message datastore on a channel
835  *
836  * \pre chan is locked
837  *
838  * \param chan the relevant channel
839  *
840  * \return the channel's message datastore, or NULL on error
841  */
842 static struct ast_datastore *msg_datastore_find_or_create(struct ast_channel *chan)
843 {
844         struct ast_datastore *ds;
845
846         if ((ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
847                 return ds;
848         }
849
850         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
851                 return NULL;
852         }
853
854         if (!(ds->data = ast_msg_alloc())) {
855                 ast_datastore_free(ds);
856                 return NULL;
857         }
858
859         ast_channel_datastore_add(chan, ds);
860
861         return ds;
862 }
863
864 static int msg_func_read(struct ast_channel *chan, const char *function,
865                 char *data, char *buf, size_t len)
866 {
867         struct ast_datastore *ds;
868         struct ast_msg *msg;
869
870         ast_channel_lock(chan);
871
872         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
873                 ast_channel_unlock(chan);
874                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
875                 return -1;
876         }
877
878         msg = ds->data;
879         ao2_ref(msg, +1);
880         ast_channel_unlock(chan);
881
882         ao2_lock(msg);
883
884         if (!strcasecmp(data, "to")) {
885                 ast_copy_string(buf, ast_str_buffer(msg->to), len);
886         } else if (!strcasecmp(data, "from")) {
887                 ast_copy_string(buf, ast_str_buffer(msg->from), len);
888         } else if (!strcasecmp(data, "body")) {
889                 ast_copy_string(buf, ast_msg_get_body(msg), len);
890         } else {
891                 ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
892         }
893
894         ao2_unlock(msg);
895         ao2_ref(msg, -1);
896
897         return 0;
898 }
899
900 static int msg_func_write(struct ast_channel *chan, const char *function,
901                 char *data, const char *value)
902 {
903         struct ast_datastore *ds;
904         struct ast_msg *msg;
905
906         ast_channel_lock(chan);
907
908         if (!(ds = msg_datastore_find_or_create(chan))) {
909                 ast_channel_unlock(chan);
910                 return -1;
911         }
912
913         msg = ds->data;
914         ao2_ref(msg, +1);
915         ast_channel_unlock(chan);
916
917         ao2_lock(msg);
918
919         if (!strcasecmp(data, "to")) {
920                 ast_msg_set_to(msg, "%s", value);
921         } else if (!strcasecmp(data, "from")) {
922                 ast_msg_set_from(msg, "%s", value);
923         } else if (!strcasecmp(data, "body")) {
924                 ast_msg_set_body(msg, "%s", value);
925         } else if (!strcasecmp(data, "custom_data")) {
926                 int outbound = -1;
927                 if (!strcasecmp(value, "mark_all_outbound")) {
928                         outbound = 1;
929                 } else if (!strcasecmp(value, "clear_all_outbound")) {
930                         outbound = 0;
931                 } else {
932                         ast_log(LOG_WARNING, "'%s' is not a valid value for custom_data\n", value);
933                 }
934
935                 if (outbound != -1) {
936                         struct msg_data *hdr_data;
937                         struct ao2_iterator iter = ao2_iterator_init(msg->vars, 0);
938
939                         while ((hdr_data = ao2_iterator_next(&iter))) {
940                                 hdr_data->send = outbound;
941                                 ao2_ref(hdr_data, -1);
942                         }
943                         ao2_iterator_destroy(&iter);
944                 }
945         } else {
946                 ast_log(LOG_WARNING, "'%s' is not a valid write argument.\n", data);
947         }
948
949         ao2_unlock(msg);
950         ao2_ref(msg, -1);
951
952         return 0;
953 }
954
955 static int msg_data_func_read(struct ast_channel *chan, const char *function,
956                 char *data, char *buf, size_t len)
957 {
958         struct ast_datastore *ds;
959         struct ast_msg *msg;
960         const char *val;
961
962         ast_channel_lock(chan);
963
964         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
965                 ast_channel_unlock(chan);
966                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
967                 return -1;
968         }
969
970         msg = ds->data;
971         ao2_ref(msg, +1);
972         ast_channel_unlock(chan);
973
974         ao2_lock(msg);
975
976         if ((val = ast_msg_get_var(msg, data))) {
977                 ast_copy_string(buf, val, len);
978         }
979
980         ao2_unlock(msg);
981         ao2_ref(msg, -1);
982
983         return 0;
984 }
985
986 static int msg_data_func_write(struct ast_channel *chan, const char *function,
987                 char *data, const char *value)
988 {
989         struct ast_datastore *ds;
990         struct ast_msg *msg;
991
992         ast_channel_lock(chan);
993
994         if (!(ds = msg_datastore_find_or_create(chan))) {
995                 ast_channel_unlock(chan);
996                 return -1;
997         }
998
999         msg = ds->data;
1000         ao2_ref(msg, +1);
1001         ast_channel_unlock(chan);
1002
1003         ao2_lock(msg);
1004
1005         ast_msg_set_var_outbound(msg, data, value);
1006
1007         ao2_unlock(msg);
1008         ao2_ref(msg, -1);
1009
1010         return 0;
1011 }
1012 static int msg_tech_hash(const void *obj, const int flags)
1013 {
1014         struct ast_msg_tech_holder *tech_holder = (struct ast_msg_tech_holder *) obj;
1015         int res = 0;
1016
1017         ast_rwlock_rdlock(&tech_holder->tech_lock);
1018         if (tech_holder->tech) {
1019                 res = ast_str_case_hash(tech_holder->tech->name);
1020         }
1021         ast_rwlock_unlock(&tech_holder->tech_lock);
1022
1023         return res;
1024 }
1025
1026 static int msg_tech_cmp(void *obj, void *arg, int flags)
1027 {
1028         struct ast_msg_tech_holder *tech_holder = obj;
1029         const struct ast_msg_tech_holder *tech_holder2 = arg;
1030         int res = 1;
1031
1032         ast_rwlock_rdlock(&tech_holder->tech_lock);
1033         /*
1034          * tech_holder2 is a temporary fake tech_holder.
1035          */
1036         if (tech_holder->tech) {
1037                 res = strcasecmp(tech_holder->tech->name, tech_holder2->tech->name) ? 0 : CMP_MATCH | CMP_STOP;
1038         }
1039         ast_rwlock_unlock(&tech_holder->tech_lock);
1040
1041         return res;
1042 }
1043
1044 static struct ast_msg_tech_holder *msg_find_by_tech(const struct ast_msg_tech *msg_tech, int ao2_flags)
1045 {
1046         struct ast_msg_tech_holder *tech_holder;
1047         struct ast_msg_tech_holder tmp_tech_holder = {
1048                 .tech = msg_tech,
1049         };
1050
1051         ast_rwlock_init(&tmp_tech_holder.tech_lock);
1052         tech_holder = ao2_find(msg_techs, &tmp_tech_holder, ao2_flags);
1053         ast_rwlock_destroy(&tmp_tech_holder.tech_lock);
1054         return tech_holder;
1055 }
1056
1057 static struct ast_msg_tech_holder *msg_find_by_tech_name(const char *tech_name, int ao2_flags)
1058 {
1059         struct ast_msg_tech tmp_msg_tech = {
1060                 .name = tech_name,
1061         };
1062         return msg_find_by_tech(&tmp_msg_tech, ao2_flags);
1063 }
1064
1065 /*!
1066  * \internal
1067  * \brief MessageSend() application
1068  */
1069 static int msg_send_exec(struct ast_channel *chan, const char *data)
1070 {
1071         struct ast_datastore *ds;
1072         struct ast_msg *msg;
1073         char *tech_name;
1074         struct ast_msg_tech_holder *tech_holder = NULL;
1075         char *parse;
1076         int res = -1;
1077         AST_DECLARE_APP_ARGS(args,
1078                 AST_APP_ARG(to);
1079                 AST_APP_ARG(from);
1080         );
1081
1082         if (ast_strlen_zero(data)) {
1083                 ast_log(LOG_WARNING, "An argument is required to MessageSend()\n");
1084                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1085                 return 0;
1086         }
1087
1088         parse = ast_strdupa(data);
1089         AST_STANDARD_APP_ARGS(args, parse);
1090
1091         if (ast_strlen_zero(args.to)) {
1092                 ast_log(LOG_WARNING, "A 'to' URI is required for MessageSend()\n");
1093                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1094                 return 0;
1095         }
1096
1097         ast_channel_lock(chan);
1098
1099         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
1100                 ast_channel_unlock(chan);
1101                 ast_log(LOG_WARNING, "No message data found on channel to send.\n");
1102                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "FAILURE");
1103                 return 0;
1104         }
1105
1106         msg = ds->data;
1107         ao2_ref(msg, +1);
1108         ast_channel_unlock(chan);
1109
1110         tech_name = ast_strdupa(args.to);
1111         tech_name = strsep(&tech_name, ":");
1112
1113         tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
1114
1115         if (!tech_holder) {
1116                 ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
1117                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
1118                 goto exit_cleanup;
1119         }
1120
1121         /*
1122          * The message lock is held here to safely allow the technology
1123          * implementation to access the message fields without worrying
1124          * that they could change.
1125          */
1126         ao2_lock(msg);
1127         ast_rwlock_rdlock(&tech_holder->tech_lock);
1128         if (tech_holder->tech) {
1129                 res = tech_holder->tech->msg_send(msg, S_OR(args.to, ""),
1130                                                         S_OR(args.from, ""));
1131         }
1132         ast_rwlock_unlock(&tech_holder->tech_lock);
1133         ao2_unlock(msg);
1134
1135         pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
1136
1137 exit_cleanup:
1138         if (tech_holder) {
1139                 ao2_ref(tech_holder, -1);
1140                 tech_holder = NULL;
1141         }
1142
1143         ao2_ref(msg, -1);
1144
1145         return 0;
1146 }
1147
1148 static int action_messagesend(struct mansession *s, const struct message *m)
1149 {
1150         const char *to = ast_strdupa(astman_get_header(m, "To"));
1151         const char *from = astman_get_header(m, "From");
1152         const char *body = astman_get_header(m, "Body");
1153         const char *base64body = astman_get_header(m, "Base64Body");
1154         char base64decoded[1301] = { 0, };
1155         char *tech_name = NULL;
1156         struct ast_variable *vars = NULL;
1157         struct ast_variable *data = NULL;
1158         struct ast_msg_tech_holder *tech_holder = NULL;
1159         struct ast_msg *msg;
1160         int res = -1;
1161
1162         if (ast_strlen_zero(to)) {
1163                 astman_send_error(s, m, "No 'To' address specified.");
1164                 return -1;
1165         }
1166
1167         if (!ast_strlen_zero(base64body)) {
1168                 ast_base64decode((unsigned char *) base64decoded, base64body, sizeof(base64decoded) - 1);
1169                 body = base64decoded;
1170         }
1171
1172         tech_name = ast_strdupa(to);
1173         tech_name = strsep(&tech_name, ":");
1174
1175         tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
1176
1177         if (!tech_holder) {
1178                 astman_send_error(s, m, "Message technology not found.");
1179                 return -1;
1180         }
1181
1182         if (!(msg = ast_msg_alloc())) {
1183                 ao2_ref(tech_holder, -1);
1184                 astman_send_error(s, m, "Internal failure\n");
1185                 return -1;
1186         }
1187
1188         data = astman_get_variables(m);
1189         for (vars = data; vars; vars = vars->next) {
1190                 ast_msg_set_var_outbound(msg, vars->name, vars->value);
1191         }
1192
1193         ast_msg_set_body(msg, "%s", body);
1194
1195         ast_rwlock_rdlock(&tech_holder->tech_lock);
1196         if (tech_holder->tech) {
1197                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1198         }
1199         ast_rwlock_unlock(&tech_holder->tech_lock);
1200
1201         ast_variables_destroy(vars);
1202         ao2_ref(tech_holder, -1);
1203         ao2_ref(msg, -1);
1204
1205         if (res) {
1206                 astman_send_error(s, m, "Message failed to send.");
1207         } else {
1208                 astman_send_ack(s, m, "Message successfully sent");
1209         }
1210         return res;
1211 }
1212
1213 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
1214 {
1215         char *tech_name = NULL;
1216         struct ast_msg_tech_holder *tech_holder = NULL;
1217         int res = -1;
1218
1219         if (ast_strlen_zero(to)) {
1220                 ao2_ref(msg, -1);
1221                 return -1;
1222         }
1223
1224         tech_name = ast_strdupa(to);
1225         tech_name = strsep(&tech_name, ":");
1226
1227         tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
1228
1229         if (!tech_holder) {
1230                 ao2_ref(msg, -1);
1231                 return -1;
1232         }
1233
1234         ast_rwlock_rdlock(&tech_holder->tech_lock);
1235         if (tech_holder->tech) {
1236                 res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1237         }
1238         ast_rwlock_unlock(&tech_holder->tech_lock);
1239
1240         ao2_ref(tech_holder, -1);
1241         ao2_ref(msg, -1);
1242
1243         return res;
1244 }
1245
1246 int ast_msg_tech_register(const struct ast_msg_tech *tech)
1247 {
1248         struct ast_msg_tech_holder *tech_holder;
1249
1250         if ((tech_holder = msg_find_by_tech(tech, OBJ_POINTER))) {
1251                 ao2_ref(tech_holder, -1);
1252                 ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
1253                                 tech->name);
1254                 return -1;
1255         }
1256
1257         if (!(tech_holder = ao2_alloc(sizeof(*tech_holder), NULL))) {
1258                 return -1;
1259         }
1260
1261         ast_rwlock_init(&tech_holder->tech_lock);
1262         tech_holder->tech = tech;
1263
1264         ao2_link(msg_techs, tech_holder);
1265
1266         ao2_ref(tech_holder, -1);
1267         tech_holder = NULL;
1268
1269         ast_verb(3, "Message technology handler '%s' registered.\n", tech->name);
1270
1271         return 0;
1272 }
1273
1274 int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
1275 {
1276         struct ast_msg_tech_holder *tech_holder;
1277
1278         tech_holder = msg_find_by_tech(tech, OBJ_POINTER | OBJ_UNLINK);
1279
1280         if (!tech_holder) {
1281                 ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
1282                 return -1;
1283         }
1284
1285         ast_rwlock_wrlock(&tech_holder->tech_lock);
1286         tech_holder->tech = NULL;
1287         ast_rwlock_unlock(&tech_holder->tech_lock);
1288
1289         ao2_ref(tech_holder, -1);
1290         tech_holder = NULL;
1291
1292         ast_verb(3, "Message technology handler '%s' unregistered.\n", tech->name);
1293
1294         return 0;
1295 }
1296
1297 void ast_msg_shutdown(void)
1298 {
1299         if (msg_q_tp) {
1300                 msg_q_tp = ast_taskprocessor_unreference(msg_q_tp);
1301         }
1302 }
1303
1304 /*!
1305  * \internal
1306  * \brief Clean up other resources on Asterisk shutdown
1307  *
1308  * \note This does not include the msg_q_tp object, which must be disposed
1309  * of prior to Asterisk checking for channel destruction in its shutdown
1310  * sequence.  The atexit handlers are executed after this occurs.
1311  */
1312 static void message_shutdown(void)
1313 {
1314         ast_custom_function_unregister(&msg_function);
1315         ast_custom_function_unregister(&msg_data_function);
1316         ast_unregister_application(app_msg_send);
1317         ast_manager_unregister("MessageSend");
1318
1319         if (msg_techs) {
1320                 ao2_ref(msg_techs, -1);
1321                 msg_techs = NULL;
1322         }
1323 }
1324
1325 /*
1326  * \internal
1327  * \brief Initialize stuff during Asterisk startup.
1328  *
1329  * Cleanup isn't a big deal in this function.  If we return non-zero,
1330  * Asterisk is going to exit.
1331  *
1332  * \retval 0 success
1333  * \retval non-zero failure
1334  */
1335 int ast_msg_init(void)
1336 {
1337         int res;
1338
1339         msg_q_tp = ast_taskprocessor_get("ast_msg_queue", TPS_REF_DEFAULT);
1340         if (!msg_q_tp) {
1341                 return -1;
1342         }
1343
1344         msg_techs = ao2_container_alloc(17, msg_tech_hash, msg_tech_cmp);
1345         if (!msg_techs) {
1346                 return -1;
1347         }
1348
1349         res = __ast_custom_function_register(&msg_function, NULL);
1350         res |= __ast_custom_function_register(&msg_data_function, NULL);
1351         res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
1352         res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
1353
1354         ast_register_atexit(message_shutdown);
1355
1356         return res;
1357 }