res_pjsip_pubsub: Solidify lifetime and ownership of objects.
[asterisk/asterisk.git] / res / res_pjsip_mwi.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <depend>res_pjsip_pubsub</depend>
23         <support_level>core</support_level>
24  ***/
25
26 #include "asterisk.h"
27
28 #include <pjsip.h>
29 #include <pjsip_simple.h>
30 #include <pjlib.h>
31
32 #include "asterisk/res_pjsip.h"
33 #include "asterisk/res_pjsip_pubsub.h"
34 #include "asterisk/res_pjsip_body_generator_types.h"
35 #include "asterisk/module.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/sorcery.h"
39 #include "asterisk/stasis.h"
40 #include "asterisk/app.h"
41
42 struct mwi_subscription;
43 static struct ao2_container *unsolicited_mwi;
44
45 #define STASIS_BUCKETS 13
46 #define MWI_BUCKETS 53
47
48 #define MWI_TYPE "application"
49 #define MWI_SUBTYPE "simple-message-summary"
50
51 #define MWI_DATASTORE "MWI datastore"
52
53 static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
54 static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
55 static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
56                 const char *resource);
57 static int mwi_subscription_established(struct ast_sip_subscription *sub);
58 static void *mwi_get_notify_data(struct ast_sip_subscription *sub);
59
60 static struct ast_sip_notifier mwi_notifier = {
61         .default_accept = MWI_TYPE"/"MWI_SUBTYPE,
62         .new_subscribe = mwi_new_subscribe,
63         .subscription_established = mwi_subscription_established,
64         .get_notify_data = mwi_get_notify_data,
65 };
66
67 static struct ast_sip_subscription_handler mwi_handler = {
68         .event_name = "message-summary",
69         .body_type = AST_SIP_MESSAGE_ACCUMULATOR,
70         .accept = { MWI_TYPE"/"MWI_SUBTYPE, },
71         .subscription_shutdown = mwi_subscription_shutdown,
72         .to_ami = mwi_to_ami,
73         .notifier = &mwi_notifier,
74 };
75
76 /*!
77  * \brief Wrapper for stasis subscription
78  *
79  * An MWI subscription has a container of these. This
80  * represents a stasis subscription for MWI state.
81  */
82 struct mwi_stasis_subscription {
83         /*! The MWI stasis subscription */
84         struct stasis_subscription *stasis_sub;
85         /*! The mailbox corresponding with the MWI subscription. Used as a hash key */
86         char mailbox[1];
87 };
88
89 /*!
90  * \brief A subscription for MWI
91  *
92  * This subscription is the basis for MWI for an endpoint. Each
93  * endpoint that uses MWI will have a corresponding mwi_subscription.
94  *
95  * This structure acts as the owner for the underlying SIP subscription.
96  * When the mwi_subscription is destroyed, the SIP subscription dies, too.
97  * The mwi_subscription's lifetime is governed by its underlying stasis
98  * subscriptions. When all stasis subscriptions are destroyed, the
99  * mwi_subscription is destroyed as well.
100  */
101 struct mwi_subscription {
102         /*! Container of \ref mwi_stasis_subscription structures.
103          * A single MWI subscription may be for multiple mailboxes, thus
104          * requiring multiple stasis subscriptions
105          */
106         struct ao2_container *stasis_subs;
107         /*! The SIP subscription. Unsolicited MWI does not use this */
108         struct ast_sip_subscription *sip_sub;
109         /*! AORs we should react to for unsolicited MWI NOTIFY */
110         char *aors;
111         /*! Is the MWI solicited (i.e. Initiated with an external SUBSCRIBE) ? */
112         unsigned int is_solicited;
113         /*! Identifier for the subscription.
114          * The identifier is the same as the corresponding endpoint's stasis ID.
115          * Used as a hash key
116          */
117         char id[1];
118 };
119
120 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
121                 struct stasis_message *msg);
122
123 static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub)
124 {
125         struct mwi_stasis_subscription *mwi_stasis_sub;
126         struct stasis_topic *topic;
127
128         if (!mwi_sub) {
129                 return NULL;
130         }
131
132         mwi_stasis_sub = ao2_alloc(sizeof(*mwi_stasis_sub) + strlen(mailbox), NULL);
133         if (!mwi_stasis_sub) {
134                 return NULL;
135         }
136
137         topic = ast_mwi_topic(mailbox);
138
139         /* Safe strcpy */
140         strcpy(mwi_stasis_sub->mailbox, mailbox);
141
142         ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n",
143                 mailbox, mwi_sub->id);
144         ao2_ref(mwi_sub, +1);
145         mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub);
146         if (!mwi_stasis_sub->stasis_sub) {
147                 /* Failed to subscribe. */
148                 ao2_ref(mwi_stasis_sub, -1);
149                 ao2_ref(mwi_sub, -1);
150                 mwi_stasis_sub = NULL;
151         }
152         return mwi_stasis_sub;
153 }
154
155 static int stasis_sub_hash(const void *obj, const int flags)
156 {
157         const struct mwi_stasis_subscription *object;
158         const char *key;
159
160         switch (flags & OBJ_SEARCH_MASK) {
161         case OBJ_SEARCH_KEY:
162                 key = obj;
163                 break;
164         case OBJ_SEARCH_OBJECT:
165                 object = obj;
166                 key = object->mailbox;
167                 break;
168         default:
169                 ast_assert(0);
170                 return 0;
171         }
172         return ast_str_hash(key);
173 }
174
175 static int stasis_sub_cmp(void *obj, void *arg, int flags)
176 {
177         const struct mwi_stasis_subscription *sub_left = obj;
178         const struct mwi_stasis_subscription *sub_right = arg;
179         const char *right_key = arg;
180         int cmp;
181
182         switch (flags & OBJ_SEARCH_MASK) {
183         case OBJ_SEARCH_OBJECT:
184                 right_key = sub_right->mailbox;
185                 /* Fall through */
186         case OBJ_SEARCH_KEY:
187                 cmp = strcmp(sub_left->mailbox, right_key);
188                 break;
189         case OBJ_SEARCH_PARTIAL_KEY:
190                 cmp = strncmp(sub_left->mailbox, right_key, strlen(right_key));
191                 break;
192         default:
193                 cmp = 0;
194                 break;
195         }
196         if (cmp) {
197                 return 0;
198         }
199         return CMP_MATCH;
200 }
201
202 static void mwi_subscription_destructor(void *obj)
203 {
204         struct mwi_subscription *sub = obj;
205
206         ast_debug(3, "Destroying MWI subscription for endpoint %s\n", sub->id);
207         if (sub->is_solicited) {
208                 ast_sip_subscription_destroy(sub->sip_sub);
209         }
210         ao2_cleanup(sub->stasis_subs);
211         ast_free(sub->aors);
212 }
213
214 static struct mwi_subscription *mwi_subscription_alloc(struct ast_sip_endpoint *endpoint,
215                 unsigned int is_solicited, struct ast_sip_subscription *sip_sub)
216 {
217         struct mwi_subscription *sub;
218         const char *endpoint_id = ast_sorcery_object_get_id(endpoint);
219
220         sub = ao2_alloc(sizeof(*sub) + strlen(endpoint_id),
221                         mwi_subscription_destructor);
222
223         if (!sub) {
224                 return NULL;
225         }
226
227         /* Safe strcpy */
228         strcpy(sub->id, endpoint_id);
229
230         /* Unsolicited MWI doesn't actually result in a SIP subscription being
231          * created. This is because a SIP subscription associates with a dialog.
232          * Most devices expect unsolicited MWI NOTIFYs to appear out of dialog. If
233          * they receive an in-dialog MWI NOTIFY (i.e. with a to-tag), then they
234          * will reject the NOTIFY with a 481, thus resulting in message-waiting
235          * state not being updated on the device
236          */
237         if (is_solicited) {
238                 sub->sip_sub = sip_sub;
239         }
240
241         sub->stasis_subs = ao2_container_alloc(STASIS_BUCKETS, stasis_sub_hash, stasis_sub_cmp);
242         if (!sub->stasis_subs) {
243                 ao2_cleanup(sub);
244                 return NULL;
245         }
246         sub->is_solicited = is_solicited;
247
248         if (!is_solicited && !ast_strlen_zero(endpoint->aors)) {
249                 sub->aors = ast_strdup(endpoint->aors);
250                 if (!sub->aors) {
251                         ao2_ref(sub, -1);
252                         return NULL;
253                 }
254         }
255
256         ast_debug(3, "Created %s MWI subscription for endpoint %s\n", is_solicited ? "solicited" : "unsolicited", sub->id);
257
258         return sub;
259 }
260
261 static int mwi_sub_hash(const void *obj, const int flags)
262 {
263         const struct mwi_subscription *object;
264         const char *key;
265
266         switch (flags & OBJ_SEARCH_MASK) {
267         case OBJ_SEARCH_KEY:
268                 key = obj;
269                 break;
270         case OBJ_SEARCH_OBJECT:
271                 object = obj;
272                 key = object->id;
273                 break;
274         default:
275                 ast_assert(0);
276                 return 0;
277         }
278         return ast_str_hash(key);
279 }
280
281 static int mwi_sub_cmp(void *obj, void *arg, int flags)
282 {
283         const struct mwi_subscription *sub_left = obj;
284         const struct mwi_subscription *sub_right = arg;
285         const char *right_key = arg;
286         int cmp;
287
288         switch (flags & OBJ_SEARCH_MASK) {
289         case OBJ_SEARCH_OBJECT:
290                 right_key = sub_right->id;
291                 /* Fall through */
292         case OBJ_SEARCH_KEY:
293                 cmp = strcmp(sub_left->id, right_key);
294                 break;
295         case OBJ_SEARCH_PARTIAL_KEY:
296                 cmp = strncmp(sub_left->id, right_key, strlen(right_key));
297                 break;
298         default:
299                 cmp = 0;
300                 break;
301         }
302         if (cmp) {
303                 return 0;
304         }
305         return CMP_MATCH;
306 }
307
308 static int get_message_count(void *obj, void *arg, int flags)
309 {
310         struct stasis_message *msg;
311         struct mwi_stasis_subscription *mwi_stasis = obj;
312         struct ast_sip_message_accumulator *counter = arg;
313         struct ast_mwi_state *mwi_state;
314
315         msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), mwi_stasis->mailbox);
316         if (!msg) {
317                 return 0;
318         }
319
320         mwi_state = stasis_message_data(msg);
321         counter->old_msgs += mwi_state->old_msgs;
322         counter->new_msgs += mwi_state->new_msgs;
323
324         ao2_ref(msg, -1);
325
326         return 0;
327 }
328
329 struct unsolicited_mwi_data {
330         struct mwi_subscription *sub;
331         struct ast_sip_endpoint *endpoint;
332         pjsip_evsub_state state;
333         const struct ast_sip_body *body;
334 };
335
336 static int send_unsolicited_mwi_notify_to_contact(void *obj, void *arg, int flags)
337 {
338         struct unsolicited_mwi_data *mwi_data = arg;
339         struct mwi_subscription *sub = mwi_data->sub;
340         struct ast_sip_endpoint *endpoint = mwi_data->endpoint;
341         pjsip_evsub_state state = mwi_data->state;
342         const struct ast_sip_body *body = mwi_data->body;
343         struct ast_sip_contact *contact = obj;
344         const char *state_name;
345         pjsip_tx_data *tdata;
346         pjsip_sub_state_hdr *sub_state;
347         pjsip_event_hdr *event;
348         const pjsip_hdr *allow_events = pjsip_evsub_get_allow_events_hdr(NULL);
349
350         if (ast_sip_create_request("NOTIFY", NULL, endpoint, NULL, contact, &tdata)) {
351                 ast_log(LOG_WARNING, "Unable to create unsolicited NOTIFY request to endpoint %s URI %s\n", sub->id, contact->uri);
352                 return 0;
353         }
354
355         if (!ast_strlen_zero(endpoint->subscription.mwi.fromuser)) {
356                 pjsip_fromto_hdr *from = pjsip_msg_find_hdr(tdata->msg, PJSIP_H_FROM, NULL);
357                 pjsip_name_addr *from_name_addr = (pjsip_name_addr *) from->uri;
358                 pjsip_sip_uri *from_uri = pjsip_uri_get_uri(from_name_addr->uri);
359
360                 pj_strdup2(tdata->pool, &from_uri->user, endpoint->subscription.mwi.fromuser);
361         }
362
363         switch (state) {
364         case PJSIP_EVSUB_STATE_ACTIVE:
365                 state_name = "active";
366                 break;
367         case PJSIP_EVSUB_STATE_TERMINATED:
368         default:
369                 state_name = "terminated";
370                 break;
371         }
372
373         sub_state = pjsip_sub_state_hdr_create(tdata->pool);
374         pj_cstr(&sub_state->sub_state, state_name);
375         pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) sub_state);
376
377         event = pjsip_event_hdr_create(tdata->pool);
378         pj_cstr(&event->event_type, "message-summary");
379         pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) event);
380
381         pjsip_msg_add_hdr(tdata->msg, pjsip_hdr_shallow_clone(tdata->pool, allow_events));
382         ast_sip_add_body(tdata, body);
383         ast_sip_send_request(tdata, NULL, endpoint, NULL, NULL);
384
385         return 0;
386 }
387
388 static void send_unsolicited_mwi_notify(struct mwi_subscription *sub,
389                 struct ast_sip_message_accumulator *counter)
390 {
391         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(),
392                                 "endpoint", sub->id), ao2_cleanup);
393         char *endpoint_aors;
394         char *aor_name;
395         struct ast_sip_body body;
396         struct ast_str *body_text;
397         struct ast_sip_body_data body_data = {
398                 .body_type = AST_SIP_MESSAGE_ACCUMULATOR,
399                 .body_data = counter,
400         };
401
402         if (!endpoint) {
403                 ast_log(LOG_WARNING, "Unable to send unsolicited MWI to %s because endpoint does not exist\n",
404                                 sub->id);
405                 return;
406         }
407         if (ast_strlen_zero(endpoint->aors)) {
408                 ast_log(LOG_WARNING, "Unable to send unsolicited MWI to %s because the endpoint has no"
409                                 " configured AORs\n", sub->id);
410                 return;
411         }
412
413         body.type = MWI_TYPE;
414         body.subtype = MWI_SUBTYPE;
415
416         body_text = ast_str_create(64);
417
418         if (!body_text) {
419                 return;
420         }
421
422         if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, &body_data, &body_text)) {
423                 ast_log(LOG_WARNING, "Unable to generate SIP MWI NOTIFY body.\n");
424                 ast_free(body_text);
425                 return;
426         }
427
428         body.body_text = ast_str_buffer(body_text);
429
430         endpoint_aors = ast_strdupa(endpoint->aors);
431
432         ast_debug(5, "Sending unsolicited MWI NOTIFY to endpoint %s, new messages: %d, old messages: %d\n",
433                         sub->id, counter->new_msgs, counter->old_msgs);
434
435         while ((aor_name = strsep(&endpoint_aors, ","))) {
436                 RAII_VAR(struct ast_sip_aor *, aor, ast_sip_location_retrieve_aor(aor_name), ao2_cleanup);
437                 RAII_VAR(struct ao2_container *, contacts, NULL, ao2_cleanup);
438                 struct unsolicited_mwi_data mwi_data = {
439                         .sub = sub,
440                         .endpoint = endpoint,
441                         .body = &body,
442                 };
443
444                 if (!aor) {
445                         ast_log(LOG_WARNING, "Unable to locate AOR %s for unsolicited MWI\n", aor_name);
446                         continue;
447                 }
448
449                 contacts = ast_sip_location_retrieve_aor_contacts(aor);
450                 if (!contacts || (ao2_container_count(contacts) == 0)) {
451                         ast_log(LOG_NOTICE, "No contacts bound to AOR %s. Cannot send unsolicited MWI until a contact registers.\n", aor_name);
452                         continue;
453                 }
454
455                 ao2_callback(contacts, OBJ_NODATA, send_unsolicited_mwi_notify_to_contact, &mwi_data);
456         }
457
458         ast_free(body_text);
459 }
460
461 static void send_mwi_notify(struct mwi_subscription *sub)
462 {
463         struct ast_sip_message_accumulator counter = {
464                 .old_msgs = 0,
465                 .new_msgs = 0,
466         };
467         struct ast_sip_body_data data = {
468                 .body_type = AST_SIP_MESSAGE_ACCUMULATOR,
469                 .body_data = &counter,
470         };
471
472         ao2_callback(sub->stasis_subs, OBJ_NODATA, get_message_count, &counter);
473
474         if (sub->is_solicited) {
475                 ast_sip_subscription_notify(sub->sip_sub, &data, 0);
476                 return;
477         }
478
479         send_unsolicited_mwi_notify(sub, &counter);
480 }
481
482 static int unsubscribe_stasis(void *obj, void *arg, int flags)
483 {
484         struct mwi_stasis_subscription *mwi_stasis = obj;
485         if (mwi_stasis->stasis_sub) {
486                 ast_debug(3, "Removing stasis subscription to mailbox %s\n", mwi_stasis->mailbox);
487                 mwi_stasis->stasis_sub = stasis_unsubscribe_and_join(mwi_stasis->stasis_sub);
488         }
489         return CMP_MATCH;
490 }
491
492 static void mwi_subscription_shutdown(struct ast_sip_subscription *sub)
493 {
494         struct mwi_subscription *mwi_sub;
495         struct ast_datastore *mwi_datastore;
496
497         mwi_datastore = ast_sip_subscription_get_datastore(sub, MWI_DATASTORE);
498         if (!mwi_datastore) {
499                 return;
500         }
501
502         mwi_sub = mwi_datastore->data;
503         ao2_callback(mwi_sub->stasis_subs, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe_stasis, NULL);
504         ast_sip_subscription_remove_datastore(sub, MWI_DATASTORE);
505
506         ao2_ref(mwi_datastore, -1);
507 }
508
509 static void mwi_ds_destroy(void *data)
510 {
511         struct mwi_subscription *sub = data;
512
513         ao2_ref(sub, -1);
514 }
515
516 static struct ast_datastore_info mwi_ds_info = {
517         .destroy = mwi_ds_destroy,
518 };
519
520 static int add_mwi_datastore(struct mwi_subscription *sub)
521 {
522         struct ast_datastore *mwi_datastore;
523         int res;
524
525         mwi_datastore = ast_sip_subscription_alloc_datastore(&mwi_ds_info, MWI_DATASTORE);
526         if (!mwi_datastore) {
527                 return -1;
528         }
529         ao2_ref(sub, +1);
530         mwi_datastore->data = sub;
531
532         /*
533          * NOTE:  Adding the datastore to the subscription creates a ref loop
534          * that must be manually broken.
535          */
536         res = ast_sip_subscription_add_datastore(sub->sip_sub, mwi_datastore);
537         ao2_ref(mwi_datastore, -1);
538         return res;
539 }
540
541 /*!
542  * \brief Determines if an endpoint is receiving unsolicited MWI for a particular mailbox.
543  *
544  * \param endpoint The endpoint to check
545  * \param mailbox The candidate mailbox
546  * \retval 0 The endpoint does not receive unsolicited MWI for this mailbox
547  * \retval 1 The endpoint receives unsolicited MWI for this mailbox
548  */
549 static int endpoint_receives_unsolicited_mwi_for_mailbox(struct ast_sip_endpoint *endpoint,
550                 const char *mailbox)
551 {
552         struct ao2_iterator *mwi_subs;
553         struct mwi_subscription *mwi_sub;
554         const char *endpoint_id = ast_sorcery_object_get_id(endpoint);
555         int ret = 0;
556
557         mwi_subs = ao2_find(unsolicited_mwi, endpoint_id, OBJ_SEARCH_KEY | OBJ_MULTIPLE);
558
559         if (!mwi_subs) {
560                 return 0;
561         }
562
563         for (; (mwi_sub = ao2_iterator_next(mwi_subs)) && !ret; ao2_cleanup(mwi_sub)) {
564                 struct mwi_stasis_subscription *mwi_stasis;
565
566                 mwi_stasis = ao2_find(mwi_sub->stasis_subs, mailbox, OBJ_SEARCH_KEY);
567                 if (mwi_stasis) {
568                         ret = 1;
569                         ao2_cleanup(mwi_stasis);
570                 }
571         }
572
573         ao2_iterator_destroy(mwi_subs);
574         return ret;
575 }
576
577 /*!
578  * \brief Determine if an endpoint is a candidate to be able to subscribe for MWI
579  *
580  * Currently, this just makes sure that the endpoint is not already receiving unsolicted
581  * MWI for any of an AOR's configured mailboxes.
582  *
583  * \param obj The AOR to which the endpoint is subscribing.
584  * \param arg The endpoint that is attempting to subscribe.
585  * \param flags Unused.
586  * \retval 0 Endpoint is a candidate to subscribe to MWI on the AOR.
587  * \retval -1 The endpoint cannot subscribe to MWI on the AOR.
588  */
589 static int mwi_validate_for_aor(void *obj, void *arg, int flags)
590 {
591         struct ast_sip_aor *aor = obj;
592         struct ast_sip_endpoint *endpoint = arg;
593         char *mailboxes;
594         char *mailbox;
595
596         if (ast_strlen_zero(aor->mailboxes)) {
597                 return 0;
598         }
599
600         mailboxes = ast_strdupa(aor->mailboxes);
601         while ((mailbox = strsep(&mailboxes, ","))) {
602                 if (endpoint_receives_unsolicited_mwi_for_mailbox(endpoint, mailbox)) {
603                         ast_log(LOG_NOTICE, "Endpoint '%s' already configured for unsolicited MWI for mailbox '%s'. "
604                                         "Denying MWI subscription to %s\n", ast_sorcery_object_get_id(endpoint), mailbox,
605                                         ast_sorcery_object_get_id(aor));
606                         return -1;
607                 }
608         }
609
610         return 0;
611 }
612
613 static int mwi_on_aor(void *obj, void *arg, int flags)
614 {
615         struct ast_sip_aor *aor = obj;
616         struct mwi_subscription *sub = arg;
617         char *mailboxes;
618         char *mailbox;
619
620         if (ast_strlen_zero(aor->mailboxes)) {
621                 return 0;
622         }
623
624         mailboxes = ast_strdupa(aor->mailboxes);
625         while ((mailbox = strsep(&mailboxes, ","))) {
626                 struct mwi_stasis_subscription *mwi_stasis_sub;
627
628                 mwi_stasis_sub = mwi_stasis_subscription_alloc(mailbox, sub);
629                 if (!mwi_stasis_sub) {
630                         continue;
631                 }
632
633                 ao2_link(sub->stasis_subs, mwi_stasis_sub);
634                 ao2_ref(mwi_stasis_sub, -1);
635         }
636
637         return 0;
638 }
639
640 static struct mwi_subscription *mwi_create_subscription(
641         struct ast_sip_endpoint *endpoint, struct ast_sip_subscription *sip_sub)
642 {
643         struct mwi_subscription *sub = mwi_subscription_alloc(endpoint, 1, sip_sub);
644
645         if (!sub) {
646                 return NULL;
647         }
648
649         if (add_mwi_datastore(sub)) {
650                 ast_log(LOG_WARNING, "Unable to add datastore for MWI subscription to %s\n",
651                         sub->id);
652                 ao2_ref(sub, -1);
653                 return NULL;
654         }
655
656         return sub;
657 }
658
659 static struct mwi_subscription *mwi_subscribe_single(
660         struct ast_sip_endpoint *endpoint, struct ast_sip_subscription *sip_sub, const char *name)
661 {
662         struct ast_sip_aor *aor;
663         struct mwi_subscription *sub;
664
665         aor = ast_sip_location_retrieve_aor(name);
666         if (!aor) {
667                 /*! I suppose it's possible for the AOR to disappear on us
668                  * between accepting the subscription and sending the first
669                  * NOTIFY...
670                  */
671                 ast_log(LOG_WARNING, "Unable to locate aor %s. MWI subscription failed.\n",
672                         name);
673                 return NULL;
674         }
675
676         sub = mwi_create_subscription(endpoint, sip_sub);
677         if (sub) {
678                 mwi_on_aor(aor, sub, 0);
679         }
680
681         ao2_ref(aor, -1);
682         return sub;
683 }
684
685 static struct mwi_subscription *mwi_subscribe_all(
686         struct ast_sip_endpoint *endpoint, struct ast_sip_subscription *sip_sub)
687 {
688         struct mwi_subscription *sub;
689
690         sub = mwi_create_subscription(endpoint, sip_sub);
691         if (!sub) {
692                 return NULL;
693         }
694
695         ast_sip_for_each_aor(endpoint->aors, mwi_on_aor, sub);
696         return sub;
697 }
698
699 static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
700                 const char *resource)
701 {
702         RAII_VAR(struct ast_sip_aor *, aor, NULL, ao2_cleanup);
703
704         if (ast_strlen_zero(resource)) {
705                 if (ast_sip_for_each_aor(endpoint->aors, mwi_validate_for_aor, endpoint)) {
706                         return 500;
707                 }
708                 return 200;
709         }
710
711         aor = ast_sip_location_retrieve_aor(resource);
712         if (!aor) {
713                 ast_log(LOG_WARNING, "Unable to locate aor %s. MWI subscription failed.\n",
714                         resource);
715                 return 404;
716         }
717
718         if (ast_strlen_zero(aor->mailboxes)) {
719                 ast_log(LOG_NOTICE, "AOR %s has no configured mailboxes. MWI subscription failed.\n",
720                         resource);
721                 return 404;
722         }
723
724         if (mwi_validate_for_aor(aor, endpoint, 0)) {
725                 return 500;
726         }
727
728         return 200;
729 }
730
731 static int mwi_subscription_established(struct ast_sip_subscription *sip_sub)
732 {
733         const char *resource = ast_sip_subscription_get_resource_name(sip_sub);
734         struct mwi_subscription *sub;
735         struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sip_sub);
736
737         /* no aor in uri? subscribe to all on endpoint */
738         if (ast_strlen_zero(resource)) {
739                 sub = mwi_subscribe_all(endpoint, sip_sub);
740         } else {
741                 sub = mwi_subscribe_single(endpoint, sip_sub, resource);
742         }
743         if (!sub) {
744                 ao2_cleanup(endpoint);
745                 return -1;
746         }
747
748         if (!ao2_container_count(sub->stasis_subs)) {
749                 /*
750                  * We setup no MWI subscriptions so remove the MWI datastore
751                  * to break the ref loop.
752                  */
753                 ast_sip_subscription_remove_datastore(sip_sub, MWI_DATASTORE);
754         }
755
756         ao2_cleanup(sub);
757         ao2_cleanup(endpoint);
758         return 0;
759 }
760
761 static void *mwi_get_notify_data(struct ast_sip_subscription *sub)
762 {
763         struct ast_sip_message_accumulator *counter;
764         struct mwi_subscription *mwi_sub;
765         struct ast_datastore *mwi_datastore;
766
767         mwi_datastore = ast_sip_subscription_get_datastore(sub, MWI_DATASTORE);
768         if (!mwi_datastore) {
769                 return NULL;
770         }
771         mwi_sub = mwi_datastore->data;
772
773         counter = ao2_alloc(sizeof(*counter), NULL);
774         if (!counter) {
775                 ao2_cleanup(mwi_datastore);
776                 return NULL;
777         }
778
779         ao2_callback(mwi_sub->stasis_subs, OBJ_NODATA, get_message_count, counter);
780         ao2_cleanup(mwi_datastore);
781         return counter;
782 }
783
784 static void mwi_subscription_mailboxes_str(struct ao2_container *stasis_subs,
785                                            struct ast_str **str)
786 {
787         int is_first = 1;
788         struct mwi_stasis_subscription *node;
789         struct ao2_iterator i = ao2_iterator_init(stasis_subs, 0);
790
791         while ((node = ao2_iterator_next(&i))) {
792                 if (is_first) {
793                         is_first = 0;
794                         ast_str_append(str, 0, "%s", node->mailbox);
795                 } else {
796                         ast_str_append(str, 0, ",%s", node->mailbox);
797                 }
798                 ao2_ref(node, -1);
799         }
800         ao2_iterator_destroy(&i);
801 }
802
803 static void mwi_to_ami(struct ast_sip_subscription *sub,
804                        struct ast_str **buf)
805 {
806         struct mwi_subscription *mwi_sub;
807         struct ast_datastore *mwi_datastore;
808
809         mwi_datastore = ast_sip_subscription_get_datastore(sub, MWI_DATASTORE);
810         if (!mwi_datastore) {
811                 return;
812         }
813
814         mwi_sub = mwi_datastore->data;
815
816         ast_str_append(buf, 0, "SubscriptionType: mwi\r\n");
817         ast_str_append(buf, 0, "Mailboxes: ");
818         mwi_subscription_mailboxes_str(mwi_sub->stasis_subs, buf);
819         ast_str_append(buf, 0, "\r\n");
820
821         ao2_ref(mwi_datastore, -1);
822 }
823
824 static int serialized_notify(void *userdata)
825 {
826         struct mwi_subscription *mwi_sub = userdata;
827
828         send_mwi_notify(mwi_sub);
829         ao2_ref(mwi_sub, -1);
830         return 0;
831 }
832
833 static int serialized_cleanup(void *userdata)
834 {
835         struct mwi_subscription *mwi_sub = userdata;
836
837         /* This is getting rid of the reference that was added
838          * just before this serialized task was pushed.
839          */
840         ao2_cleanup(mwi_sub);
841         /* This is getting rid of the reference held by the
842          * stasis subscription
843          */
844         ao2_cleanup(mwi_sub);
845         return 0;
846 }
847
848 static int send_notify(void *obj, void *arg, int flags)
849 {
850         struct mwi_subscription *mwi_sub = obj;
851         struct ast_taskprocessor *serializer = mwi_sub->is_solicited
852                 ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
853                 : NULL;
854
855         if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
856                 ao2_ref(mwi_sub, -1);
857         }
858
859         return 0;
860 }
861
862 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
863                 struct stasis_message *msg)
864 {
865         struct mwi_subscription *mwi_sub = userdata;
866
867         if (stasis_subscription_final_message(sub, msg)) {
868                 if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) {
869                         ao2_ref(mwi_sub, -1);
870                 }
871                 return;
872         }
873
874         if (ast_mwi_state_type() == stasis_message_type(msg)) {
875                 send_notify(mwi_sub, NULL, 0);
876         }
877 }
878
879 /*! \note Called with the unsolicited_mwi conainer lock held. */
880 static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, int flags)
881 {
882         RAII_VAR(struct mwi_subscription *, aggregate_sub, NULL, ao2_cleanup);
883         struct ast_sip_endpoint *endpoint = obj;
884         char *endpoint_aors, *aor_name, *mailboxes, *mailbox;
885         struct ao2_container *contacts = NULL;
886
887         if (ast_strlen_zero(endpoint->subscription.mwi.mailboxes)) {
888                 return 0;
889         }
890
891         endpoint_aors = ast_strdupa(endpoint->aors);
892
893         while ((aor_name = strsep(&endpoint_aors, ","))) {
894                 RAII_VAR(struct ast_sip_aor *, aor, ast_sip_location_retrieve_aor(aor_name), ao2_cleanup);
895
896                 if (!aor) {
897                         continue;
898                 }
899
900                 contacts = ast_sip_location_retrieve_aor_contacts(aor);
901                 if (!contacts || (ao2_container_count(contacts) == 0)) {
902                         ao2_cleanup(contacts);
903                         contacts = NULL;
904                         continue;
905                 }
906
907                 break;
908         }
909
910         if (!contacts) {
911                 return 0;
912         }
913
914         ao2_ref(contacts, -1);
915
916         if (endpoint->subscription.mwi.aggregate) {
917                 aggregate_sub = mwi_subscription_alloc(endpoint, 0, NULL);
918                 if (!aggregate_sub) {
919                         return 0;
920                 }
921         }
922
923         mailboxes = ast_strdupa(endpoint->subscription.mwi.mailboxes);
924         while ((mailbox = strsep(&mailboxes, ","))) {
925                 struct mwi_subscription *sub = aggregate_sub ?:
926                         mwi_subscription_alloc(endpoint, 0, NULL);
927                 struct mwi_stasis_subscription *mwi_stasis_sub;
928
929                 mwi_stasis_sub = mwi_stasis_subscription_alloc(mailbox, sub);
930                 if (mwi_stasis_sub) {
931                         ao2_link(sub->stasis_subs, mwi_stasis_sub);
932                         ao2_ref(mwi_stasis_sub, -1);
933                 }
934                 if (!aggregate_sub && sub) {
935                         ao2_link_flags(unsolicited_mwi, sub, OBJ_NOLOCK);
936                         ao2_ref(sub, -1);
937                 }
938         }
939         if (aggregate_sub) {
940                 ao2_link_flags(unsolicited_mwi, aggregate_sub, OBJ_NOLOCK);
941         }
942         return 0;
943 }
944
945 static int unsubscribe(void *obj, void *arg, int flags)
946 {
947         struct mwi_subscription *mwi_sub = obj;
948
949         ao2_callback(mwi_sub->stasis_subs, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe_stasis, NULL);
950
951         return CMP_MATCH;
952 }
953
954 static void create_mwi_subscriptions(void)
955 {
956         struct ao2_container *endpoints;
957
958         endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "endpoint",
959                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
960         if (!endpoints) {
961                 return;
962         }
963
964         /* We remove all the old stasis subscriptions first before applying the new configuration. This
965          * prevents a situation where there might be multiple overlapping stasis subscriptions for an
966          * endpoint for mailboxes. Though there may be mailbox changes during the gap between unsubscribing
967          * and resubscribing, up-to-date mailbox state will be sent out to the endpoint when the
968          * new stasis subscription is established
969          */
970         ao2_lock(unsolicited_mwi);
971         ao2_callback(unsolicited_mwi, OBJ_NOLOCK | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
972         ao2_callback(endpoints, OBJ_NODATA, create_mwi_subscriptions_for_endpoint, NULL);
973         ao2_unlock(unsolicited_mwi);
974
975         ao2_ref(endpoints, -1);
976 }
977
978 /*! \brief Function called to send MWI NOTIFY on any unsolicited mailboxes relating to this AOR */
979 static int send_contact_notify(void *obj, void *arg, int flags)
980 {
981         struct mwi_subscription *mwi_sub = obj;
982         const char *aor = arg;
983
984         if (!mwi_sub->aors || !strstr(mwi_sub->aors, aor)) {
985                 return 0;
986         }
987
988         if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
989                 ao2_ref(mwi_sub, -1);
990         }
991
992         return 0;
993 }
994
995 /*! \brief Function called when a contact is updated */
996 static void mwi_contact_updated(const void *object)
997 {
998         char *id = ast_strdupa(ast_sorcery_object_get_id(object)), *aor = NULL;
999
1000         aor = strsep(&id, ";@");
1001
1002         ao2_callback(unsolicited_mwi, OBJ_NODATA, send_contact_notify, aor);
1003 }
1004
1005 /*! \brief Function called when a contact is added */
1006 static void mwi_contact_added(const void *object)
1007 {
1008         const struct ast_sip_contact *contact = object;
1009         struct ao2_iterator *mwi_subs;
1010         struct mwi_subscription *mwi_sub;
1011         const char *endpoint_id = ast_sorcery_object_get_id(contact->endpoint);
1012
1013         if (ast_strlen_zero(contact->endpoint->subscription.mwi.mailboxes)) {
1014                 return;
1015         }
1016
1017         ao2_lock(unsolicited_mwi);
1018
1019         mwi_subs = ao2_find(unsolicited_mwi, endpoint_id, OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK | OBJ_UNLINK);
1020         if (mwi_subs) {
1021                 for (; (mwi_sub = ao2_iterator_next(mwi_subs)); ao2_cleanup(mwi_sub)) {
1022                         unsubscribe(mwi_sub, NULL, 0);
1023                 }
1024                 ao2_iterator_destroy(mwi_subs);
1025         }
1026
1027         create_mwi_subscriptions_for_endpoint(contact->endpoint, NULL, 0);
1028
1029         ao2_unlock(unsolicited_mwi);
1030
1031         mwi_contact_updated(object);
1032 }
1033
1034 /*! \brief Observer for contacts so unsolicited MWI is sent when a contact changes */
1035 static const struct ast_sorcery_observer mwi_contact_observer = {
1036         .created = mwi_contact_added,
1037         .updated = mwi_contact_updated,
1038 };
1039
1040 /*! \brief Task invoked to send initial MWI NOTIFY for unsolicited */
1041 static int send_initial_notify_all(void *obj)
1042 {
1043         ao2_callback(unsolicited_mwi, OBJ_NODATA, send_notify, NULL);
1044
1045         return 0;
1046 }
1047
1048 /*! \brief Event callback which fires initial unsolicited MWI NOTIFY messages when we're fully booted */
1049 static void mwi_startup_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1050 {
1051         struct ast_json_payload *payload;
1052         const char *type;
1053
1054         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1055                 return;
1056         }
1057
1058         payload = stasis_message_data(message);
1059         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1060
1061         if (strcmp(type, "FullyBooted")) {
1062                 return;
1063         }
1064
1065         ast_sip_push_task(NULL, send_initial_notify_all, NULL);
1066
1067         stasis_unsubscribe(sub);
1068 }
1069
1070 static int reload(void)
1071 {
1072         create_mwi_subscriptions();
1073         return 0;
1074 }
1075
1076 static int load_module(void)
1077 {
1078         CHECK_PJSIP_MODULE_LOADED();
1079
1080         if (ast_sip_register_subscription_handler(&mwi_handler)) {
1081                 return AST_MODULE_LOAD_DECLINE;
1082         }
1083
1084         unsolicited_mwi = ao2_container_alloc(MWI_BUCKETS, mwi_sub_hash, mwi_sub_cmp);
1085         if (!unsolicited_mwi) {
1086                 ast_sip_unregister_subscription_handler(&mwi_handler);
1087                 return AST_MODULE_LOAD_DECLINE;
1088         }
1089
1090         create_mwi_subscriptions();
1091         ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
1092
1093         if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
1094                 ast_sip_push_task(NULL, send_initial_notify_all, NULL);
1095         } else {
1096                 stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
1097         }
1098
1099         return AST_MODULE_LOAD_SUCCESS;
1100 }
1101
1102 static int unload_module(void)
1103 {
1104         ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
1105         ao2_ref(unsolicited_mwi, -1);
1106         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
1107         ast_sip_unregister_subscription_handler(&mwi_handler);
1108         return 0;
1109 }
1110
1111 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP MWI resource",
1112         .support_level = AST_MODULE_SUPPORT_CORE,
1113         .load = load_module,
1114         .unload = unload_module,
1115         .reload = reload,
1116         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1117 );