From 19c3ee85ea45c5b5c2a479af17b86ba65d52673b Mon Sep 17 00:00:00 2001 From: "M.Shulhan" Date: Sun, 5 Apr 2009 17:52:57 +0700 Subject: fix sort process. type/vos_TStmtJoin: - deleted, merge into vos_TStmtMeta.h op/vos_File: - file_open: fix memory leak on open fail. - file_raw_copy: new, function to copy file. - file_raw_get_dirname: new, function to get directory name from path. op/vos_Stmt: - stmt_find_by_name: re-structure the switch. - stmt_update_meta: new, update meta filename to the last output name. op/vos_StmtSort: - stmtsort_create: remove free sort->in. - stmtsort_init_output: split into stmtsort_init. - stmtsort_init: new. - stmtsort_destroy: remove unused temporary sort file. proc/vos_create: - vos_process_create: update meta object before processing. proc/vos_join: - change affect by split of stmtsort_init_output. proc/vos_parser: - use 'filename' as an alias if alias is not set in statement. proc/vos_sort: - vos_process_sort: fix bug in sort process. when input file is splitted into several thread and the first split is precisely end in a new-line character, this can cause losing of one row in the output file. - sort_write: use 'get_tmp_dir()' to get temporary directory. proc/vos_sort_merge: - vos_sort_merge: simplified the merge process if only one file to be merged. - single_merge: new. vos: - get_tmp_dir: new, function to get temporary directory. --- op/vos_File.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++ op/vos_File.h | 2 + op/vos_Stmt.c | 78 +++++++++++++++--------------- op/vos_Stmt.h | 1 + op/vos_StmtJoin.h | 1 - op/vos_StmtSort.c | 66 +++++++++++++++++++------- op/vos_StmtSort.h | 1 + proc/vos_create.c | 2 + proc/vos_create.h | 1 + proc/vos_join.c | 4 +- proc/vos_parser.c | 34 ++++++++++---- proc/vos_sort.c | 30 +++++------- proc/vos_sort_merge.c | 24 +++++++++- type/vos_TStmtJoin.h | 17 ------- type/vos_TStmtMeta.h | 13 +++++ vos.c | 32 +++++++++++++ vos.h | 1 + 17 files changed, 333 insertions(+), 102 deletions(-) delete mode 100644 type/vos_TStmtJoin.h diff --git a/op/vos_File.c b/op/vos_File.c index dfb80cb..f6ccd07 100644 --- a/op/vos_File.c +++ b/op/vos_File.c @@ -32,6 +32,8 @@ int file_open(struct File **F, const char *f, int flag) if ((*F)->d < 0) { str_raw_copy(f, &_vos.e_sparm0); + free((*F)); + (*F) = 0; switch (errno) { case ENOENT: @@ -249,3 +251,129 @@ int file_raw_is_exist(const char *file) return s; } + +/** + * @desc: copy file 'from' to 'to'. + */ +int file_raw_copy(const char *from, const char *to) +{ + int s; + int fdin; + int fdout; + long int nread = 0; + long int nwrite = 0; + long int ntot = 0; + char *buf; + + fdin = open(from, O_RDONLY); + if (fdin < 0) + return E_FILE_OPEN; + + fdout = open(to, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); + if (fdout < 0) { + close(fdin); + return E_FILE_OPEN; + } + + buf = (char *) calloc(_vos.file_buf_size, sizeof(char)); + if (! buf) { + close(fdin); + close(fdout); + return E_MEM; + } + + nread = read(fdin, buf, _vos.file_buf_size); + if (nread < 0) { + s = E_FILE_READ; + goto err; + } + while (nread > 0) { + do { + nwrite = write(fdout, &buf[ntot], nread); + if (nwrite < 0) { + s = E_FILE_WRITE; + goto err; + } + ntot += nwrite; + nread -= nwrite; + } while (nread > 0); + + nwrite = 0; + ntot = 0; + + nread = read(fdin, buf, _vos.file_buf_size); + if (nread < 0) { + s = E_FILE_READ; + goto err; + } + } + + s = 0; +err: + close(fdout); + close(fdin); + free(buf); + return s; +} + +/** + * @desc: get directory name from path. + * path => dirname + * "/usr/lib" => "/usr" + * "/usr/" => "/" + * "usr" => "." + * "/" => "/" + * "." => "." + * ".." => ".." + * + * @return: + * < 0 : success. + * < E_MEM : fail. + */ +int file_raw_get_dirname(const char *path, char **dirname) +{ + int l = 0; + char *d = 0; + + if (! path + || (path && strcmp(path, "..") == 0)) { + d = (char *) calloc(2, sizeof(char)); + if (d) + d[0] = '.'; + goto out; + } + + l = strlen(path) - 1; + + /* path : '/' */ + if (l == 0 && path[0] == '/') { + d = (char *) calloc(2, sizeof(char)); + if (d) + d[0] = '/'; + goto out; + } + + /* path : '/path///' */ + while (l >= 0 && path[l] == '/') + l--; + + while (l >= 0 && path[l] != '/') + l--; + + if (l < 0) { + d = (char *) calloc(2, sizeof(char)); + if (d) + d[0] = '.'; + } else { + if (l == 0) + l++; + d = (char *) calloc(l + 1, sizeof(char)); + if (d) + memcpy(d, path, l); + } + +out: + (*dirname) = d; + + return d ? 0 : E_MEM; +} diff --git a/op/vos_File.h b/op/vos_File.h index 90f8a22..6d98166 100644 --- a/op/vos_File.h +++ b/op/vos_File.h @@ -23,5 +23,7 @@ void file_close(struct File **F); int file_raw_get_size(const char *file, unsigned long *fsize); int file_raw_is_exist(const char *file); +int file_raw_copy(const char *from, const char *to); +int file_raw_get_dirname(const char *path, char **dirname); #endif diff --git a/op/vos_Stmt.c b/op/vos_Stmt.c index 41fc13c..0ad0bbd 100644 --- a/op/vos_Stmt.c +++ b/op/vos_Stmt.c @@ -26,59 +26,26 @@ void stmt_add(struct Stmt **stmt, struct Stmt *new_stmt) (*stmt)->last = new_stmt; } -struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name) +struct Stmt * stmt_find_by_name(struct Stmt *p, const char *name) { - int s = 0; - struct Stmt *p = stmt->last; - while (p) { switch (p->type) { case STMT_LOAD: - s = strcasecmp(p->in->filename, name); - if (s == 0) - return p; - - if (p->in->alias) { - s = strcasecmp(p->in->alias, name); - if (s == 0) - return p; - } - break; - case STMT_SORT: - s = strcasecmp(p->in->filename, name); - if (s == 0) + if (strcasecmp(p->in->filename, name) == 0) return p; - if (p->in->alias) { - s = strcasecmp(p->in->alias, name); - if (s == 0) - return p; - } - break; - - case STMT_CREATE: - s = strcasecmp(p->out->filename, name); - if (s == 0) + if (strcasecmp(p->in->alias, name) == 0) return p; - - if (p->out->alias) { - s = strcasecmp(p->out->alias, name); - if (s == 0) - return p; - } break; + case STMT_CREATE: case STMT_JOIN: - s = strcasecmp(p->out->filename, name); - if (s == 0) + if (strcasecmp(p->out->filename, name) == 0) return p; - if (p->out->alias) { - s = strcasecmp(p->out->alias, name); - if (s == 0) - return p; - } + if (strcasecmp(p->out->alias, name) == 0) + return p; break; } p = p->prev; @@ -87,6 +54,37 @@ struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name) return 0; } +/** + * @desc: update the filename in 'smeta' to point to sort output. + * + * some statement use sort output as input, since sort output some time + * is not defined in script (without INTO clause) then we need to update + * the filename before processing. + * + * @return: + * < 0 : success. + * < E_FILE_NOT_EXIST : fail, could not find file alias in 'stmt'. + */ +int stmt_update_meta(struct Stmt *stmt, struct StmtMeta *smeta) +{ + struct Stmt *p; + + if (smeta->filename) + return 0; + + p = stmt_find_by_name(stmt, smeta->alias); + if (! p) { + str_raw_copy(smeta->alias, &_vos.e_sparm0); + return E_FILE_NOT_EXIST; + } + + if (p->type == STMT_SORT) { + str_raw_copy(p->out->filename, &smeta->filename); + } + + return 0; +} + void stmt_print(struct Stmt *stmt) { while (stmt) { diff --git a/op/vos_Stmt.h b/op/vos_Stmt.h index cbd589b..17a001a 100644 --- a/op/vos_Stmt.h +++ b/op/vos_Stmt.h @@ -13,6 +13,7 @@ extern const char *_stmt_type[N_STMT_TYPE]; void stmt_add(struct Stmt **stmt, struct Stmt *new_stmt); struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name); +int stmt_update_meta(struct Stmt *stmt, struct StmtMeta *smeta); void stmt_print(struct Stmt *stmt); void stmt_destroy(struct Stmt **stmt); diff --git a/op/vos_StmtJoin.h b/op/vos_StmtJoin.h index a360f91..7267492 100644 --- a/op/vos_StmtJoin.h +++ b/op/vos_StmtJoin.h @@ -1,7 +1,6 @@ #ifndef _VOS_STMTJOIN_H #define _VOS_STMTJOIN_H 1 -#include "type/vos_TStmtJoin.h" #include "type/vos_TStmt.h" #include "op/vos_StmtMeta.h" #include "op/vos_File.h" diff --git a/op/vos_StmtSort.c b/op/vos_StmtSort.c index a2d4bc8..437feeb 100644 --- a/op/vos_StmtSort.c +++ b/op/vos_StmtSort.c @@ -8,7 +8,6 @@ int stmtsort_create(struct Stmt **sort) (*sort)->out = (struct StmtMeta *) calloc(1, sizeof(struct StmtMeta)); if (! (*sort)->out) { - free((*sort)->in); free((*sort)); (*sort) = 0; return E_MEM; @@ -17,9 +16,9 @@ int stmtsort_create(struct Stmt **sort) return 0; } -int stmtsort_init_output(struct Stmt *sort) +int stmtsort_init(struct Stmt *sort) { - int s = E_MEM; + int s = 0; struct Field *fld_in = 0; struct Field *fld_out = 0; @@ -30,18 +29,7 @@ int stmtsort_init_output(struct Stmt *sort) return E_MEM; } - if (! sort->out->filename) { - do { - s = str_raw_randomize(VOS_SORT_OUT_FORMAT, - &sort->out->filename); - if (s) - return s; - - s = file_raw_is_exist(sort->out->filename); - if (s) - free(sort->out->filename); - } while (s); - } else { + if (sort->out->filename) { /* check if file output is exist */ s = file_raw_is_exist(sort->out->filename); if (s) { @@ -79,6 +67,49 @@ int stmtsort_init_output(struct Stmt *sort) return 0; } +/** + * @stmtsort_init_output: get temporary file name for sort output. + * + * @return: + * < 0 : success. + * < !0 : fail. + */ +int stmtsort_init_output(struct Stmt *sort) +{ + int s; + char *rndm_name = 0; + struct String *tmp = 0; + + /* filename already declared by INTO statement */ + if (sort->out->filename) + return 0; + + str_create(&tmp); + + do { + str_append(tmp, get_tmp_dir(0)); + + s = str_raw_randomize(VOS_SORT_OUT_FORMAT, &rndm_name); + if (s) + goto err; + + str_append(tmp, rndm_name); + + s = file_raw_is_exist(tmp->buf); + if (s) + str_prune(tmp); + + free(rndm_name); + } while (s); + + sort->out->flag |= SORT_TMP; + sort->out->filename = tmp->buf; + tmp->buf = 0; +err: + str_destroy(&tmp); + return s; +} + void stmtsort_print(struct Stmt *sort) { if (! sort) @@ -99,8 +130,11 @@ void stmtsort_destroy(struct Stmt **sort) stmtmeta_soft_destroy(&(*sort)->in); if ((*sort)->out) { - if ((*sort)->out->filename) + if ((*sort)->out->filename) { + if ((*sort)->out->flag & SORT_TMP) + unlink((*sort)->out->filename); free((*sort)->out->filename); + } if ((*sort)->out->alias) free((*sort)->out->alias); field_soft_destroy(&(*sort)->out->fields); diff --git a/op/vos_StmtSort.h b/op/vos_StmtSort.h index e167876..4edbaff 100644 --- a/op/vos_StmtSort.h +++ b/op/vos_StmtSort.h @@ -8,6 +8,7 @@ #define sort_get_idx(T) get_token_idx(_fflag_sort, N_FFLAG_SORT, T) int stmtsort_create(struct Stmt **sort); +int stmtsort_init(struct Stmt *sort); int stmtsort_init_output(struct Stmt *sort); void stmtsort_print(struct Stmt *sort); void stmtsort_destroy(struct Stmt **sort); diff --git a/proc/vos_create.c b/proc/vos_create.c index 14134ab..d7c6730 100644 --- a/proc/vos_create.c +++ b/proc/vos_create.c @@ -176,6 +176,8 @@ int vos_process_create(struct Stmt *create) pin = create->in; for (i = 0; i < n_in; i++) { + stmt_update_meta(create->prev, pin); + cproc[i].in = pin; cproc[i].status = CPROC_START; cproc[i].buckets = buckets; diff --git a/proc/vos_create.h b/proc/vos_create.h index 92def4e..9e83de0 100644 --- a/proc/vos_create.h +++ b/proc/vos_create.h @@ -4,6 +4,7 @@ #include #include "type/vos_TProcCreate.h" #include "op/vos_Bucket.h" +#include "op/vos_Stmt.h" #define THREAD_TIME_WAIT 0.1 diff --git a/proc/vos_join.c b/proc/vos_join.c index 1da9736..e5b48bb 100644 --- a/proc/vos_join.c +++ b/proc/vos_join.c @@ -14,7 +14,7 @@ static int join_do_sort(struct Stmt *join) join->in->next = 0; sort->in = join->in; - s = stmtsort_init_output(sort); + s = stmtsort_init(sort); if (s) goto err; @@ -38,7 +38,7 @@ static int join_do_sort(struct Stmt *join) sort->in = join->in->next; - s = stmtsort_init_output(sort); + s = stmtsort_init(sort); if (s) goto err; diff --git a/proc/vos_parser.c b/proc/vos_parser.c index dfad97e..590a4bc 100644 --- a/proc/vos_parser.c +++ b/proc/vos_parser.c @@ -108,7 +108,7 @@ static int stmt_get_output_from(struct StmtMeta **smeta, struct Stmt *stmt, int s; struct Stmt *p = 0; - p = stmt_find_by_name(stmt, name); + p = stmt_find_by_name(stmt->last, name); if (! p) return E_PARSER_INV_VALUE; @@ -507,13 +507,19 @@ static int parsing_LOAD(struct Stmt **load, struct LL **ptok) s = E_PARSER_UNX_TOKEN; goto err; } + + /* use filename as an alias if alias is not set */ + if (!(*load)->in->alias) { + str_raw_copy((*load)->in->filename, + &(*load)->in->alias); + } s = PLOAD_DONE; break; case PLOAD_AS_NAME: (*load)->in->alias = (*ptok)->str; (*ptok)->str = 0; - s = PLOAD_END; + s = PLOAD_END; break; } (*ptok) = (*ptok)->next; @@ -629,7 +635,7 @@ static int parsing_SORT(struct Stmt *stmt, struct Stmt **sort, } if (s == PSORT_DONE) - s = stmtsort_init_output((*sort)); + s = stmtsort_init((*sort)); else s = E_PARSER_INV_STMT; err: @@ -831,6 +837,13 @@ static int parsing_CREATE(struct Stmt *stmt, struct Stmt **create, s = E_PARSER_UNX_TOKEN; goto err; } + + /* use filename as an alias if alias is not set */ + if (! (*create)->out->alias) { + str_raw_copy((*create)->out->filename, + &(*create)->out->alias); + } + s = PCREATE_DONE; break; @@ -1070,12 +1083,17 @@ static int parsing_JOIN(struct Stmt *stmt, struct Stmt **join, } case PJOIN_END: - if ((*ptok)->str[0] == ';') { - s = PJOIN_DONE; - break; + if ((*ptok)->str[0] != ';') { + s = E_PARSER_UNK_TOKEN; + goto err; } - s = E_PARSER_UNK_TOKEN; - goto err; + /* use filename as an alias if alias is not set */ + if (! (*join)->out->alias) { + str_raw_copy((*join)->out->filename, + &(*join)->out->alias); + } + s = PJOIN_DONE; + break; case PJOIN_INTO: (*join)->out->filename = (*ptok)->str; diff --git a/proc/vos_sort.c b/proc/vos_sort.c index 6e2029c..a013bc4 100644 --- a/proc/vos_sort.c +++ b/proc/vos_sort.c @@ -128,22 +128,7 @@ static int sort_write(struct ProcSort *psort, struct Record *rows) str_create(&tmp); do { - /* get a path to temporary directory */ - if (_vos.proc_max > 1) { - do { - s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock); - } while (s); - } - - str_append(tmp, _vos.p_proc_tmp_dir->str); - - _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next; - if (! _vos.p_proc_tmp_dir) - _vos.p_proc_tmp_dir = _vos.proc_tmp_dir; - - if (_vos.proc_max > 1) { - pthread_mutex_unlock(&_vos.proc_tmp_dir_lock); - } + str_append(tmp, get_tmp_dir(1)); /* get random file name */ s = str_raw_randomize(VOS_SORT_TMP_FORMAT, &rndm_name); @@ -218,6 +203,10 @@ static void *sort_process(void *parm) goto err; } + if (_vos.debug & DBG_SORT) { + printf("(%lu) pos start : %ld\n", psort->tid, FCURP(F)); + } + /* phase 1: create & fill rows */ while (FCURP(F) < psort->pos_end) { s = record_read(&R, F, psort->sort->in->fields); @@ -230,6 +219,10 @@ static void *sort_process(void *parm) break; } + if (_vos.debug & DBG_SORT) { + printf("(%lu) pos end : %ld\n", psort->tid, FCURP(F)); + } + /* check if error, but not end of file */ if (s && s != E_FILE_END) goto err; @@ -359,7 +352,10 @@ int vos_process_sort(struct Stmt *sort) /* create thread for sort */ for (i = 0; i < _vos.proc_max; i++) { - sort_proc[i].pos_start = i * esize; + if (i == 0) + sort_proc[i].pos_start = i * esize; + else + sort_proc[i].pos_start = (i * esize) - 1; if ((i + 1) == _vos.proc_max) sort_proc[i].pos_end = fsize; diff --git a/proc/vos_sort_merge.c b/proc/vos_sort_merge.c index 30bddf2..97660f1 100644 --- a/proc/vos_sort_merge.c +++ b/proc/vos_sort_merge.c @@ -123,6 +123,24 @@ static struct MNode * mnode_get_loser(struct MNode **root, return loser; } +static int single_merge(const char *from, const char *to) +{ + int s; + char *from_dir = 0; + char *to_dir = 0; + + file_raw_get_dirname(from, &from_dir); + file_raw_get_dirname(to, &to_dir); + + if (strcmp(from_dir, to_dir) == 0) { + s = rename(from, to); + } else { + s = file_raw_copy(from, to); + } + + return s; +} + int vos_sort_merge(struct Stmt *sort, struct LL *lsfile, struct Record **_all_rows, unsigned long all_n_row) { @@ -136,9 +154,13 @@ int vos_sort_merge(struct Stmt *sort, struct LL *lsfile, struct Record *R = 0; struct Record *all_rows = (*_all_rows); + s = stmtsort_init_output(sort); + if (s) + return s; + /* in case of only one file to merge */ if (! lsfile->next) { - s = rename(lsfile->str, sort->out->filename); + s = single_merge(lsfile->str, sort->out->filename); return s; } diff --git a/type/vos_TStmtJoin.h b/type/vos_TStmtJoin.h deleted file mode 100644 index aae8fc3..0000000 --- a/type/vos_TStmtJoin.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef _VOS_TYPE_STMTJOIN_H -#define _VOS_TYPE_STMTJOIN_H 1 - -enum _stmt_join_flag { - JOIN_NORMAL = 0, - JOIN_OUTER = 1, - JOIN_ANTI = 2, - N_JOIN_FLAG -}; - -enum _stmt_join_sort_flag { - JOIN_UNSORTED = 4, - JOIN_SORTED = 8, - N_JOIN_SORT_FLAG -}; - -#endif diff --git a/type/vos_TStmtMeta.h b/type/vos_TStmtMeta.h index 5e37c3b..6377988 100644 --- a/type/vos_TStmtMeta.h +++ b/type/vos_TStmtMeta.h @@ -3,6 +3,19 @@ #include "type/vos_TField.h" +enum _stmtmeta_flag { + JOIN_NORMAL = 0, + JOIN_OUTER = 1, + JOIN_ANTI = 2, + N_JOIN_FLAG = 3, + + JOIN_UNSORTED = 4, + JOIN_SORTED = 8, + N_JOIN_SORT_FLAG = 9, + + SORT_TMP = 16 +}; + struct StmtMeta { int flag; char *filename; diff --git a/vos.c b/vos.c index c71cb27..12b29b5 100644 --- a/vos.c +++ b/vos.c @@ -34,6 +34,38 @@ int get_token_idx(const char **ls, const unsigned int n, const char *tok) return i; } +/** + * @desc: get one of the temporary directory from the list of temporary + * directories. + * + * @return: + * < tmp_dir : a path to random temporary directory. + */ +char *get_tmp_dir(const int lock) +{ + int s; + char *tmp_dir = 0; + + if (_vos.proc_max > 1 && lock) { + do { + s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock); + } while (s); + } + + tmp_dir = _vos.p_proc_tmp_dir->str; + + /* to the next temporary dir */ + _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next; + if (! _vos.p_proc_tmp_dir) + _vos.p_proc_tmp_dir = _vos.proc_tmp_dir; + + if (_vos.proc_max > 1 && lock) { + pthread_mutex_unlock(&_vos.proc_tmp_dir_lock); + } + + return tmp_dir; +} + static int vos_init(int argc, char **argv) { int i = 1; diff --git a/vos.h b/vos.h index f29a3ab..fa2568e 100644 --- a/vos.h +++ b/vos.h @@ -16,5 +16,6 @@ struct Vos _vos; int get_token_idx(const char **ls, const unsigned int n, const char *tok); +char *get_tmp_dir(const int lock); #endif -- cgit v1.3