major redesign of the channel spy infrastructure, increasing efficiency and reducing...
[asterisk/asterisk.git] / channel.c
index 85692e7..c3f3d03 100755 (executable)
--- a/channel.c
+++ b/channel.c
@@ -71,6 +71,17 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/transcap.h"
 #include "asterisk/devicestate.h"
 
+struct channel_spy_trans {
+       int last_format;
+       struct ast_trans_pvt *path;
+};
+
+struct ast_channel_spy_list {
+       struct channel_spy_trans read_translator;
+       struct channel_spy_trans write_translator;
+       AST_LIST_HEAD_NOLOCK(, ast_channel_spy) list;
+};
+
 /* uncomment if you have problems with 'monitoring' synchronized files */
 #if 0
 #define MONITOR_CONSTANT_DELAY
@@ -931,10 +942,8 @@ void ast_channel_free(struct ast_channel *chan)
        /* loop over the variables list, freeing all data and deleting list items */
        /* no need to lock the list, as the channel is already locked */
        
-       while (!AST_LIST_EMPTY(headp)) {           /* List Deletion. */
-                   vardata = AST_LIST_REMOVE_HEAD(headp, entries);
-                   ast_var_delete(vardata);
-       }
+       while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries)))
+               ast_var_delete(vardata);
 
        free(chan);
        ast_mutex_unlock(&chlock);
@@ -942,19 +951,134 @@ void ast_channel_free(struct ast_channel *chan)
        ast_device_state_changed_literal(name);
 }
 
-static void ast_spy_detach(struct ast_channel *chan) 
+int ast_channel_spy_add(struct ast_channel *chan, struct ast_channel_spy *spy)
 {
-       struct ast_channel_spy *chanspy;
+       if (!ast_test_flag(spy, CHANSPY_FORMAT_AUDIO)) {
+               ast_log(LOG_WARNING, "Could not add channel spy '%s' to channel '%s', only audio format spies are supported.\n",
+                       spy->type, chan->name);
+               return -1;
+       }
 
-       /* Marking the spies as done is sufficient.  Chanspy or spy users will get the picture. */
-       for (chanspy = chan->spiers; chanspy; chanspy = chanspy->next) {
-               if (chanspy->status == CHANSPY_RUNNING) {
-                       chanspy->status = CHANSPY_DONE;
+       if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST) && (spy->read_queue.format != AST_FORMAT_SLINEAR)) {
+               ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+                       ast_getformatname(spy->read_queue.format));
+               return -1;
+       }
+
+       if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST) && (spy->write_queue.format != AST_FORMAT_SLINEAR)) {
+               ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+                       ast_getformatname(spy->write_queue.format));
+               return -1;
+       }
+
+       if (ast_test_flag(spy, CHANSPY_MIXAUDIO) &&
+           ((spy->read_queue.format != AST_FORMAT_SLINEAR) ||
+            (spy->write_queue.format != AST_FORMAT_SLINEAR))) {
+               ast_log(LOG_WARNING, "Cannot provide audio mixing on '%s'-'%s' format spies\n",
+                       ast_getformatname(spy->read_queue.format), ast_getformatname(spy->write_queue.format));
+               return -1;
+       }
+
+       if (!chan->spies) {
+               if (!(chan->spies = calloc(1, sizeof(*chan->spies)))) {
+                       ast_log(LOG_WARNING, "Memory allocation failure\n");
+                       return -1;
                }
+
+               AST_LIST_HEAD_INIT_NOLOCK(&chan->spies->list);
+               AST_LIST_INSERT_HEAD(&chan->spies->list, spy, list);
+       } else {
+               AST_LIST_INSERT_TAIL(&chan->spies->list, spy, list);
        }
 
