Multiple revisions 399887,400138,400178,400180-400181
authorDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 18:55:27 +0000 (18:55 +0000)
committerDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 18:55:27 +0000 (18:55 +0000)
........
  r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line

  Minor performance bump by not allocate manager variable struct if we don't need it
........
  r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines

  Stasis performance improvements

  This patch addresses several performance problems that were found in
  the initial performance testing of Asterisk 12.

  The Stasis dispatch object was allocated as an AO2 object, even though
  it has a very confined lifecycle. This was replaced with a straight
  ast_malloc().

  The Stasis message router was spending an inordinate amount of time
  searching hash tables. In this case, most of our routers had 6 or
  fewer routes in them to begin with. This was replaced with an array
  that's searched linearly for the route.

  We more heavily rely on AO2 objects in Asterisk 12, and the memset()
  in ao2_ref() actually became noticeable on the profile. This was
  #ifdef'ed to only run when AO2_DEBUG was enabled.

  After being misled by an erroneous comment in taskprocessor.c during
  profiling, the wrong comment was removed.

  Review: https://reviewboard.asterisk.org/r/2873/
........
  r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines

  Taskprocessor optimization; switch Stasis to use taskprocessors

  This patch optimizes taskprocessor to use a semaphore for signaling,
  which the OS can do a better job at managing contention and waiting
  that we can with a mutex and condition.

  The taskprocessor execution was also slightly optimized to reduce the
  number of locks taken.

  The only observable difference in the taskprocessor implementation is
  that when the final reference to the taskprocessor goes away, it will
  execute all tasks to completion instead of discarding the unexecuted
  tasks.

  For systems where unnamed semaphores are not supported, a really
  simple semaphore implementation is provided. (Which gives identical
  performance as the original taskprocessor implementation).

  The way we ended up implementing Stasis caused the threadpool to be a
  burden instead of a boost to performance. This was switched to just
  use taskprocessors directly for subscriptions.

  Review: https://reviewboard.asterisk.org/r/2881/
........
  r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines

  Optimize how Stasis forwards are dispatched

  This patch optimizes how forwards are dispatched in Stasis.

  Originally, forwards were dispatched as subscriptions that are invoked
  on the publishing thread. This did not account for the vast number of
  forwards we would end up having in the system, and the amount of work it
  would take to walk though the forward subscriptions.

  This patch modifies Stasis so that rather than walking the tree of
  forwards on every dispatch, when forwards and subscriptions are changed,
  the subscriber list for every topic in the tree is changed.

  This has a couple of benefits. First, this reduces the workload of
  dispatching messages. It also reduces contention when dispatching to
  different topics that happen to forward to the same aggregation topic
  (as happens with all of the channel, bridge and endpoint topics).

  Since forwards are no longer subscriptions, the bulk of this patch is
  simply changing stasis_subscription objects to stasis_forward objects
  (which, admittedly, I should have done in the first place.)

  Since this required me to yet again put in a growing array, I finally
  abstracted that out into a set of ast_vector macros in
  asterisk/vector.h.

  Review: https://reviewboard.asterisk.org/r/2883/
........
  r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines

  Remove dispatch object allocation from Stasis publishing

  While looking for areas for performance improvement, I realized that an
  unused feature in Stasis was negatively impacting performance.

  When a message is sent to a subscriber, a dispatch object is allocated
  for the dispatch, containing the topic the message was published to, the
  subscriber the message is being sent to, and the message itself.

  The topic is actually unused by any subscriber in Asterisk today. And
  the subscriber is associated with the taskprocessor the message is being
  dispatched to.

  First, this patch removes the unused topic parameter from Stasis
  subscription callbacks.

  Second, this patch introduces the concept of taskprocessor local data,
  data that may be set on a taskprocessor and provided along with the data
  pointer when a task is pushed using the ast_taskprocessor_push_local()
  call. This allows the task to have both data specific to that
  taskprocessor, in addition to data specific to that invocation.

  With those two changes, the dispatch object can be removed completely,
  and the message is simply refcounted and sent directly to the
  taskprocessor.

  Review: https://reviewboard.asterisk.org/r/2884/
........

Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3

62 files changed:
apps/app_meetme.c
apps/app_queue.c
apps/app_voicemail.c
apps/confbridge/confbridge_manager.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
configs/stasis.conf.sample [deleted file]
configure
configure.ac
funcs/func_presencestate.c
include/asterisk/autoconfig.h.in
include/asterisk/sem.h [new file with mode: 0644]
include/asterisk/stasis.h
include/asterisk/stasis_internal.h
include/asterisk/stasis_message_router.h
include/asterisk/taskprocessor.h
include/asterisk/vector.h [new file with mode: 0644]
main/astobj2.c
main/ccss.c
main/cdr.c
main/cel.c
main/channel.c
main/channel_internal_api.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/manager_bridges.c
main/manager_channels.c
main/manager_endpoints.c
main/manager_mwi.c
main/manager_system.c
main/pbx.c
main/sem.c [new file with mode: 0644]
main/sounds_index.c
main/stasis.c
main/stasis_cache.c
main/stasis_cache_pattern.c
main/stasis_config.c [deleted file]
main/stasis_message_router.c
main/stasis_wait.c
main/taskprocessor.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/parking/parking_manager.c
res/res_agi.c
res/res_chan_stats.c
res/res_jabber.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip_mwi.c
res/res_pjsip_refer.c
res/res_security_log.c
res/res_stasis_test.c
res/res_xmpp.c
res/stasis/app.c
tests/test_devicestate.c
tests/test_stasis.c
tests/test_stasis_endpoints.c
tests/test_taskprocessor.c

index e1fedb4..cd6a4f7 100644 (file)
@@ -1139,7 +1139,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talking_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talk_request_type);
 
 static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message);
+       struct stasis_message *message);
 
 static void meetme_stasis_cleanup(void)
 {
@@ -1226,7 +1226,7 @@ static int meetme_stasis_init(void)
 }
 
 static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *channel_blob = stasis_message_data(message);
        struct stasis_message_type *message_type;
index e0889a5..e72997f 100644 (file)
@@ -1832,7 +1832,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type);
 
 static void queue_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -1858,7 +1858,7 @@ static void queue_channel_manager_event(void *data,
 }
 
 static void queue_multi_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -1902,7 +1902,7 @@ static void queue_multi_channel_manager_event(void *data,
 }
 
 static void queue_member_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -2140,7 +2140,7 @@ static int is_member_available(struct call_queue *q, struct member *mem)
 }
 
 /*! \brief set a member's status based on device state of that member's interface*/
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ao2_iterator miter, qiter;
        struct ast_device_state_message *dev_state;
@@ -5185,7 +5185,7 @@ static void send_agent_complete(const char *queuename, struct ast_channel_snapsh
 }
 
 static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct ast_channel_blob *agent_blob;
 
@@ -5401,7 +5401,7 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a
  * \param msg The stasis message for the bridge enter event
  */
 static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
@@ -5434,7 +5434,7 @@ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
  * \param msg The stasis message for the blind transfer event
  */
 static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
@@ -5503,7 +5503,7 @@ static void handle_blind_transfer(void *userdata, struct stasis_subscription *su
  * \param msg The stasis message for the attended transfer event.
  */
 static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
@@ -5558,7 +5558,7 @@ static void handle_attended_transfer(void *userdata, struct stasis_subscription
  * subroutines for further processing.
  */
 static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        if (stasis_subscription_final_message(sub, msg)) {
                ao2_cleanup(userdata);
@@ -5578,7 +5578,7 @@ static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
  * \param msg The stasis message for the local optimization begin event
  */
 static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@@ -5630,7 +5630,7 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr
  * \param msg The stasis message for the local optimization end event
  */
 static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@@ -5695,7 +5695,7 @@ static void handle_local_optimization_end(void *userdata, struct stasis_subscrip
  * \param msg The stasis message for the hangup event.
  */
 static void handle_hangup(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_channel_blob *channel_blob = stasis_message_data(msg);
@@ -5756,7 +5756,7 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
  * subroutines for further processing.
  */
 static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        if (stasis_subscription_final_message(sub, msg)) {
                ao2_cleanup(userdata);
@@ -10336,7 +10336,7 @@ static const struct ast_data_entry queue_data_providers[] = {
 };
 
 static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static int unload_module(void)
 {
@@ -10364,7 +10364,7 @@ static int unload_module(void)
                stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
        }
        stasis_message_router_unsubscribe_and_join(agent_router);
-       topic_forwarder = stasis_unsubscribe(topic_forwarder);
+       topic_forwarder = stasis_forward_cancel(topic_forwarder);
 
        STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
        STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
index c32b781..1ab1645 100644 (file)
@@ -12606,7 +12606,7 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
        }
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct stasis_subscription_change *change;
        /* Only looking for subscription change notices here */
@@ -12629,7 +12629,7 @@ static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct
 static int dump_cache(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
-       mwi_event_cb(NULL, NULL, NULL, msg);
+       mwi_event_cb(NULL, NULL, msg);
        return 0;
 }
 
index 57bf64b..1b8eab2 100644 (file)
@@ -224,63 +224,54 @@ static void confbridge_publish_manager_event(
 }
 
 static void confbridge_start_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeStart", NULL);
 }
 
 static void confbridge_end_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeEnd", NULL);
 }
 
 static void confbridge_leave_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeLeave", NULL);
 }
 
 static void confbridge_join_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeJoin", NULL);
 }
 
 static void confbridge_start_record_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeRecord", NULL);
 }
 
 static void confbridge_stop_record_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeStopRecord", NULL);
 }
 
 static void confbridge_mute_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeMute", NULL);
 }
 
 static void confbridge_unmute_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeUnmute", NULL);
 }
 
 static void confbridge_talking_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, extra_text, NULL, ast_free);
index 5fc9683..f7cfa8c 100644 (file)
@@ -553,7 +553,7 @@ static int restart_monitor(void);
 
 static int dahdi_sendtext(struct ast_channel *c, const char *text);
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
index fb68131..b880f5d 100644 (file)
@@ -1270,8 +1270,8 @@ static void build_rand_pad(unsigned char *buf, ssize_t len);
 static int get_unused_callno(enum callno_type type, int validated, callno_entry *entry);
 static int replace_callno(const void *obj);
 static void sched_delay_remove(struct sockaddr_in *sin, callno_entry entry);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 static struct ast_channel_tech iax2_tech = {
        .type = "IAX2",
@@ -1331,7 +1331,7 @@ static void iax2_lock_owner(int callno)
        }
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* The MWI subscriptions exist just so the core knows we care about those
         * mailboxes.  However, we just grab the events out of the cache when it
@@ -1378,7 +1378,7 @@ static int network_change_sched_cb(const void *data)
 }
 
 static void network_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* This callback is only concerned with network change messages from the system topic. */
        if (stasis_message_type(message) != ast_network_change_type()) {
@@ -1392,7 +1392,7 @@ static void network_change_stasis_cb(void *data, struct stasis_subscription *sub
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index b28fce3..8fcdebf 100644 (file)
@@ -486,7 +486,7 @@ static struct ast_channel_tech mgcp_tech = {
        .func_channel_read = acf_channel_read,
 };
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
index 9cbfd71..2a821dd 100644 (file)
@@ -1324,9 +1324,9 @@ static int sip_poke_noanswer(const void *data);
 static int sip_poke_peer(struct sip_peer *peer, int force);
 static void sip_poke_all_peers(void);
 static void sip_peer_hold(struct sip_pvt *p, int hold);
-static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_message *);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 static void sip_keepalive_all_peers(void);
 
 /*--- Applications, functions, CLI and manager command helpers */
@@ -16825,7 +16825,7 @@ static void sip_peer_hold(struct sip_pvt *p, int hold)
 }
 
 /*! \brief Receive MWI events that we have subscribed to */
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct sip_peer *peer = userdata;
        if (stasis_subscription_final_message(sub, msg)) {
@@ -16872,7 +16872,7 @@ static int network_change_sched_cb(const void *data)
        return 0;
 }
 
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        /* This callback is only concerned with network change messages from the system topic. */
        if (stasis_message_type(message) != ast_network_change_type()) {
@@ -28940,7 +28940,7 @@ static int restart_monitor(void)
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index b620796..7cf592a 100644 (file)
@@ -1639,7 +1639,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s
 static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
 static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
 static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
 static int skinny_dialer_cb(const void *data);
 static int skinny_reload(void);
 
@@ -2300,7 +2300,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s)
                                set_callforwards(l, NULL, SKINNY_CFWD_ALL|SKINNY_CFWD_BUSY|SKINNY_CFWD_NOANSWER);
                                register_exten(l);
                                /* initialize MWI on line and device */
-                               mwi_event_cb(l, NULL, NULL, NULL);
+                               mwi_event_cb(l, NULL, NULL);
                                AST_LIST_TRAVERSE(&l->sublines, subline, list) {
                                        ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
                                }
@@ -3529,7 +3529,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data
        send_callinfo(sub);
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct skinny_line *l = userdata;
        struct skinny_device *d = l->device;
index a6d134e..9f40077 100644 (file)
@@ -8892,7 +8892,7 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm
  *
  * \return Nothing
  */
-static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct sig_pri_span *pri = userdata;
        const char *mbox_context;
diff --git a/configs/stasis.conf.sample b/configs/stasis.conf.sample
deleted file mode 100644 (file)
index c6466b4..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-[threadpool]
-;initial_size = 0      ; Initial size of the threadpool
-
-;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before dying
-;                      ; 0 means threads never time out
-
-;max_size = 200         ; Maximum number of threads in the threadpool
-;                      ; 0 means no limit to the threads in the threadpool
index 7ead7bc..ff7a01d 100755 (executable)
--- a/configure
+++ b/configure
@@ -1,14 +1,12 @@
 #! /bin/sh
-# From configure.ac Revision: 397990 .
+# From configure.ac Revision: 400078 .
 # Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.68 for asterisk trunk.
+# Generated by GNU Autoconf 2.69 for asterisk trunk.
 #
 # Report bugs to <https://issues.asterisk.org>.
 #
 #
-# Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
-# 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 Free Software
-# Foundation, Inc.
+# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
 #
 #
 # This configure script is free software; the Free Software Foundation
@@ -139,6 +137,31 @@ export LANGUAGE
 # CDPATH.
 (unset CDPATH) >/dev/null 2>&1 && unset CDPATH
 
+# Use a proper internal environment variable to ensure we don't fall
+  # into an infinite loop, continuously re-executing ourselves.
+  if test x"${_as_can_reexec}" != xno && test "x$CONFIG_SHELL" != x; then
+    _as_can_reexec=no; export _as_can_reexec;
+    # We cannot yet assume a decent shell, so we have to provide a
+# neutralization value for shells without unset; and this also
+# works around shells that cannot unset nonexistent variables.
+# Preserve -v and -x to the replacement shell.
+BASH_ENV=/dev/null
+ENV=/dev/null
+(unset BASH_ENV) >/dev/null 2>&1 && unset BASH_ENV ENV
+case $- in # ((((
+  *v*x* | *x*v* ) as_opts=-vx ;;
+  *v* ) as_opts=-v ;;
+  *x* ) as_opts=-x ;;
+  * ) as_opts= ;;
+esac
+exec $CONFIG_SHELL $as_opts "$as_myself" ${1+"$@"}
+# Admittedly, this is quite paranoid, since all the known shells bail
+# out after a failed `exec'.
+$as_echo "$0: could not re-execute with $CONFIG_SHELL" >&2
+as_fn_exit 255
+  fi
+  # We don't want this to propagate to other subprocesses.
+          { _as_can_reexec=; unset _as_can_reexec;}
 if test "x$CONFIG_SHELL" = x; then
   as_bourne_compatible="if test -n \"\${ZSH_VERSION+set}\" && (emulate sh) >/dev/null 2>&1; then :
   emulate sh
@@ -172,7 +195,8 @@ if ( set x; as_fn_ret_success y && test x = \"\$1\" ); then :
 else
   exitcode=1; echo positional parameters were not saved.
 fi
-test x\$exitcode = x0 || exit 1"
+test x\$exitcode = x0 || exit 1
+test -x / || exit 1"
   as_suggested="  as_lineno_1=";as_suggested=$as_suggested$LINENO;as_suggested=$as_suggested" as_lineno_1a=\$LINENO
   as_lineno_2=";as_suggested=$as_suggested$LINENO;as_suggested=$as_suggested" as_lineno_2a=\$LINENO
   eval 'test \"x\$as_lineno_1'\$as_run'\" != \"x\$as_lineno_2'\$as_run'\" &&
@@ -217,21 +241,25 @@ IFS=$as_save_IFS
 
 
       if test "x$CONFIG_SHELL" != x; then :
-  # We cannot yet assume a decent shell, so we have to provide a
-       # neutralization value for shells without unset; and this also
-       # works around shells that cannot unset nonexistent variables.
-       # Preserve -v and -x to the replacement shell.
-       BASH_ENV=/dev/null
-       ENV=/dev/null
-       (unset BASH_ENV) >/dev/null 2>&1 && unset BASH_ENV ENV
-       export CONFIG_SHELL
-       case $- in # ((((
-         *v*x* | *x*v* ) as_opts=-vx ;;
-         *v* ) as_opts=-v ;;
-         *x* ) as_opts=-x ;;
-         * ) as_opts= ;;
-       esac
-       exec "$CONFIG_SHELL" $as_opts "$as_myself" ${1+"$@"}
+  export CONFIG_SHELL
+             # We cannot yet assume a decent shell, so we have to provide a
+# neutralization value for shells without unset; and this also
+# works around shells that cannot unset nonexistent variables.
+# Preserve -v and -x to the replacement shell.
+BASH_ENV=/dev/null
+ENV=/dev/null
+(unset BASH_ENV) >/dev/null 2>&1 && unset BASH_ENV ENV
+case $- in # ((((
+  *v*x* | *x*v* ) as_opts=-vx ;;
+  *v* ) as_opts=-v ;;
+  *x* ) as_opts=-x ;;
+  * ) as_opts= ;;
+esac
+exec $CONFIG_SHELL $as_opts "$as_myself" ${1+"$@"}
+# Admittedly, this is quite paranoid, since all the known shells bail
+# out after a failed `exec'.
+$as_echo "$0: could not re-execute with $CONFIG_SHELL" >&2
+exit 255
 fi
 
     if test x$as_have_required = xno; then :
