diff options
Diffstat (limited to 'proc')
| -rw-r--r-- | proc/vos_create.c | 2 | ||||
| -rw-r--r-- | proc/vos_create.h | 1 | ||||
| -rw-r--r-- | proc/vos_join.c | 4 | ||||
| -rw-r--r-- | proc/vos_parser.c | 34 | ||||
| -rw-r--r-- | proc/vos_sort.c | 30 | ||||
| -rw-r--r-- | proc/vos_sort_merge.c | 24 |
6 files changed, 67 insertions, 28 deletions
diff --git a/proc/vos_create.c b/proc/vos_create.c index 14134ab..d7c6730 100644 --- a/proc/vos_create.c +++ b/proc/vos_create.c @@ -176,6 +176,8 @@ int vos_process_create(struct Stmt *create) pin = create->in; for (i = 0; i < n_in; i++) { + stmt_update_meta(create->prev, pin); + cproc[i].in = pin; cproc[i].status = CPROC_START; cproc[i].buckets = buckets; diff --git a/proc/vos_create.h b/proc/vos_create.h index 92def4e..9e83de0 100644 --- a/proc/vos_create.h +++ b/proc/vos_create.h @@ -4,6 +4,7 @@ #include <pthread.h> #include "type/vos_TProcCreate.h" #include "op/vos_Bucket.h" +#include "op/vos_Stmt.h" #define THREAD_TIME_WAIT 0.1 diff --git a/proc/vos_join.c b/proc/vos_join.c index 1da9736..e5b48bb 100644 --- a/proc/vos_join.c +++ b/proc/vos_join.c @@ -14,7 +14,7 @@ static int join_do_sort(struct Stmt *join) join->in->next = 0; sort->in = join->in; - s = stmtsort_init_output(sort); + s = stmtsort_init(sort); if (s) goto err; @@ -38,7 +38,7 @@ static int join_do_sort(struct Stmt *join) sort->in = join->in->next; - s = stmtsort_init_output(sort); + s = stmtsort_init(sort); if (s) goto err; diff --git a/proc/vos_parser.c b/proc/vos_parser.c index dfad97e..590a4bc 100644 --- a/proc/vos_parser.c +++ b/proc/vos_parser.c @@ -108,7 +108,7 @@ static int stmt_get_output_from(struct StmtMeta **smeta, struct Stmt *stmt, int s; struct Stmt *p = 0; - p = stmt_find_by_name(stmt, name); + p = stmt_find_by_name(stmt->last, name); if (! p) return E_PARSER_INV_VALUE; @@ -507,13 +507,19 @@ static int parsing_LOAD(struct Stmt **load, struct LL **ptok) s = E_PARSER_UNX_TOKEN; goto err; } + + /* use filename as an alias if alias is not set */ + if (!(*load)->in->alias) { + str_raw_copy((*load)->in->filename, + &(*load)->in->alias); + } s = PLOAD_DONE; break; case PLOAD_AS_NAME: (*load)->in->alias = (*ptok)->str; (*ptok)->str = 0; - s = PLOAD_END; + s = PLOAD_END; break; } (*ptok) = (*ptok)->next; @@ -629,7 +635,7 @@ static int parsing_SORT(struct Stmt *stmt, struct Stmt **sort, } if (s == PSORT_DONE) - s = stmtsort_init_output((*sort)); + s = stmtsort_init((*sort)); else s = E_PARSER_INV_STMT; err: @@ -831,6 +837,13 @@ static int parsing_CREATE(struct Stmt *stmt, struct Stmt **create, s = E_PARSER_UNX_TOKEN; goto err; } + + /* use filename as an alias if alias is not set */ + if (! (*create)->out->alias) { + str_raw_copy((*create)->out->filename, + &(*create)->out->alias); + } + s = PCREATE_DONE; break; @@ -1070,12 +1083,17 @@ static int parsing_JOIN(struct Stmt *stmt, struct Stmt **join, } case PJOIN_END: - if ((*ptok)->str[0] == ';') { - s = PJOIN_DONE; - break; + if ((*ptok)->str[0] != ';') { + s = E_PARSER_UNK_TOKEN; + goto err; } - s = E_PARSER_UNK_TOKEN; - goto err; + /* use filename as an alias if alias is not set */ + if (! (*join)->out->alias) { + str_raw_copy((*join)->out->filename, + &(*join)->out->alias); + } + s = PJOIN_DONE; + break; case PJOIN_INTO: (*join)->out->filename = (*ptok)->str; diff --git a/proc/vos_sort.c b/proc/vos_sort.c index 6e2029c..a013bc4 100644 --- a/proc/vos_sort.c +++ b/proc/vos_sort.c @@ -128,22 +128,7 @@ static int sort_write(struct ProcSort *psort, struct Record *rows) str_create(&tmp); do { - /* get a path to temporary directory */ - if (_vos.proc_max > 1) { - do { - s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock); - } while (s); - } - - str_append(tmp, _vos.p_proc_tmp_dir->str); - - _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next; - if (! _vos.p_proc_tmp_dir) - _vos.p_proc_tmp_dir = _vos.proc_tmp_dir; - - if (_vos.proc_max > 1) { - pthread_mutex_unlock(&_vos.proc_tmp_dir_lock); - } + str_append(tmp, get_tmp_dir(1)); /* get random file name */ s = str_raw_randomize(VOS_SORT_TMP_FORMAT, &rndm_name); @@ -218,6 +203,10 @@ static void *sort_process(void *parm) goto err; } + if (_vos.debug & DBG_SORT) { + printf("(%lu) pos start : %ld\n", psort->tid, FCURP(F)); + } + /* phase 1: create & fill rows */ while (FCURP(F) < psort->pos_end) { s = record_read(&R, F, psort->sort->in->fields); @@ -230,6 +219,10 @@ static void *sort_process(void *parm) break; } + if (_vos.debug & DBG_SORT) { + printf("(%lu) pos end : %ld\n", psort->tid, FCURP(F)); + } + /* check if error, but not end of file */ if (s && s != E_FILE_END) goto err; @@ -359,7 +352,10 @@ int vos_process_sort(struct Stmt *sort) /* create thread for sort */ for (i = 0; i < _vos.proc_max; i++) { - sort_proc[i].pos_start = i * esize; + if (i == 0) + sort_proc[i].pos_start = i * esize; + else + sort_proc[i].pos_start = (i * esize) - 1; if ((i + 1) == _vos.proc_max) sort_proc[i].pos_end = fsize; diff --git a/proc/vos_sort_merge.c b/proc/vos_sort_merge.c index 30bddf2..97660f1 100644 --- a/proc/vos_sort_merge.c +++ b/proc/vos_sort_merge.c @@ -123,6 +123,24 @@ static struct MNode * mnode_get_loser(struct MNode **root, return loser; } +static int single_merge(const char *from, const char *to) +{ + int s; + char *from_dir = 0; + char *to_dir = 0; + + file_raw_get_dirname(from, &from_dir); + file_raw_get_dirname(to, &to_dir); + + if (strcmp(from_dir, to_dir) == 0) { + s = rename(from, to); + } else { + s = file_raw_copy(from, to); + } + + return s; +} + int vos_sort_merge(struct Stmt *sort, struct LL *lsfile, struct Record **_all_rows, unsigned long all_n_row) { @@ -136,9 +154,13 @@ int vos_sort_merge(struct Stmt *sort, struct LL *lsfile, struct Record *R = 0; struct Record *all_rows = (*_all_rows); + s = stmtsort_init_output(sort); + if (s) + return s; + /* in case of only one file to merge */ if (! lsfile->next) { - s = rename(lsfile->str, sort->out->filename); + s = single_merge(lsfile->str, sort->out->filename); return s; } |
