Issue 9799 - Multirow results for func_odbc
authorTilghman Lesher <tilghman@meg.abyt.es>
Thu, 31 May 2007 15:05:56 +0000 (15:05 +0000)
committerTilghman Lesher <tilghman@meg.abyt.es>
Thu, 31 May 2007 15:05:56 +0000 (15:05 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@66734 65c4cc65-6c06-0410-ace0-fbb531ad65f3

configs/func_odbc.conf.sample
funcs/func_odbc.c

index 2405952..ff9e1c1 100644 (file)
@@ -67,4 +67,11 @@ writesql=UPDATE presence SET location='${SQL_ESC(${VAL1})}' WHERE id='${SQL_ESC(
 ;escapecommas=no    ; Normally, commas within a field are escaped such that each
                     ; field may be separated into individual variables with ARRAY.
                     ; This option turns that behavior off [default=yes].
+;mode=multirow      ; Enable multirow fetching.  Instead of returning results directly,
+                    ; mode=multirow queries will return a result-id, which can be passed
+                    ; multiple times to ODBC_FETCH, and that function will return each
+                    ; row, in order.  You can add to this the following parameter:
+;rowlimit=5         ; rowlimit will limit the number of rows retrieved and stored from
+                    ; the database.  If not specified, all rows, up to available memory,
+                    ; will be retrieved and stored.
 
index 5506c31..bfb2d37 100644 (file)
@@ -57,6 +57,7 @@ static char *config = "func_odbc.conf";
 
 enum {
        OPT_ESCAPECOMMAS =      (1 << 0),
+       OPT_MULTIROW     =      (1 << 1),
 } odbc_option_flags;
 
 struct acf_odbc_query {
@@ -66,11 +67,47 @@ struct acf_odbc_query {
        char sql_read[2048];
        char sql_write[2048];
        unsigned int flags;
+       int rowlimit;
        struct ast_custom_function *acf;
 };
 
+static void odbc_datastore_free(void *data);
+
+struct ast_datastore_info odbc_info = {
+       .type = "FUNC_ODBC",
+       .destroy = odbc_datastore_free,
+};
+
+/* For storing each result row */
+struct odbc_datastore_row {
+       AST_LIST_ENTRY(odbc_datastore_row) list;
+       char data[0];
+};
+
+/* For storing each result set */
+struct odbc_datastore {
+       AST_LIST_HEAD(, odbc_datastore_row);
+       char names[0];
+};
+
 AST_LIST_HEAD_STATIC(queries, acf_odbc_query);
 
+static int resultcount = 0;
+AST_MUTEX_DEFINE_STATIC(resultlock);
+
+static void odbc_datastore_free(void *data)
+{
+       struct odbc_datastore *result = data;
+       struct odbc_datastore_row *row;
+       AST_LIST_LOCK(result);
+       while ((row = AST_LIST_REMOVE_HEAD(result, list))) {
+               ast_free(row);
+       }
+       AST_LIST_UNLOCK(result);
+       AST_LIST_HEAD_DESTROY(result);
+       ast_free(result);
+}
+
 static SQLHSTMT generic_prepare(struct odbc_obj *obj, void *data)
 {
        int res;
@@ -200,8 +237,8 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
 {
        struct odbc_obj *obj = NULL;
        struct acf_odbc_query *query;
-       char sql[2048] = "", varname[15], colnames[2048] = "";
-       int res, x, buflen = 0, escapecommas, dsn;
+       char sql[2048] = "", varname[15], colnames[2048] = "", rowcount[12] = "-1";
+       int res, x, y, buflen = 0, escapecommas, rowlimit = 1, dsn;
        AST_DECLARE_APP_ARGS(args,
                AST_APP_ARG(field)[100];
        );
@@ -209,6 +246,8 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
        SQLSMALLINT colcount=0;
        SQLLEN indicator;
        SQLSMALLINT collength;
+       struct odbc_datastore *resultset = NULL;
+       struct odbc_datastore_row *row = NULL;
 
        AST_LIST_LOCK(&queries);
        AST_LIST_TRAVERSE(&queries, query, list) {
@@ -220,6 +259,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
        if (!query) {
                ast_log(LOG_ERROR, "No such function '%s'\n", cmd);
                AST_LIST_UNLOCK(&queries);
+               pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                return -1;
        }
 
@@ -237,9 +277,16 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                pbx_builtin_setvar_helper(chan, varname, NULL);
        }
 
-       /* Save this flag, so we can release the lock */
+       /* Save these flags, so we can release the lock */
        escapecommas = ast_test_flag(query, OPT_ESCAPECOMMAS);
-
+       if (ast_test_flag(query, OPT_MULTIROW)) {
+               resultset = ast_calloc(1, sizeof(*resultset));
+               AST_LIST_HEAD_INIT(resultset);
+               if (query->rowlimit)
+                       rowlimit = query->rowlimit;
+               else
+                       rowlimit = INT_MAX;
+       }
        AST_LIST_UNLOCK(&queries);
 
        for (dsn = 0; dsn < 5; dsn++) {
@@ -256,6 +303,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                ast_log(LOG_ERROR, "Unable to execute query [%s]\n", sql);
                if (obj)
                        ast_odbc_release_obj(obj);
+               pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                return -1;
        }
 
@@ -264,92 +312,152 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", sql);
                SQLFreeHandle (SQL_HANDLE_STMT, stmt);
                ast_odbc_release_obj(obj);
+               pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                return -1;
        }
 
-       *buf = '\0';
-
        res = SQLFetch(stmt);
        if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
                int res1 = -1;
                if (res == SQL_NO_DATA) {
-                       if (option_verbose > 3) {
+                       if (option_verbose > 3)
                                ast_verbose(VERBOSE_PREFIX_4 "Found no rows [%s]\n", sql);
-                       }
                        res1 = 0;
-               } else if (option_verbose > 3) {
+                       ast_copy_string(rowcount, "0", sizeof(rowcount));
+               } else {
                        ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql);
                }
                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
                ast_odbc_release_obj(obj);
+               pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                return res1;
        }
 
-       for (x = 0; x < colcount; x++) {
-               int i, namelen;
-               char coldata[256], colname[256];
-
-               res = SQLDescribeCol(stmt, x + 1, (unsigned char *)colname, sizeof(colname), &collength, NULL, NULL, NULL, NULL);
-               if (((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) || collength == 0) {
-                       snprintf(colname, sizeof(colname), "field%d", x);
-               }
-
-               if (!ast_strlen_zero(colnames))
-                       strncat(colnames, ",", sizeof(colnames) - 1);
-               namelen = strlen(colnames);
+       for (y = 0; y < rowlimit; y++) {
+               *buf = '\0';
+               for (x = 0; x < colcount; x++) {
+                       int i;
+                       char coldata[256];
+
+                       if (y == 0) {
+                               char colname[256];
+                               int namelen;
+
+                               res = SQLDescribeCol(stmt, x + 1, (unsigned char *)colname, sizeof(colname), &collength, NULL, NULL, NULL, NULL);
+                               if (((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) || collength == 0) {
+                                       snprintf(colname, sizeof(colname), "field%d", x);
+                               }
+
+                               if (!ast_strlen_zero(colnames))
+                                       strncat(colnames, ",", sizeof(colnames) - 1);
+                               namelen = strlen(colnames);
+
+                               /* Copy data, encoding '\' and ',' for the argument parser */
+                               for (i = 0; i < sizeof(colname); i++) {
+                                       if (escapecommas && (colname[i] == '\\' || colname[i] == ',')) {
+                                               colnames[namelen++] = '\\';
+                                       }
+                                       colnames[namelen++] = colname[i];
+
+                                       if (namelen >= sizeof(colnames) - 2) {
+                                               colnames[namelen >= sizeof(colnames) ? sizeof(colnames) - 1 : namelen] = '\0';
+                                               break;
+                                       }
+
+                                       if (colname[i] == '\0')
+                                               break;
+                               }
+
+                               if (resultset) {
+                                       void *tmp = ast_realloc(resultset, sizeof(*resultset) + strlen(colnames) + 1);
+                                       if (!tmp) {
+                                               ast_log(LOG_ERROR, "No space for a new resultset?\n");
+                                               ast_free(resultset);
+                                               SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+                                               ast_odbc_release_obj(obj);
+                                               pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
+                                               return -1;
+                                       }
+                                       resultset = tmp;
+                                       strcpy((char *)resultset + sizeof(*resultset), colnames);
+                               }
+                       }
 
-               /* Copy data, encoding '\' and ',' for the argument parser */
-               for (i = 0; i < sizeof(colname); i++) {
-                       if (escapecommas && (colname[i] == '\\' || colname[i] == ',')) {
-                               colnames[namelen++] = '\\';
+                       buflen = strlen(buf);
+                       res = SQLGetData(stmt, x + 1, SQL_CHAR, coldata, sizeof(coldata), &indicator);
+                       if (indicator == SQL_NULL_DATA) {
+                               coldata[0] = '\0';
+                               res = SQL_SUCCESS;
                        }
-                       colnames[namelen++] = colname[i];
 
-                       if (namelen >= sizeof(colnames) - 2) {
-                               colnames[namelen >= sizeof(colnames) ? sizeof(colnames) - 1 : namelen] = '\0';
-                               break;
+                       if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+                               ast_log(LOG_WARNING, "SQL Get Data error!\n[%s]\n\n", sql);
+                               y = -1;
+                               goto end_acf_read;
                        }
 
-                       if (colname[i] == '\0')
-                               break;
-               }
+                       /* Copy data, encoding '\' and ',' for the argument parser */
+                       for (i = 0; i < sizeof(coldata); i++) {
+                               if (escapecommas && (coldata[i] == '\\' || coldata[i] == ',')) {
+                                       buf[buflen++] = '\\';
+                               }
+                               buf[buflen++] = coldata[i];
 
-               buflen = strlen(buf);
-               res = SQLGetData(stmt, x + 1, SQL_CHAR, coldata, sizeof(coldata), &indicator);
-               if (indicator == SQL_NULL_DATA) {
-                       coldata[0] = '\0';
-                       res = SQL_SUCCESS;
-               }
+                               if (buflen >= len - 2)
+                                       break;
 
-               if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
-                       ast_log(LOG_WARNING, "SQL Get Data error!\n[%s]\n\n", sql);
-                       SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                       ast_odbc_release_obj(obj);
-                       return -1;
-               }
-
-               /* Copy data, encoding '\' and ',' for the argument parser */
-               for (i = 0; i < sizeof(coldata); i++) {
-                       if (escapecommas && (coldata[i] == '\\' || coldata[i] == ',')) {
-                               buf[buflen++] = '\\';
+                               if (coldata[i] == '\0')
+                                       break;
                        }
-                       buf[buflen++] = coldata[i];
-
-                       if (buflen >= len - 2)
-                               break;
 
-                       if (coldata[i] == '\0')
+                       buf[buflen - 1] = ',';
+                       buf[buflen] = '\0';
+               }
+               /* Trim trailing comma */
+               buf[buflen - 1] = '\0';
+
+               if (resultset) {
+                       row = ast_calloc(1, sizeof(*row) + buflen);
+                       if (!row) {
+                               ast_log(LOG_ERROR, "Unable to allocate space for more rows in this resultset.\n");
+                               goto end_acf_read;
+                       }
+                       strcpy((char *)row + sizeof(*row), buf);
+                       AST_LIST_INSERT_TAIL(resultset, row, list);
+
+                       /* Get next row */
+                       res = SQLFetch(stmt);
+                       if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+                               if (res != SQL_NO_DATA)
+                                       ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql);
+                               y++;
                                break;
+                       }
                }
-
-               buf[buflen - 1] = ',';
-               buf[buflen] = '\0';
        }
-       /* Trim trailing comma */
-       buf[buflen - 1] = '\0';
 
+end_acf_read:
+       snprintf(rowcount, sizeof(rowcount), "%d", y);
+       pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
        pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", colnames);
-
+       if (resultset) {
+               int uid;
+               struct ast_datastore *odbc_store;
+               ast_mutex_lock(&resultlock);
+               uid = ++resultcount;
+               ast_mutex_unlock(&resultlock);
+               snprintf(buf, len, "%d", uid);
+               odbc_store = ast_channel_datastore_alloc(&odbc_info, buf);
+               if (!odbc_store) {
+                       ast_log(LOG_ERROR, "Rows retrieved, but unable to store it in the channel.  Results fail.\n");
+                       odbc_datastore_free(resultset);
+                       SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+                       ast_odbc_release_obj(obj);
+                       return -1;
+               }
+               odbc_store->data = resultset;
+               ast_channel_datastore_add(chan, odbc_store);
+       }
        SQLFreeHandle(SQL_HANDLE_STMT, stmt);
        ast_odbc_release_obj(obj);
        return 0;
@@ -383,6 +491,60 @@ static struct ast_custom_function escape_function = {
        .write = NULL,
 };
 
+static int acf_fetch(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
+{
+       struct ast_datastore *store;
+       struct odbc_datastore *resultset;
+       struct odbc_datastore_row *row;
+       store = ast_channel_datastore_find(chan, &odbc_info, data);
+       if (!store) {
+               return -1;
+       }
+       resultset = store->data;
+       AST_LIST_LOCK(resultset);
+       row = AST_LIST_REMOVE_HEAD(resultset, list);
+       AST_LIST_UNLOCK(resultset);
+       if (!row) {
+               /* Cleanup datastore */
+               ast_channel_datastore_remove(chan, store);
+               ast_channel_datastore_free(store);
+               return -1;
+       }
+       pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", resultset->names);
+       ast_copy_string(buf, row->data, len);
+       ast_free(row);
+       return 0;
+}
+
+static struct ast_custom_function fetch_function = {
+       .name = "ODBC_FETCH",
+       .synopsis = "Fetch a row from a multirow query",
+       .syntax = "ODBC_FETCH(<result-id>)",
+       .desc =
+"For queries which are marked as mode=multirow, the original query returns a\n"
+"result-id from which results may be fetched.  This function implements the\n"
+"actual fetch of the results.\n",
+       .read = acf_fetch,
+       .write = NULL,
+};
+
+static char *app_odbcfinish = "ODBCFinish";
+static char *syn_odbcfinish = "Clear the resultset of a successful multirow query";
+static char *desc_odbcfinish =
+"ODBCFinish(<result-id>)\n"
+"  Clears any remaining rows of the specified resultset\n";
+
+
+static int exec_odbcfinish(struct ast_channel *chan, void *data)
+{
+       struct ast_datastore *store = ast_channel_datastore_find(chan, &odbc_info, data);
+       if (!store) /* Already freed; no big deal. */
+               return 0;
+       ast_channel_datastore_remove(chan, store);
+       ast_channel_datastore_free(store);
+       return 0;
+}
+
 static int init_acf_query(struct ast_config *cfg, char *catg, struct acf_odbc_query **query)
 {
        const char *tmp;
@@ -459,6 +621,13 @@ static int init_acf_query(struct ast_config *cfg, char *catg, struct acf_odbc_qu
                        ast_clear_flag((*query), OPT_ESCAPECOMMAS);
        }
 
+       if ((tmp = ast_variable_retrieve(cfg, catg, "mode"))) {
+               if (strcasecmp(tmp, "multirow") == 0)
+                       ast_set_flag((*query), OPT_MULTIROW);
+               if ((tmp = ast_variable_retrieve(cfg, catg, "rowlimit")))
+                       sscanf(tmp, "%d", &((*query)->rowlimit));
+       }
+
        (*query)->acf = ast_calloc(1, sizeof(struct ast_custom_function));
        if (! (*query)->acf) {
                free(*query);
@@ -569,6 +738,8 @@ static int load_module(void)
        struct ast_config *cfg;
        char *catg;
 
+       res |= ast_custom_function_register(&fetch_function);
+       res |= ast_register_application(app_odbcfinish, exec_odbcfinish, syn_odbcfinish, desc_odbcfinish);
        AST_LIST_LOCK(&queries);
 
        cfg = ast_config_load(config);
@@ -617,6 +788,8 @@ static int unload_module(void)
        }
 
        res |= ast_custom_function_unregister(&escape_function);
+       res |= ast_custom_function_unregister(&fetch_function);
+       res |= ast_unregister_application(app_odbcfinish);
 
        /* Allow any threads waiting for this lock to pass (avoids a race) */
        AST_LIST_UNLOCK(&queries);