@@ -334,6 +362,14 @@ $as_echo X"$as_dir" |
 
 
 } # as_fn_mkdir_p
+
+# as_fn_executable_p FILE
+# -----------------------
+# Test if FILE is an executable regular file.
+as_fn_executable_p ()
+{
+  test -f "$1" && test -x "$1"
+} # as_fn_executable_p
 # as_fn_append VAR VALUE
 # ----------------------
 # Append the text in VALUE to the end of the definition contained in VAR. Take
@@ -455,6 +491,10 @@ as_cr_alnum=$as_cr_Letters$as_cr_digits
   chmod +x "$as_me.lineno" ||
     { $as_echo "$as_me: error: cannot create $as_me.lineno; rerun with a POSIX shell" >&2; as_fn_exit 1; }
 
+  # If we had to re-execute with $CONFIG_SHELL, we're ensured to have
+  # already done that, so ensure we don't try to do so again and fall
+  # in an infinite loop.  This has already happened in practice.
+  _as_can_reexec=no; export _as_can_reexec
   # Don't try to exec as it changes $[0], causing all sort of problems
   # (the dirname of $[0] is not the place where we might find the
   # original and so on.  Autoconf is especially sensitive to this).
@@ -489,16 +529,16 @@ if (echo >conf$$.file) 2>/dev/null; then
     # ... but there are two gotchas:
     # 1) On MSYS, both `ln -s file dir' and `ln file dir' fail.
     # 2) DJGPP < 2.04 has no symlinks; `ln -s' creates a wrapper executable.
-    # In both cases, we have to default to `cp -p'.
+    # In both cases, we have to default to `cp -pR'.
     ln -s conf$$.file conf$$.dir 2>/dev/null && test ! -f conf$$.exe ||
-      as_ln_s='cp -p'
+      as_ln_s='cp -pR'
   elif ln conf$$.file conf$$ 2>/dev/null; then
     as_ln_s=ln
   else
-    as_ln_s='cp -p'
+    as_ln_s='cp -pR'
   fi
 else
-  as_ln_s='cp -p'
+  as_ln_s='cp -pR'
 fi
 rm -f conf$$ conf$$.exe conf$$.dir/conf$$.file conf$$.file
 rmdir conf$$.dir 2>/dev/null
@@ -510,28 +550,8 @@ else
   as_mkdir_p=false
 fi
 
-if test -x / >/dev/null 2>&1; then
-  as_test_x='test -x'
-else
-  if ls -dL / >/dev/null 2>&1; then
-    as_ls_L_option=L
-  else
-    as_ls_L_option=
-  fi
-  as_test_x='
-    eval sh -c '\''
-      if test -d "$1"; then
-       test -d "$1/.";
-      else
-       case $1 in #(
-       -*)set "./$1";;
-       esac;
-       case `ls -ld'$as_ls_L_option' "$1" 2>/dev/null` in #((
-       ???[sx]*):;;*)false;;esac;fi
-    '\'' sh
-  '
-fi
-as_executable_p=$as_test_x
+as_test_x='test -x'
+as_executable_p=as_fn_executable_p
 
 # Sed expression to map a string onto a valid CPP name.
 as_tr_cpp="eval sed 'y%*$as_cr_letters%P$as_cr_LETTERS%;s%[^_$as_cr_alnum]%_%g'"
@@ -1794,8 +1814,6 @@ target=$target_alias
 if test "x$host_alias" != x; then
   if test "x$build_alias" = x; then
     cross_compiling=maybe
-    $as_echo "$as_me: WARNING: if you wanted to set the --build type, don't use --host.
-    If a cross compiler is detected then cross compile mode will be used" >&2
   elif test "x$build_alias" != "x$host_alias"; then
     cross_compiling=yes
   fi
@@ -2149,9 +2167,9 @@ test -n "$ac_init_help" && exit $ac_status
 if $ac_init_version; then
   cat <<\_ACEOF
 asterisk configure trunk
-generated by GNU Autoconf 2.68
+generated by GNU Autoconf 2.69
 
-Copyright (C) 2010 Free Software Foundation, Inc.
+Copyright (C) 2012 Free Software Foundation, Inc.
 This configure script is free software; the Free Software Foundation
 gives unlimited permission to copy, distribute and modify it.
 
@@ -2505,7 +2523,7 @@ $as_echo "$ac_try_echo"; } >&5
         test ! -s conftest.err
        } && test -s conftest$ac_exeext && {
         test "$cross_compiling" = yes ||
-        $as_test_x conftest$ac_exeext
+        test -x conftest$ac_exeext
        }; then :
   ac_retval=0
 else