-       chan->spiers = NULL;
-       return;
+       if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+               ast_cond_init(&spy->trigger, NULL);
+               ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+               ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+       }
+
+       ast_log(LOG_DEBUG, "Spy %s added to channel %s\n",
+               spy->type, chan->name);
+
+       return 0;
+}
+
+void ast_channel_spy_stop_by_type(struct ast_channel *chan, const char *type)
+{
+       struct ast_channel_spy *spy;
+       
+       if (!chan->spies)
+               return;
+
+       AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+               if ((spy->type == type) && (spy->status == CHANSPY_RUNNING))
+                       spy->status = CHANSPY_DONE;
+       }
+}
+
+void ast_channel_spy_trigger_wait(struct ast_channel_spy *spy)
+{
+       ast_cond_wait(&spy->trigger, &spy->lock);
+}
+
+void ast_channel_spy_remove(struct ast_channel *chan, struct ast_channel_spy *spy)
+{
+       struct ast_frame *f;
+
+       if (!chan->spies)
+               return;
+
+       AST_LIST_REMOVE(&chan->spies->list, spy, list);
+
+       ast_mutex_lock(&spy->lock);
+
+       for (f = spy->read_queue.head; f; f = spy->read_queue.head) {
+               spy->read_queue.head = f->next;
+               ast_frfree(f);
+       }
+       for (f = spy->write_queue.head; f; f = spy->write_queue.head) {
+               spy->write_queue.head = f->next;
+               ast_frfree(f);
+       }
+
+       if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+               ast_cond_destroy(&spy->trigger);
+
+       ast_mutex_unlock(&spy->lock);
+
+       ast_log(LOG_DEBUG, "Spy %s removed from channel %s\n",
+               spy->type, chan->name);
+
+       if (AST_LIST_EMPTY(&chan->spies->list)) {
+               if (chan->spies->read_translator.path)
+                       ast_translator_free_path(chan->spies->read_translator.path);
+               if (chan->spies->write_translator.path)
+                       ast_translator_free_path(chan->spies->write_translator.path);
+               free(chan->spies);
+               chan->spies = NULL;
+       }
+}
+
+static void detach_spies(struct ast_channel *chan) 
+{
+       struct ast_channel_spy *spy;
+
+       if (!chan->spies)
+               return;
+
+       /* Marking the spies as done is sufficient.  Chanspy or spy users will get the picture. */
+       AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+               ast_mutex_lock(&spy->lock);
+               if (spy->status == CHANSPY_RUNNING)
+                       spy->status = CHANSPY_DONE;
+               if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+                       ast_cond_signal(&spy->trigger);
+               ast_mutex_unlock(&spy->lock);
+       }
+
+       AST_LIST_TRAVERSE_SAFE_BEGIN(&chan->spies->list, spy, list)
+               ast_channel_spy_remove(chan, spy);
+       AST_LIST_TRAVERSE_SAFE_END;
 }
 
 /*--- ast_softhangup_nolock: Softly hangup a channel, don't lock */
@@ -983,40 +1107,136 @@ int ast_softhangup(struct ast_channel *chan, int cause)
        return res;
 }
 
