Permit additional CDR columns to be saved in Postgres. Note that these
authorTilghman Lesher <tilghman@meg.abyt.es>
Mon, 25 Feb 2008 23:04:20 +0000 (23:04 +0000)
committerTilghman Lesher <tilghman@meg.abyt.es>
Mon, 25 Feb 2008 23:04:20 +0000 (23:04 +0000)
changes are backward-compatible, so no changes to UPGRADE.txt are
necessary.
(closes issue #9279)
 Reported by: rottenroddy
 Patches:
       20080125__bug9279.diff.txt uploaded by Corydon76 (license 14)
 Tested by: Corydon76

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@104101 65c4cc65-6c06-0410-ace0-fbb531ad65f3

CHANGES
cdr/cdr_pgsql.c

diff --git a/CHANGES b/CHANGES
index 21ea3c2..c5480df 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -450,6 +450,29 @@ Logger changes
      and to ensure that the oldest log file gets deleted.
   * Added realtime support for the queue log
 
+Call Detail Records 
+-------------------
+  * The cdr_manager module has a [mappings] feature, like cdr_custom,
+    to add fields to the manager event from the CDR variables.
+  * Added cdr_adaptive_odbc, a new module that adapts to the structure of your
+     backend database CDR table.  Specifically, additional, non-standard
+     columns are supported, merely by setting the corresponding CDR variable in
+     your dialplan.  In addition, you may alias any column to another name (for
+     example, if you want the 'src' CDR variable to be column 'ANI' in the DB,
+     simply "alias src => ANI" in the configuration file).  Records may be
+     posted to more than one backend, simply by specifying multiple categories
+     in the configuration file.  And finally, you may filter which CDRs get
+     posted to each backend, by specifying a filter (which the record must
+     match) for the particular category.  Filters are additive (meaning all
+     rules must match to post that CDR).
+  * The Postgres CDR module now supports some features of the cdr_adaptive_odbc
+     module.  Specifically, you may add additional columns into the table and
+     they will be set, if you set the corresponding CDR variable name.  Also,
+     if you omit columns in your database table, they will be silently skipped
+     (but a record will still be inserted, based on what columns remain).  Note
+     that the other two features from cdr_adaptive_odbc (alias and filter) are
+     not currently supported.
+
 Miscellaneous New Modules
 -------------------------
   * Added a new CDR module, cdr_sqlite3_custom.
@@ -494,8 +517,6 @@ Miscellaneous
   * Added maxfiles option to options section of asterisk.conf which allows you to specify
      what Asterisk should set as the maximum number of open files when it loads.
   * Added the jittertargetextra configuration option.
-  * The cdr_manager module has a [mappings] feature, like cdr_custom,
-    to add fields to the manager event from the CDR variables.
   * Added support for setting the CoS for VLAN traffic (802.1p).  See the sample
      configuration files for the IP channel drivers.  The new option is "cos".
      This information is also documented in doc/qos.tex, or the IP Quality of Service
@@ -523,3 +544,4 @@ Miscellaneous
   * Added a compiler flag, CHANNEL_TRACE, which permits channel tracing to be
      turned on, via the CHANNEL(trace) dialplan function.  Could be useful for
      dialplan debugging.
+
index eea4416..8ec1105 100644 (file)
@@ -50,29 +50,68 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/cdr.h"
 #include "asterisk/module.h"
 
-#define DATE_FORMAT "%Y-%m-%d %T"
+#define DATE_FORMAT "'%Y-%m-%d %T'"
 
 static char *name = "pgsql";
 static char *config = "cdr_pgsql.conf";
 static char *pghostname = NULL, *pgdbname = NULL, *pgdbuser = NULL, *pgpassword = NULL, *pgdbport = NULL, *table = NULL;
 static int connected = 0;
+static int maxsize = 512, maxsize2 = 512;
 
 AST_MUTEX_DEFINE_STATIC(pgsql_lock);
 
 static PGconn  *conn = NULL;
 
+struct columns {
+       char *name;
+       char *type;
+       int len;
+       AST_RWLIST_ENTRY(columns) list;
+};
+
+static AST_RWLIST_HEAD_STATIC(psql_columns, columns);
+
+#define LENGTHEN_BUF1(size)                                                                                                            \
+                       do {                                                                                                                            \
+                               /* Lengthen buffer, if necessary */                                                             \
+                               if ((newsize = lensql + (size) + 3) > sizesql) {        \
+                                       if ((tmp = ast_realloc(sql, (newsize / 512 + 1) * 512))) {      \
+                                               sql = tmp;                                                                                              \
+                                               sizesql = (newsize / 512 + 1) * 512;                                    \
+                                       } else {                                                                                                        \
+                                               ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CDR failed.\n"); \
+                                               ast_free(sql);                                                                                  \
+                                               ast_free(sql2);                                                                                 \
+                                               AST_RWLIST_UNLOCK(&psql_columns);                                               \
+                                               return -1;                                                                                              \
+                                       }                                                                                                                       \
+                               }                                                                                                                               \
+                       } while (0)
+
+#define LENGTHEN_BUF2(size)                                                                                                            \
+                       do {                                                                                                                            \
+                               if ((newsize = lensql2 + (size) + 3) > sizesql2) {                              \
+                                       if ((tmp = ast_realloc(sql2, (newsize / 512 + 1) * 512))) {     \
+                                               sql2 = tmp;                                                                                             \
+                                               sizesql2 = (newsize / 512 + 1) * 512;                                   \
+                                       } else {                                                                                                        \
+                                               ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CDR failed.\n");      \
+                                               ast_free(sql);                                                                                  \
+                                               ast_free(sql2);                                                                                 \
+                                               AST_RWLIST_UNLOCK(&psql_columns);                                               \
+                                               return -1;                                                                                              \
+                                       }                                                                                                                       \
+                               }                                                                                                                               \
+                       } while (0)
+
 static int pgsql_log(struct ast_cdr *cdr)
 {
        struct ast_tm tm;
-       char sqlcmd[2048] = "", timestr[128];
        char *pgerror;
        PGresult *result;
 
        ast_mutex_lock(&pgsql_lock);
 
-       ast_localtime(&cdr->start, &tm, NULL);
-       ast_strftime(timestr, sizeof(timestr), DATE_FORMAT, &tm);
-
        if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
                conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
                if (PQstatus(conn) != CONNECTION_BAD) {
@@ -87,49 +126,135 @@ static int pgsql_log(struct ast_cdr *cdr)
        }
 
        if (connected) {
-               char *clid=NULL, *dcontext=NULL, *channel=NULL, *dstchannel=NULL, *lastapp=NULL, *lastdata=NULL;
-               char *src=NULL, *dst=NULL, *uniqueid=NULL, *userfield=NULL;
-               int pgerr;
-
-               /* Maximum space needed would be if all characters needed to be escaped, plus a trailing NULL */
-               if ((clid = alloca(strlen(cdr->clid) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, clid, cdr->clid, strlen(cdr->clid), &pgerr);
-               if ((dcontext = alloca(strlen(cdr->dcontext) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, dcontext, cdr->dcontext, strlen(cdr->dcontext), &pgerr);
-               if ((channel = alloca(strlen(cdr->channel) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, channel, cdr->channel, strlen(cdr->channel), &pgerr);
-               if ((dstchannel = alloca(strlen(cdr->dstchannel) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, dstchannel, cdr->dstchannel, strlen(cdr->dstchannel), &pgerr);
-               if ((lastapp = alloca(strlen(cdr->lastapp) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, lastapp, cdr->lastapp, strlen(cdr->lastapp), &pgerr);
-               if ((lastdata = alloca(strlen(cdr->lastdata) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, lastdata, cdr->lastdata, strlen(cdr->lastdata), &pgerr);
-               if ((uniqueid = alloca(strlen(cdr->uniqueid) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, uniqueid, cdr->uniqueid, strlen(cdr->uniqueid), &pgerr);
-               if ((userfield = alloca(strlen(cdr->userfield) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, userfield, cdr->userfield, strlen(cdr->userfield), &pgerr);
-               if ((src = alloca(strlen(cdr->src) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, src, cdr->src, strlen(cdr->src), &pgerr);
-               if ((dst = alloca(strlen(cdr->dst) * 2 + 1)) != NULL)
-                       PQescapeStringConn(conn, dst, cdr->dst, strlen(cdr->dst), &pgerr);
-
-               /* Check for all alloca failures above at once */
-               if ((!clid) || (!dcontext) || (!channel) || (!dstchannel) || (!lastapp) || (!lastdata) || (!uniqueid) || (!userfield) || (!src) || (!dst)) {
-                       ast_log(LOG_ERROR, "cdr_pgsql:  Out of memory error (insert fails)\n");
-                       ast_mutex_unlock(&pgsql_lock);
-                       return -1;
-               }
+               struct columns *cur;
+               int lensql, lensql2, sizesql = maxsize, sizesql2 = maxsize2, newsize;
+               char *sql = ast_calloc(sizeof(char), sizesql), *sql2 = ast_calloc(sizeof(char), sizesql2), *tmp, *value;
+               char buf[257], escapebuf[513];
+  
+               lensql = snprintf(sql, sizesql, "INSERT INTO %s (", table);
+               lensql2 = snprintf(sql2, sizesql2, " VALUES (");
+  
+               AST_RWLIST_RDLOCK(&psql_columns);
+               AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
+                       /* For fields not set, simply skip them */
+                       ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+                       if (!value)
+                               continue;
+                       
+                       LENGTHEN_BUF1(strlen(cur->name));
+                       lensql += snprintf(sql + lensql, sizesql - lensql, "%s,", cur->name);
+
+                       if (strcmp(cur->name, "start") == 0 || strcmp(cur->name, "calldate") == 0) {
+                               if (strncmp(cur->type, "int", 3) == 0) {
+                                       LENGTHEN_BUF2(12);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->start.tv_sec);
+                               } else if (strncmp(cur->type, "float", 5) == 0) {
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->start.tv_sec + (double)cdr->start.tv_usec / 1000000.0);
+                               } else {
+                                       /* char, hopefully */
+                                       LENGTHEN_BUF2(30);
+                                       ast_localtime(&cdr->start, &tm, NULL);
+                                       lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+                               }
+                       } else if (strcmp(cur->name, "answer") == 0) {
+                               if (strncmp(cur->type, "int", 3) == 0) {
+                                       LENGTHEN_BUF2(12);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->answer.tv_sec);
+                               } else if (strncmp(cur->type, "float", 5) == 0) {
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->answer.tv_sec + (double)cdr->answer.tv_usec / 1000000.0);
+                               } else {
+                                       /* char, hopefully */
+                                       LENGTHEN_BUF2(30);
+                                       ast_localtime(&cdr->start, &tm, NULL);
+                                       lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+                               }
+                       } else if (strcmp(cur->name, "end") == 0) {
+                               if (strncmp(cur->type, "int", 3) == 0) {
+                                       LENGTHEN_BUF2(12);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->end.tv_sec);
+                               } else if (strncmp(cur->type, "float", 5) == 0) {
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec + (double)cdr->end.tv_usec / 1000000.0);
+                               } else {
+                                       /* char, hopefully */
+                                       LENGTHEN_BUF2(30);
+                                       ast_localtime(&cdr->end, &tm, NULL);
+                                       lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+                               }
+                       } else if (strcmp(cur->name, "duration") == 0 || strcmp(cur->name, "billsec") == 0) {
+                               if (cur->type[0] == 'i') {
+                                       /* Get integer, no need to escape anything */
+                                       ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+                                       LENGTHEN_BUF2(12);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+                               } else if (strncmp(cur->type, "float", 5) == 0) {
+                                       struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+                               } else {
+                                       /* Char field, probably */
+                                       struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%f'", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+                               }
+                       } else if (strcmp(cur->name, "disposition") == 0 || strcmp(cur->name, "amaflags") == 0) {
+                               if (strncmp(cur->type, "int", 3) == 0) {
+                                       /* Integer, no need to escape anything */
+                                       ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 1);
+                                       LENGTHEN_BUF2(12);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+                               } else {
+                                       /* Although this is a char field, there are no special characters in the values for these fields */
+                                       ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+                                       LENGTHEN_BUF2(30);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", value);
+                               }
+                       } else {
+                               /* Arbitrary field, could be anything */
+                               ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+                               if (strncmp(cur->type, "int", 3) == 0) {
+                                       long long whatever;
+                                       if (value && sscanf(value, "%lld", &whatever) == 1) {
+                                               LENGTHEN_BUF2(25);
+                                               lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%lld", whatever);
+                                       } else {
+                                               LENGTHEN_BUF2(1);
+                                               lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+                                       }
+                               } else if (strncmp(cur->type, "float", 5) == 0) {
+                                       long double whatever;
+                                       if (value && sscanf(value, "%Lf", &whatever) == 1) {
+                                               LENGTHEN_BUF2(50);
+                                               lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%30Lf", whatever);
+                                       } else {
+                                               LENGTHEN_BUF2(1);
+                                               lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+                                       }
+                               /* XXX Might want to handle dates, times, and other misc fields here XXX */
+                               } else {
+                                       if (value)
+                                               PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
+                                       else
+                                               escapebuf[0] = '\0';
+                                       LENGTHEN_BUF2(strlen(escapebuf) + 2);
+                                       lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", escapebuf);
+                               }
+                       }
+                       LENGTHEN_BUF2(1);
+                       strcat(sql2 + lensql2, ",");
+                       lensql2++;
+               }
+               AST_RWLIST_UNLOCK(&psql_columns);
+               LENGTHEN_BUF1(lensql2);
+               sql[lensql - 1] = ')';
+               sql2[lensql2 - 1] = ')';
+               strcat(sql + lensql, sql2);
+               ast_verb(11, "[%s]\n", sql);
 
                ast_debug(2, "cdr_pgsql: inserting a CDR record.\n");
 
-               snprintf(sqlcmd,sizeof(sqlcmd),"INSERT INTO %s (calldate,clid,src,dst,dcontext,channel,dstchannel,"
-                                "lastapp,lastdata,duration,billsec,disposition,amaflags,accountcode,uniqueid,userfield) VALUES"
-                                " ('%s','%s','%s','%s','%s', '%s','%s','%s','%s',%ld,%ld,'%s',%ld,'%s','%s','%s')",
-                                table, timestr, clid, src, dst, dcontext, channel, dstchannel, lastapp, lastdata,
-                                cdr->duration,cdr->billsec,ast_cdr_disp2str(cdr->disposition),cdr->amaflags, cdr->accountcode, uniqueid, userfield);
-               
-               ast_debug(3, "cdr_pgsql: SQL command executed:  %s\n",sqlcmd);
-               
                /* Test to be sure we're still connected... */
                /* If we're connected, and connection is working, good. */
                /* Otherwise, attempt reconnect.  If it fails... sorry... */
@@ -152,7 +277,7 @@ static int pgsql_log(struct ast_cdr *cdr)
                                return -1;
                        }
                }
-               result = PQexec(conn, sqlcmd);
+               result = PQexec(conn, sql);
                if (PQresultStatus(result) != PGRES_COMMAND_OK) {
                        pgerror = PQresultErrorMessage(result);
                        ast_log(LOG_ERROR,"cdr_pgsql: Failed to insert call detail record into database!\n");
@@ -163,7 +288,7 @@ static int pgsql_log(struct ast_cdr *cdr)
                                ast_log(LOG_ERROR, "cdr_pgsql: Connection reestablished.\n");
                                connected = 1;
                                PQclear(result);
-                               result = PQexec(conn, sqlcmd);
+                               result = PQexec(conn, sql);
                                if (PQresultStatus(result) != PGRES_COMMAND_OK) {
                                        pgerror = PQresultErrorMessage(result);
                                        ast_log(LOG_ERROR,"cdr_pgsql: HARD ERROR!  Attempted reconnection failed.  DROPPING CALL RECORD!\n");
@@ -181,8 +306,14 @@ static int pgsql_log(struct ast_cdr *cdr)
 }
 
 static int unload_module(void)
-{ 
+{
+       struct columns *cur;
+       ast_cdr_unregister(name);
+
+       /* Give all threads time to finish */
+       usleep(1);
        PQfinish(conn);
+
        if (pghostname)
                ast_free(pghostname);
        if (pgdbname)
@@ -195,7 +326,13 @@ static int unload_module(void)
                ast_free(pgdbport);
        if (table)
                ast_free(table);
-       ast_cdr_unregister(name);
+
+       AST_RWLIST_WRLOCK(&psql_columns);
+       while ((cur = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
+               ast_free(cur);
+       }
+       AST_RWLIST_UNLOCK(&psql_columns);
+
        return 0;
 }
 
@@ -203,6 +340,8 @@ static int config_module(int reload)
 {
        struct ast_variable *var;
        char *pgerror;
+       struct columns *cur;
+       PGresult *result;
        const char *tmp;
        struct ast_config *cfg;
        struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
@@ -304,8 +443,40 @@ static int config_module(int reload)
        
        conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
        if (PQstatus(conn) != CONNECTION_BAD) {
+               char sqlcmd[256];
+               char *fname, *ftype, *flen;
+               int i, rows;
                ast_debug(1, "Successfully connected to PostgreSQL database.\n");
                connected = 1;
+
+               /* Query the columns */
+               snprintf(sqlcmd, sizeof(sqlcmd), "select a.attname, t.typname, a.attlen from pg_class c, pg_attribute a, pg_type t where c.oid = a.attrelid and a.atttypid = t.oid and (a.attnum > 0) and c.relname = '%s' order by c.relname, attnum", table);
+               result = PQexec(conn, sqlcmd);
+               if (PQresultStatus(result) != PGRES_TUPLES_OK) {
+                       pgerror = PQresultErrorMessage(result);
+                       ast_log(LOG_ERROR, "cdr_pgsql: Failed to query database columns: %s\n", pgerror);
+                       PQclear(result);
+                       unload_module();
+                       return AST_MODULE_LOAD_DECLINE;
+               }
+
+               rows = PQntuples(result);
+               for (i = 0; i < rows; i++) {
+                       fname = PQgetvalue(result, i, 0);
+                       ftype = PQgetvalue(result, i, 1);
+                       flen = PQgetvalue(result, i, 2);
+                       ast_verb(4, "Found column '%s' of type '%s'\n", fname, ftype);
+                       cur = ast_calloc(1, sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
+                       if (cur) {
+                               sscanf(flen, "%d", &cur->len);
+                               cur->name = (char *)cur + sizeof(*cur);
+                               cur->type = (char *)cur + sizeof(*cur) + strlen(fname) + 1;
+                               strcpy(cur->name, fname);
+                               strcpy(cur->type, ftype);
+                               AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
+                       }
+               }
+               PQclear(result);
        } else {
                pgerror = PQerrorMessage(conn);
                ast_log(LOG_ERROR, "cdr_pgsql: Unable to connect to database server %s.  CALLS WILL NOT BE LOGGED!!\n", pghostname);