@@ -2719,7 +2737,8 @@ int
 main ()
 {
 static int test_array [1 - 2 * !(($2) >= 0)];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
@@ -2735,7 +2754,8 @@ int
 main ()
 {
 static int test_array [1 - 2 * !(($2) <= $ac_mid)];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
@@ -2761,7 +2781,8 @@ int
 main ()
 {
 static int test_array [1 - 2 * !(($2) < 0)];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
@@ -2777,7 +2798,8 @@ int
 main ()
 {
 static int test_array [1 - 2 * !(($2) >= $ac_mid)];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
@@ -2811,7 +2833,8 @@ int
 main ()
 {
 static int test_array [1 - 2 * !(($2) <= $ac_mid)];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
@@ -2998,7 +3021,7 @@ $as_echo "$ac_try_echo"; } >&5
         test ! -s conftest.err
        } && test -s conftest$ac_exeext && {
         test "$cross_compiling" = yes ||
-        $as_test_x conftest$ac_exeext
+        test -x conftest$ac_exeext
        }; then :
   ac_retval=0
 else
@@ -3052,7 +3075,7 @@ This file contains any messages produced by compilers while
 running configure, to aid debugging if configure makes a mistake.
 
 It was created by asterisk $as_me trunk, which was
-generated by GNU Autoconf 2.68.  Invocation command line was
+generated by GNU Autoconf 2.69.  Invocation command line was
 
   $ $0 $@
 
@@ -3584,7 +3607,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_CC="$ac_tool_prefix$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -3628,7 +3651,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_CC="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -4072,8 +4095,7 @@ cat confdefs.h - <<_ACEOF >conftest.$ac_ext
 /* end confdefs.h.  */
 #include <stdarg.h>
 #include <stdio.h>
-#include <sys/types.h>
-#include <sys/stat.h>
+struct stat;
 /* Most of the following tests are stolen from RCS 5.7's src/conf.sh.  */
 struct buf { int x; };
 FILE * (*rcsopen) (struct buf *, struct stat *, int);
@@ -4314,7 +4336,7 @@ do
     for ac_prog in grep ggrep; do
     for ac_exec_ext in '' $ac_executable_extensions; do
       ac_path_GREP="$as_dir/$ac_prog$ac_exec_ext"
-      { test -f "$ac_path_GREP" && $as_test_x "$ac_path_GREP"; } || continue
+      as_fn_executable_p "$ac_path_GREP" || continue
 # Check for GNU ac_path_GREP and select it if it is found.
   # Check for GNU $ac_path_GREP
 case `"$ac_path_GREP" --version 2>&1` in
@@ -4380,7 +4402,7 @@ do
     for ac_prog in egrep; do
     for ac_exec_ext in '' $ac_executable_extensions; do
       ac_path_EGREP="$as_dir/$ac_prog$ac_exec_ext"
-      { test -f "$ac_path_EGREP" && $as_test_x "$ac_path_EGREP"; } || continue
+      as_fn_executable_p "$ac_path_EGREP" || continue
 # Check for GNU ac_path_EGREP and select it if it is found.
   # Check for GNU $ac_path_EGREP
 case `"$ac_path_EGREP" --version 2>&1` in
@@ -4587,8 +4609,8 @@ else
   cat confdefs.h - <<_ACEOF >conftest.$ac_ext
 /* end confdefs.h.  */
 
-#        define __EXTENSIONS__ 1
-         $ac_includes_default
+#         define __EXTENSIONS__ 1
+          $ac_includes_default
 int
 main ()
 {
@@ -4791,7 +4813,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_UNAME="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -4834,7 +4856,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_UNAME="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -4899,7 +4921,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_CC="${ac_tool_prefix}gcc"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -4939,7 +4961,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_CC="gcc"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -4991,7 +5013,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_CXX="${ac_tool_prefix}g++"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5031,7 +5053,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_CXX="g++"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5083,7 +5105,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_LD="${ac_tool_prefix}ld"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5123,7 +5145,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_LD="ld"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5175,7 +5197,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_RANLIB="${ac_tool_prefix}ranlib"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5215,7 +5237,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_RANLIB="ranlib"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5281,7 +5303,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_CXX="$ac_tool_prefix$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5325,7 +5347,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_CXX="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -5804,7 +5826,7 @@ do
     for ac_prog in sed gsed; do
     for ac_exec_ext in '' $ac_executable_extensions; do
       ac_path_SED="$as_dir/$ac_prog$ac_exec_ext"
-      { test -f "$ac_path_SED" && $as_test_x "$ac_path_SED"; } || continue
+      as_fn_executable_p "$ac_path_SED" || continue
 # Check for GNU ac_path_SED and select it if it is found.
   # Check for GNU $ac_path_SED
 case `"$ac_path_SED" --version 2>&1` in
@@ -5989,7 +6011,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_AWK="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6050,7 +6072,7 @@ case $as_dir/ in #((
     # by default.
     for ac_prog in ginstall scoinst install; do
       for ac_exec_ext in '' $ac_executable_extensions; do
-       if { test -f "$as_dir/$ac_prog$ac_exec_ext" && $as_test_x "$as_dir/$ac_prog$ac_exec_ext"; }; then
+       if as_fn_executable_p "$as_dir/$ac_prog$ac_exec_ext"; then
          if test $ac_prog = install &&
            grep dspmsg "$as_dir/$ac_prog$ac_exec_ext" >/dev/null 2>&1; then
            # AIX install.  It has an incompatible calling convention.
@@ -6134,7 +6156,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_RANLIB="${ac_tool_prefix}ranlib"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6174,7 +6196,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_RANLIB="ranlib"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6256,7 +6278,7 @@ do
     for ac_prog in egrep; do
     for ac_exec_ext in '' $ac_executable_extensions; do
       ac_path_EGREP="$as_dir/$ac_prog$ac_exec_ext"
-      { test -f "$ac_path_EGREP" && $as_test_x "$ac_path_EGREP"; } || continue
+      as_fn_executable_p "$ac_path_EGREP" || continue
 # Check for GNU ac_path_EGREP and select it if it is found.
   # Check for GNU $ac_path_EGREP
 case `"$ac_path_EGREP" --version 2>&1` in
@@ -6324,7 +6346,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_STRIP="$ac_tool_prefix$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6368,7 +6390,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_STRIP="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6424,7 +6446,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_AR="$ac_tool_prefix$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6468,7 +6490,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_AR="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6530,7 +6552,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_BISON="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6571,7 +6593,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CMP="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6612,7 +6634,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_FLEX="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6653,7 +6675,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_GREP="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6694,7 +6716,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PYTHON="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6735,7 +6757,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_FIND="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6776,7 +6798,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_COMPRESS="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6817,7 +6839,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_BASENAME="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6858,7 +6880,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_DIRNAME="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6899,7 +6921,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_SHELL="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6940,7 +6962,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_LN="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -6981,7 +7003,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_DOXYGEN="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7022,7 +7044,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_DOT="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7063,7 +7085,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_WGET="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7104,7 +7126,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CURL="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7145,7 +7167,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_RUBBER="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7186,7 +7208,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CATDVI="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7227,7 +7249,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_KPATHSEA="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7268,7 +7290,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_XMLLINT="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7309,7 +7331,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_XMLSTARLET="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7350,7 +7372,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_GIT="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7396,7 +7418,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_FETCH="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7441,7 +7463,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_LDCONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7482,7 +7504,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_SHA1SUM="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7523,7 +7545,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_OPENSSL="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7615,7 +7637,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_SOXMIX="${ac_tool_prefix}soxmix"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7655,7 +7677,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_ac_ct_SOXMIX="soxmix"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7714,7 +7736,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_MD5="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -7880,7 +7902,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_acx_pthread_config="yes"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -8036,7 +8058,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_PTHREAD_CC="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -9145,7 +9167,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_prog_AWK="$ac_prog"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -9195,7 +9217,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path__libcurl_config="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -9237,7 +9259,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path__libcurl_config="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -11421,23 +11443,20 @@ else
 /* end confdefs.h.  */
 $ac_includes_default
 int
-find_stack_direction ()
+find_stack_direction (int *addr, int depth)
 {
-  static char *addr = 0;
-  auto char dummy;
-  if (addr == 0)
-    {
-      addr = &dummy;
-      return find_stack_direction ();
-    }
-  else
-    return (&dummy > addr) ? 1 : -1;
+  int dir, dummy = 0;
+  if (! addr)
+    addr = &dummy;
+  *addr = addr < &dummy ? 1 : addr == &dummy ? 0 : -1;
+  dir = depth ? find_stack_direction (addr, depth - 1) : 0;
+  return dir + dummy;
 }
 
 int
-main ()
+main (int argc, char **argv)
 {
-  return find_stack_direction () < 0;
+  return find_stack_direction (0, argc + !argv + 20) < 0;
 }
 _ACEOF
 if ac_fn_c_try_run "$LINENO"; then :
@@ -12709,7 +12728,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_LIBXML2="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -12753,7 +12772,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_LIBXML2="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -13190,6 +13209,8 @@ _ACEOF
 esac
 rm -rf conftest*
   fi
+
+
 fi
 
 
@@ -13202,60 +13223,60 @@ else
   cat confdefs.h - <<_ACEOF >conftest.$ac_ext
 /* end confdefs.h.  */
 
-#include <stdbool.h>
-#ifndef bool
- "error: bool is not defined"
-#endif
-#ifndef false
- "error: false is not defined"
-#endif
-#if false
- "error: false is not 0"
-#endif
-#ifndef true
- "error: true is not defined"
-#endif
-#if true != 1
- "error: true is not 1"
-#endif
-#ifndef __bool_true_false_are_defined
- "error: __bool_true_false_are_defined is not defined"
-#endif
-
-       struct s { _Bool s: 1; _Bool t; } s;
-
-       char a[true == 1 ? 1 : -1];
-       char b[false == 0 ? 1 : -1];
-       char c[__bool_true_false_are_defined == 1 ? 1 : -1];
-       char d[(bool) 0.5 == true ? 1 : -1];
-       /* See body of main program for 'e'.  */
-       char f[(_Bool) 0.0 == false ? 1 : -1];
-       char g[true];
-       char h[sizeof (_Bool)];
-       char i[sizeof s.t];
-       enum { j = false, k = true, l = false * true, m = true * 256 };
-       /* The following fails for
-          HP aC++/ANSI C B3910B A.05.55 [Dec 04 2003]. */
-       _Bool n[m];
-       char o[sizeof n == m * sizeof n[0] ? 1 : -1];
-       char p[-1 - (_Bool) 0 < 0 && -1 - (bool) 0 < 0 ? 1 : -1];
-       /* Catch a bug in an HP-UX C compiler.  See
-          http://gcc.gnu.org/ml/gcc-patches/2003-12/msg02303.html
-          http://lists.gnu.org/archive/html/bug-coreutils/2005-11/msg00161.html
-        */
-       _Bool q = true;
-       _Bool *pq = &q;
-
-int
-main ()
-{
-
-       bool e = &s;
-       *pq |= q;
-       *pq |= ! q;
-       /* Refer to every declared value, to avoid compiler optimizations.  */
-       return (!a + !b + !c + !d + !e + !f + !g + !h + !i + !!j + !k + !!l
-               + !m + !n + !o + !p + !q + !pq);
+             #include <stdbool.h>
+             #ifndef bool
+              "error: bool is not defined"
+             #endif
+             #ifndef false
+              "error: false is not defined"
+             #endif
+             #if false
+              "error: false is not 0"
+             #endif
+             #ifndef true
+              "error: true is not defined"
+             #endif
+             #if true != 1
+              "error: true is not 1"
+             #endif
+             #ifndef __bool_true_false_are_defined
+              "error: __bool_true_false_are_defined is not defined"
+             #endif
+
+             struct s { _Bool s: 1; _Bool t; } s;
+
+             char a[true == 1 ? 1 : -1];
+             char b[false == 0 ? 1 : -1];
+             char c[__bool_true_false_are_defined == 1 ? 1 : -1];
+             char d[(bool) 0.5 == true ? 1 : -1];
+             /* See body of main program for 'e'.  */
+             char f[(_Bool) 0.0 == false ? 1 : -1];
+             char g[true];
+             char h[sizeof (_Bool)];
+             char i[sizeof s.t];
+             enum { j = false, k = true, l = false * true, m = true * 256 };
+             /* The following fails for
+                HP aC++/ANSI C B3910B A.05.55 [Dec 04 2003]. */
+             _Bool n[m];
+             char o[sizeof n == m * sizeof n[0] ? 1 : -1];
+             char p[-1 - (_Bool) 0 < 0 && -1 - (bool) 0 < 0 ? 1 : -1];
+             /* Catch a bug in an HP-UX C compiler.  See
+                http://gcc.gnu.org/ml/gcc-patches/2003-12/msg02303.html
+                http://lists.gnu.org/archive/html/bug-coreutils/2005-11/msg00161.html
+              */
+             _Bool q = true;
+             _Bool *pq = &q;
+
+int
+main ()
+{
+
+             bool e = &s;
+             *pq |= q;
+             *pq |= ! q;
+             /* Refer to every declared value, to avoid compiler optimizations.  */
+             return (!a + !b + !c + !d + !e + !f + !g + !h + !i + !!j + !k + !!l
+                     + !m + !n + !o + !p + !q + !pq);
 
   ;
   return 0;
@@ -13270,7 +13291,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
 fi
 { $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_header_stdbool_h" >&5
 $as_echo "$ac_cv_header_stdbool_h" >&6; }
-ac_fn_c_check_type "$LINENO" "_Bool" "ac_cv_type__Bool" "$ac_includes_default"
+   ac_fn_c_check_type "$LINENO" "_Bool" "ac_cv_type__Bool" "$ac_includes_default"
 if test "x$ac_cv_type__Bool" = xyes; then :
 
 cat >>confdefs.h <<_ACEOF
@@ -13280,6 +13301,7 @@ _ACEOF
 
 fi
 
+
 if test $ac_cv_header_stdbool_h = yes; then
 
 $as_echo "#define HAVE_STDBOOL_H 1" >>confdefs.h
@@ -13297,11 +13319,11 @@ else
 int
 main ()
 {
-/* FIXME: Include the comments suggested by Paul. */
+
 #ifndef __cplusplus
-  /* Ultrix mips cc rejects this.  */
+  /* Ultrix mips cc rejects this sort of thing.  */
   typedef int charset[2];
-  const charset cs;
+  const charset cs = { 0, 0 };
   /* SunOS 4.1.1 cc rejects this.  */
   char const *const *pcpcc;
   char **ppc;
@@ -13318,8 +13340,9 @@ main ()
   ++pcpcc;
   ppc = (char**) pcpcc;
   pcpcc = (char const *const *) ppc;
-  { /* SCO 3.2v4 cc rejects this.  */
-    char *t;
+  { /* SCO 3.2v4 cc rejects this sort of thing.  */
+    char tx;
+    char *t = &tx;
     char const *s = 0 ? (char *) 0 : (char const *) 0;
 
     *t++ = 0;
@@ -13335,10 +13358,10 @@ main ()
     iptr p = 0;
     ++p;
   }
-  { /* AIX XL C 1.02.0.0 rejects this saying
+  { /* AIX XL C 1.02.0.0 rejects this sort of thing, saying
        "k.c", line 2.27: 1506-025 (S) Operand must be a modifiable lvalue. */
-    struct s { int j; const int *ap[3]; };
-    struct s *b; b->j = 5;
+    struct s { int j; const int *ap[3]; } bx;
+    struct s *b = &bx; b->j = 5;
   }
   { /* ULTRIX-32 V3.1 (Rev 9) vcc rejects this */
     const int foo = 10;
@@ -13468,7 +13491,8 @@ static int test_array [1 - 2 * !((0 < ((DBL_MAX_EXP < LDBL_MAX_EXP)
                   - (LDBL_MANT_DIG < DBL_MANT_DIG)))
            && (int) LDBL_EPSILON == 0
          )];
-test_array [0] = 0
+test_array [0] = 0;
+return test_array [0];
 
   ;
   return 0;
 rm -f core conftest.err conftest.$ac_objext \
     conftest$ac_exeext conftest.$ac_ext
 
-{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for pthread_rwlock_timedwrlock() in pthread.h" >&5
-$as_echo_n "checking for pthread_rwlock_timedwrlock() in pthread.h... " >&6; }
 save_LIBS="$LIBS"
 save_CFLAGS="$CFLAGS"
 LIBS="$PTHREAD_LIBS $LIBS"
 CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for pthread_rwlock_timedwrlock() in pthread.h" >&5
+$as_echo_n "checking for pthread_rwlock_timedwrlock() in pthread.h... " >&6; }
 cat confdefs.h - <<_ACEOF >conftest.$ac_ext
 /* end confdefs.h.  */
 #include <pthread.h>
@@ -15688,6 +15713,43 @@ $as_echo "no" >&6; }
 fi
 rm -f core conftest.err conftest.$ac_objext \
     conftest$ac_exeext conftest.$ac_ext
+
+# Some platforms define sem_init(), but only support sem_open(). joyous.
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for working unnamed semaphores" >&5
+$as_echo_n "checking for working unnamed semaphores... " >&6; }
+if test "$cross_compiling" = yes; then :
+  { { $as_echo "$as_me:${as_lineno-$LINENO}: error: in \`$ac_pwd':" >&5
+$as_echo "$as_me: error: in \`$ac_pwd':" >&2;}
+as_fn_error $? "cannot run test program while cross compiling
+See \`config.log' for more details" "$LINENO" 5; }
+else
+  cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+#include <semaphore.h>
+int
+main ()
+{
+sem_t sem; return sem_init(&sem, 0, 0);
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_run "$LINENO"; then :
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5
+$as_echo "yes" >&6; }
+
+$as_echo "#define HAS_WORKING_SEMAPHORE 1" >>confdefs.h
+
+else
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5
+$as_echo "no" >&6; }
+
+fi
+rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext \
+  conftest.$ac_objext conftest.beam conftest.$ac_ext
+fi
+
+
 LIBS="$save_LIBS"
 CFLAGS="$save_CFLAGS"
 if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then
@@ -17737,6 +17799,7 @@ LIBS=${old_LIBS}
 
 
 
+
 if test "x$ac_cv_env_PKG_CONFIG_set" != "xset"; then
        if test -n "$ac_tool_prefix"; then
   # Extract the first word of "${ac_tool_prefix}pkg-config", so it can be a program name with args.
@@ -17757,7 +17820,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PKG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -17800,7 +17863,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_PKG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -18943,6 +19006,7 @@ if test -n "$ILBC_CFLAGS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_ILBC_CFLAGS=`$PKG_CONFIG --cflags "libilbc" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -18959,6 +19023,7 @@ if test -n "$ILBC_LIBS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_ILBC_LIBS=`$PKG_CONFIG --libs "libilbc" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -18978,9 +19043,9 @@ else
         _pkg_short_errors_supported=no
 fi
         if test $_pkg_short_errors_supported = yes; then
-               ILBC_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors "libilbc" 2>&1`
+               ILBC_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libilbc" 2>&1`
         else
-               ILBC_PKG_ERRORS=`$PKG_CONFIG --print-errors "libilbc" 2>&1`
+               ILBC_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libilbc" 2>&1`
         fi
        # Put the nasty error message in config.log where it belongs
        echo "$ILBC_PKG_ERRORS" >&5
@@ -19048,6 +19113,7 @@ if test -n "$LIBEDIT_CFLAGS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_LIBEDIT_CFLAGS=`$PKG_CONFIG --cflags "libedit" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -19064,6 +19130,7 @@ if test -n "$LIBEDIT_LIBS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_LIBEDIT_LIBS=`$PKG_CONFIG --libs "libedit" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -19083,9 +19150,9 @@ else
         _pkg_short_errors_supported=no
 fi
         if test $_pkg_short_errors_supported = yes; then
-               LIBEDIT_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors "libedit" 2>&1`
+               LIBEDIT_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libedit" 2>&1`
         else
-               LIBEDIT_PKG_ERRORS=`$PKG_CONFIG --print-errors "libedit" 2>&1`
+               LIBEDIT_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libedit" 2>&1`
         fi
        # Put the nasty error message in config.log where it belongs
        echo "$LIBEDIT_PKG_ERRORS" >&5
@@ -21374,7 +21441,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_MYSQLCLIENT="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21418,7 +21485,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_MYSQLCLIENT="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21630,7 +21697,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_NEON="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21674,7 +21741,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_NEON="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21781,7 +21848,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_NEON29="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21825,7 +21892,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_NEON29="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21934,7 +22001,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_NETSNMP="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -21978,7 +22045,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_NETSNMP="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -23033,7 +23100,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -23076,7 +23143,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_PG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -23145,7 +23212,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -23188,7 +23255,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_PG_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -23357,6 +23424,7 @@ if test -n "$PJPROJECT_CFLAGS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_PJPROJECT_CFLAGS=`$PKG_CONFIG --cflags "libpjproject" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -23373,6 +23441,7 @@ if test -n "$PJPROJECT_LIBS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_PJPROJECT_LIBS=`$PKG_CONFIG --libs "libpjproject" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -23392,9 +23461,9 @@ else
         _pkg_short_errors_supported=no
 fi
         if test $_pkg_short_errors_supported = yes; then
-               PJPROJECT_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors "libpjproject" 2>&1`
+               PJPROJECT_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libpjproject" 2>&1`
         else
-               PJPROJECT_PKG_ERRORS=`$PKG_CONFIG --print-errors "libpjproject" 2>&1`
+               PJPROJECT_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libpjproject" 2>&1`
         fi
        # Put the nasty error message in config.log where it belongs
        echo "$PJPROJECT_PKG_ERRORS" >&5
@@ -26388,7 +26457,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PTLIB_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -26450,7 +26519,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_PTLIB_CONFIG="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -29044,6 +29113,7 @@ if test -n "$GMIME_CFLAGS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_GMIME_CFLAGS=`$PKG_CONFIG --cflags "gmime-$ver" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -29060,6 +29130,7 @@ if test -n "$GMIME_LIBS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_GMIME_LIBS=`$PKG_CONFIG --libs "gmime-$ver" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -29079,9 +29150,9 @@ else
         _pkg_short_errors_supported=no
 fi
         if test $_pkg_short_errors_supported = yes; then
-               GMIME_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors "gmime-$ver" 2>&1`
+               GMIME_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "gmime-$ver" 2>&1`
         else
-               GMIME_PKG_ERRORS=`$PKG_CONFIG --print-errors "gmime-$ver" 2>&1`
+               GMIME_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "gmime-$ver" 2>&1`
         fi
        # Put the nasty error message in config.log where it belongs
        echo "$GMIME_PKG_ERRORS" >&5
@@ -30098,7 +30169,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_CONFIG_SDL="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -30142,7 +30213,7 @@ do
   IFS=$as_save_IFS
   test -z "$as_dir" && as_dir=.
     for ac_exec_ext in '' $ac_executable_extensions; do
-  if { test -f "$as_dir/$ac_word$ac_exec_ext" && $as_test_x "$as_dir/$ac_word$ac_exec_ext"; }; then
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
     ac_cv_path_ac_pt_CONFIG_SDL="$as_dir/$ac_word$ac_exec_ext"
     $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
     break 2
@@ -30711,6 +30782,7 @@ if test -n "$GTK2_CFLAGS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_GTK2_CFLAGS=`$PKG_CONFIG --cflags "gtk+-2.0" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -30727,6 +30799,7 @@ if test -n "$GTK2_LIBS"; then
   $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
   test $ac_status = 0; }; then
   pkg_cv_GTK2_LIBS=`$PKG_CONFIG --libs "gtk+-2.0" 2>/dev/null`
+                     test "x$?" != "x0" && pkg_failed=yes
 else
   pkg_failed=yes
 fi
@@ -30746,9 +30819,9 @@ else
         _pkg_short_errors_supported=no
 fi
         if test $_pkg_short_errors_supported = yes; then
-               GTK2_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors "gtk+-2.0" 2>&1`
+               GTK2_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "gtk+-2.0" 2>&1`
         else
-               GTK2_PKG_ERRORS=`$PKG_CONFIG --print-errors "gtk+-2.0" 2>&1`
+               GTK2_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "gtk+-2.0" 2>&1`
         fi
        # Put the nasty error message in config.log where it belongs
        echo "$GTK2_PKG_ERRORS" >&5
@@ -31767,16 +31840,16 @@ if (echo >conf$$.file) 2>/dev/null; then
     # ... but there are two gotchas:
     # 1) On MSYS, both `ln -s file dir' and `ln file dir' fail.
     # 2) DJGPP < 2.04 has no symlinks; `ln -s' creates a wrapper executable.
-    # In both cases, we have to default to `cp -p'.
+    # In both cases, we have to default to `cp -pR'.
     ln -s conf$$.file conf$$.dir 2>/dev/null && test ! -f conf$$.exe ||
-      as_ln_s='cp -p'
+      as_ln_s='cp -pR'
   elif ln conf$$.file conf$$ 2>/dev/null; then
     as_ln_s=ln
   else
-    as_ln_s='cp -p'
+    as_ln_s='cp -pR'
   fi
 else
-  as_ln_s='cp -p'
+  as_ln_s='cp -pR'
 fi
 rm -f conf$$ conf$$.exe conf$$.dir/conf$$.file conf$$.file
 rmdir conf$$.dir 2>/dev/null
@@ -31836,28 +31909,16 @@ else
   as_mkdir_p=false
 fi
 
-if test -x / >/dev/null 2>&1; then
-  as_test_x='test -x'
-else
-  if ls -dL / >/dev/null 2>&1; then
-    as_ls_L_option=L
-  else
-    as_ls_L_option=
-  fi
-  as_test_x='
-    eval sh -c '\''
-      if test -d "$1"; then
-       test -d "$1/.";
-      else
-       case $1 in #(
-       -*)set "./$1";;
-       esac;
-       case `ls -ld'$as_ls_L_option' "$1" 2>/dev/null` in #((
-       ???[sx]*):;;*)false;;esac;fi
-    '\'' sh
-  '
-fi
-as_executable_p=$as_test_x
+
+# as_fn_executable_p FILE
+# -----------------------
+# Test if FILE is an executable regular file.
+as_fn_executable_p ()
+{
+  test -f "$1" && test -x "$1"
+} # as_fn_executable_p
+as_test_x='test -x'
+as_executable_p=as_fn_executable_p
 
 # Sed expression to map a string onto a valid CPP name.
 as_tr_cpp="eval sed 'y%*$as_cr_letters%P$as_cr_LETTERS%;s%[^_$as_cr_alnum]%_%g'"
@@ -31879,7 +31940,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
 # values after options handling.
 ac_log="
 This file was extended by asterisk $as_me trunk, which was
-generated by GNU Autoconf 2.68.  Invocation command line was
+generated by GNU Autoconf 2.69.  Invocation command line was
 
   CONFIG_FILES    = $CONFIG_FILES
   CONFIG_HEADERS  = $CONFIG_HEADERS
@@ -31941,10 +32002,10 @@ cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
 ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
 ac_cs_version="\\
 asterisk config.status trunk
-configured by $0, generated by GNU Autoconf 2.68,
+configured by $0, generated by GNU Autoconf 2.69,
   with options \\"\$ac_cs_config\\"
 
-Copyright (C) 2010 Free Software Foundation, Inc.
+Copyright (C) 2012 Free Software Foundation, Inc.
 This config.status script is free software; the Free Software Foundation
 gives unlimited permission to copy, distribute and modify it."
 
@@ -32034,7 +32095,7 @@ fi
 _ACEOF
 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
 if \$ac_cs_recheck; then
-  set X '$SHELL' '$0' $ac_configure_args \$ac_configure_extra_args --no-create --no-recursion
+  set X $SHELL '$0' $ac_configure_args \$ac_configure_extra_args --no-create --no-recursion
   shift
   \$as_echo "running CONFIG_SHELL=$SHELL \$*" >&6
   CONFIG_SHELL='$SHELL'
index ba8581b..765f080 100644 (file)
@@ -808,11 +808,12 @@ AC_DEFINE([HAVE_PTHREAD_MUTEX_RECURSIVE_NP], 1, [Define to 1 if your system defi
 AC_MSG_RESULT(no)
 )
 
-AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 save_LIBS="$LIBS"
 save_CFLAGS="$CFLAGS"
 LIBS="$PTHREAD_LIBS $LIBS"
 CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 AC_LINK_IFELSE(
   [AC_LANG_PROGRAM(
     [#include <pthread.h>
@@ -826,6 +827,17 @@ AC_LINK_IFELSE(
     ac_cv_pthread_rwlock_timedwrlock="no"
   ]
 )
+
+# Some platforms define sem_init(), but only support sem_open(). joyous.
+AC_MSG_CHECKING(for working unnamed semaphores)
+AC_RUN_IFELSE(
+       [AC_LANG_PROGRAM([#include <semaphore.h>],
+               [sem_t sem; return sem_init(&sem, 0, 0);])],
+       AC_MSG_RESULT(yes)
+       AC_DEFINE([HAS_WORKING_SEMAPHORE], 1, [Define to 1 if anonymous semaphores work.]),
+       AC_MSG_RESULT(no)
+)
+
 LIBS="$save_LIBS"
 CFLAGS="$save_CFLAGS"
 if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then
index 75cef8a..49f8e78 100644 (file)
@@ -649,7 +649,7 @@ struct test_cb_data {
        sem_t sem;
 };
 
-static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct test_cb_data *cb_data = userdata;
        if (stasis_message_type(msg) != ast_presence_state_message_type()) {
index 87a769e..559b69a 100644 (file)
@@ -29,6 +29,9 @@
 /* Define to 1 if using `alloca.c'. */
 #undef C_ALLOCA
 
+/* Define to 1 if anonymous semaphores work. */
+#undef HAS_WORKING_SEMAPHORE
+
 /* Define to 1 if you have the `acos' function. */
 #undef HAVE_ACOS
 
diff --git a/include/asterisk/sem.h b/include/asterisk/sem.h
new file mode 100644 (file)
index 0000000..8f6356c
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef ASTERISK_SEMAPHORE_H
+#define ASTERISK_SEMAPHORE_H
+
+/*!
+ * \file Asterisk semaphore API
+ *
+ * This API is a thin wrapper around the POSIX semaphore API (when available),
+ * so see the POSIX documentation for further details.
+ */
+
+#ifdef HAS_WORKING_SEMAPHORE
+/* Working semaphore implementation detected */
+
+#include <semaphore.h>
+
+struct ast_sem {
+       sem_t real_sem;
+};
+
+#define AST_SEM_VALUE_MAX SEM_VALUE_MAX
+
+/* These are thin wrappers; might as well inline them */
+
+static force_inline int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+       return sem_init(&sem->real_sem, pshared, value);
+}
+
+static force_inline int ast_sem_destroy(struct ast_sem *sem)
+{
+       return sem_destroy(&sem->real_sem);
+}
+
+static force_inline int ast_sem_post(struct ast_sem *sem)
+{
+       return sem_post(&sem->real_sem);
+}
+
+static force_inline int ast_sem_wait(struct ast_sem *sem)
+{
+       return sem_wait(&sem->real_sem);
+}
+
+static force_inline int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+       return sem_getvalue(&sem->real_sem, sval);
+}
+
+#else
+/* Unnamed semaphores don't work. Rolling our own, I guess... */
+
+#include "asterisk/lock.h"
+
+#include <limits.h>
+
+struct ast_sem {
+       /*! Current count of this semaphore */
+       int count;
+       /*! Number of threads currently waiting for this semaphore */
+       int waiters;
+       /*! Mutual exclusion */
+       ast_mutex_t mutex;
+       /*! Condition for singalling waiters */
+       ast_cond_t cond;
+};
+
+#define AST_SEM_VALUE_MAX INT_MAX
+
+/*!
+ * \brief Initialize a semaphore.
+ *
+ * \param sem Semaphore to initialize.
+ * \param pshared Pass true (nonzero) to share this thread between processes.
+ *                Not be supported on all platforms, so be wary!
+ *                But leave the parameter, to be compatible with the POSIX ABI
+ *                in case we need to add support in the future.
+ * \param value Initial value of the semaphore.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value);
+
+/*!
+ * \brief Destroy a semaphore.
+ *
+ * Destroying a semaphore that other threads are currently blocked on produces
+ * undefined behavior.
+ *
+ * \param sem Semaphore to destroy.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_destroy(struct ast_sem *sem);
+
+/*!
+ * \brief Increments the semaphore, unblocking a waiter if necessary.
+ *
+ * \param sem Semaphore to increment.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_post(struct ast_sem *sem);
+
+/*!
+ * \brief Decrements the semaphore.
+ *
+ * If the semaphore's current value is zero, this function blocks until another
+ * thread posts (ast_sem_post()) to the semaphore (or is interrupted by a signal
+ * handler, which sets errno to EINTR).
+ *
+ * \param sem Semaphore to decrement.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_wait(struct ast_sem *sem);
+
+/*!
+ * \brief Gets the current value of the semaphore.
+ *
+ * If threads are blocked on this semaphore, POSIX allows the return value to be
+ * either 0 or a negative number whose absolute value is the number of threads
+ * blocked. Don't assume that it will give you one or the other; Asterisk has
+ * been ported to just about everything.
+ *
+ * \param sem Semaphore to query.
+ * \param[out] sval Output value.
+ *
+ * \return 0 on success.
+ * \return -1 on error, errno set to indicate error.
+ */
+int ast_sem_getvalue(struct ast_sem *sem, int *sval);
+
+#endif
+
+#endif /* ASTERISK_SEMAPHORE_H */
index 1a3dae0..529aa12 100644 (file)
@@ -348,18 +348,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
 
 /*!
- * \brief Publish a message from a specified topic to all the subscribers of a
- * possibly different topic.
- * \param topic Topic to publish message to.
- * \param topic Original topic message was from.
- * \param message Message
- * \since 12
- */
-void stasis_forward_message(struct stasis_topic *topic,
-                           struct stasis_topic *publisher_topic,
-                           struct stasis_message *message);
-
-/*!
  * \brief Wait for all pending messages on a given topic to be processed.
  * \param topic Topic to await pending messages on.
  * \return 0 on success.
@@ -381,11 +369,10 @@ struct stasis_subscription;
 /*!
  * \brief Callback function type for Stasis subscriptions.
  * \param data Data field provided with subscription.
- * \param topic Topic to which the message was published.
  * \param message Published message.
  * \since 12
  */
-typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 /*!
  * \brief Create a subscription.
@@ -464,6 +451,8 @@ int stasis_subscription_is_done(struct stasis_subscription *subscription);
 struct stasis_subscription *stasis_unsubscribe_and_join(
        struct stasis_subscription *subscription);
 
+struct stasis_forward;
+
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
  * another.
@@ -477,9 +466,11 @@ struct stasis_subscription *stasis_unsubscribe_and_join(
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
        struct stasis_topic *to_topic);
 
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
+
 /*!
  * \brief Get the unique ID for the subscription.
  *
@@ -579,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void);
  * \since 12
  */
 struct stasis_cache_update {
-       /*! \brief Topic that published \c new_snapshot */
-       struct stasis_topic *topic;
        /*! \brief Convenience reference to snapshot type */
        struct stasis_message_type *type;
        /*! \brief Old value from the cache */
@@ -884,16 +873,6 @@ int stasis_config_init(void);
  */
 int stasis_wait_init(void);
 
-struct ast_threadpool_options;
-
-/*!
- * \internal
- * \brief Retrieves the Stasis threadpool configuration.
- * \param[out] threadpool_options Filled with Stasis threadpool options.
- */
-void stasis_config_get_threadpool_options(
-       struct ast_threadpool_options *threadpool_options);
-
 /*! @} */
 
 /*!
index 67ab88f..01e5812 100644 (file)
@@ -62,7 +62,7 @@ struct stasis_message;
  */
 struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
-       void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message),
+       void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message),
        void *data,
        int needs_mailbox);
 
index b14868b..3209adb 100644 (file)
@@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
  * updates for types not handled by routes added with
  * stasis_message_router_add_cache_update().
  *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
  * \param router Router to add the route to.
  * \param message_type Type of message to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router,
  * These are distinct from regular routes, so one could have both a regular
  * route and a cache route for the same \a message_type.
  *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
  * \param router Router to add the route to.
  * \param message_type Subtype of cache update to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
 /*!
  * \brief Remove a route from a message router.
  *
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
  * \param router Router to remove the route from.
  * \param message_type Type of message to route.
  *
@@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router,
 /*!
  * \brief Remove a cache route from a message router.
  *
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
  * \param router Router to remove the route from.
  * \param message_type Type of message to route.
  *
index 2191663..ab52329 100644 (file)
@@ -109,6 +109,7 @@ struct ast_taskprocessor_listener_callbacks {
         * \param listener The listener
         */
        void (*shutdown)(struct ast_taskprocessor_listener *listener);
+       void (*dtor)(struct ast_taskprocessor_listener *listener);
 };
 
 /*!
@@ -175,6 +176,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
 
 /*!
+ * \brief Sets the local data associated with a taskprocessor.
+ *
+ * \since 12.0.0
+ *
+ * See ast_taskprocessor_push_local().
+ *
+ * \param tps Task processor.
+ * \param local_data Local data to associate with \a tps.
+ */
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data);
+
+/*!
  * \brief Unreference the specified taskprocessor and its reference count will decrement.
  *
  * Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy
@@ -196,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
  */
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
+/*! \brief Local data parameter */
+struct ast_taskprocessor_local {
+       /*! Local data, associated with the taskprocessor. */
+       void *local_data;
+       /*! Data pointer passed with this task. */
+       void *data;
+};
+
+/*!
+ * \brief Push a task into the specified taskprocessor queue and signal the
+ * taskprocessor thread.
+ *
+ * The callback receives a \ref ast_taskprocessor_local struct, which contains
+ * both the provided \a datap pointer, and any local data set on the
+ * taskprocessor with ast_taskprocessor_set_local().
+ *
+ * \param tps The taskprocessor structure
+ * \param task_exe The task handling function to push into the taskprocessor queue
+ * \param datap The data to be used by the task handling function
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 12.0.0
+ */
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+       int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
 /*!
  * \brief Pop a task off the taskprocessor and execute it.
  *
diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h
new file mode 100644 (file)
index 0000000..f5d3e9a
--- /dev/null
@@ -0,0 +1,193 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_VECTOR_H
+#define _ASTERISK_VECTOR_H
+
+/*! \file
+ *
+ * \brief Vector container support.
+ *
+ * A vector is a variable length array, with properties that can be useful when
+ * order doesn't matter.
+ *  - Appends are asymptotically constant time.
+ *  - Unordered removes are constant time.
+ *  - Search is linear time
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ */
+
+/*! \brief Define a vector structure */
+#define ast_vector(type)                       \
+       struct {                                \
+               type *elems;                    \
+               size_t max;                     \
+               size_t current;                 \
+       }
+
+/*!
+ * \brief Initialize a vector
+ *
+ * If \a size is 0, then no space will be allocated until the vector is
+ * appended to.
+ *
+ * \param vec Vector to initialize.
+ * \param size Initial size of the vector.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_init(vec, size) ({                                  \
+       size_t __size = (size);                                         \
+       size_t alloc_size = __size * sizeof(*(vec).elems);              \
+       (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL;       \
+       (vec).current = 0;                                              \
+       if ((vec).elems) {                                              \
+               (vec).max = __size;                                     \
+       } else {                                                        \
+               (vec).max = 0;                                          \
+       }                                                               \
+       alloc_size == 0 || (vec).elems != NULL ? 0 : -1;                \
+})
+
+/*!
+ * \brief Deallocates this vector.
+ *
+ * If any code to free the elements of this vector need to be run, that should
+ * be done prior to this call.
+ *
+ * \param vec Vector to deallocate.
+ */
+#define ast_vector_free(vec) do {              \
+       ast_free((vec).elems);                  \
+       (vec).elems = NULL;                     \
+       (vec).max = 0;                          \
+       (vec).current = 0;                      \
+} while (0)
+
+/*!
+ * \brief Append an element to a vector, growing the vector if needed.
+ *
+ * \param vec Vector to append to.
+ * \param elem Element to append.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_append(vec, elem) ({                                        \
+       int res = 0;                                                    \
+                                                                       \
+       if ((vec).current + 1 > (vec).max) {                            \
+               size_t new_max = (vec).max ? 2 * (vec).max : 1;         \
+               typeof((vec).elems) new_elems = ast_realloc(            \
+                       (vec).elems, new_max * sizeof(*new_elems));     \
+               if (new_elems) {                                        \
+                       (vec).elems = new_elems;                                \
+                       (vec).max = new_max;                            \
+               } else {                                                \
+                       res = -1;                                       \
+               }                                                       \
+       }                                                               \
+                                                                       \
+       if (res == 0) {                                                 \
+               (vec).elems[(vec).current++] = (elem);                  \
+       }                                                               \
+       res;                                                            \
+})
+
+/*!
+ * \brief Remove an element from a vector by index.
+ *
+ * Note that elements in the vector may be reordered, so that the remove can
+ * happen in constant time.
+ *
+ * \param vec Vector to remove from.
+ * \param idx Index of the element to remove.
+ * \return The element that was removed.
+ */
+#define ast_vector_remove_unordered(vec, idx) ({               \
+       typeof((vec).elems[0]) res;                             \
+       size_t __idx = (idx);                                   \
+       ast_assert(__idx < (vec).current);                      \
+       res = (vec).elems[__idx];                               \
+       (vec).elems[__idx] = (vec).elems[--(vec).current];      \
+       res;                                                    \
+})
+
+
+/*!
+ * \brief Remove an element from a vector that matches the given comparison
+ *
+ * \param vec Vector to remove from.
+ * \param value Value to pass into comparator.
+ * \param cmp Comparator function/macros (called as \c cmp(elem, value))
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({            \
+       int res = -1;                                                   \
+       size_t idx;                                                     \
+       typeof(value) __value = (value);                                \
+       for (idx = 0; idx < (vec).current; ++idx) {                     \
+               if (cmp((vec).elems[idx], __value)) {                   \
+                       ast_vector_remove_unordered((vec), idx);        \
+                       res = 0;                                        \
+                       break;                                          \
+               }                                                       \
+       }                                                               \
+       res;                                                            \
+})
+
+/*! \brief Default comparator for ast_vector_remove_elem_unordered() */
+#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b))
+
+/*!
+ * \brief Remove an element from a vector.
+ *
+ * \param vec Vector to remove from.
+ * \param elem Element to remove
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_elem_unordered(vec, elem) ({ \
+       ast_vector_remove_cmp_unordered((vec), (elem),  \
+               AST_VECTOR_DEFAULT_CMP);                \
+})
+
+/*!
+ * \brief Get the number of elements in a vector.
+ *
+ * \param vec Vector to query.
+ * \return Number of elements in the vector.
+ */
+#define ast_vector_size(vec) (vec).current
+
+/*!
+ * \brief Get an element from a vector.
+ *
+ * \param vec Vector to query.
+ * \param idx Index of the element to get.
+ */
+#define ast_vector_get(vec, idx) ({            \
+       size_t __idx = (idx);                   \
+       ast_assert(__idx < (vec).current);      \
+       (vec).elems[__idx];                     \
+})
+
+#endif /* _ASTERISK_VECTOR_H */
index 88a6a6a..88801bd 100644 (file)
@@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
        ast_atomic_fetchadd_int(&ao2.total_objects, -1);
 #endif
 