-static void ast_queue_spy_frame(struct ast_channel_spy *spy, struct ast_frame *f, int pos) 
+enum spy_direction {
+       SPY_READ,
+       SPY_WRITE,
+};
+
+#define SPY_QUEUE_SAMPLE_LIMIT 4000                    /* half of one second */
+
+static void queue_frame_to_spies(struct ast_channel *chan, struct ast_frame *f, enum spy_direction dir)
 {
-       struct ast_frame *tmpf = NULL;
-       int count = 0;
+       struct ast_frame *translated_frame = NULL;
+       struct ast_channel_spy *spy;
+       struct ast_channel_spy_queue *queue;
+       struct ast_channel_spy_queue *other_queue;
+       struct channel_spy_trans *trans;
+       struct ast_frame *last;
 
-       ast_mutex_lock(&spy->lock);
-       for (tmpf=spy->queue[pos]; tmpf && tmpf->next; tmpf=tmpf->next) {
-               count++;
-       }
-       if (count > 1000) {
-               struct ast_frame *freef, *headf;
-
-               ast_log(LOG_ERROR, "Too many frames queued at once, flushing cache.\n");
-               headf = spy->queue[pos];
-               /* deref the queue right away so it looks empty */
-               spy->queue[pos] = NULL;
-               tmpf = headf;
-               /* free the wasted frames */
-               while (tmpf) {
-                       freef = tmpf;
-                       tmpf = tmpf->next;
-                       ast_frfree(freef);
+       trans = (dir == SPY_READ) ? &chan->spies->read_translator : &chan->spies->write_translator;
+
+       AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+               ast_mutex_lock(&spy->lock);
+
+               queue = (dir == SPY_READ) ? &spy->read_queue : &spy->write_queue;
+
+               if ((queue->format == AST_FORMAT_SLINEAR) && (f->subclass != AST_FORMAT_SLINEAR)) {
+                       if (!translated_frame) {
+                               if (trans->path && (trans->last_format != f->subclass)) {
+                                       ast_translator_free_path(trans->path);
+                                       trans->path = NULL;
+                               }
+                               if (!trans->path) {
+                                       ast_log(LOG_DEBUG, "Building translator from %s to SLINEAR for spies on channel %s\n",
+                                               ast_getformatname(f->subclass), chan->name);
+                                       if ((trans->path = ast_translator_build_path(AST_FORMAT_SLINEAR, f->subclass)) == NULL) {
+                                               ast_log(LOG_WARNING, "Cannot build a path from %s to %s\n",
+                                                       ast_getformatname(f->subclass), ast_getformatname(AST_FORMAT_SLINEAR));
+                                               ast_mutex_unlock(&spy->lock);
+                                               continue;
+                                       } else {
+                                               trans->last_format = f->subclass;
+                                       }
+                               }
+                               translated_frame = ast_translate(trans->path, f, 0);
+                       }
+
+                       for (last = queue->head; last && last->next; last = last->next);
+                       if (last)
+                               last->next = ast_frdup(translated_frame);
+                       else
+                               queue->head = ast_frdup(translated_frame);
+               } else {
+                       if (f->subclass != queue->format) {
+                               ast_log(LOG_WARNING, "Spy '%s' on channel '%s' wants format '%s', but frame is '%s', dropping\n",
+                                       spy->type, chan->name,
+                                       ast_getformatname(queue->format), ast_getformatname(f->subclass));
+                               ast_mutex_unlock(&spy->lock);
+                               continue;
+                       }
+
+                       for (last = queue->head; last && last->next; last = last->next);
+                       if (last)
+                               last->next = ast_frdup(f);
+                       else
+                               queue->head = ast_frdup(f);
                }
-               ast_mutex_unlock(&spy->lock);
-               return;
-       }
 
-       if (tmpf) {
-               tmpf->next = ast_frdup(f);
-       } else {
-               spy->queue[pos] = ast_frdup(f);
+               queue->samples += f->samples;
+
+               if (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+                       if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+                               other_queue = (dir == SPY_WRITE) ? &spy->read_queue : &spy->write_queue;
+
+                               if (other_queue->samples == 0) {
+                                       switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+                                       case CHANSPY_TRIGGER_READ:
+                                               if (dir == SPY_WRITE) {
+                                                       ast_set_flag(spy, CHANSPY_TRIGGER_WRITE);
+                                                       ast_clear_flag(spy, CHANSPY_TRIGGER_READ);
+                                                       if (option_debug)
+                                                               ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to write-trigger mode\n",
+                                                                       spy->type, chan->name);
+                                               }
+                                               break;
+                                       case CHANSPY_TRIGGER_WRITE:
+                                               if (dir == SPY_READ) {
+                                                       ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+                                                       ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+                                                       if (option_debug)
+                                                               ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to read-trigger mode\n",
+                                                                       spy->type, chan->name);
+                                               }
+                                               break;
+                                       }
+                                       if (option_debug)
+                                               ast_log(LOG_DEBUG, "Triggering queue flush for spy '%s' on '%s'\n",
+                                                       spy->type, chan->name);
+                                       ast_set_flag(spy, CHANSPY_TRIGGER_FLUSH);
+                                       ast_cond_signal(&spy->trigger);
+                                       ast_mutex_unlock(&spy->lock);
+                                       continue;
+                               }
+                       }
+
+                       if (option_debug)
+                               ast_log(LOG_DEBUG, "Spy '%s' on channel '%s' %s queue too long, dropping frames\n",
+                                       spy->type, chan->name, (dir == SPY_READ) ? "read" : "write");
+                       while (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+                               struct ast_frame *drop = queue->head;
+
+                               queue->samples -= drop->samples;
+                               queue->head = drop->next;
+                               ast_frfree(drop);
+                       }
+               } else {
+                       switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+                       case CHANSPY_TRIGGER_READ:
+                               if (dir == SPY_READ)
+                                       ast_cond_signal(&spy->trigger);
+                               break;
+                       case CHANSPY_TRIGGER_WRITE:
+                               if (dir == SPY_WRITE)
+                                       ast_cond_signal(&spy->trigger);
+                               break;
+                       }
+               }
+
+               ast_mutex_unlock(&spy->lock);
        }
 
-       ast_mutex_unlock(&spy->lock);
+       if (translated_frame)
+               ast_frfree(translated_frame);
 }
 
 static void free_translation(struct ast_channel *clone)
@@ -1040,7 +1260,7 @@ int ast_hangup(struct ast_channel *chan)
           if someone is going to masquerade as us */
        ast_mutex_lock(&chan->lock);
 
