MESSAGE: Flush Message/ast_msg_queue channel alert pipe.
[asterisk/asterisk.git] / main / message.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2010, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Out-of-call text message support
22  *
23  * \author Russell Bryant <russell@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/_private.h"
33
34 #include "asterisk/module.h"
35 #include "asterisk/datastore.h"
36 #include "asterisk/pbx.h"
37 #include "asterisk/manager.h"
38 #include "asterisk/strings.h"
39 #include "asterisk/astobj2.h"
40 #include "asterisk/vector.h"
41 #include "asterisk/app.h"
42 #include "asterisk/taskprocessor.h"
43 #include "asterisk/message.h"
44
45 /*** DOCUMENTATION
46         <function name="MESSAGE" language="en_US">
47                 <synopsis>
48                         Create a message or read fields from a message.
49                 </synopsis>
50                 <syntax argsep="/">
51                         <parameter name="argument" required="true">
52                         <para>Field of the message to get or set.</para>
53                         <enumlist>
54                                 <enum name="to">
55                                         <para>Read-only.  The destination of the message.  When processing an
56                                         incoming message, this will be set to the destination listed as
57                                         the recipient of the message that was received by Asterisk.</para>
58                                 </enum>
59                                 <enum name="from">
60                                         <para>Read-only.  The source of the message.  When processing an
61                                         incoming message, this will be set to the source of the message.</para>
62                                 </enum>
63                                 <enum name="custom_data">
64                                         <para>Write-only.  Mark or unmark all message headers for an outgoing
65                                         message.  The following values can be set:</para>
66                                         <enumlist>
67                                                 <enum name="mark_all_outbound">
68                                                         <para>Mark all headers for an outgoing message.</para>
69                                                 </enum>
70                                                 <enum name="clear_all_outbound">
71                                                         <para>Unmark all headers for an outgoing message.</para>
72                                                 </enum>
73                                         </enumlist>
74                                 </enum>
75                                 <enum name="body">
76                                         <para>Read/Write.  The message body.  When processing an incoming
77                                         message, this includes the body of the message that Asterisk
78                                         received.  When MessageSend() is executed, the contents of this
79                                         field are used as the body of the outgoing message.  The body
80                                         will always be UTF-8.</para>
81                                 </enum>
82                         </enumlist>
83                         </parameter>
84                 </syntax>
85                 <description>
86                         <para>This function will read from or write a value to a text message.
87                         It is used both to read the data out of an incoming message, as well as
88                         modify or create a message that will be sent outbound.</para>
89                 </description>
90                 <see-also>
91                         <ref type="application">MessageSend</ref>
92                 </see-also>
93         </function>
94         <function name="MESSAGE_DATA" language="en_US">
95                 <synopsis>
96                         Read or write custom data attached to a message.
97                 </synopsis>
98                 <syntax argsep="/">
99                         <parameter name="argument" required="true">
100                         <para>Field of the message to get or set.</para>
101                         </parameter>
102                 </syntax>
103                 <description>
104                         <para>This function will read from or write a value to a text message.
105                         It is used both to read the data out of an incoming message, as well as
106                         modify a message that will be sent outbound.</para>
107                         <note>
108                                 <para>If you want to set an outbound message to carry data in the
109                                 current message, do
110                                 Set(MESSAGE_DATA(<replaceable>key</replaceable>)=${MESSAGE_DATA(<replaceable>key</replaceable>)}).</para>
111                         </note>
112                 </description>
113                 <see-also>
114                         <ref type="application">MessageSend</ref>
115                 </see-also>
116         </function>
117         <application name="MessageSend" language="en_US">
118                 <synopsis>
119                         Send a text message.
120                 </synopsis>
121                 <syntax>
122                         <parameter name="to" required="true">
123                                 <para>A To URI for the message.</para>
124                                 <xi:include xpointer="xpointer(/docs/info[@name='MessageToInfo'])" />
125                         </parameter>
126                         <parameter name="from" required="false">
127                                 <para>A From URI for the message if needed for the
128                                 message technology being used to send this message.</para>
129                                 <xi:include xpointer="xpointer(/docs/info[@name='MessageFromInfo'])" />
130                         </parameter>
131                 </syntax>
132                 <description>
133                         <para>Send a text message.  The body of the message that will be
134                         sent is what is currently set to <literal>MESSAGE(body)</literal>.
135                           The technology chosen for sending the message is determined
136                         based on a prefix to the <literal>to</literal> parameter.</para>
137                         <para>This application sets the following channel variables:</para>
138                         <variablelist>
139                                 <variable name="MESSAGE_SEND_STATUS">
140                                         <para>This is the message delivery status returned by this application.</para>
141                                         <value name="INVALID_PROTOCOL">
142                                                 No handler for the technology part of the URI was found.
143                                         </value>
144                                         <value name="INVALID_URI">
145                                                 The protocol handler reported that the URI was not valid.
146                                         </value>
147                                         <value name="SUCCESS">
148                                                 Successfully passed on to the protocol handler, but delivery has not necessarily been guaranteed.
149                                         </value>
150                                         <value name="FAILURE">
151                                                 The protocol handler reported that it was unabled to deliver the message for some reason.
152                                         </value>
153                                 </variable>
154                         </variablelist>
155                 </description>
156         </application>
157         <manager name="MessageSend" language="en_US">
158                 <synopsis>
159                         Send an out of call message to an endpoint.
160                 </synopsis>
161                 <syntax>
162                         <xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
163                         <parameter name="To" required="true">
164                                 <para>The URI the message is to be sent to.</para>
165                                 <xi:include xpointer="xpointer(/docs/info[@name='MessageToInfo'])" />
166                         </parameter>
167                         <parameter name="From">
168                                 <para>A From URI for the message if needed for the
169                                 message technology being used to send this message.</para>
170                                 <xi:include xpointer="xpointer(/docs/info[@name='MessageFromInfo'])" />
171                         </parameter>
172                         <parameter name="Body">
173                                 <para>The message body text.  This must not contain any newlines as that
174                                 conflicts with the AMI protocol.</para>
175                         </parameter>
176                         <parameter name="Base64Body">
177                                 <para>Text bodies requiring the use of newlines have to be base64 encoded
178                                 in this field.  Base64Body will be decoded before being sent out.
179                                 Base64Body takes precedence over Body.</para>
180                         </parameter>
181                         <parameter name="Variable">
182                                 <para>Message variable to set, multiple Variable: headers are
183                                 allowed.  The header value is a comma separated list of
184                                 name=value pairs.</para>
185                         </parameter>
186                 </syntax>
187         </manager>
188  ***/
189
190 struct msg_data {
191         AST_DECLARE_STRING_FIELDS(
192                 AST_STRING_FIELD(name);
193                 AST_STRING_FIELD(value);
194         );
195         unsigned int send; /* Whether to send out on outbound messages */
196 };
197
198 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
199
200 /*!
201  * \brief A message.
202  */
203 struct ast_msg {
204         AST_DECLARE_STRING_FIELDS(
205                 /*! Where the message is going */
206                 AST_STRING_FIELD(to);
207                 /*! Where we "say" the message came from */
208                 AST_STRING_FIELD(from);
209                 /*! The text to send */
210                 AST_STRING_FIELD(body);
211                 /*! The dialplan context for the message */
212                 AST_STRING_FIELD(context);
213                 /*! The dialplan extension for the message */
214                 AST_STRING_FIELD(exten);
215                 /*! An endpoint associated with this message */
216                 AST_STRING_FIELD(endpoint);
217                 /*! The technology of the endpoint associated with this message */
218                 AST_STRING_FIELD(tech);
219         );
220         /*! Technology/dialplan specific variables associated with the message */
221         struct ao2_container *vars;
222 };
223
224 /*! \brief Lock for \c msg_techs vector */
225 static ast_rwlock_t msg_techs_lock;
226
227 /*! \brief Vector of message technologies */
228 AST_VECTOR(, const struct ast_msg_tech *) msg_techs;
229
230 /*! \brief Lock for \c msg_handlers vector */
231 static ast_rwlock_t msg_handlers_lock;
232
233 /*! \brief Vector of received message handlers */
234 AST_VECTOR(, const struct ast_msg_handler *) msg_handlers;
235
236 static struct ast_taskprocessor *msg_q_tp;
237
238 static const char app_msg_send[] = "MessageSend";
239
240 static void msg_ds_destroy(void *data);
241
242 static const struct ast_datastore_info msg_datastore = {
243         .type = "message",
244         .destroy = msg_ds_destroy,
245 };
246
247 static int msg_func_read(struct ast_channel *chan, const char *function,
248                 char *data, char *buf, size_t len);
249 static int msg_func_write(struct ast_channel *chan, const char *function,
250                 char *data, const char *value);
251
252 static struct ast_custom_function msg_function = {
253         .name = "MESSAGE",
254         .read = msg_func_read,
255         .write = msg_func_write,
256 };
257
258 static int msg_data_func_read(struct ast_channel *chan, const char *function,
259                 char *data, char *buf, size_t len);
260 static int msg_data_func_write(struct ast_channel *chan, const char *function,
261                 char *data, const char *value);
262
263 static struct ast_custom_function msg_data_function = {
264         .name = "MESSAGE_DATA",
265         .read = msg_data_func_read,
266         .write = msg_data_func_write,
267 };
268
269 static struct ast_frame *chan_msg_read(struct ast_channel *chan);
270 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr);
271 static int chan_msg_indicate(struct ast_channel *chan, int condition,
272                 const void *data, size_t datalen);
273 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit);
274 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
275                 unsigned int duration);
276
277 /*!
278  * \internal
279  * \brief A bare minimum channel technology
280  *
281  * This will not be registered as we never want anything to try
282  * to create Message channels other than internally in this file.
283  */
284 static const struct ast_channel_tech msg_chan_tech_hack = {
285         .type             = "Message",
286         .description      = "Internal Text Message Processing",
287         .read             = chan_msg_read,
288         .write            = chan_msg_write,
289         .indicate         = chan_msg_indicate,
290         .send_digit_begin = chan_msg_send_digit_begin,
291         .send_digit_end   = chan_msg_send_digit_end,
292 };
293
294 /*!
295  * \internal
296  * \brief ast_channel_tech read callback
297  *
298  * This should never be called.  However, we say that about chan_iax2's
299  * read callback, too, and it seems to randomly get called for some
300  * reason.  If it does, a simple NULL frame will suffice.
301  */
302 static struct ast_frame *chan_msg_read(struct ast_channel *chan)
303 {
304         return &ast_null_frame;
305 }
306
307 /*!
308  * \internal
309  * \brief ast_channel_tech write callback
310  *
311  * Throw all frames away.  We don't care about any of them.
312  */
313 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr)
314 {
315         return 0;
316 }
317
318 /*!
319  * \internal
320  * \brief ast_channel_tech indicate callback
321  *
322  * The indicate callback is here just so it can return success.
323  * We don't want any callers of ast_indicate() to think something
324  * has failed.  We also don't want ast_indicate() itself to try
325  * to generate inband tones since we didn't tell it that we took
326  * care of it ourselves.
327  */
328 static int chan_msg_indicate(struct ast_channel *chan, int condition,
329                 const void *data, size_t datalen)
330 {
331         return 0;
332 }
333
334 /*!
335  * \internal
336  * \brief ast_channel_tech send_digit_begin callback
337  *
338  * This is here so that just in case a digit comes at a message channel
339  * that the Asterisk core doesn't waste any time trying to generate
340  * inband DTMF in audio.  It's a waste of resources.
341  */
342 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit)
343 {
344         return 0;
345 }
346
347 /*!
348  * \internal
349  * \brief ast_channel_tech send_digit_end callback
350  *
351  * This is here so that just in case a digit comes at a message channel
352  * that the Asterisk core doesn't waste any time trying to generate
353  * inband DTMF in audio.  It's a waste of resources.
354  */
355 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
356                 unsigned int duration)
357 {
358         return 0;
359 }
360
361 static void msg_ds_destroy(void *data)
362 {
363         struct ast_msg *msg = data;
364
365         ao2_ref(msg, -1);
366 }
367
368 static int msg_data_hash_fn(const void *obj, const int flags)
369 {
370         const struct msg_data *data = obj;
371         return ast_str_case_hash(data->name);
372 }
373
374 static int msg_data_cmp_fn(void *obj, void *arg, int flags)
375 {
376         const struct msg_data *one = obj, *two = arg;
377         return !strcasecmp(one->name, two->name) ? CMP_MATCH | CMP_STOP : 0;
378 }
379
380 static void msg_data_destructor(void *obj)
381 {
382         struct msg_data *data = obj;
383         ast_string_field_free_memory(data);
384 }
385
386 static void msg_destructor(void *obj)
387 {
388         struct ast_msg *msg = obj;
389
390         ast_string_field_free_memory(msg);
391         ao2_cleanup(msg->vars);
392 }
393
394 struct ast_msg *ast_msg_alloc(void)
395 {
396         struct ast_msg *msg;
397
398         if (!(msg = ao2_alloc(sizeof(*msg), msg_destructor))) {
399                 return NULL;
400         }
401
402         if (ast_string_field_init(msg, 128)) {
403                 ao2_ref(msg, -1);
404                 return NULL;
405         }
406
407         if (!(msg->vars = ao2_container_alloc(1, msg_data_hash_fn, msg_data_cmp_fn))) {
408                 ao2_ref(msg, -1);
409                 return NULL;
410         }
411         ast_string_field_set(msg, context, "default");
412
413         return msg;
414 }
415
416 struct ast_msg *ast_msg_ref(struct ast_msg *msg)
417 {
418         ao2_ref(msg, 1);
419         return msg;
420 }
421
422 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
423 {
424         ao2_ref(msg, -1);
425         return NULL;
426 }
427
428 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
429 {
430         va_list ap;
431
432         va_start(ap, fmt);
433         ast_string_field_build_va(msg, to, fmt, ap);
434         va_end(ap);
435
436         return 0;
437 }
438
439 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
440 {
441         va_list ap;
442
443         va_start(ap, fmt);
444         ast_string_field_build_va(msg, from, fmt, ap);
445         va_end(ap);
446
447         return 0;
448 }
449
450 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
451 {
452         va_list ap;
453
454         va_start(ap, fmt);
455         ast_string_field_build_va(msg, body, fmt, ap);
456         va_end(ap);
457
458         return 0;
459 }
460
461 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
462 {
463         va_list ap;
464
465         va_start(ap, fmt);
466         ast_string_field_build_va(msg, context, fmt, ap);
467         va_end(ap);
468
469         return 0;
470 }
471
472 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
473 {
474         va_list ap;
475
476         va_start(ap, fmt);
477         ast_string_field_build_va(msg, exten, fmt, ap);
478         va_end(ap);
479
480         return 0;
481 }
482
483 int ast_msg_set_tech(struct ast_msg *msg, const char *fmt, ...)
484 {
485         va_list ap;
486
487         va_start(ap, fmt);
488         ast_string_field_build_va(msg, tech, fmt, ap);
489         va_end(ap);
490
491         return 0;
492 }
493
494 int ast_msg_set_endpoint(struct ast_msg *msg, const char *fmt, ...)
495 {
496         va_list ap;
497
498         va_start(ap, fmt);
499         ast_string_field_build_va(msg, endpoint, fmt, ap);
500         va_end(ap);
501
502         return 0;
503 }
504
505 const char *ast_msg_get_body(const struct ast_msg *msg)
506 {
507         return msg->body;
508 }
509
510 const char *ast_msg_get_from(const struct ast_msg *msg)
511 {
512         return msg->from;
513 }
514
515 const char *ast_msg_get_to(const struct ast_msg *msg)
516 {
517         return msg->to;
518 }
519
520 const char *ast_msg_get_tech(const struct ast_msg *msg)
521 {
522         return msg->tech;
523 }
524
525 const char *ast_msg_get_endpoint(const struct ast_msg *msg)
526 {
527         return msg->endpoint;
528 }
529
530 static struct msg_data *msg_data_alloc(void)
531 {
532         struct msg_data *data;
533
534         if (!(data = ao2_alloc(sizeof(*data), msg_data_destructor))) {
535                 return NULL;
536         }
537
538         if (ast_string_field_init(data, 32)) {
539                 ao2_ref(data, -1);
540                 return NULL;
541         }
542
543         return data;
544 }
545
546 static struct msg_data *msg_data_find(struct ao2_container *vars, const char *name)
547 {
548         struct msg_data tmp = {
549                 .name = name,
550         };
551         return ao2_find(vars, &tmp, OBJ_POINTER);
552 }
553
554 static int msg_set_var_full(struct ast_msg *msg, const char *name, const char *value, unsigned int outbound)
555 {
556         struct msg_data *data;
557
558         if (!(data = msg_data_find(msg->vars, name))) {
559                 if (ast_strlen_zero(value)) {
560                         return 0;
561                 }
562                 if (!(data = msg_data_alloc())) {
563                         return -1;
564                 };
565
566                 ast_string_field_set(data, name, name);
567                 ast_string_field_set(data, value, value);
568                 data->send = outbound;
569                 ao2_link(msg->vars, data);
570         } else {
571                 if (ast_strlen_zero(value)) {
572                         ao2_unlink(msg->vars, data);
573                 } else {
574                         ast_string_field_set(data, value, value);
575                         data->send = outbound;
576                 }
577         }
578
579         ao2_ref(data, -1);
580
581         return 0;
582 }
583
584 int ast_msg_set_var_outbound(struct ast_msg *msg, const char *name, const char *value)
585 {
586         return msg_set_var_full(msg, name, value, 1);
587 }
588
589 int ast_msg_set_var(struct ast_msg *msg, const char *name, const char *value)
590 {
591         return msg_set_var_full(msg, name, value, 0);
592 }
593
594 const char *ast_msg_get_var(struct ast_msg *msg, const char *name)
595 {
596         struct msg_data *data;
597         const char *val = NULL;
598
599         if (!(data = msg_data_find(msg->vars, name))) {
600                 return NULL;
601         }
602
603         /* Yep, this definitely looks like val would be a dangling pointer
604          * after the ref count is decremented.  As long as the message structure
605          * is used in a thread safe manner, this will not be the case though.
606          * The ast_msg holds a reference to this object in the msg->vars container. */
607         val = data->value;
608         ao2_ref(data, -1);
609
610         return val;
611 }
612
613 struct ast_msg_var_iterator {
614         struct ao2_iterator iter;
615         struct msg_data *current_used;
616 };
617
618 struct ast_msg_var_iterator *ast_msg_var_iterator_init(const struct ast_msg *msg)
619 {
620         struct ast_msg_var_iterator *iter;
621
622         iter = ast_calloc(1, sizeof(*iter));
623         if (!iter) {
624                 return NULL;
625         }
626
627         iter->iter = ao2_iterator_init(msg->vars, 0);
628
629         return iter;
630 }
631
632 int ast_msg_var_iterator_next(const struct ast_msg *msg, struct ast_msg_var_iterator *iter, const char **name, const char **value)
633 {
634         struct msg_data *data;
635
636         if (!iter) {
637                 return 0;
638         }
639
640         /* Skip any that aren't marked for sending out */
641         while ((data = ao2_iterator_next(&iter->iter)) && !data->send) {
642                 ao2_ref(data, -1);
643         }
644
645         if (!data) {
646                 return 0;
647         }
648
649         if (data->send) {
650                 *name = data->name;
651                 *value = data->value;
652         }
653
654         /* Leave the refcount to be cleaned up by the caller with
655          * ast_msg_var_unref_current after they finish with the pointers to the data */
656         iter->current_used = data;
657
658         return 1;
659 }
660
661 void ast_msg_var_unref_current(struct ast_msg_var_iterator *iter)
662 {
663         ao2_cleanup(iter->current_used);
664         iter->current_used = NULL;
665 }
666
667 void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *iter)
668 {
669         if (iter) {
670                 ao2_iterator_destroy(&iter->iter);
671                 ast_msg_var_unref_current(iter);
672                 ast_free(iter);
673         }
674 }
675
676 static struct ast_channel *create_msg_q_chan(void)
677 {
678         struct ast_channel *chan;
679         struct ast_datastore *ds;
680
681         chan = ast_channel_alloc(1, AST_STATE_UP,
682                         NULL, NULL, NULL,
683                         NULL, NULL, NULL, NULL, 0,
684                         "%s", "Message/ast_msg_queue");
685
686         if (!chan) {
687                 return NULL;
688         }
689
690         ast_channel_tech_set(chan, &msg_chan_tech_hack);
691         ast_channel_unlock(chan);
692         ast_channel_unlink(chan);
693
694         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
695                 ast_hangup(chan);
696                 return NULL;
697         }
698
699         ast_channel_lock(chan);
700         ast_channel_datastore_add(chan, ds);
701         ast_channel_unlock(chan);
702
703         return chan;
704 }
705
706 /*!
707  * \internal
708  * \brief Run the dialplan for message processing
709  *
710  * \pre The message has already been set up on the msg datastore
711  *      on this channel.
712  */
713 static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
714 {
715         struct ast_pbx_args pbx_args;
716
717         ast_explicit_goto(chan, msg->context, S_OR(msg->exten, "s"), 1);
718
719         memset(&pbx_args, 0, sizeof(pbx_args));
720         pbx_args.no_hangup_chan = 1,
721         ast_pbx_run_args(chan, &pbx_args);
722 }
723
724 /*!
725  * \internal
726  * \brief Clean up ast_channel after each message
727  *
728  * Reset various bits of state after routing each message so the same ast_channel
729  * can just be reused.
730  */
731 static void chan_cleanup(struct ast_channel *chan)
732 {
733         struct ast_datastore *msg_ds, *ds;
734         struct varshead *headp;
735         struct ast_var_t *vardata;
736         struct ast_frame *cur;
737
738         ast_channel_lock(chan);
739
740         /*
741          * Remove the msg datastore.  Free its data but keep around the datastore
742          * object and just reuse it.
743          */
744         if ((msg_ds = ast_channel_datastore_find(chan, &msg_datastore, NULL)) && msg_ds->data) {
745                 ast_channel_datastore_remove(chan, msg_ds);
746                 ao2_ref(msg_ds->data, -1);
747                 msg_ds->data = NULL;
748         }
749
750         /*
751          * Destroy all other datastores.
752          */
753         while ((ds = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) {
754                 ast_datastore_free(ds);
755         }
756
757         /*
758          * Destroy all channel variables.
759          */
760         headp = ast_channel_varshead(chan);
761         while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries))) {
762                 ast_var_delete(vardata);
763         }
764
765         /*
766          * Remove frames from read queue
767          */
768         while ((cur = AST_LIST_REMOVE_HEAD(ast_channel_readq(chan), frame_list))) {
769                 ast_frfree(cur);
770         }
771
772         /*
773          * Restore msg datastore.
774          */
775         if (msg_ds) {
776                 ast_channel_datastore_add(chan, msg_ds);
777         }
778
779         /*
780          * Clear softhangup flags.
781          */
782         ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ALL);
783
784         /*
785          * Flush the alert pipe in case we miscounted somewhere when
786          * messing with frames on the read queue, we had to flush the
787          * read queue above, or we had an "Exceptionally long queue
788          * length" event.
789          */
790         ast_channel_internal_alert_flush(chan);
791
792         ast_channel_unlock(chan);
793 }
794
795 static void destroy_msg_q_chan(void *data)
796 {
797         struct ast_channel **chan = data;
798
799         if (!*chan) {
800                 return;
801         }
802
803         ast_channel_release(*chan);
804 }
805
806 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
807
808 /*! \internal \brief Handle a message bound for the dialplan */
809 static int dialplan_handle_msg_cb(struct ast_msg *msg)
810 {
811         struct ast_channel **chan_p, *chan;
812         struct ast_datastore *ds;
813
814         if (!(chan_p = ast_threadstorage_get(&msg_q_chan, sizeof(struct ast_channel *)))) {
815                 return -1;
816         }
817         if (!*chan_p) {
818                 if (!(*chan_p = create_msg_q_chan())) {
819                         return -1;
820                 }
821         }
822         chan = *chan_p;
823
824         ast_channel_lock(chan);
825         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
826                 ast_channel_unlock(chan);
827                 return -1;
828         }
829         ao2_ref(msg, +1);
830         ds->data = msg;
831         ast_channel_unlock(chan);
832
833         msg_route(chan, msg);
834         chan_cleanup(chan);
835
836         return 0;
837 }
838
839 /*! \internal \brief Determine if a message has a destination in the dialplan */
840 static int dialplan_has_destination_cb(const struct ast_msg *msg)
841 {
842         if (ast_strlen_zero(msg->context)) {
843                 return 0;
844         }
845
846         return ast_exists_extension(NULL, msg->context, S_OR(msg->exten, "s"), 1, NULL);
847 }
848
849 static struct ast_msg_handler dialplan_msg_handler = {
850         .name = "dialplan",
851         .handle_msg = dialplan_handle_msg_cb,
852         .has_destination = dialplan_has_destination_cb,
853 };
854
855 /*!
856  * \internal
857  * \brief Message queue task processor callback
858  *
859  * \retval 0 success
860  * \retval non-zero failure
861  *
862  * \note Even though this returns a value, the taskprocessor code ignores the value.
863  */
864 static int msg_q_cb(void *data)
865 {
866         struct ast_msg *msg = data;
867         int res = 1;
868         int i;
869
870         ast_rwlock_rdlock(&msg_handlers_lock);
871         for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
872                 const struct ast_msg_handler *handler = AST_VECTOR_GET(&msg_handlers, i);
873
874                 if (!handler->has_destination(msg)) {
875                         ast_debug(5, "Handler %s doesn't want message, moving on\n", handler->name);
876                         continue;
877                 }
878
879                 ast_debug(5, "Dispatching message to %s handler\n", handler->name);
880                 res &= handler->handle_msg(msg);
881         }
882         ast_rwlock_unlock(&msg_handlers_lock);
883
884         if (res != 0) {
885                 ast_log(LOG_WARNING, "No handler processed message from %s to %s\n",
886                         S_OR(msg->from, "<unknown>"), S_OR(msg->to, "<unknown>"));
887         }
888
889         ao2_ref(msg, -1);
890
891         return res;
892 }
893
894 int ast_msg_has_destination(const struct ast_msg *msg)
895 {
896         int i;
897         int result = 0;
898
899         ast_rwlock_rdlock(&msg_handlers_lock);
900         for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
901                 const struct ast_msg_handler *handler = AST_VECTOR_GET(&msg_handlers, i);
902
903                 ast_debug(5, "Seeing if %s can handle message\n", handler->name);
904                 if (handler->has_destination(msg)) {
905                         ast_debug(5, "%s can handle message\n", handler->name);
906                         result = 1;
907                         break;
908                 }
909         }
910         ast_rwlock_unlock(&msg_handlers_lock);
911
912         return result;
913 }
914
915 int ast_msg_queue(struct ast_msg *msg)
916 {
917         int res;
918         res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
919         if (res == -1) {
920                 ao2_ref(msg, -1);
921         }
922
923         return res;
924 }
925
926 /*!
927  * \internal
928  * \brief Find or create a message datastore on a channel
929  *
930  * \pre chan is locked
931  *
932  * \param chan the relevant channel
933  *
934  * \return the channel's message datastore, or NULL on error
935  */
936 static struct ast_datastore *msg_datastore_find_or_create(struct ast_channel *chan)
937 {
938         struct ast_datastore *ds;
939
940         if ((ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
941                 return ds;
942         }
943
944         if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
945                 return NULL;
946         }
947
948         if (!(ds->data = ast_msg_alloc())) {
949                 ast_datastore_free(ds);
950                 return NULL;
951         }
952
953         ast_channel_datastore_add(chan, ds);
954
955         return ds;
956 }
957
958 static int msg_func_read(struct ast_channel *chan, const char *function,
959                 char *data, char *buf, size_t len)
960 {
961         struct ast_datastore *ds;
962         struct ast_msg *msg;
963
964         if (!chan) {
965                 ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
966                 return -1;
967         }
968
969         ast_channel_lock(chan);
970
971         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
972                 ast_channel_unlock(chan);
973                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
974                 return -1;
975         }
976
977         msg = ds->data;
978         ao2_ref(msg, +1);
979         ast_channel_unlock(chan);
980
981         ao2_lock(msg);
982
983         if (!strcasecmp(data, "to")) {
984                 ast_copy_string(buf, msg->to, len);
985         } else if (!strcasecmp(data, "from")) {
986                 ast_copy_string(buf, msg->from, len);
987         } else if (!strcasecmp(data, "body")) {
988                 ast_copy_string(buf, msg->body, len);
989         } else {
990                 ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
991         }
992
993         ao2_unlock(msg);
994         ao2_ref(msg, -1);
995
996         return 0;
997 }
998
999 static int msg_func_write(struct ast_channel *chan, const char *function,
1000                 char *data, const char *value)
1001 {
1002         struct ast_datastore *ds;
1003         struct ast_msg *msg;
1004
1005         if (!chan) {
1006                 ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
1007                 return -1;
1008         }
1009
1010         ast_channel_lock(chan);
1011
1012         if (!(ds = msg_datastore_find_or_create(chan))) {
1013                 ast_channel_unlock(chan);
1014                 return -1;
1015         }
1016
1017         msg = ds->data;
1018         ao2_ref(msg, +1);
1019         ast_channel_unlock(chan);
1020
1021         ao2_lock(msg);
1022
1023         if (!strcasecmp(data, "to")) {
1024                 ast_msg_set_to(msg, "%s", value);
1025         } else if (!strcasecmp(data, "from")) {
1026                 ast_msg_set_from(msg, "%s", value);
1027         } else if (!strcasecmp(data, "body")) {
1028                 ast_msg_set_body(msg, "%s", value);
1029         } else if (!strcasecmp(data, "custom_data")) {
1030                 int outbound = -1;
1031                 if (!strcasecmp(value, "mark_all_outbound")) {
1032                         outbound = 1;
1033                 } else if (!strcasecmp(value, "clear_all_outbound")) {
1034                         outbound = 0;
1035                 } else {
1036                         ast_log(LOG_WARNING, "'%s' is not a valid value for custom_data\n", value);
1037                 }
1038
1039                 if (outbound != -1) {
1040                         struct msg_data *hdr_data;
1041                         struct ao2_iterator iter = ao2_iterator_init(msg->vars, 0);
1042
1043                         while ((hdr_data = ao2_iterator_next(&iter))) {
1044                                 hdr_data->send = outbound;
1045                                 ao2_ref(hdr_data, -1);
1046                         }
1047                         ao2_iterator_destroy(&iter);
1048                 }
1049         } else {
1050                 ast_log(LOG_WARNING, "'%s' is not a valid write argument.\n", data);
1051         }
1052
1053         ao2_unlock(msg);
1054         ao2_ref(msg, -1);
1055
1056         return 0;
1057 }
1058
1059 static int msg_data_func_read(struct ast_channel *chan, const char *function,
1060                 char *data, char *buf, size_t len)
1061 {
1062         struct ast_datastore *ds;
1063         struct ast_msg *msg;
1064         const char *val;
1065
1066         if (!chan) {
1067                 ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
1068                 return -1;
1069         }
1070
1071         ast_channel_lock(chan);
1072
1073         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
1074                 ast_channel_unlock(chan);
1075                 ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
1076                 return -1;
1077         }
1078
1079         msg = ds->data;
1080         ao2_ref(msg, +1);
1081         ast_channel_unlock(chan);
1082
1083         ao2_lock(msg);
1084
1085         if ((val = ast_msg_get_var(msg, data))) {
1086                 ast_copy_string(buf, val, len);
1087         }
1088
1089         ao2_unlock(msg);
1090         ao2_ref(msg, -1);
1091
1092         return 0;
1093 }
1094
1095 static int msg_data_func_write(struct ast_channel *chan, const char *function,
1096                 char *data, const char *value)
1097 {
1098         struct ast_datastore *ds;
1099         struct ast_msg *msg;
1100
1101         if (!chan) {
1102                 ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
1103                 return -1;
1104         }
1105
1106         ast_channel_lock(chan);
1107
1108         if (!(ds = msg_datastore_find_or_create(chan))) {
1109                 ast_channel_unlock(chan);
1110                 return -1;
1111         }
1112
1113         msg = ds->data;
1114         ao2_ref(msg, +1);
1115         ast_channel_unlock(chan);
1116
1117         ao2_lock(msg);
1118
1119         ast_msg_set_var_outbound(msg, data, value);
1120
1121         ao2_unlock(msg);
1122         ao2_ref(msg, -1);
1123
1124         return 0;
1125 }
1126
1127 /*!
1128  * \internal \brief Find a \c ast_msg_tech by its technology name
1129  *
1130  * \param tech_name The name of the message technology
1131  *
1132  * \note \c msg_techs should be locked via \c msg_techs_lock prior to
1133  *       calling this function
1134  *
1135  * \retval NULL if no \c ast_msg_tech has been registered
1136  * \retval \c ast_msg_tech if registered
1137  */
1138 static const struct ast_msg_tech *msg_find_by_tech_name(const char *tech_name)
1139 {
1140         const struct ast_msg_tech *current;
1141         int i;
1142
1143         for (i = 0; i < AST_VECTOR_SIZE(&msg_techs); i++) {
1144                 current = AST_VECTOR_GET(&msg_techs, i);
1145                 if (!strcmp(current->name, tech_name)) {
1146                         return current;
1147                 }
1148         }
1149
1150         return NULL;
1151 }
1152
1153 /*!
1154  * \internal \brief Find a \c ast_msg_handler by its technology name
1155  *
1156  * \param tech_name The name of the message technology
1157  *
1158  * \note \c msg_handlers should be locked via \c msg_handlers_lock
1159  *       prior to calling this function
1160  *
1161  * \retval NULL if no \c ast_msg_handler has been registered
1162  * \retval \c ast_msg_handler if registered
1163  */
1164 static const struct ast_msg_handler *msg_handler_find_by_tech_name(const char *tech_name)
1165 {
1166         const struct ast_msg_handler *current;
1167         int i;
1168
1169         for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
1170                 current = AST_VECTOR_GET(&msg_handlers, i);
1171                 if (!strcmp(current->name, tech_name)) {
1172                         return current;
1173                 }
1174         }
1175
1176         return NULL;
1177 }
1178
1179 /*!
1180  * \internal
1181  * \brief MessageSend() application
1182  */
1183 static int msg_send_exec(struct ast_channel *chan, const char *data)
1184 {
1185         struct ast_datastore *ds;
1186         struct ast_msg *msg;
1187         char *tech_name;
1188         const struct ast_msg_tech *msg_tech;
1189         char *parse;
1190         int res = -1;
1191         AST_DECLARE_APP_ARGS(args,
1192                 AST_APP_ARG(to);
1193                 AST_APP_ARG(from);
1194         );
1195
1196         if (ast_strlen_zero(data)) {
1197                 ast_log(LOG_WARNING, "An argument is required to MessageSend()\n");
1198                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1199                 return 0;
1200         }
1201
1202         parse = ast_strdupa(data);
1203         AST_STANDARD_APP_ARGS(args, parse);
1204
1205         if (ast_strlen_zero(args.to)) {
1206                 ast_log(LOG_WARNING, "A 'to' URI is required for MessageSend()\n");
1207                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
1208                 return 0;
1209         }
1210
1211         ast_channel_lock(chan);
1212
1213         if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
1214                 ast_channel_unlock(chan);
1215                 ast_log(LOG_WARNING, "No message data found on channel to send.\n");
1216                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "FAILURE");
1217                 return 0;
1218         }
1219
1220         msg = ds->data;
1221         ao2_ref(msg, +1);
1222         ast_channel_unlock(chan);
1223
1224         tech_name = ast_strdupa(args.to);
1225         tech_name = strsep(&tech_name, ":");
1226
1227         ast_rwlock_rdlock(&msg_techs_lock);
1228         msg_tech = msg_find_by_tech_name(tech_name);
1229
1230         if (!msg_tech) {
1231                 ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
1232                 pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
1233                 goto exit_cleanup;
1234         }
1235
1236         /*
1237          * The message lock is held here to safely allow the technology
1238          * implementation to access the message fields without worrying
1239          * that they could change.
1240          */
1241         ao2_lock(msg);
1242         res = msg_tech->msg_send(msg, S_OR(args.to, ""), S_OR(args.from, ""));
1243         ao2_unlock(msg);
1244
1245         pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
1246
1247 exit_cleanup:
1248         ast_rwlock_unlock(&msg_techs_lock);
1249         ao2_ref(msg, -1);
1250
1251         return 0;
1252 }
1253
1254 static int action_messagesend(struct mansession *s, const struct message *m)
1255 {
1256         const char *to = ast_strdupa(astman_get_header(m, "To"));
1257         const char *from = astman_get_header(m, "From");
1258         const char *body = astman_get_header(m, "Body");
1259         const char *base64body = astman_get_header(m, "Base64Body");
1260         char base64decoded[1301] = { 0, };
1261         char *tech_name = NULL;
1262         struct ast_variable *vars = NULL;
1263         struct ast_variable *data = NULL;
1264         const struct ast_msg_tech *msg_tech;
1265         struct ast_msg *msg;
1266         int res = -1;
1267
1268         if (ast_strlen_zero(to)) {
1269                 astman_send_error(s, m, "No 'To' address specified.");
1270                 return 0;
1271         }
1272
1273         if (!ast_strlen_zero(base64body)) {
1274                 ast_base64decode((unsigned char *) base64decoded, base64body, sizeof(base64decoded) - 1);
1275                 body = base64decoded;
1276         }
1277
1278         tech_name = ast_strdupa(to);
1279         tech_name = strsep(&tech_name, ":");
1280
1281         ast_rwlock_rdlock(&msg_techs_lock);
1282         msg_tech = msg_find_by_tech_name(tech_name);
1283         if (!msg_tech) {
1284                 ast_rwlock_unlock(&msg_techs_lock);
1285                 astman_send_error(s, m, "Message technology not found.");
1286                 return 0;
1287         }
1288
1289         if (!(msg = ast_msg_alloc())) {
1290                 ast_rwlock_unlock(&msg_techs_lock);
1291                 astman_send_error(s, m, "Internal failure\n");
1292                 return 0;
1293         }
1294
1295         data = astman_get_variables_order(m, ORDER_NATURAL);
1296         for (vars = data; vars; vars = vars->next) {
1297                 ast_msg_set_var_outbound(msg, vars->name, vars->value);
1298         }
1299
1300         ast_msg_set_body(msg, "%s", body);
1301
1302         res = msg_tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1303
1304         ast_rwlock_unlock(&msg_techs_lock);
1305
1306         ast_variables_destroy(vars);
1307         ao2_ref(msg, -1);
1308
1309         if (res) {
1310                 astman_send_error(s, m, "Message failed to send.");
1311         } else {
1312                 astman_send_ack(s, m, "Message successfully sent");
1313         }
1314         return 0;
1315 }
1316
1317 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
1318 {
1319         char *tech_name = NULL;
1320         const struct ast_msg_tech *msg_tech;
1321         int res = -1;
1322
1323         if (ast_strlen_zero(to)) {
1324                 ao2_ref(msg, -1);
1325                 return -1;
1326         }
1327
1328         tech_name = ast_strdupa(to);
1329         tech_name = strsep(&tech_name, ":");
1330
1331         ast_rwlock_rdlock(&msg_techs_lock);
1332         msg_tech = msg_find_by_tech_name(tech_name);
1333
1334         if (!msg_tech) {
1335                 ast_log(LOG_ERROR, "Unknown message tech: %s\n", tech_name);
1336                 ast_rwlock_unlock(&msg_techs_lock);
1337                 return -1;
1338         }
1339
1340         res = msg_tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
1341
1342         ast_rwlock_unlock(&msg_techs_lock);
1343
1344         ao2_ref(msg, -1);
1345
1346         return res;
1347 }
1348
1349 int ast_msg_tech_register(const struct ast_msg_tech *tech)
1350 {
1351         const struct ast_msg_tech *match;
1352
1353         ast_rwlock_wrlock(&msg_techs_lock);
1354
1355         match = msg_find_by_tech_name(tech->name);
1356         if (match) {
1357                 ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
1358                         tech->name);
1359                 ast_rwlock_unlock(&msg_techs_lock);
1360                 return -1;
1361         }
1362
1363         AST_VECTOR_APPEND(&msg_techs, tech);
1364         ast_verb(3, "Message technology '%s' registered.\n", tech->name);
1365
1366         ast_rwlock_unlock(&msg_techs_lock);
1367
1368         return 0;
1369 }
1370
1371 /*!
1372  * \brief Comparison callback for \c ast_msg_tech vector removal
1373  *
1374  * \param vec_elem The element in the vector being compared
1375  * \param srch The element being looked up
1376  *
1377  * \retval non-zero The items are equal
1378  * \retval 0 The items are not equal
1379  */
1380 static int msg_tech_cmp(const struct ast_msg_tech *vec_elem, const struct ast_msg_tech *srch)
1381 {
1382         return !strcmp(vec_elem->name, srch->name);
1383 }
1384
1385 int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
1386 {
1387         int match;
1388
1389         ast_rwlock_wrlock(&msg_techs_lock);
1390         match = AST_VECTOR_REMOVE_CMP_UNORDERED(&msg_techs, tech, msg_tech_cmp,
1391                                                 AST_VECTOR_ELEM_CLEANUP_NOOP);
1392         ast_rwlock_unlock(&msg_techs_lock);
1393
1394         if (match) {
1395                 ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
1396                 return -1;
1397         }
1398
1399         ast_verb(2, "Message technology '%s' unregistered.\n", tech->name);
1400
1401         return 0;
1402 }
1403
1404 int ast_msg_handler_register(const struct ast_msg_handler *handler)
1405 {
1406         const struct ast_msg_handler *match;
1407
1408         ast_rwlock_wrlock(&msg_handlers_lock);
1409
1410         match = msg_handler_find_by_tech_name(handler->name);
1411         if (match) {
1412                 ast_log(LOG_ERROR, "Message handler already registered for '%s'\n",
1413                         handler->name);
1414                 ast_rwlock_unlock(&msg_handlers_lock);
1415                 return -1;
1416         }
1417
1418         AST_VECTOR_APPEND(&msg_handlers, handler);
1419         ast_verb(2, "Message handler '%s' registered.\n", handler->name);
1420
1421         ast_rwlock_unlock(&msg_handlers_lock);
1422
1423         return 0;
1424
1425 }
1426
1427 /*!
1428  * \brief Comparison callback for \c ast_msg_handler vector removal
1429  *
1430  * \param vec_elem The element in the vector being compared
1431  * \param srch The element being looked up
1432  *
1433  * \retval non-zero The items are equal
1434  * \retval 0 The items are not equal
1435  */
1436 static int msg_handler_cmp(const struct ast_msg_handler *vec_elem, const struct ast_msg_handler *srch)
1437 {
1438         return !strcmp(vec_elem->name, srch->name);
1439 }
1440
1441 int ast_msg_handler_unregister(const struct ast_msg_handler *handler)
1442 {
1443         int match;
1444
1445         ast_rwlock_wrlock(&msg_handlers_lock);
1446         match = AST_VECTOR_REMOVE_CMP_UNORDERED(&msg_handlers, handler, msg_handler_cmp,
1447                                                 AST_VECTOR_ELEM_CLEANUP_NOOP);
1448         ast_rwlock_unlock(&msg_handlers_lock);
1449
1450         if (match) {
1451                 ast_log(LOG_ERROR, "No '%s' message handler found.\n", handler->name);
1452                 return -1;
1453         }
1454
1455         ast_verb(3, "Message handler '%s' unregistered.\n", handler->name);
1456         return 0;
1457 }
1458
1459 void ast_msg_shutdown(void)
1460 {
1461         if (msg_q_tp) {
1462                 msg_q_tp = ast_taskprocessor_unreference(msg_q_tp);
1463         }
1464 }
1465
1466 /*!
1467  * \internal
1468  * \brief Clean up other resources on Asterisk shutdown
1469  *
1470  * \note This does not include the msg_q_tp object, which must be disposed
1471  * of prior to Asterisk checking for channel destruction in its shutdown
1472  * sequence.  The atexit handlers are executed after this occurs.
1473  */
1474 static void message_shutdown(void)
1475 {
1476         ast_msg_handler_unregister(&dialplan_msg_handler);
1477
1478         ast_custom_function_unregister(&msg_function);
1479         ast_custom_function_unregister(&msg_data_function);
1480         ast_unregister_application(app_msg_send);
1481         ast_manager_unregister("MessageSend");
1482
1483         AST_VECTOR_FREE(&msg_techs);
1484         ast_rwlock_destroy(&msg_techs_lock);
1485
1486         AST_VECTOR_FREE(&msg_handlers);
1487         ast_rwlock_destroy(&msg_handlers_lock);
1488 }
1489
1490 /*
1491  * \internal
1492  * \brief Initialize stuff during Asterisk startup.
1493  *
1494  * Cleanup isn't a big deal in this function.  If we return non-zero,
1495  * Asterisk is going to exit.
1496  *
1497  * \retval 0 success
1498  * \retval non-zero failure
1499  */
1500 int ast_msg_init(void)
1501 {
1502         int res;
1503
1504         msg_q_tp = ast_taskprocessor_get("ast_msg_queue", TPS_REF_DEFAULT);
1505         if (!msg_q_tp) {
1506                 return -1;
1507         }
1508
1509         ast_rwlock_init(&msg_techs_lock);
1510         if (AST_VECTOR_INIT(&msg_techs, 8)) {
1511                 return -1;
1512         }
1513
1514         ast_rwlock_init(&msg_handlers_lock);
1515         if (AST_VECTOR_INIT(&msg_handlers, 4)) {
1516                 return -1;
1517         }
1518
1519         res = ast_msg_handler_register(&dialplan_msg_handler);
1520
1521         res |= __ast_custom_function_register(&msg_function, NULL);
1522         res |= __ast_custom_function_register(&msg_data_function, NULL);
1523         res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
1524         res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
1525
1526         ast_register_cleanup(message_shutdown);
1527
1528         return res;
1529 }