+       /* In case someone uses an object after it's been freed */
+       obj->priv_data.magic = 0;
+
        switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
        case AO2_ALLOC_OPT_LOCK_MUTEX:
                obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
                ast_mutex_destroy(&obj_mutex->mutex.lock);
 
-               /*
-                * For safety, zero-out the astobj2_lock header and also the
-                * first word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
                ast_free(obj_mutex);
                break;
        case AO2_ALLOC_OPT_LOCK_RWLOCK:
                obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
                ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
 
-               /*
-                * For safety, zero-out the astobj2_rwlock header and also the
-                * first word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
                ast_free(obj_rwlock);
                break;
        case AO2_ALLOC_OPT_LOCK_NOLOCK:
-               /*
-                * For safety, zero-out the astobj2 header and also the first
-                * word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
                ast_free(obj);
                break;
        default:
@@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
        struct astobj2_lock *obj_mutex;
        struct astobj2_rwlock *obj_rwlock;
 
-       if (data_size < sizeof(void *)) {
-               /*
-                * We always alloc at least the size of a void *,
-                * for debugging purposes.
-                */
-               data_size = sizeof(void *);
-       }
-
        switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
        case AO2_ALLOC_OPT_LOCK_MUTEX:
 #if defined(__AST_DEBUG_MALLOC)
index 061c45a..3068c6f 100644 (file)
@@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj)
        ast_free((char *)generic_list->device_name);
 }
 
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
 static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
 {
        struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data)
        return 0;
 }
 
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* Wow, it's cool that we've picked up on a state change, but we really want
         * the actual work to be done in the core's taskprocessor execution thread
@@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent)
        return 0;
 }
 
-static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_cc_agent *agent = userdata;
        enum ast_device_state new_state;
index fb02d33..ea0f9c0 100644 (file)
@@ -334,13 +334,13 @@ static struct ao2_container *active_cdrs_by_channel;
 static struct stasis_message_router *stasis_router;
 
 /*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
 
 /*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
 
 /*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
 
 /*! \brief The parent topic for all topics we want to aggregate for CDRs */
 static struct stasis_topic *cdr_topic;
@@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
  * \param topic The topic this message was published for
  * \param message The message
  */
-static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
        RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
@@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
  * \param topic The topic this message was published for
  * \param message The message
  */
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge)
  * \param message The message - hopefully a bridge one!
  */
 static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_bridge_blob *update = stasis_message_data(message);
        struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
  * \param message The message - hopefully a bridge one!
  */
 static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_bridge_blob *update = stasis_message_data(message);
        struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
  * \param message The message about who got parked
  * */
 static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_parked_call_payload *payload = stasis_message_data(message);
        struct ast_channel_snapshot *channel = payload->parkee;