-       ast_spy_detach(chan);           /* get rid of spies */
+       detach_spies(chan);             /* get rid of spies */
 
        if (chan->masq) {
                if (ast_do_masquerade(chan)) 
@@ -1174,20 +1394,28 @@ static int generator_force(void *data)
 int ast_activate_generator(struct ast_channel *chan, struct ast_generator *gen, void *params)
 {
        int res = 0;
+
        ast_mutex_lock(&chan->lock);
+
        if (chan->generatordata) {
                if (chan->generator && chan->generator->release)
                        chan->generator->release(chan, chan->generatordata);
                chan->generatordata = NULL;
        }
+
        ast_prod(chan);
-       if ((chan->generatordata = gen->alloc(chan, params))) {
+       if (gen->alloc) {
+               if (!(chan->generatordata = gen->alloc(chan, params)))
+                       res = -1;
+       }
+       
+       if (!res) {
                ast_settimeout(chan, 160, generator_force, chan);
                chan->generator = gen;
-       } else {
-               res = -1;
        }
+
        ast_mutex_unlock(&chan->lock);
+
        return res;
 }
 
@@ -1661,12 +1889,9 @@ struct ast_frame *ast_read(struct ast_channel *chan)
                        ast_frfree(f);
                        f = &null_frame;
                } else {
-                       if (chan->spiers) {
-                               struct ast_channel_spy *spying;
-                               for (spying = chan->spiers; spying; spying=spying->next) {
-                                       ast_queue_spy_frame(spying, f, 0);
-                               }
-                       }
+                       if (chan->spies)
+                               queue_frame_to_spies(chan, f, SPY_READ);
+
                        if (chan->monitor && chan->monitor->read_stream ) {
 #ifndef MONITOR_CONSTANT_DELAY
                                int jump = chan->outsmpl - chan->insmpl - 2 * f->samples;
@@ -2007,17 +2232,10 @@ int ast_write(struct ast_channel *chan, struct ast_frame *fr)
                break;
        default:
                if (chan->tech->write) {
-                       if (chan->writetrans) 
-                               f = ast_translate(chan->writetrans, fr, 0);
-                       else
-                               f = fr;
+                       f = (chan->writetrans) ? ast_translate(chan->writetrans, fr, 0) : fr;
                        if (f) {
-                               if (f->frametype == AST_FRAME_VOICE && chan->spiers) {
-                                       struct ast_channel_spy *spying;
-                                       for (spying = chan->spiers; spying; spying=spying->next) {
-                                               ast_queue_spy_frame(spying, f, 1);
-                                       }
-                               }
+                               if (f->frametype == AST_FRAME_VOICE && chan->spies)
+                                       queue_frame_to_spies(chan, f, SPY_WRITE);
 
                                if( chan->monitor && chan->monitor->write_stream &&
                                                f && ( f->frametype == AST_FRAME_VOICE ) ) {
@@ -3207,8 +3425,9 @@ enum ast_bridge_result ast_channel_bridge(struct ast_channel *c0, struct ast_cha
                if (c0->tech->bridge &&
                    (config->timelimit == 0) &&
                    (c0->tech->bridge == c1->tech->bridge) &&
-                   !nativefailed && !c0->monitor && !c1->monitor && !c0->spiers && !c1->spiers) {
-                       /* Looks like they share a bridge method */
+                   !nativefailed && !c0->monitor && !c1->monitor &&
+                   !c0->spies && !c1->spies) {
+                       /* Looks like they share a bridge method and nothing else is in the way */
                        if (option_verbose > 2) 
                                ast_verbose(VERBOSE_PREFIX_3 "Attempting native bridge of %s and %s\n", c0->name, c1->name);
                        ast_set_flag(c0, AST_FLAG_NBRIDGE);
@@ -3237,6 +3456,7 @@ enum ast_bridge_result ast_channel_bridge(struct ast_channel *c0, struct ast_cha
                        } else {
                                ast_clear_flag(c0, AST_FLAG_NBRIDGE);
                                ast_clear_flag(c1, AST_FLAG_NBRIDGE);
+                               ast_verbose(VERBOSE_PREFIX_3 "Native bridge of %s and %s was unsuccessful\n", c0->name, c1->name);
                        }
                        if (res == AST_BRIDGE_RETRY)
                                continue;
@@ -3570,3 +3790,134 @@ void ast_set_variables(struct ast_channel *chan, struct ast_variable *vars)
        for (cur = vars; cur; cur = cur->next)
                pbx_builtin_setvar_helper(chan, cur->name, cur->value); 
 }
+
+static void copy_data_from_queue(struct ast_channel_spy_queue *queue, short *buf, unsigned int samples)
+{
+       struct ast_frame *f;
+       int tocopy;
+       int bytestocopy;
+
+       while (samples) {
+               f = queue->head;
+
+               if (!f) {
+                       ast_log(LOG_ERROR, "Ran out of frames before buffer filled!\n");
+                       break;
+               }
+
+               tocopy = (f->samples > samples) ? samples : f->samples;
+               bytestocopy = ast_codec_get_len(queue->format, samples);
+               memcpy(buf, f->data, bytestocopy);
+               samples -= tocopy;
+               buf += tocopy;
+               f->samples -= tocopy;
+               f->data += bytestocopy;
+               f->datalen -= bytestocopy;
+               f->offset += bytestocopy;
+               queue->samples -= tocopy;
+               if (!f->samples) {
+                       queue->head = f->next;
+                       ast_frfree(f);
+               }
+       }
+}
+
+struct ast_frame *ast_channel_spy_read_frame(struct ast_channel_spy *spy, unsigned int samples)
+{
+       struct ast_frame *result;
+       /* buffers are allocated to hold SLINEAR, which is the largest format */
+        short read_buf[samples];
+        short write_buf[samples];
+       struct ast_frame *read_frame;
+       struct ast_frame *write_frame;
+       int need_dup;
+       struct ast_frame stack_read_frame = { .frametype = AST_FRAME_VOICE,
+                                             .subclass = spy->read_queue.format,
+                                             .data = read_buf,
+                                             .samples = samples,
+                                             .datalen = ast_codec_get_len(spy->read_queue.format, samples),
+       };
+       struct ast_frame stack_write_frame = { .frametype = AST_FRAME_VOICE,
+                                              .subclass = spy->write_queue.format,
+                                              .data = write_buf,
+                                              .samples = samples,
+                                              .datalen = ast_codec_get_len(spy->write_queue.format, samples),
+       };
+
+       /* if a flush has been requested, dump everything in whichever queue is larger */
+       if (ast_test_flag(spy, CHANSPY_TRIGGER_FLUSH)) {
+               if (spy->read_queue.samples > spy->write_queue.samples) {
+                       if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST)) {
+                               for (result = spy->read_queue.head; result; result = result->next)
+                                       ast_frame_adjust_volume(result, spy->read_vol_adjustment);
+                       }
+                       result = spy->read_queue.head;
+                       spy->read_queue.head = NULL;
+                       spy->read_queue.samples = 0;
+                       ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+                       return result;
+               } else {
+                       if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST)) {
+                               for (result = spy->write_queue.head; result; result = result->next)
+                                       ast_frame_adjust_volume(result, spy->write_vol_adjustment);
+                       }
+                       result = spy->write_queue.head;
+                       spy->write_queue.head = NULL;
+                       spy->write_queue.samples = 0;
+                       ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+                       return result;
+               }
+       }
+
+       if ((spy->read_queue.samples < samples) || (spy->write_queue.samples < samples))
+               return NULL;
+
+       /* short-circuit if both head frames have exactly what we want */
+       if ((spy->read_queue.head->samples == samples) &&
+           (spy->write_queue.head->samples == samples)) {
+               read_frame = spy->read_queue.head;
+               spy->read_queue.head = read_frame->next;
+               read_frame->next = NULL;
+
+               write_frame = spy->write_queue.head;
+               spy->write_queue.head = write_frame->next;
+               write_frame->next = NULL;
+
+               spy->read_queue.samples -= samples;
+               spy->write_queue.samples -= samples;
+
+               need_dup = 0;
+       } else {
+               copy_data_from_queue(&spy->read_queue, read_buf, samples);
+               copy_data_from_queue(&spy->write_queue, write_buf, samples);
+
+               read_frame = &stack_read_frame;
+               write_frame = &stack_write_frame;
+               need_dup = 1;
+       }
+       
+       if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST))
+               ast_frame_adjust_volume(read_frame, spy->read_vol_adjustment);
+
+       if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST))
+               ast_frame_adjust_volume(write_frame, spy->write_vol_adjustment);
+
+       if (ast_test_flag(spy, CHANSPY_MIXAUDIO)) {
+               ast_frame_slinear_sum(read_frame, write_frame);
+
+               if (need_dup)
+                       result = ast_frdup(read_frame);
+               else
+                       result = read_frame;
+       } else {
+               if (need_dup) {
+                       result = ast_frdup(read_frame);
+                       result->next = ast_frdup(write_frame);
+               } else {
+                       result = read_frame;
+                       result->next = write_frame;
+               }
+       }
+
+       return result;
+}