2 * Asterisk -- An open source telephony toolkit.
6 * Steve Murphy - adapted to CEL, from:
7 * Matthew D. Hardeman <mhardemn@papersoft.com>
8 * Adapted from the MySQL CDR logger originally by James Sharp
10 * Modified April, 2007; Dec, 2008
11 * Steve Murphy <murf@digium.com>
13 * Modified September 2003
14 * Matthew D. Hardeman <mhardemn@papersoft.com>
16 * See http://www.asterisk.org for more information about
17 * the Asterisk project. Please do not directly contact
18 * any of the maintainers of this project for assistance;
19 * the project provides a web site, mailing lists and IRC
20 * channels for your use.
22 * This program is free software, distributed under the terms of
23 * the GNU General Public License Version 2. See the LICENSE file
24 * at the top of the source tree.
29 * \brief PostgreSQL CEL logger
31 * \author Steve Murphy <murf@digium.com>
32 * PostgreSQL http://www.postgresql.org/
35 * \arg \ref Config_cel
36 * PostgreSQL http://www.postgresql.org/
37 * \ingroup cel_drivers
41 <depend>pgsql</depend>
42 <support_level>extended</support_level>
47 ASTERISK_REGISTER_FILE()
51 #include "asterisk/config.h"
52 #include "asterisk/options.h"
53 #include "asterisk/channel.h"
54 #include "asterisk/cel.h"
55 #include "asterisk/module.h"
56 #include "asterisk/logger.h"
59 #define DATE_FORMAT "%Y-%m-%d %T.%6q"
61 #define PGSQL_BACKEND_NAME "CEL PGSQL backend"
63 #define PGSQL_MIN_VERSION_SCHEMA 70300
65 static char *config = "cel_pgsql.conf";
67 static char *pghostname;
68 static char *pgdbname;
69 static char *pgdbuser;
70 static char *pgpassword;
71 static char *pgappname;
72 static char *pgdbport;
76 static int connected = 0;
77 /* Optimization to reduce number of memory allocations */
78 static int maxsize = 512, maxsize2 = 512;
79 static int usegmtime = 0;
81 /*! \brief show_user_def is off by default */
82 #define CEL_SHOW_USERDEF_DEFAULT 0
84 /*! TRUE if we should set the eventtype field to USER_DEFINED on user events. */
85 static unsigned char cel_show_user_def;
87 AST_MUTEX_DEFINE_STATIC(pgsql_lock);
89 static PGconn *conn = NULL;
90 static PGresult *result = NULL;
96 unsigned int notnull:1;
97 unsigned int hasdefault:1;
98 AST_RWLIST_ENTRY(columns) list;
101 static AST_RWLIST_HEAD_STATIC(psql_columns, columns);
103 #define LENGTHEN_BUF1(size) \
105 /* Lengthen buffer, if necessary */ \
106 if (ast_str_strlen(sql) + size + 1 > ast_str_size(sql)) { \
107 if (ast_str_make_space(&sql, ((ast_str_size(sql) + size + 3) / 512 + 1) * 512) != 0) { \
108 ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CEL '%s:%s' failed.\n", pghostname, table); \
111 AST_RWLIST_UNLOCK(&psql_columns); \
117 #define LENGTHEN_BUF2(size) \
119 if (ast_str_strlen(sql2) + size + 1 > ast_str_size(sql2)) { \
120 if (ast_str_make_space(&sql2, ((ast_str_size(sql2) + size + 3) / 512 + 1) * 512) != 0) { \
121 ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CEL '%s:%s' failed.\n", pghostname, table); \
124 AST_RWLIST_UNLOCK(&psql_columns); \
130 static void pgsql_reconnect(void)
132 struct ast_str *conn_info = ast_str_create(128);
134 ast_log(LOG_ERROR, "Failed to allocate memory for connection string.\n");
143 ast_str_set(&conn_info, 0, "host=%s port=%s dbname=%s user=%s",
144 pghostname, pgdbport, pgdbname, pgdbuser);
146 if (!ast_strlen_zero(pgappname)) {
147 ast_str_append(&conn_info, 0, " application_name=%s", pgappname);
150 if (!ast_strlen_zero(pgpassword)) {
151 ast_str_append(&conn_info, 0, " password=%s", pgpassword);
154 conn = PQconnectdb(ast_str_buffer(conn_info));
159 static void pgsql_log(struct ast_event *event)
164 struct ast_cel_event_record record = {
165 .version = AST_CEL_EVENT_RECORD_VERSION,
168 if (ast_cel_fill_record(event, &record)) {
172 ast_mutex_lock(&pgsql_lock);
174 ast_localtime(&record.event_time, &tm, usegmtime ? "GMT" : NULL);
175 ast_strftime(timestr, sizeof(timestr), DATE_FORMAT, &tm);
177 if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
179 if (PQstatus(conn) != CONNECTION_BAD) {
182 pgerror = PQerrorMessage(conn);
183 ast_log(LOG_ERROR, "cel_pgsql: Unable to connect to database server %s. Calls will not be logged!\n", pghostname);
184 ast_log(LOG_ERROR, "cel_pgsql: Reason: %s\n", pgerror);
191 struct ast_str *sql = ast_str_create(maxsize), *sql2 = ast_str_create(maxsize2);
192 char buf[257], escapebuf[513];
197 goto ast_log_cleanup;
200 ast_str_set(&sql, 0, "INSERT INTO %s (", table);
201 ast_str_set(&sql2, 0, " VALUES (");
203 #define SEP (first ? "" : ",")
205 AST_RWLIST_RDLOCK(&psql_columns);
206 AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
207 LENGTHEN_BUF1(strlen(cur->name) + 2);
208 ast_str_append(&sql, 0, "%s\"%s\"", SEP, cur->name);
210 if (strcmp(cur->name, "eventtime") == 0) {
211 if (strncmp(cur->type, "int", 3) == 0) {
213 ast_str_append(&sql2, 0, "%s%ld", SEP, (long) record.event_time.tv_sec);
214 } else if (strncmp(cur->type, "float", 5) == 0) {
216 ast_str_append(&sql2, 0, "%s%f",
218 (double) record.event_time.tv_sec +
219 (double) record.event_time.tv_usec / 1000000.0);
221 /* char, hopefully */
223 ast_localtime(&record.event_time, &tm, usegmtime ? "GMT" : NULL);
224 ast_strftime(buf, sizeof(buf), DATE_FORMAT, &tm);
225 ast_str_append(&sql2, 0, "%s'%s'", SEP, buf);
227 } else if (strcmp(cur->name, "eventtype") == 0) {
228 if (cur->type[0] == 'i') {
229 /* Get integer, no need to escape anything */
231 ast_str_append(&sql2, 0, "%s%d", SEP, (int) record.event_type);
232 } else if (strncmp(cur->type, "float", 5) == 0) {
234 ast_str_append(&sql2, 0, "%s%f", SEP, (double) record.event_type);
236 /* Char field, probably */
237 const char *event_name;
239 event_name = (!cel_show_user_def
240 && record.event_type == AST_CEL_USER_DEFINED)
241 ? record.user_defined_name : record.event_name;
242 LENGTHEN_BUF2(strlen(event_name) + 1);
243 ast_str_append(&sql2, 0, "%s'%s'", SEP, event_name);
245 } else if (strcmp(cur->name, "amaflags") == 0) {
246 if (strncmp(cur->type, "int", 3) == 0) {
247 /* Integer, no need to escape anything */
249 ast_str_append(&sql2, 0, "%s%u", SEP, record.amaflag);
251 /* Although this is a char field, there are no special characters in the values for these fields */
253 ast_str_append(&sql2, 0, "%s'%u'", SEP, record.amaflag);
256 /* Arbitrary field, could be anything */
257 if (strcmp(cur->name, "userdeftype") == 0) {
258 value = record.user_defined_name;
259 } else if (strcmp(cur->name, "cid_name") == 0) {
260 value = record.caller_id_name;
261 } else if (strcmp(cur->name, "cid_num") == 0) {
262 value = record.caller_id_num;
263 } else if (strcmp(cur->name, "cid_ani") == 0) {
264 value = record.caller_id_ani;
265 } else if (strcmp(cur->name, "cid_rdnis") == 0) {
266 value = record.caller_id_rdnis;
267 } else if (strcmp(cur->name, "cid_dnid") == 0) {
268 value = record.caller_id_dnid;
269 } else if (strcmp(cur->name, "exten") == 0) {
270 value = record.extension;
271 } else if (strcmp(cur->name, "context") == 0) {
272 value = record.context;
273 } else if (strcmp(cur->name, "channame") == 0) {
274 value = record.channel_name;
275 } else if (strcmp(cur->name, "appname") == 0) {
276 value = record.application_name;
277 } else if (strcmp(cur->name, "appdata") == 0) {
278 value = record.application_data;
279 } else if (strcmp(cur->name, "accountcode") == 0) {
280 value = record.account_code;
281 } else if (strcmp(cur->name, "peeraccount") == 0) {
282 value = record.peer_account;
283 } else if (strcmp(cur->name, "uniqueid") == 0) {
284 value = record.unique_id;
285 } else if (strcmp(cur->name, "linkedid") == 0) {
286 value = record.linked_id;
287 } else if (strcmp(cur->name, "userfield") == 0) {
288 value = record.user_field;
289 } else if (strcmp(cur->name, "peer") == 0) {
291 } else if (strcmp(cur->name, "extra") == 0) {
292 value = record.extra;
298 ast_str_append(&sql2, 0, "%sDEFAULT", SEP);
299 } else if (strncmp(cur->type, "int", 3) == 0) {
301 if (value && sscanf(value, "%30lld", &whatever) == 1) {
303 ast_str_append(&sql2, 0, "%s%lld", SEP, whatever);
306 ast_str_append(&sql2, 0, "%s0", SEP);
308 } else if (strncmp(cur->type, "float", 5) == 0) {
309 long double whatever;
310 if (value && sscanf(value, "%30Lf", &whatever) == 1) {
312 ast_str_append(&sql2, 0, "%s%30Lf", SEP, whatever);
315 ast_str_append(&sql2, 0, "%s0", SEP);
317 /* XXX Might want to handle dates, times, and other misc fields here XXX */
320 PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
324 LENGTHEN_BUF2(strlen(escapebuf) + 3);
325 ast_str_append(&sql2, 0, "%s'%s'", SEP, escapebuf);
330 AST_RWLIST_UNLOCK(&psql_columns);
331 LENGTHEN_BUF1(ast_str_strlen(sql2) + 2);
332 ast_str_append(&sql, 0, ")%s)", ast_str_buffer(sql2));
334 ast_debug(3, "Inserting a CEL record: [%s].\n", ast_str_buffer(sql));
335 /* Test to be sure we're still connected... */
336 /* If we're connected, and connection is working, good. */
337 /* Otherwise, attempt reconnect. If it fails... sorry... */
338 if (PQstatus(conn) == CONNECTION_OK) {
341 ast_log(LOG_WARNING, "Connection was lost... attempting to reconnect.\n");
343 if (PQstatus(conn) == CONNECTION_OK) {
344 ast_log(LOG_NOTICE, "Connection reestablished.\n");
347 pgerror = PQerrorMessage(conn);
348 ast_log(LOG_ERROR, "Unable to reconnect to database server %s. Calls will not be logged!\n", pghostname);
349 ast_log(LOG_ERROR, "Reason: %s\n", pgerror);
353 goto ast_log_cleanup;
356 result = PQexec(conn, ast_str_buffer(sql));
357 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
358 pgerror = PQresultErrorMessage(result);
359 ast_log(LOG_WARNING, "Failed to insert call detail record into database!\n");
360 ast_log(LOG_WARNING, "Reason: %s\n", pgerror);
361 ast_log(LOG_WARNING, "Connection may have been lost... attempting to reconnect.\n");
363 if (PQstatus(conn) == CONNECTION_OK) {
364 ast_log(LOG_NOTICE, "Connection reestablished.\n");
367 result = PQexec(conn, ast_str_buffer(sql));
368 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
369 pgerror = PQresultErrorMessage(result);
370 ast_log(LOG_ERROR, "HARD ERROR! Attempted reconnection failed. DROPPING CALL RECORD!\n");
371 ast_log(LOG_ERROR, "Reason: %s\n", pgerror);
375 goto ast_log_cleanup;
379 /* Next time, just allocate buffers that are that big to start with. */
380 if (ast_str_strlen(sql) > maxsize) {
381 maxsize = ast_str_strlen(sql);
383 if (ast_str_strlen(sql2) > maxsize2) {
384 maxsize2 = ast_str_strlen(sql2);
392 ast_mutex_unlock(&pgsql_lock);
395 static int my_unload_module(void)
397 struct columns *current;
399 ast_cel_backend_unregister(PGSQL_BACKEND_NAME);
400 AST_RWLIST_WRLOCK(&psql_columns);
406 ast_free(pghostname);
418 ast_free(pgpassword);
437 while ((current = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
440 AST_RWLIST_UNLOCK(&psql_columns);
444 static int unload_module(void)
446 return my_unload_module();
449 static int process_my_load_module(struct ast_config *cfg)
451 struct ast_variable *var;
457 if (!(var = ast_variable_browse(cfg, "global"))) {
458 ast_log(LOG_WARNING,"CEL pgsql config file missing global section.\n");
459 return AST_MODULE_LOAD_DECLINE;
461 if (!(tmp = ast_variable_retrieve(cfg,"global","hostname"))) {
462 ast_log(LOG_WARNING,"PostgreSQL server hostname not specified. Assuming unix socket connection\n");
463 tmp = ""; /* connect via UNIX-socket by default */
466 ast_free(pghostname);
467 if (!(pghostname = ast_strdup(tmp))) {
468 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying host info\n");
469 return AST_MODULE_LOAD_DECLINE;
471 if (!(tmp = ast_variable_retrieve(cfg, "global", "dbname"))) {
472 ast_log(LOG_WARNING,"PostgreSQL database not specified. Assuming asterisk\n");
473 tmp = "asteriskceldb";
477 if (!(pgdbname = ast_strdup(tmp))) {
478 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying dbname info\n");
479 return AST_MODULE_LOAD_DECLINE;
481 if (!(tmp = ast_variable_retrieve(cfg, "global", "user"))) {
482 ast_log(LOG_WARNING,"PostgreSQL database user not specified. Assuming asterisk\n");
487 if (!(pgdbuser = ast_strdup(tmp))) {
488 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying user info\n");
489 return AST_MODULE_LOAD_DECLINE;
491 if (!(tmp = ast_variable_retrieve(cfg, "global", "password"))) {
492 ast_log(LOG_WARNING, "PostgreSQL database password not specified. Assuming blank\n");
496 ast_free(pgpassword);
497 if (!(pgpassword = ast_strdup(tmp))) {
498 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying password info\n");
499 return AST_MODULE_LOAD_DECLINE;
501 if (!(tmp = ast_variable_retrieve(cfg, "global", "appname"))) {
507 if (!(pgappname = ast_strdup(tmp))) {
508 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying appname info\n");
509 return AST_MODULE_LOAD_DECLINE;
512 if (!(tmp = ast_variable_retrieve(cfg,"global","port"))) {
513 ast_log(LOG_WARNING,"PostgreSQL database port not specified. Using default 5432.\n");
518 if (!(pgdbport = ast_strdup(tmp))) {
519 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying port info\n");
520 return AST_MODULE_LOAD_DECLINE;
522 if (!(tmp = ast_variable_retrieve(cfg, "global", "table"))) {
523 ast_log(LOG_WARNING,"CEL table not specified. Assuming cel\n");
528 if (!(table = ast_strdup(tmp))) {
529 return AST_MODULE_LOAD_DECLINE;
531 cel_show_user_def = CEL_SHOW_USERDEF_DEFAULT;
532 if ((tmp = ast_variable_retrieve(cfg, "global", "show_user_defined"))) {
533 cel_show_user_def = ast_true(tmp) ? 1 : 0;
535 if ((tmp = ast_variable_retrieve(cfg, "global", "usegmtime"))) {
536 usegmtime = ast_true(tmp);
540 if (!(tmp = ast_variable_retrieve(cfg, "global", "schema"))) {
546 if (!(schema = ast_strdup(tmp))) {
547 ast_log(LOG_WARNING,"PostgreSQL Ran out of memory copying schema info\n");
548 return AST_MODULE_LOAD_DECLINE;
551 if (ast_strlen_zero(pghostname)) {
552 ast_debug(3, "cel_pgsql: using default unix socket\n");
554 ast_debug(3, "cel_pgsql: got hostname of %s\n", pghostname);
556 ast_debug(3, "cel_pgsql: got port of %s\n", pgdbport);
557 ast_debug(3, "cel_pgsql: got user of %s\n", pgdbuser);
558 ast_debug(3, "cel_pgsql: got dbname of %s\n", pgdbname);
559 ast_debug(3, "cel_pgsql: got password of %s\n", pgpassword);
560 ast_debug(3, "cel_pgsql: got sql table name of %s\n", table);
561 ast_debug(3, "cel_pgsql: got show_user_defined of %s\n",
562 cel_show_user_def ? "Yes" : "No");
566 if (PQstatus(conn) != CONNECTION_BAD) {
568 char *fname, *ftype, *flen, *fnotnull, *fdef, *tablename, *tmp_tablename;
569 int i, rows, version;
571 ast_debug(1, "Successfully connected to PostgreSQL database.\n");
574 version = PQserverVersion(conn);
575 /* Remove any schema name from the table */
576 if ((tmp_tablename = strrchr(table, '.'))) {
579 tmp_tablename = table;
581 tablename = ast_alloca(strlen(tmp_tablename) * 2 + 1);
582 PQescapeStringConn(conn, tablename, tmp_tablename, strlen(tmp_tablename), NULL);
583 if (version >= PGSQL_MIN_VERSION_SCHEMA) {
586 lenschema = strlen(schema);
587 schemaname = ast_alloca(lenschema * 2 + 1);
588 PQescapeStringConn(conn, schemaname, schema, lenschema, NULL);
590 snprintf(sqlcmd, sizeof(sqlcmd),
591 "SELECT a.attname, t.typname, a.attlen, a.attnotnull, d.adsrc, a.atttypmod "
592 "FROM (((pg_catalog.pg_class c INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
593 "AND c.relname = '%s' AND n.nspname = %s%s%s) "
594 "INNER JOIN pg_catalog.pg_attribute a ON ("
595 "NOT a.attisdropped) AND a.attnum > 0 AND a.attrelid = c.oid) "
596 "INNER JOIN pg_catalog.pg_type t ON t.oid = a.atttypid) "
597 "LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid "
598 "AND d.adnum = a.attnum "
599 "ORDER BY n.nspname, c.relname, attnum",
601 lenschema == 0 ? "" : "'", lenschema == 0 ? "current_schema()" : schemaname, lenschema == 0 ? "" : "'");
603 snprintf(sqlcmd, sizeof(sqlcmd),
604 "SELECT a.attname, t.typname, a.attlen, a.attnotnull, d.adsrc, a.atttypmod "
605 "FROM pg_class c, pg_type t, pg_attribute a "
606 "LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid "
607 "AND d.adnum = a.attnum WHERE c.oid = a.attrelid AND a.atttypid = t.oid "
608 "AND (a.attnum > 0) AND c.relname = '%s' ORDER BY c.relname, attnum", tablename);
610 /* Query the columns */
611 result = PQexec(conn, sqlcmd);
612 if (PQresultStatus(result) != PGRES_TUPLES_OK) {
613 pgerror = PQresultErrorMessage(result);
614 ast_log(LOG_ERROR, "Failed to query database columns: %s\n", pgerror);
617 return AST_MODULE_LOAD_DECLINE;
620 rows = PQntuples(result);
621 for (i = 0; i < rows; i++) {
622 fname = PQgetvalue(result, i, 0);
623 ftype = PQgetvalue(result, i, 1);
624 flen = PQgetvalue(result, i, 2);
625 fnotnull = PQgetvalue(result, i, 3);
626 fdef = PQgetvalue(result, i, 4);
627 ast_verb(4, "Found column '%s' of type '%s'\n", fname, ftype);
628 cur = ast_calloc(1, sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
630 sscanf(flen, "%30d", &cur->len);
631 cur->name = (char *)cur + sizeof(*cur);
632 cur->type = (char *)cur + sizeof(*cur) + strlen(fname) + 1;
633 strcpy(cur->name, fname);
634 strcpy(cur->type, ftype);
635 if (*fnotnull == 't') {
640 if (!ast_strlen_zero(fdef)) {
645 AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
650 pgerror = PQerrorMessage(conn);
651 ast_log(LOG_ERROR, "cel_pgsql: Unable to connect to database server %s. CALLS WILL NOT BE LOGGED!!\n", pghostname);
652 ast_log(LOG_ERROR, "cel_pgsql: Reason: %s\n", pgerror);
657 return AST_MODULE_LOAD_SUCCESS;
660 static int my_load_module(int reload)
662 struct ast_config *cfg;
663 struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
665 if ((cfg = ast_config_load(config, config_flags)) == NULL || cfg == CONFIG_STATUS_FILEINVALID) {
666 ast_log(LOG_WARNING, "Unable to load config for PostgreSQL CEL's: %s\n", config);
667 return AST_MODULE_LOAD_DECLINE;
668 } else if (cfg == CONFIG_STATUS_FILEUNCHANGED) {
669 return AST_MODULE_LOAD_SUCCESS;
676 process_my_load_module(cfg);
677 ast_config_destroy(cfg);
679 if (ast_cel_backend_register(PGSQL_BACKEND_NAME, pgsql_log)) {
680 ast_log(LOG_WARNING, "Unable to subscribe to CEL events for pgsql\n");
681 return AST_MODULE_LOAD_DECLINE;
684 return AST_MODULE_LOAD_SUCCESS;
687 static int load_module(void)
689 return my_load_module(0);
692 static int reload(void)
694 return my_load_module(1);
697 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PostgreSQL CEL Backend",
698 .support_level = AST_MODULE_SUPPORT_EXTENDED,
700 .unload = unload_module,
702 .load_pri = AST_MODPRI_CDR_DRIVER,