@@ -3884,9 +3884,9 @@ static int process_config(int reload)
 
 static void cdr_engine_cleanup(void)
 {
-       channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
-       bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
-       parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+       channel_subscription = stasis_forward_cancel(channel_subscription);
+       bridge_subscription = stasis_forward_cancel(bridge_subscription);
+       parking_subscription = stasis_forward_cancel(parking_subscription);
        stasis_message_router_unsubscribe_and_join(stasis_router);
        ao2_cleanup(cdr_topic);
        cdr_topic = NULL;
index 6050fac..0d78b5c 100644 (file)
@@ -121,16 +121,16 @@ static struct stasis_topic *cel_topic;
 static struct stasis_topic *cel_aggregation_topic;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
 
 /*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
 
 /*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
 
 struct stasis_message_type *cel_generic_type(void);
 STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
 }
 
 static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct stasis_cache_update *update = stasis_message_data(message);
@@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str(
 
 static void cel_bridge_enter_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb(
 
 static void cel_bridge_leave_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb(
 
 static void cel_parking_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
@@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob)
 }
 
 static void cel_dial_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *blob = stasis_message_data(message);
@@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub,
 
 static void cel_generic_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
@@ -1241,7 +1235,6 @@ static void cel_generic_cb(
 
 static void cel_blind_transfer_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *obj = stasis_message_data(message);
@@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb(
 
 static void cel_attended_transfer_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_attended_transfer_message *xfer = stasis_message_data(message);
@@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb(
 
 static void cel_pickup_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1364,7 +1355,6 @@ static void cel_pickup_cb(
 
 static void cel_local_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1394,10 +1384,10 @@ static void ast_cel_engine_term(void)
        cel_aggregation_topic = NULL;
        ao2_cleanup(cel_topic);
        cel_topic = NULL;
-       cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
-       cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
-       cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
-       cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+       cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+       cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+       cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+       cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
        ast_cli_unregister(&cli_status);
        ao2_cleanup(cel_dialstatus_store);
        cel_dialstatus_store = NULL;
index 7ba3e6c..e8ce5e0 100644 (file)
@@ -7549,14 +7549,19 @@ struct varshead *ast_channel_get_manager_vars(struct ast_channel *chan)
        RAII_VAR(struct ast_str *, tmp, NULL, ast_free);
        struct manager_channel_variable *mcv;
 
-       ret = ao2_alloc(sizeof(*ret), varshead_dtor);
-       tmp = ast_str_create(16);
-
        if (!ret || !tmp) {
                return NULL;
        }
 
        AST_RWLIST_RDLOCK(&channelvars);
+
+       if (AST_LIST_EMPTY(&channelvars)) {
+               return NULL;
+       }
+
+       ret = ao2_alloc(sizeof(*ret), varshead_dtor);
+       tmp = ast_str_create(16);
+
        AST_LIST_TRAVERSE(&channelvars, mcv, entry) {
                const char *val = NULL;
                struct ast_var_t *var;
index 956816d..de2cc9c 100644 (file)
@@ -207,8 +207,7 @@ struct ast_channel {
        char sending_dtmf_digit;                        /*!< Digit this channel is currently sending out. (zero if not sending) */
        struct timeval sending_dtmf_tv;         /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
        struct stasis_cp_single *topics;                /*!< Topic for all channel's events */
-       struct stasis_subscription *forwarder;          /*!< Subscription for event forwarding to all topic */
-       struct stasis_subscription *endpoint_forward;   /*!< Subscription for event forwarding to endpoint's topic */
+       struct stasis_forward *endpoint_forward;        /*!< Subscription for event forwarding to endpoint's topic */
 };
 
 /*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
 
        ast_string_field_free_memory(chan);
 
-       chan->forwarder = stasis_unsubscribe(chan->forwarder);
-       chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+       chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
 
        stasis_cp_single_unsubscribe(chan->topics);
        chan->topics = NULL;
index bcf07ff..158d1f8 100644 (file)
@@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
        return 1;
 }
 
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        enum ast_device_state aggregate_state;
        char *device;
index b33e33f..bdcf401 100644 (file)
@@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
 
 /*! \brief Handler for channel snapshot cache clears */
 static void endpoint_cache_clear(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct ast_endpoint *endpoint = data;
@@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data,
 }
 
 static void endpoint_default(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct stasis_endpoint *endpoint = data;
index 00649da..69def4b 100644 (file)
@@ -1126,7 +1126,7 @@ static struct stasis_topic *manager_topic;
 static struct stasis_message_router *stasis_router;
 
 /*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
 
 #define MGR_SHOW_TERMINAL_WIDTH 80
 
@@ -1151,7 +1151,7 @@ static const struct {
        {{ "restart", "gracefully", NULL }},
 };
 
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 static void acl_change_stasis_subscribe(void)
 {
@@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
 }
 
 static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
@@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_json_payload *payload = stasis_message_data(message);
@@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var)
 #ifdef TEST_FRAMEWORK
 
 static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic,
                struct stasis_message *message)
 {
        struct ast_test_suite_message_payload *payload;
@@ -7759,7 +7756,7 @@ static void manager_shutdown(void)
                stasis_message_router_unsubscribe_and_join(stasis_router);
                stasis_router = NULL;
        }
-       stasis_unsubscribe_and_join(rtp_topic_forwarder);
+       stasis_forward_cancel(rtp_topic_forwarder);
        rtp_topic_forwarder = NULL;
        ao2_cleanup(manager_topic);
        manager_topic = NULL;
@@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config)
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index 77d9ff0..fad676b 100644 (file)
@@ -106,7 +106,7 @@ static struct stasis_message_router *bridge_state_router;
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_bridge_state_string_prefix(
        const struct ast_bridge_snapshot *snapshot,
@@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = {
 };
 
 static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
@@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
 }
 
 static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
@@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_enter_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        static const char *swap_name = "SwapUniqueid: ";
@@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_leave_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -456,7 +452,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
 
 static void manager_bridging_cleanup(void)
 {
-       stasis_unsubscribe(topic_forwarder);
+       stasis_forward_cancel(topic_forwarder);
        topic_forwarder = NULL;
 }
 
index 485841b..0bebb21 100644 (file)
@@ -370,7 +370,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_channel_state_string_prefix(
                const struct ast_channel_snapshot *snapshot,
@@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = {
 };
 
 static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key)
 }
 
 static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast
 }
 
 static void channel_hangup_request_cb(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
@@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data,
 }
 
 static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
        struct ast_channel_snapshot *spyer;
@@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
@@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub
 }
 
 static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
        struct ast_channel_blob *payload = stasis_message_data(message);
@@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su
 }
 
 static void channel_fax_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
@@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
        struct ast_json *blob = payload->blob;
@@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub
 }
 
 static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
  * \brief Callback processing messages for channel dialing
  */
 static void channel_dial_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
        const char *dialstatus;
@@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_hold_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        const char *musicclass;
@@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -1100,7 +1099,7 @@ static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
 
 static void manager_channels_shutdown(void)
 {
-       stasis_unsubscribe(topic_forwarder);
+       stasis_forward_cancel(topic_forwarder);
        topic_forwarder = NULL;
 }
 
index 6342837..b5f5b31 100644 (file)
@@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void)
 }
 
 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
-       /* XXX This looks wrong. Nothing should post or forward to a caching
-        * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
-        * to dig to make sure I don't break anything, though.
-        */
-       stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
+       stasis_publish(ast_manager_get_topic(), message);
 }
 
 int manager_endpoints_init(void)
index 12a3de3..849c315 100644 (file)
@@ -41,7 +41,7 @@ struct stasis_message_router *mwi_state_router;
 /*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 /*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
 static int exclude_event_cb(const char *key)
@@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key)
 
 /*! \brief Generic MWI event callback used for one-off events from voicemail modules */
 static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_mwi_blob *payload = stasis_message_data(message);
@@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void mwi_update_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_mwi_state *mwi_state;
@@ -149,7 +147,7 @@ static void mwi_update_cb(void *data, struct stasis_subscription *sub,
 
 static void manager_mwi_shutdown(void)
 {
-       stasis_unsubscribe(topic_forwarder);
+       stasis_forward_cancel(topic_forwarder);
        topic_forwarder = NULL;
 }
 
index 4fef11d..f4e7e9e 100644 (file)
@@ -34,11 +34,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! \brief The \ref stasis subscription returned by the forwarding of the system topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static void manager_system_shutdown(void)
 {
-       stasis_unsubscribe(topic_forwarder);
+       stasis_forward_cancel(topic_forwarder);
        topic_forwarder = NULL;
 }
 
index 09f3d95..2a41540 100644 (file)
@@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c)
        ao2_iterator_destroy(&iter);
 }
 
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_device_state_message *dev_state;
        struct ast_hint *hint;
@@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data)
        return res;
 }
 
-static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_presence_state_message *presence_state = stasis_message_data(msg);
        struct ast_hint *hint;
diff --git a/main/sem.c b/main/sem.c
new file mode 100644 (file)
index 0000000..e67d9c7
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk semaphore support.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/sem.h"
+#include "asterisk/utils.h"
+
+#ifndef HAS_WORKING_SEMAPHORE
+
+/* DIY semaphores! */
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+       if (pshared) {
+               /* Don't need it... yet */
+               errno = ENOSYS;
+               return -1;
+       }
+
+       /* Since value is unsigned, this will also catch attempts to init with
+        * a negative value */
+       if (value > AST_SEM_VALUE_MAX) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       sem->count = value;
+       sem->waiters = 0;
+       ast_mutex_init(&sem->mutex);
+       ast_cond_init(&sem->cond, NULL);
+       return 0;
+}
+
+int ast_sem_destroy(struct ast_sem *sem)
+{
+       ast_mutex_destroy(&sem->mutex);
+       ast_cond_destroy(&sem->cond);
+       return 0;
+}
+
+int ast_sem_post(struct ast_sem *sem)
+{
+       SCOPED_MUTEX(lock, &sem->mutex);
+
+       ast_assert(sem->count >= 0);
+
+       if (sem->count == AST_SEM_VALUE_MAX) {
+               errno = EOVERFLOW;
+               return -1;
+       }
+
+       /* Give it up! */
+       ++sem->count;
+
+       /* Release a waiter, if needed */
+       if (sem->waiters) {
+               ast_cond_signal(&sem->cond);
+       }
+
+       return 0;
+}
+
+int ast_sem_wait(struct ast_sem *sem)
+{
+       SCOPED_MUTEX(lock, &sem->mutex);
+
+       ast_assert(sem->count >= 0);
+
+       /* Wait for a non-zero count */
+       ++sem->waiters;
+       while (sem->count == 0) {
+               ast_cond_wait(&sem->cond, &sem->mutex);
+       }
+       --sem->waiters;
+
+       /* Take it! */
+       --sem->count;
+
+       return 0;
+}
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+       SCOPED_MUTEX(lock, &sem->mutex);
+
+       ast_assert(sem->count >= 0);
+
+       *sval = sem->count;
+
+       return 0;
+}
+
+#endif
index 9f70ef6..2fcd239 100644 (file)
@@ -281,7 +281,7 @@ static void sounds_cleanup(void)
 }
 
 static void format_update_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        ast_sounds_reindex();
 }
index 1a03bb3..42c9017 100644 (file)
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
+#include "asterisk/vector.h"
 
 /*!
  * \page stasis-impl Stasis Implementation Notes
@@ -134,24 +134,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
-/*! Threadpool for dispatching notifications to subscribers */
-static struct ast_threadpool *pool;
-
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
 struct stasis_topic {
        char *name;
        /*! Variable length array of the subscribers */
-       struct stasis_subscription **subscribers;
-       /*! Allocated length of the subscribers array */
-       size_t num_subscribers_max;
-       /*! Current size of the subscribers array */
-       size_t num_subscribers_current;
+       ast_vector(struct stasis_subscription *) subscribers;
+
+       /*! Topics forwarding into this topic */
+       ast_vector(struct stasis_topic *) upstream_topics;
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+       struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 static void topic_dtor(void *obj)
 {
@@ -159,16 +158,18 @@ static void topic_dtor(void *obj)
 
        /* Subscribers hold a reference to topics, so they should all be
         * unsubscribed before we get here. */
-       ast_assert(topic->num_subscribers_current == 0);
+       ast_assert(ast_vector_size(topic->subscribers) == 0);
        ast_free(topic->name);
        topic->name = NULL;
-       ast_free(topic->subscribers);
-       topic->subscribers = NULL;
+
+       ast_vector_free(topic->subscribers);
+       ast_vector_free(topic->upstream_topics);
 }
 
 struct stasis_topic *stasis_topic_create(const char *name)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       int res = 0;
 
        topic = ao2_alloc(sizeof(*topic), topic_dtor);
 
@@ -181,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name)
                return NULL;
        }
 
-       topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
-       topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
-       if (!topic->subscribers) {
+       res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+       res |= ast_vector_init(topic->upstream_topics, 0);
+
+       if (res != 0) {
                return NULL;
        }
 
@@ -247,7 +249,6 @@ static void subscription_dtor(void *obj)
  * \param message Message to send.
  */
 static void subscription_invoke(struct stasis_subscription *sub,
-                                 struct stasis_topic *topic,
                                  struct stasis_message *message)
 {
        /* Notify that the final message has been received */
@@ -258,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub,
        }
 
        /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, topic, message);
+       sub->callback(sub->data, sub, message);
 
        /* Notify that the final message has been processed */
        if (stasis_subscription_final_message(sub, message)) {
@@ -268,7 +269,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
        }
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
@@ -286,10 +288,21 @@ struct stasis_subscription *internal_stasis_subscribe(
        ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
        if (needs_mailbox) {
-               sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+               /* With a small number of subscribers, a thread-per-sub is
+                * acceptable. If our usage changes so that we have larger
+                * numbers of subscribers, we'll probably want to consider
+                * a threadpool. We had that originally, but with so few
+                * subscribers it was actually a performance loss instead of
+                * a gain.
+                */
+               sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+                       TPS_REF_DEFAULT);
                if (!sub->mailbox) {
                        return NULL;
                }
+               ast_taskprocessor_set_local(sub->mailbox, sub);
+               /* Taskprocessor has a reference */
+               ao2_ref(sub, +1);
        }
 
        ao2_ref(topic, +1);
@@ -302,7 +315,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        if (topic_add_subscription(topic, sub) != 0) {
                return NULL;
        }
-       send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+       send_subscription_subscribe(topic, sub);
 
        ao2_ref(sub, +1);
        return sub;
@@ -316,29 +329,42 @@ struct stasis_subscription *stasis_subscribe(
        return internal_stasis_subscribe(topic, callback, data, 1);
 }
 
+static int sub_cleanup(void *data)
+{
+       struct stasis_subscription *sub = data;
+       ao2_cleanup(sub);
+       return 0;
+}
+
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
-       if (sub) {
-               size_t i;
-               /* The subscription may be the last ref to this topic. Hold
-                * the topic ref open until after the unlock. */
-               RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
-                       ao2_cleanup);
-               SCOPED_AO2LOCK(lock_topic, topic);
+       /* The subscription may be the last ref to this topic. Hold
+        * the topic ref open until after the unlock. */
+       RAII_VAR(struct stasis_topic *, topic,
+               ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
 
-               for (i = 0; i < topic->num_subscribers_current; ++i) {
-                       if (topic->subscribers[i] == sub) {
-                               send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
-                               /* swap [i] with last entry; remove last entry */
-                               topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
-                               /* Unsubscribing unrefs the subscription */
-                               ao2_cleanup(sub);
-                               return NULL;
-                       }
-               }
+       if (!sub) {
+               return NULL;
+       }
+
+       /* We have to remove the subscription first, to ensure the unsubscribe
+        * is the final message */
+       if (topic_remove_subscription(sub->topic, sub) != 0) {
+               ast_log(LOG_ERROR,
+                       "Internal error: subscription has invalid topic\n");
+               return NULL;
+       }
+
+       /* Now let everyone know about the unsubscribe */
+       send_subscription_unsubscribe(topic, sub);
 
-               ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+       /* When all that's done, remove the ref the mailbox has on the sub */
+       if (sub->mailbox) {
+               ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
        }
+
+       /* Unsubscribing unrefs the subscription */
+       ao2_cleanup(sub);
        return NULL;
 }
 
@@ -388,8 +414,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
                struct stasis_topic *topic = sub->topic;
                SCOPED_AO2LOCK(lock_topic, topic);
 
-               for (i = 0; i < topic->num_subscribers_current; ++i) {
-                       if (topic->subscribers[i] == sub) {
+               for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+                       if (ast_vector_get(topic->subscribers, i) == sub) {
                                return 1;
                        }
                }
@@ -431,74 +457,36 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
  */
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       struct stasis_subscription **subscribers;
+       size_t idx;
        SCOPED_AO2LOCK(lock, topic);
 
-       /* Increase list size, if needed */
-       if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
-               subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
-               if (!subscribers) {
-                       return -1;
-               }
-               topic->subscribers = subscribers;
-               topic->num_subscribers_max *= 2;
-       }
-
        /* The reference from the topic to the subscription is shared with
         * the owner of the subscription, which will explicitly unsubscribe
         * to release it.
         *
         * If we bumped the refcount here, the owner would have to unsubscribe
         * and cleanup, which is a bit awkward. */
-       topic->subscribers[topic->num_subscribers_current++] = sub;
-       return 0;
-}
+       ast_vector_append(topic->subscribers, sub);
 
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
-       /*! Topic message was published to */
-       struct stasis_topic *topic;
-       /*! The message itself */
-       struct stasis_message *message;
-       /*! Subscription receiving the message */
-       struct stasis_subscription *sub;
-};
+       for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+               topic_add_subscription(
+                       ast_vector_get(topic->upstream_topics, idx), sub);
+       }
 
-static void dispatch_dtor(void *data)
-{
-       struct dispatch *dispatch = data;
-       ao2_cleanup(dispatch->topic);
-       ao2_cleanup(dispatch->message);
-       ao2_cleanup(dispatch->sub);
+       return 0;
 }
 
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+       size_t idx;
+       SCOPED_AO2LOCK(lock_topic, topic);
 
-       ast_assert(topic != NULL);
-       ast_assert(message != NULL);
-       ast_assert(sub != NULL);
-
-       dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
-       if (!dispatch) {
-               return NULL;
+       for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+               topic_remove_subscription(
+                       ast_vector_get(topic->upstream_topics, idx), sub);
        }
 
-       dispatch->topic = topic;
-       ao2_ref(topic, +1);
-
-       dispatch->message = message;
-       ao2_ref(message, +1);
-
-       dispatch->sub = sub;
-       ao2_ref(sub, +1);
-
-       ao2_ref(dispatch, +1);
-       return dispatch;
+       return ast_vector_remove_elem_unordered(topic->subscribers, sub);
 }
 
 /*!
@@ -506,16 +494,34 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
  * \param data \ref dispatch object
  * \return 0
  */
-static int dispatch_exec(void *data)
+static int dispatch_exec(struct ast_taskprocessor_local *local)
 {
-       RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+       struct stasis_subscription *sub = local->local_data;
+       struct stasis_message *message = local->data;
 
-       subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
+       subscription_invoke(sub, message);
+       ao2_cleanup(message);
 
        return 0;
 }
 
-void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+static void dispatch_message(struct stasis_subscription *sub,
+       struct stasis_message *message)
+{
+       if (sub->mailbox) {
+               ao2_bump(message);
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+                       /* Push failed; ugh. */
+                       ast_log(LOG_DEBUG, "Dropping dispatch\n");
+                       ao2_cleanup(message);
+               }
+       } else {
+               /* Dispatch directly */
+               subscription_invoke(sub, message);
+       }
+}
+
+void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
 {
        size_t i;
        /* The topic may be unref'ed by the subscription invocation.
@@ -525,70 +531,104 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
        SCOPED_AO2LOCK(lock, topic);
 
        ast_assert(topic != NULL);
-       ast_assert(publisher_topic != NULL);
        ast_assert(message != NULL);
 
-       for (i = 0; i < topic->num_subscribers_current; ++i) {
-               struct stasis_subscription *sub = topic->subscribers[i];
+       for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+               struct stasis_subscription *sub =
+                       ast_vector_get(topic->subscribers, i);
 
                ast_assert(sub != NULL);
 
-               if (sub->mailbox) {
-                       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
-                       dispatch = dispatch_create(publisher_topic, message, sub);
-                       if (!dispatch) {
-                               ast_log(LOG_DEBUG, "Dropping dispatch\n");
-                               break;
-                       }
-
-                       if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-                               /* Ownership transferred to mailbox.
-                                * Don't increment ref, b/c the task processor
-                                * may have already gotten rid of the object.
-                                */
-                               dispatch = NULL;
-                       }
-               } else {
-                       /* Dispatch directly */
-                       subscription_invoke(sub, publisher_topic, message);
-               }
+               dispatch_message(sub, message);
        }
 }
 
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \brief Forwarding information
+ *
+ * Any message posted to \a from_topic is forwarded to \a to_topic.
+ *
+ * In cases where both the \a from_topic and \a to_topic need to be locked,
+ * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
+ */
+struct stasis_forward {
+       /*! Originating topic */
+       struct stasis_topic *from_topic;
+       /*! Destination topic */
+       struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
 {
-       stasis_forward_message(topic, topic, message);
+       struct stasis_forward *forward = obj;
+
+       ao2_cleanup(forward->from_topic);
+       forward->from_topic = NULL;
+       ao2_cleanup(forward->to_topic);
+       forward->to_topic = NULL;
 }
 
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
 {
-       struct stasis_topic *to_topic = data;
-       stasis_forward_message(to_topic, topic, message);
+       if (forward) {
+               int idx;
 
-       if (stasis_subscription_final_message(sub, message)) {
-               ao2_cleanup(to_topic);
+               struct stasis_topic *from = forward->from_topic;
+               struct stasis_topic *to = forward->to_topic;
+
+               SCOPED_AO2LOCK(to_lock, to);
+
+               ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+               ao2_lock(from);
+               for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+                       topic_remove_subscription(
+                               from, ast_vector_get(to->subscribers, idx));
+               }
+               ao2_unlock(from);
        }
+
+       ao2_cleanup(forward);
+
+       return NULL;
 }
 
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+       struct stasis_topic *to_topic)
 {
-       struct stasis_subscription *sub;
+       RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
        if (!from_topic || !to_topic) {
                return NULL;
        }
 
-       /* Forwarding subscriptions should dispatch directly instead of having a
-        * mailbox. Otherwise, messages forwarded to the same topic from
-        * different topics may get reordered. Which is bad.
-        */
-       sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
-       if (sub) {
-               /* hold a ref to to_topic for this forwarding subscription */
-               ao2_ref(to_topic, +1);
+       forward = ao2_alloc(sizeof(*forward), forward_dtor);
+       if (!forward) {
+               return NULL;
        }
-       return sub;
+
+       forward->from_topic = ao2_bump(from_topic);
+       forward->to_topic = ao2_bump(to_topic);
+
+       {
+               SCOPED_AO2LOCK(lock, to_topic);
+               int res;
+
+               res = ast_vector_append(to_topic->upstream_topics, from_topic);
+               if (res != 0) {
+                       return NULL;
+               }
+
+               {
+                       SCOPED_AO2LOCK(lock, from_topic);
+                       size_t idx;
+                       for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+                               topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+                       }
+               }
+       }
+
+       return ao2_bump(forward);
 }
 
 static void subscription_change_dtor(void *obj)
@@ -598,7 +638,7 @@ static void subscription_change_dtor(void *obj)
        ao2_cleanup(change->topic);
 }
 
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
 {
        RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
 
@@ -616,12 +656,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
        return change;
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
        RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-       change = subscription_change_alloc(topic, uniqueid, description);
+       /* This assumes that we have already unsubscribed */
+       ast_assert(stasis_subscription_is_subscribed(sub));
+
+       change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
 
        if (!change) {
                return;
@@ -636,15 +679,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
        stasis_publish(topic, msg);
 }
 
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+       struct stasis_subscription *sub)
+{
+       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       /* This assumes that we have already unsubscribed */
+       ast_assert(!stasis_subscription_is_subscribed(sub));
+
+       change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+       if (!change) {
+               return;
+       }
+
+       msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+       if (!msg) {
+               return;
+       }
+
+       stasis_publish(topic, msg);
+
+       /* Now we have to dispatch to the subscription itself */
+       dispatch_message(sub, msg);
+}
+
 struct topic_pool_entry {
-       struct stasis_subscription *forward;
+       struct stasis_forward *forward;
        struct stasis_topic *topic;
 };
 
 static void topic_pool_entry_dtor(void *obj)
 {
        struct topic_pool_entry *entry = obj;
-       entry->forward = stasis_unsubscribe(entry->forward);
+       entry->forward = stasis_forward_cancel(entry->forward);
        ao2_cleanup(entry->topic);
        entry->topic = NULL;
 }
@@ -731,13 +801,6 @@ void stasis_log_bad_type_access(const char *name)
        ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
 }
 
-/*! \brief Shutdown function */
-static void stasis_exit(void)
-{
-       ast_threadpool_shutdown(pool);
-       pool = NULL;
-}
-
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
@@ -748,36 +811,14 @@ int stasis_init(void)
 {
        int cache_init;
 
-       struct ast_threadpool_options opts;
-
        /* Be sure the types are cleaned up after the message bus */
        ast_register_cleanup(stasis_cleanup);
-       ast_register_atexit(stasis_exit);
-
-       if (stasis_config_init() != 0) {
-               ast_log(LOG_ERROR, "Stasis configuration failed\n");
-               return -1;
-       }
 
        if (stasis_wait_init() != 0) {
                ast_log(LOG_ERROR, "Stasis initialization failed\n");
                return -1;
        }
 
-       if (pool) {
-               ast_log(LOG_ERROR, "Stasis double-initialized\n");
-               return -1;
-       }
-
-       stasis_config_get_threadpool_options(&opts);
-       ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
-               opts.initial_size, opts.max_size, opts.idle_timeout);
-       pool = ast_threadpool_create("stasis-core", NULL, &opts);
-       if (!pool) {
-               ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
-               return -1;
-       }
-
        cache_init = stasis_cache_init();
        if (cache_init != 0) {
                return -1;
index d437552..279210d 100644 (file)
@@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa
 static void stasis_cache_update_dtor(void *obj)
 {
        struct stasis_cache_update *update = obj;
-       ao2_cleanup(update->topic);
-       update->topic = NULL;
        ao2_cleanup(update->old_snapshot);
        update->old_snapshot = NULL;
        ao2_cleanup(update->new_snapshot);
@@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj)
        update->type = NULL;
 }
 
-static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
 {
        RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-       ast_assert(topic != NULL);
        ast_assert(old_snapshot != NULL || new_snapshot != NULL);
 
        update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
@@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
                return NULL;
        }
 
-       ao2_ref(topic, +1);
-       update->topic = topic;
        if (old_snapshot) {
                ao2_ref(old_snapshot, +1);
                update->old_snapshot = old_snapshot;
@@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
 }
 
 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
        struct stasis_caching_topic *caching_topic = data;
@@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                if (clear_id) {
                        old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
                        if (old_snapshot) {
-                               update = update_create(topic, old_snapshot, NULL);
+                               update = update_create(old_snapshot, NULL);
                                stasis_publish(caching_topic->topic, update);
                                return;
                        }
@@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
 
                old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
 
-               update = update_create(topic, old_snapshot, message);
+               update = update_create(old_snapshot, message);
                if (update == NULL) {
                        return;
                }
index 381fdd9..9644028 100644 (file)
@@ -39,15 +39,15 @@ struct stasis_cp_all {
        struct stasis_topic *topic_cached;
        struct stasis_cache *cache;
 
-       struct stasis_subscription *forward_all_to_cached;
+       struct stasis_forward *forward_all_to_cached;
 };
 
 struct stasis_cp_single {
        struct stasis_topic *topic;
        struct stasis_caching_topic *topic_cached;
 
-       struct stasis_subscription *forward_topic_to_all;
-       struct stasis_subscription *forward_cached_to_all;
+       struct stasis_forward *forward_topic_to_all;
+       struct stasis_forward *forward_cached_to_all;
 };
 
 static void all_dtor(void *obj)
@@ -60,7 +60,7 @@ static void all_dtor(void *obj)
        all->topic_cached = NULL;
        ao2_cleanup(all->cache);
        all->cache = NULL;
-       stasis_unsubscribe_and_join(all->forward_all_to_cached);
+       stasis_forward_cancel(all->forward_all_to_cached);
        all->forward_all_to_cached = NULL;
 }
 
@@ -172,9 +172,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
                return;
        }
 
-       stasis_unsubscribe(one->forward_topic_to_all);
+       stasis_forward_cancel(one->forward_topic_to_all);
        one->forward_topic_to_all = NULL;
-       stasis_unsubscribe(one->forward_cached_to_all);
+       stasis_forward_cancel(one->forward_cached_to_all);
        one->forward_cached_to_all = NULL;
        stasis_caching_unsubscribe(one->topic_cached);
        one->topic_cached = NULL;
diff --git a/main/stasis_config.c b/main/stasis_config.c
deleted file mode 100644 (file)
index 006df51..0000000
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
-
-/*! \file
- *
- * \brief Stasis Message Bus configuration API.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
-
-/*** MODULEINFO
-       <support_level>core</support_level>
- ***/
-
-#include "asterisk.h"
-
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
-#include "asterisk/config_options.h"
-#include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
-
-#include <limits.h>
-
-/*** DOCUMENTATION
-       <configInfo name="stasis" language="en_US">
-               <synopsis>Stasis message bus configuration.</synopsis>
-               <configFile name="stasis.conf">
-                       <configObject name="threadpool">
-                               <synopsis>Threadpool configuration.</synopsis>
-                               <configOption name="initial_size" default="0">
-                                       <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
-                               </configOption>
-                               <configOption name="idle_timeout_sec" default="20">
-                                       <synopsis>Number of seconds for an idle thread to be disposed of.</synopsis>
-                               </configOption>
-                               <configOption name="max_size" default="200">
-                                       <synopsis>Maximum number of threads in the threadpool.</synopsis>
-                               </configOption>
-                       </configObject>
-               </configFile>
-       </configInfo>
- ***/
-
-/*! \brief Locking container for safe configuration access. */
-static AO2_GLOBAL_OBJ_STATIC(confs);
-
-struct stasis_threadpool_conf {
-       int initial_size;
-       int idle_timeout_sec;
-       int max_size;
-};
-
-struct stasis_conf {
-       struct stasis_threadpool_conf *threadpool;
-};
-
-/*! \brief Mapping of the stasis conf struct's globals to the
- *         threadpool context in the config file. */
-static struct aco_type threadpool_option = {
-        .type = ACO_GLOBAL,
-        .name = "threadpool",
-        .item_offset = offsetof(struct stasis_conf, threadpool),
-        .category = "^threadpool$",
-        .category_match = ACO_WHITELIST,
-};
-
-static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
-
-#define CONF_FILENAME "stasis.conf"
-
-/*! \brief The conf file that's processed for the module. */
-static struct aco_file conf_file = {
-        /*! The config file name. */
-        .filename = CONF_FILENAME,
-        /*! The mapping object types to be processed. */
-        .types = ACO_TYPES(&threadpool_option),
-};
-
-static void conf_dtor(void *obj)
-{
-       struct stasis_conf *conf = obj;
-
-       ao2_cleanup(conf->threadpool);
-       conf->threadpool = NULL;
-}
-
-static void *conf_alloc(void)
-{
-       RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
-       conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
-               AO2_ALLOC_OPT_LOCK_NOLOCK);
-       if (!conf) {
-               return NULL;
-       }
-
-       conf->threadpool = ao2_alloc_options(sizeof(*conf->threadpool), NULL,
-               AO2_ALLOC_OPT_LOCK_NOLOCK);
-       if (!conf->threadpool) {
-               return NULL;
-       }
-
-       aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool);
-
-       ao2_ref(conf, +1);
-       return conf;
-}
-
-CONFIG_INFO_CORE("stasis", cfg_info, confs, conf_alloc,
-       .files = ACO_FILES(&conf_file));
-
-void stasis_config_get_threadpool_options(
-       struct ast_threadpool_options *threadpool_options)
-{
-       RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
-
-       conf = ao2_global_obj_ref(confs);
-
-       ast_assert(conf && conf->threadpool);
-
-       {
-               struct ast_threadpool_options newopts = {
-                       .version = AST_THREADPOOL_OPTIONS_VERSION,
-                       .initial_size = conf->threadpool->initial_size,
-                       .auto_increment = 1,
-                       .idle_timeout = conf->threadpool->idle_timeout_sec,
-                       .max_size = conf->threadpool->max_size,
-               };
-
-               *threadpool_options = newopts;
-       }
-}
-
-/*! \brief Load (or reload) configuration. */
-static int process_config(int reload)
-{
-       RAII_VAR(struct stasis_conf *, conf, conf_alloc(), ao2_cleanup);
-
-       switch (aco_process_config(&cfg_info, reload)) {
-       case ACO_PROCESS_ERROR:
-               if (conf && !reload
-                       && !aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool)) {
-                       ast_log(AST_LOG_NOTICE, "Failed to process Stasis configuration; using defaults\n");
-                       ao2_global_obj_replace_unref(confs, conf);
-                       return 0;
-               }
-               return -1;
-       case ACO_PROCESS_OK:
-       case ACO_PROCESS_UNCHANGED:
-               break;
-       }
-
-       return 0;
-}
-
-static void config_exit(void)
-{
-       aco_info_destroy(&cfg_info);
-       ao2_global_obj_release(confs);
-}
-
-int stasis_config_init(void)
-{
-       if (aco_info_init(&cfg_info)) {
-               aco_info_destroy(&cfg_info);
-               return -1;
-       }
-
-       ast_register_atexit(config_exit);
-
-       /* threadpool section */
-       aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
-               threadpool_options, "0", OPT_INT_T, PARSE_IN_RANGE,
-               FLDSET(struct stasis_threadpool_conf, initial_size), 0,
-               INT_MAX);
-       aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
-               threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
-               FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
-               INT_MAX);
-       aco_option_register(&cfg_info, "max_size", ACO_EXACT,
-               threadpool_options, "200", OPT_INT_T, PARSE_IN_RANGE,
-               FLDSET(struct stasis_threadpool_conf, max_size), 0, INT_MAX);
-
-       return process_config(0);
-}
index 26d2f2c..8c82dec 100644 (file)
@@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_message_router.h"
 
-/*! Number of hash buckets for the route table. Keep it prime! */
-#define ROUTE_TABLE_BUCKETS 7
-
 /*! \internal */
 struct stasis_message_route {
        /*! Message type handle by this route. */
@@ -47,29 +44,79 @@ struct stasis_message_route {
        void *data;
 };
 
-static void route_dtor(void *obj)
+struct route_table {
+       /*! Current number of entries in the route table */
+       size_t current_size;
+       /*! Allocated number of entires in the route table */
+       size_t max_size;
+       /*! The route table itself */
+       struct stasis_message_route routes[];
+};
+
+static struct stasis_message_route *table_find_route(struct route_table *table,
+       struct stasis_message_type *message_type)
 {
-       struct stasis_message_route *route = obj;
+       size_t idx;
+
+       /* While a linear search for routes may seem very inefficient, most
+        * route tables have six routes or less. For such small data, it's
+        * hard to beat a linear search. If we start having larger route
+        * tables, then we can look into containers with more efficient
+        * lookups.
+        */
+       for (idx = 0; idx < table->current_size; ++idx) {
+               if (table->routes[idx].message_type == message_type) {
+                       return &table->routes[idx];
+               }
+       }
 
-       ao2_cleanup(route->message_type);
-       route->message_type = NULL;
+       return NULL;
 }
 
-static int route_hash(const void *obj, const int flags)
+static int table_add_route(struct route_table **table_ptr,
+       struct stasis_message_type *message_type,
+       stasis_subscription_cb callback, void *data)
 {
-       const struct stasis_message_route *route = obj;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
+       struct route_table *table = *table_ptr;
+       struct stasis_message_route *route;
+
+       ast_assert(table_find_route(table, message_type) == NULL);
+
+       if (table->current_size + 1 > table->max_size) {
+               size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
+               struct route_table *new_table = ast_realloc(table,
+                       sizeof(*new_table) +
+                       sizeof(new_table->routes[0]) * new_max_size);
+               if (!new_table) {
+                       return -1;
+               }
+               *table_ptr = table = new_table;
+               table->max_size = new_max_size;
+       }
 
-       return ast_str_hash(stasis_message_type_name(message_type));
+       route = &table->routes[table->current_size++];
+
+       route->message_type = ao2_bump(message_type);
+       route->callback = callback;
+       route->data = data;
+
+       return 0;
 }
 
-static int route_cmp(void *obj, void *arg, int flags)
+static int table_remove_route(struct route_table *table,
+       struct stasis_message_type *message_type)
 {
-       const struct stasis_message_route *left = obj;
-       const struct stasis_message_route *right = arg;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
-
-       return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
+       size_t idx;
+
+       for (idx = 0; idx < table->current_size; ++idx) {
+               if (table->routes[idx].message_type == message_type) {
+                       ao2_cleanup(message_type);
+                       table->routes[idx] =
+                               table->routes[--table->current_size];
+                       return 0;
+               }
+       }
+       return -1;
 }
 
 /*! \internal */
@@ -77,11 +124,11 @@ struct stasis_message_router {
        /*! Subscription to the upstream topic */
        struct stasis_subscription *subscription;
        /*! Subscribed routes */
-       struct ao2_container *routes;
-       /*! Subscribed routes for \ref stasi_cache_update messages */
-       struct ao2_container *cache_routes;
+       struct route_table *routes;
+       /*! Subscribed routes for \ref stasis_cache_update messages */
+       struct route_table *cache_routes;
        /*! Route of last resort */
-       struct stasis_message_route *default_route;
+       struct stasis_message_route default_route;
 };
 
 static void router_dtor(void *obj)
@@ -92,66 +139,60 @@ static void router_dtor(void *obj)
        ast_assert(stasis_subscription_is_done(router->subscription));
        router->subscription = NULL;
 
-       ao2_cleanup(router->routes);
+       ast_free(router->routes);
        router->routes = NULL;
 
-       ao2_cleanup(router->cache_routes);
+       ast_free(router->cache_routes);
        router->cache_routes = NULL;
-
-       ao2_cleanup(router->default_route);
-       router->default_route = NULL;
 }
 
-static struct stasis_message_route *find_route(
+static int find_route(
        struct stasis_message_router *router,
-       struct stasis_message *message)
+       struct stasis_message *message,
+       struct stasis_message_route *route_out)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route *route = NULL;
        struct stasis_message_type *type = stasis_message_type(message);
        SCOPED_AO2LOCK(lock, router);
 
+       ast_assert(route_out != NULL);
+
        if (type == stasis_cache_update_type()) {
                /* Find a cache route */
                struct stasis_cache_update *update =
                        stasis_message_data(message);
-               route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+               route = table_find_route(router->cache_routes, update->type);
        }
 
        if (route == NULL) {
                /* Find a regular route */
-               route = ao2_find(router->routes, type, OBJ_KEY);
+               route = table_find_route(router->routes, type);
        }
 
-       if (route == NULL) {
+       if (route == NULL && router->default_route.callback) {
                /* Maybe the default route, then? */
-               if ((route = router->default_route)) {
-                       ao2_ref(route, +1);
-               }
+               route = &router->default_route;
        }
 
-       if (route == NULL) {
-               return NULL;
+       if (!route) {
+               return -1;
        }
 
-       ao2_ref(route, +1);
-       return route;
+       *route_out = *route;
+       return 0;
 }
 
 static void router_dispatch(void *data,
                            struct stasis_subscription *sub,
-                           struct stasis_topic *topic,
                            struct stasis_message *message)
 {
        struct stasis_message_router *router = data;
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route route;
 
-       route = find_route(router, message);
-
-       if (route) {
-               route->callback(route->data, sub, topic, message);
+       if (find_route(router, message, &route) == 0) {
+               route.callback(route.data, sub, message);
        }
 
-
        if (stasis_subscription_final_message(sub, message)) {
                ao2_cleanup(router);
        }
@@ -167,14 +208,12 @@ struct stasis_message_router *stasis_message_router_create(
                return NULL;
        }
 
-       router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
-               route_cmp);
+       router->routes = ast_calloc(1, sizeof(*router->routes));
        if (!router->routes) {
                return NULL;
        }
 
-       router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
-               route_hash, route_cmp);
+       router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
        if (!router->cache_routes) {
                return NULL;
        }
@@ -216,100 +255,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
        return stasis_subscription_is_done(router->subscription);
 }
 
-
-static struct stasis_message_route *route_create(
-       struct stasis_message_type *message_type,
-       stasis_subscription_cb callback,
-       void *data)
-{
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = ao2_alloc(sizeof(*route), route_dtor);
-       if (!route) {
-               return NULL;
-       }
-
-       if (message_type) {
-               ao2_ref(message_type, +1);
-       }
-       route->message_type = message_type;
-       route->callback = callback;
-       route->data = data;
-
-       ao2_ref(route, +1);
-       return route;
-}
-
-static int add_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
-{
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
-
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
-       }
-
-       ao2_link(router->routes, route);
-       return 0;
-}
-
-static int add_cache_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
-{
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->cache_routes, route->message_type,
-               OBJ_KEY);
-
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
-       }
-
-       ao2_link(router->cache_routes, route);
-       return 0;
-}
-
 int stasis_message_router_add(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = route_create(message_type, callback, data);
-       if (!route) {
-               return -1;
-       }
-
-       return add_route(router, route);
+       SCOPED_AO2LOCK(lock, router);
+       return table_add_route(&router->routes, message_type, callback, data);
 }
 
 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = route_create(message_type, callback, data);
-       if (!route) {
-               return -1;
-       }
-
-       return add_cache_route(router, route);
+       SCOPED_AO2LOCK(lock, router);
+       return table_add_route(&router->cache_routes, message_type, callback, data);
 }
 
 void stasis_message_router_remove(struct stasis_message_router *router,
        struct stasis_message_type *message_type)
 {
        SCOPED_AO2LOCK(lock, router);
-
-       ao2_find(router->routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       table_remove_route(router->routes, message_type);
 }
 
 void stasis_message_router_remove_cache_update(
@@ -317,9 +283,7 @@ void stasis_message_router_remove_cache_update(
        struct stasis_message_type *message_type)
 {
        SCOPED_AO2LOCK(lock, router);
-
-       ao2_find(router->cache_routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       table_remove_route(router->cache_routes, message_type);
 }
 
 int stasis_message_router_set_default(struct stasis_message_router *router,
@@ -327,7 +291,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
                                      void *data)
 {
        SCOPED_AO2LOCK(lock, router);
-       ao2_cleanup(router->default_route);
-       router->default_route = route_create(NULL, callback, data);
-       return router->default_route ? 0 : -1;
+       router->default_route.callback = callback;
+       router->default_route.data = data;
+       /* While this implementation can never fail, it used to be able to */
+       return 0;
 }
index e94c686..32b5971 100644 (file)
@@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj)
 }
 
 static void guarantee_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* Wait for our particular message */
        if (data == message) {
index a8d1c80..189219d 100644 (file)
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
 
 /*!
  * \brief tps_task structure is queued to a taskprocessor
@@ -47,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  */
 struct tps_task {
        /*! \brief The execute() task callback function pointer */
-       int (*execute)(void *datap);
+       union {
+               int (*execute)(void *datap);
+               int (*execute_local)(struct ast_taskprocessor_local *local);
+       } callback;
        /*! \brief The data pointer for the task execute() function */
        void *datap;
        /*! \brief AST_LIST_ENTRY overhead */
        AST_LIST_ENTRY(tps_task) list;
+       unsigned int wants_local:1;
 };
 
 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -68,6 +73,7 @@ struct ast_taskprocessor {
        const char *name;
        /*! \brief Taskprocessor statistics */
        struct tps_taskprocessor_stats *stats;
+       void *local_data;
        /*! \brief Taskprocessor current queue size */
        long tps_queue_size;
        /*! \brief Taskprocessor queue */
@@ -113,9 +119,6 @@ static int tps_hash_cb(const void *obj, const int flags);
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
 static void tps_taskprocessor_destroy(void *tps);
 
@@ -138,47 +141,56 @@ static struct ast_cli_entry taskprocessor_clis[] = {
 
 struct default_taskprocessor_listener_pvt {
        pthread_t poll_thread;
-       ast_mutex_t lock;
-       ast_cond_t cond;
-       int wake_up;
        int dead;
+       struct ast_sem sem;
 };
 
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
 {
-       SCOPED_MUTEX(lock, &pvt->lock);
-       pvt->wake_up = 1;
-       pvt->dead = should_die;
-       ast_cond_signal(&pvt->cond);
+       ast_assert(pvt->dead);
+       ast_sem_destroy(&pvt->sem);
+       ast_free(pvt);
 }
 
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
 {
-       SCOPED_MUTEX(lock, &pvt->lock);
-       while (!pvt->wake_up) {
-               ast_cond_wait(&pvt->cond, lock);
-       }
-       pvt->wake_up = 0;
-       return pvt->dead;
+       struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+       default_listener_pvt_destroy(pvt);
+
+       listener->user_data = NULL;
 }
 
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
  */
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
 {
        struct ast_taskprocessor_listener *listener = data;
        struct ast_taskprocessor *tps = listener->tps;
        struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-       int dead = 0;
-
-       while (!dead) {
-               if (!ast_taskprocessor_execute(tps)) {
-                       dead = default_tps_idle(pvt);
+       int sem_value;
+       int res;
+
+       while (!pvt->dead) {
+               res = ast_sem_wait(&pvt->sem);
+               if (res != 0 && errno != EINTR) {
+                       ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+                               strerror(errno));
+                       /* Just give up */
+                       break;
                }
+               ast_taskprocessor_execute(tps);
        }
+
+       /* No posting to a dead taskprocessor! */
+       res = ast_sem_getvalue(&pvt->sem, &sem_value);
+       ast_assert(res == 0 && sem_value == 0);
+
+       /* Free the shutdown reference (see default_listener_shutdown) */
+       ao2_t_ref(listener->tps, -1, "tps-shutdown");
+
        return NULL;
 }
 
@@ -186,7 +198,7 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
 {
        struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-       if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+       if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
                return -1;
        }
 
@@ -197,33 +209,50 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
 {
        struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-       ast_assert(!pvt->dead);
-
-       if (was_empty) {
-               default_tps_wake_up(pvt, 0);
+       if (ast_sem_post(&pvt->sem) != 0) {
+               ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+                       strerror(errno));
        }
 }
 
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+static int default_listener_die(void *data)
 {
-       ast_mutex_destroy(&pvt->lock);
-       ast_cond_destroy(&pvt->cond);
-       ast_free(pvt);
+       struct default_taskprocessor_listener_pvt *pvt = data;
+       pvt->dead = 1;
+       return 0;
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
        struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-       default_tps_wake_up(pvt, 1);
-       pthread_join(pvt->poll_thread, NULL);
+       int res;
+
+       /* Hold a reference during shutdown */
+       ao2_t_ref(listener->tps, +1, "tps-shutdown");
+
+       ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+       if (pthread_self() == pvt->poll_thread) {
+               res = pthread_detach(pvt->poll_thread);
+               if (res != 0) {
+                       ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+                               strerror(errno));
+               }
+       } else {
+               res = pthread_join(pvt->poll_thread, NULL);
+               if (res != 0) {
+                       ast_log(LOG_ERROR, "pthread_join(): %s\n",
+                               strerror(errno));
+               }
+       }
        pvt->poll_thread = AST_PTHREADT_NULL;
-       default_listener_pvt_destroy(pvt);
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
        .start = default_listener_start,
        .task_pushed = default_task_pushed,
        .shutdown = default_listener_shutdown,
+       .dtor = default_listener_pvt_dtor,
 };
 
 /*!
@@ -258,19 +287,48 @@ int ast_tps_init(void)
 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
 {
        struct tps_task *t;
-       if ((t = ast_calloc(1, sizeof(*t)))) {
-               t->execute = task_exe;
-               t->datap = datap;
+       if (!task_exe) {
+               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               return NULL;
+       }
+
+       t = ast_calloc(1, sizeof(*t));
+       if (!t) {
+               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               return NULL;
+       }
+
+       t->callback.execute = task_exe;
+       t->datap = datap;
+
+       return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+       struct tps_task *t;
+       if (!task_exe) {
+               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               return NULL;
+       }
+
+       t = ast_calloc(1, sizeof(*t));
+       if (!t) {
+               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               return NULL;
        }
+
+       t->callback.execute_local = task_exe;
+       t->datap = datap;
+       t->wants_local = 1;
+
        return t;
 }
 
 /* release task resources */
 static void *tps_task_free(struct tps_task *task)
 {
-       if (task) {
-               ast_free(task);
-       }
+       ast_free(task);
        return NULL;
 }
 
@@ -425,16 +483,10 @@ static void tps_taskprocessor_destroy(void *tps)
        }
        ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
        /* free it */
-       if (t->stats) {
-               ast_free(t->stats);
-               t->stats = NULL;
-       }
+       ast_free(t->stats);
+       t->stats = NULL;
        ast_free((char *) t->name);
        if (t->listener) {
-               /* This code should not be reached since the listener
-                * should have been destroyed before the taskprocessor could
-                * be destroyed
-                */
                ao2_ref(t->listener, -1);
                t->listener = NULL;
        }
@@ -447,7 +499,6 @@ static void tps_taskprocessor_destroy(void *tps)
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
        struct tps_task *task;
-       SCOPED_AO2LOCK(lock, tps);
 
        if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
                tps->tps_queue_size--;
@@ -476,10 +527,21 @@ static void listener_shutdown(struct ast_taskprocessor_listener *listener)
        ao2_ref(listener->tps, -1);
 }
 
+static void taskprocessor_listener_dtor(void *obj)
+{
+       struct ast_taskprocessor_listener *listener = obj;
+
+       if (listener->callbacks->dtor) {
+               listener->callbacks->dtor(listener);
+       }
+}
+
 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 {
        RAII_VAR(struct ast_taskprocessor_listener *, listener,
-                       ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+                       NULL, ao2_cleanup);
+
+       listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
 
        if (!listener) {
                return NULL;
@@ -510,9 +572,12 @@ static void *default_listener_pvt_alloc(void)
        if (!pvt) {
                return NULL;
        }
-       ast_cond_init(&pvt->cond, NULL);
-       ast_mutex_init(&pvt->lock);
        pvt->poll_thread = AST_PTHREADT_NULL;
+       if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+               ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+               ast_free(pvt);
+               return NULL;
+       }
        return pvt;
 }
 
@@ -594,7 +659,6 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
 
        p = __allocate_taskprocessor(name, listener);
        if (!p) {
-               default_listener_pvt_destroy(pvt);
                ao2_ref(listener, -1);
                return NULL;
        }
@@ -615,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
        return __allocate_taskprocessor(name, listener);
 }
 
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+       void *local_data)
+{
+       SCOPED_AO2LOCK(lock, tps);
+       tps->local_data = local_data;
+}
+
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
@@ -636,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 }
 
 /* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
 {
-       struct tps_task *t;
        int previous_size;
        int was_empty;
 
-       if (!tps || !task_exe) {
-               ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+       if (!tps) {
+               ast_log(LOG_ERROR, "tps is NULL!\n");
                return -1;
        }
-       if (!(t = tps_task_alloc(task_exe, datap))) {
-               ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
+
+       if (!t) {
+               ast_log(LOG_ERROR, "t is NULL!\n");
                return -1;
        }
+
        ao2_lock(tps);
        AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
        previous_size = tps->tps_queue_size++;
@@ -660,21 +732,43 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
        return 0;
 }
 
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+       return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+       return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
 {
+       struct ast_taskprocessor_local local;
        struct tps_task *t;
        int size;
 
        ao2_lock(tps);
+       t = tps_taskprocessor_pop(tps);
+       if (!t) {
+               ao2_unlock(tps);
+               return 0;
+       }
+
        tps->executing = 1;
-       ao2_unlock(tps);
 
-       t = tps_taskprocessor_pop(tps);
+       if (t->wants_local) {
+               local.local_data = tps->local_data;
+               local.data = t->datap;
+       }
+       ao2_unlock(tps);
 
-       if (t) {
-               t->execute(t->datap);
-               tps_task_free(t);
+       if (t->wants_local) {
+               t->callback.execute_local(&local);
+       } else {
+               t->callback.execute(t->datap);
        }
+       tps_task_free(t);
 
        ao2_lock(tps);
        /* We need to check size in the same critical section where we reset the
@@ -684,7 +778,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        tps->executing = 0;
        size = tps_taskprocessor_depth(tps);
        /* If we executed a task, bump the stats */
-       if (t && tps->stats) {
+       if (tps->stats) {
                tps->stats->_tasks_processed_count++;
                if (size > tps->stats->max_qsize) {
                        tps->stats->max_qsize = size;
@@ -693,7 +787,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        ao2_unlock(tps);
 
        /* If we executed a task, check for the transition to empty */
-       if (t && size == 0 && tps->listener->callbacks->emptied) {
+       if (size == 0 && tps->listener->callbacks->emptied) {
                tps->listener->callbacks->emptied(tps->listener);
        }
        return size > 0;
index fc74ac2..7bd8922 100644 (file)
@@ -739,7 +739,7 @@ announce_cleanup:
        cap_slin = ast_format_cap_destroy(cap_slin);
 }
 
-static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct park_announce_subscription_data *pa_data = data;
        char *dial_string = pa_data->dial_string;
index 0e5e05d..6c1d4d6 100644 (file)
@@ -125,7 +125,7 @@ static void parker_parked_call_message_response(struct ast_parked_call_payload *
        }
 }
 
-static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        if (stasis_subscription_final_message(sub, message)) {
                ast_free(data);
index dac1910..0c57701 100644 (file)
@@ -545,7 +545,7 @@ static void parked_call_message_response(struct ast_parked_call_payload *parked_
                );
 }
 
-static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        if (stasis_message_type(message) == ast_parked_call_type()) {
                struct ast_parked_call_payload *parked_call_message = stasis_message_data(message);
index 25251f3..84dcbeb 100644 (file)
@@ -1040,7 +1040,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type);
 
 static void agi_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
index a43c564..9d1e8c0 100644 (file)
@@ -57,7 +57,7 @@ static struct stasis_message_router *router;
  * \param message The message itself.
  */
 static void statsmaker(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, metric, NULL, ast_free);
 
@@ -89,7 +89,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub,
  * \param message The message itself.
  */
 static void updates(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* Since this came from a message router, we know the type of the
         * message. We can cast the data without checking its type.
@@ -139,7 +139,7 @@ static void updates(void *data, struct stasis_subscription *sub,
  * \param message The message itself.
  */
 static void default_route(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_subscription_final_message(sub, message)) {
                /* Much like with the regular subscription, you may need to
index b534e80..c1cee9b 100644 (file)
@@ -371,8 +371,8 @@ static void aji_pubsub_purge_nodes(struct aji_client *client,
        const char* collection_name);
 static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
        const char *context, const char *oldmsgs, const char *newmsgs);
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
 static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
                                       const char *event_type, unsigned int cachable);
 /* No transports in this version */
@@ -3235,7 +3235,7 @@ int ast_aji_disconnect(struct aji_client *client)
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        const char *mailbox;
        const char *context;
@@ -3269,7 +3269,7 @@ static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasi
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct aji_client *client = data;
        struct ast_device_state_message *dev_state;
@@ -3291,7 +3291,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
        struct aji_client *client = arg;
-       aji_devstate_cb(client, device_state_sub, NULL, msg);
+       aji_devstate_cb(client, device_state_sub, msg);
        return 0;
 }
 
index 2b7a82b..c3e6c21 100644 (file)
@@ -9,6 +9,7 @@
 #define RES_PJSIP_PRIVATE_H_
 
 struct ao2_container;
+struct ast_threadpool_options;
 
 /*!
  * \brief Initialize the configuration for res_pjsip
index e2b9b63..17d648b 100644 (file)
@@ -118,7 +118,7 @@ struct mwi_subscription {
 };
 
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg);
+               struct stasis_message *msg);
 
 static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub)
 {
@@ -603,7 +603,7 @@ static int serialized_cleanup(void *userdata)
 }
 
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct mwi_subscription *mwi_sub = userdata;
 
index cca5a7c..91da22f 100644 (file)
@@ -143,7 +143,7 @@ static int refer_progress_notify(void *data)
 }
 
 static void refer_progress_bridge(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct refer_progress *progress = data;
        struct ast_bridge_blob *enter_blob;
index d06f9f7..e56f7f7 100644 (file)
@@ -117,7 +117,7 @@ static void security_event_stasis_cb(struct ast_json *json)
 }
 
 static void security_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_json_payload *payload = stasis_message_data(message);
 
index 58df8c1..099e1af 100644 (file)
@@ -120,7 +120,7 @@ struct stasis_message_sink *stasis_message_sink_create(void)
  * the initial lazy binding will still work as expected.
  */
 static void message_sink_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct stasis_message_sink *sink = data;
 
index 1d8f628..bd66e70 100644 (file)
@@ -1318,7 +1318,7 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_xmpp_client *client = data;
        const char *mailbox, *context;
@@ -1351,7 +1351,7 @@ static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, stru
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_xmpp_client *client = data;
        struct ast_device_state_message *dev_state;
@@ -1566,7 +1566,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
        struct ast_xmpp_client *client = arg;
-       xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg);
+       xmpp_pubsub_devstate_cb(client, client->device_state_sub, msg);
        return 0;
 }
 
index ab46be5..bc1268f 100644 (file)
@@ -58,9 +58,9 @@ struct app_forwards {
        int interested;
 
        /*! Forward for the regular topic */
-       struct stasis_subscription *topic_forward;
+       struct stasis_forward *topic_forward;
        /*! Forward for the caching topic */
-       struct stasis_subscription *topic_cached_forward;
+       struct stasis_forward *topic_cached_forward;
 
        /*! Unique id of the object being forwarded */
        char id[];
@@ -78,9 +78,9 @@ static void forwards_dtor(void *obj)
 
 static void forwards_unsubscribe(struct app_forwards *forwards)
 {
-       stasis_unsubscribe(forwards->topic_forward);
+       stasis_forward_cancel(forwards->topic_forward);
        forwards->topic_forward = NULL;
-       stasis_unsubscribe(forwards->topic_cached_forward);
+       stasis_forward_cancel(forwards->topic_cached_forward);
        forwards->topic_cached_forward = NULL;
 }
 
@@ -129,7 +129,7 @@ static struct app_forwards *forwards_create_channel(struct app *app,
                ast_channel_topic_cached(chan), app->topic);
        if (!forwards->topic_cached_forward) {
                /* Half-subscribed is a bad thing */
-               stasis_unsubscribe(forwards->topic_forward);
+               stasis_forward_cancel(forwards->topic_forward);
                forwards->topic_forward = NULL;
                return NULL;
        }
@@ -163,7 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
                ast_bridge_topic_cached(bridge), app->topic);
        if (!forwards->topic_cached_forward) {
                /* Half-subscribed is a bad thing */
-               stasis_unsubscribe(forwards->topic_forward);
+               stasis_forward_cancel(forwards->topic_forward);
                forwards->topic_forward = NULL;
                return NULL;
        }
@@ -220,7 +220,7 @@ static void app_dtor(void *obj)
 }
 
 static void sub_default_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct app *app = data;
        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -363,7 +363,6 @@ static channel_snapshot_monitor channel_monitors[] = {
 
 static void sub_channel_update_handler(void *data,
                 struct stasis_subscription *sub,
-                struct stasis_topic *topic,
                 struct stasis_message *message)
 {
        struct app *app = data;
@@ -411,7 +410,6 @@ static struct ast_json *simple_bridge_event(
 
 static void sub_bridge_update_handler(void *data,
                 struct stasis_subscription *sub,
-                struct stasis_topic *topic,
                 struct stasis_message *message)
 {
         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -447,7 +445,7 @@ static void sub_bridge_update_handler(void *data,
 }
 
 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct app *app = data;
        struct ast_bridge_merge_message *merge;
@@ -476,7 +474,7 @@ static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
        }
 
        /* Forward the message to the app */
-       stasis_forward_message(app->topic, topic, message);
+       stasis_publish(app->topic, message);
 }
 
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
index ff5d681..5a3d255 100644 (file)
@@ -309,7 +309,7 @@ static struct consumer *consumer_create(void) {
        return consumer;
 }
 
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
        RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -342,7 +342,7 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
        }
 }
 
-static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
 
index da9c508..1e911e0 100644 (file)
@@ -183,7 +183,7 @@ static struct consumer *consumer_create(int ignore_subscriptions) {
        return consumer;
 }
 
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
        RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -427,7 +427,7 @@ AST_TEST_DEFINE(forward)
        RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
        RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 
-       RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
        RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
        RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 
@@ -499,8 +499,8 @@ AST_TEST_DEFINE(interleaving)
        RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
 
-       RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
-       RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+       RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
        RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 
        RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
@@ -711,7 +711,6 @@ AST_TEST_DEFINE(cache)
        /* Check for new snapshot messages */
        ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
        actual_update = stasis_message_data(consumer->messages_rxed[0]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, NULL == actual_update->old_snapshot);
        ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
        ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
@@ -720,7 +719,6 @@ AST_TEST_DEFINE(cache)
 
        ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
        actual_update = stasis_message_data(consumer->messages_rxed[1]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, NULL == actual_update->old_snapshot);
        ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
        ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
@@ -736,7 +734,6 @@ AST_TEST_DEFINE(cache)
        ast_test_validate(test, 3 == actual_len);
 
        actual_update = stasis_message_data(consumer->messages_rxed[2]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
        ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
        ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
@@ -752,7 +749,6 @@ AST_TEST_DEFINE(cache)
        ast_test_validate(test, 4 == actual_len);
 
        actual_update = stasis_message_data(consumer->messages_rxed[3]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
        ast_test_validate(test, NULL == actual_update->new_snapshot);
        ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
@@ -867,52 +863,6 @@ AST_TEST_DEFINE(cache_dump)
        return AST_TEST_PASS;
 }
 
-AST_TEST_DEFINE(route_conflicts)
-{
-       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
-       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
-       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
-       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
-       int ret;
-
-       switch (cmd) {
-       case TEST_INIT:
-               info->name = __func__;
-               info->category = test_category;
-               info->summary =
-                       "Multiple routes to the same message_type should fail";
-               info->description =
-                       "Multiple routes to the same message_type should fail";
-               return AST_TEST_NOT_RUN;
-       case TEST_EXECUTE:
-               break;
-       }
-
-       topic = stasis_topic_create("TestTopic");
-       ast_test_validate(test, NULL != topic);
-
-       consumer1 = consumer_create(1);
-       ast_test_validate(test, NULL != consumer1);
-       consumer2 = consumer_create(1);
-       ast_test_validate(test, NULL != consumer2);
-
-       test_message_type = stasis_message_type_create("TestMessage", NULL);
-       ast_test_validate(test, NULL != test_message_type);
-
-       uut = stasis_message_router_create(topic);
-       ast_test_validate(test, NULL != uut);
-
-       ret = stasis_message_router_add(
-               uut, test_message_type, consumer_exec, consumer1);
-       ast_test_validate(test, 0 == ret);
-       ret = stasis_message_router_add(
-               uut, test_message_type, consumer_exec, consumer2);
-       ast_test_validate(test, 0 != ret);
-
-       return AST_TEST_PASS;
-}
-
 AST_TEST_DEFINE(router)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1272,7 +1222,7 @@ AST_TEST_DEFINE(to_ami)
 }
 
 static void noop(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* no-op */
 }
@@ -1373,7 +1323,6 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(cache_filter);
        AST_TEST_UNREGISTER(cache);
        AST_TEST_UNREGISTER(cache_dump);
-       AST_TEST_UNREGISTER(route_conflicts);
        AST_TEST_UNREGISTER(router);
        AST_TEST_UNREGISTER(router_cache_updates);
        AST_TEST_UNREGISTER(interleaving);
@@ -1397,7 +1346,6 @@ static int load_module(void)
        AST_TEST_REGISTER(cache_filter);
        AST_TEST_REGISTER(cache);
        AST_TEST_REGISTER(cache_dump);
-       AST_TEST_REGISTER(route_conflicts);
        AST_TEST_REGISTER(router);
        AST_TEST_REGISTER(router_cache_updates);
        AST_TEST_REGISTER(interleaving);
index c0be07c..bc0f575 100644 (file)
@@ -264,11 +264,14 @@ AST_TEST_DEFINE(channel_messages)
        type = stasis_message_type(msg);
        ast_test_validate(test, ast_channel_snapshot_type() == type);
 
+       /* The ordering of the cache clear and endpoint snapshot are
+        * unspecified */
        msg = sink->messages[3];
-       type = stasis_message_type(msg);
-       ast_test_validate(test, stasis_cache_clear_type() == type);
+       if (stasis_message_type(msg) == stasis_cache_clear_type()) {
+               /* Okay; the next message should be the endpoint snapshot */
+               msg = sink->messages[4];
+       }
 
-       msg = sink->messages[4];
        type = stasis_message_type(msg);
        ast_test_validate(test, ast_endpoint_snapshot_type() == type);
        actual_snapshot = stasis_message_data(msg);
index 70400a9..be48f92 100644 (file)
@@ -48,6 +48,31 @@ struct task_data {
        int task_complete;
 };
 
+static void task_data_dtor(void *obj)
+{
+       struct task_data *task_data = obj;
+
+       ast_mutex_destroy(&task_data->lock);
+       ast_cond_destroy(&task_data->cond);
+}
+
+/*! \brief Create a task_data object */
+static struct task_data *task_data_create(void)
+{
+       struct task_data *task_data =
+               ao2_alloc(sizeof(*task_data), task_data_dtor);
+
+       if (!task_data) {
+               return NULL;
+       }
+
+       ast_cond_init(&task_data->cond, NULL);
+       ast_mutex_init(&task_data->lock);
+       task_data->task_complete = 0;
+
+       return task_data;
+}
+
 /*!
  * \brief Queued task for baseline test.
  *
@@ -65,6 +90,30 @@ static int task(void *data)
 }
 
 /*!
+ * \brief Wait for a task to execute.
+ */
+static int task_wait(struct task_data *task_data)
+{
+       struct timeval start = ast_tvnow();
+       struct timespec end;
+       SCOPED_MUTEX(lock, &task_data->lock);
+
+       end.tv_sec = start.tv_sec + 30;
+       end.tv_nsec = start.tv_usec * 1000;
+
+       while (!task_data->task_complete) {
+               int res;
+               res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
+                       &end);
+               if (res == ETIMEDOUT) {
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+/*!
  * \brief Baseline test for default taskprocessor
  *
  * This test ensures that when a task is added to a taskprocessor that
@@ -73,12 +122,9 @@ static int task(void *data)
  */
 AST_TEST_DEFINE(default_taskprocessor)
 {
-       struct ast_taskprocessor *tps;
-       struct task_data task_data;
-       struct timeval start;
-       struct timespec ts;
-       enum ast_test_result_state res = AST_TEST_PASS;
-       int timedwait_res;
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+       RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
+       int res;
 
        switch (cmd) {
        case TEST_INIT:
@@ -99,36 +145,21 @@ AST_TEST_DEFINE(default_taskprocessor)
                return AST_TEST_FAIL;
        }
 
-       start = ast_tvnow();
-
-       ts.tv_sec = start.tv_sec + 30;
-       ts.tv_nsec = start.tv_usec * 1000;
-
-       ast_cond_init(&task_data.cond, NULL);
-       ast_mutex_init(&task_data.lock);
-       task_data.task_complete = 0;
-
-       ast_taskprocessor_push(tps, task, &task_data);
-       ast_mutex_lock(&task_data.lock);
-       while (!task_data.task_complete) {
-               timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
-               if (timedwait_res == ETIMEDOUT) {
-                       break;
-               }
+       task_data = task_data_create();
+       if (!task_data) {
+               ast_test_status_update(test, "Unable to create task_data\n");
+               return AST_TEST_FAIL;
        }
-       ast_mutex_unlock(&task_data.lock);
 
-       if (!task_data.task_complete) {
+       ast_taskprocessor_push(tps, task, task_data);
+
+       res = task_wait(task_data);
+       if (res != 0) {
                ast_test_status_update(test, "Queued task did not execute!\n");
-               res = AST_TEST_FAIL;
-               goto test_end;
+               return AST_TEST_FAIL;
        }
 
-test_end:
-       tps = ast_taskprocessor_unreference(tps);
-       ast_mutex_destroy(&task_data.lock);
-       ast_cond_destroy(&task_data.cond);
-       return res;
+       return AST_TEST_PASS;
 }
 
 #define NUM_TASKS 20000
@@ -631,12 +662,78 @@ AST_TEST_DEFINE(taskprocessor_shutdown)
        return AST_TEST_PASS;
 }
 
+static int local_task_exe(struct ast_taskprocessor_local *local)
+{
+       int *local_data = local->local_data;
+       struct task_data *task_data = local->data;
+
+       *local_data = 1;
+       task(task_data);
+
+       return 0;
+}
+
+AST_TEST_DEFINE(taskprocessor_push_local)
+{
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL,
+               ast_taskprocessor_unreference);
+       struct task_data *task_data;
+       int local_data;
+       int res;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = "/main/taskprocessor/";
+               info->summary = "Test of pushing local data";
+               info->description =
+                       "Ensures that local data is passed along.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+
+       tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
+       if (!tps) {
+               ast_test_status_update(test, "Unable to create test taskprocessor\n");
+               return AST_TEST_FAIL;
+       }
+
+
+       task_data = task_data_create();
+       if (!task_data) {
+               ast_test_status_update(test, "Unable to create task_data\n");
+               return AST_TEST_FAIL;
+       }
+
+       local_data = 0;
+       ast_taskprocessor_set_local(tps, &local_data);
+
+       ast_taskprocessor_push_local(tps, local_task_exe, task_data);
+
+       res = task_wait(task_data);
+       if (res != 0) {
+               ast_test_status_update(test, "Queued task did not execute!\n");
+               return AST_TEST_FAIL;
+       }
+
+       if (local_data != 1) {
+               ast_test_status_update(test,
+                       "Queued task did not set local_data!\n");
+               return AST_TEST_FAIL;
+       }
+
+       return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
        ast_test_unregister(default_taskprocessor);
        ast_test_unregister(default_taskprocessor_load);
        ast_test_unregister(taskprocessor_listener);
        ast_test_unregister(taskprocessor_shutdown);
+       ast_test_unregister(taskprocessor_push_local);
        return 0;
 }
 
@@ -646,6 +743,7 @@ static int load_module(void)
        ast_test_register(default_taskprocessor_load);
        ast_test_register(taskprocessor_listener);
        ast_test_register(taskprocessor_shutdown);
+       ast_test_register(taskprocessor_push_local);
        return AST_MODULE_LOAD_SUCCESS;
 }