aboutsummaryrefslogtreecommitdiff
path: root/proc
diff options
context:
space:
mode:
Diffstat (limited to 'proc')
-rw-r--r--proc/vos_create.c2
-rw-r--r--proc/vos_create.h1
-rw-r--r--proc/vos_join.c4
-rw-r--r--proc/vos_parser.c34
-rw-r--r--proc/vos_sort.c30
-rw-r--r--proc/vos_sort_merge.c24
6 files changed, 67 insertions, 28 deletions
diff --git a/proc/vos_create.c b/proc/vos_create.c
index 14134ab..d7c6730 100644
--- a/proc/vos_create.c
+++ b/proc/vos_create.c
@@ -176,6 +176,8 @@ int vos_process_create(struct Stmt *create)
pin = create->in;
for (i = 0; i < n_in; i++) {
+ stmt_update_meta(create->prev, pin);
+
cproc[i].in = pin;
cproc[i].status = CPROC_START;
cproc[i].buckets = buckets;
diff --git a/proc/vos_create.h b/proc/vos_create.h
index 92def4e..9e83de0 100644
--- a/proc/vos_create.h
+++ b/proc/vos_create.h
@@ -4,6 +4,7 @@
#include <pthread.h>
#include "type/vos_TProcCreate.h"
#include "op/vos_Bucket.h"
+#include "op/vos_Stmt.h"
#define THREAD_TIME_WAIT 0.1
diff --git a/proc/vos_join.c b/proc/vos_join.c
index 1da9736..e5b48bb 100644
--- a/proc/vos_join.c
+++ b/proc/vos_join.c
@@ -14,7 +14,7 @@ static int join_do_sort(struct Stmt *join)
join->in->next = 0;
sort->in = join->in;
- s = stmtsort_init_output(sort);
+ s = stmtsort_init(sort);
if (s)
goto err;
@@ -38,7 +38,7 @@ static int join_do_sort(struct Stmt *join)
sort->in = join->in->next;
- s = stmtsort_init_output(sort);
+ s = stmtsort_init(sort);
if (s)
goto err;
diff --git a/proc/vos_parser.c b/proc/vos_parser.c
index dfad97e..590a4bc 100644
--- a/proc/vos_parser.c
+++ b/proc/vos_parser.c
@@ -108,7 +108,7 @@ static int stmt_get_output_from(struct StmtMeta **smeta, struct Stmt *stmt,
int s;
struct Stmt *p = 0;
- p = stmt_find_by_name(stmt, name);
+ p = stmt_find_by_name(stmt->last, name);
if (! p)
return E_PARSER_INV_VALUE;
@@ -507,13 +507,19 @@ static int parsing_LOAD(struct Stmt **load, struct LL **ptok)
s = E_PARSER_UNX_TOKEN;
goto err;
}
+
+ /* use filename as an alias if alias is not set */
+ if (!(*load)->in->alias) {
+ str_raw_copy((*load)->in->filename,
+ &(*load)->in->alias);
+ }
s = PLOAD_DONE;
break;
case PLOAD_AS_NAME:
(*load)->in->alias = (*ptok)->str;
(*ptok)->str = 0;
- s = PLOAD_END;
+ s = PLOAD_END;
break;
}
(*ptok) = (*ptok)->next;
@@ -629,7 +635,7 @@ static int parsing_SORT(struct Stmt *stmt, struct Stmt **sort,
}
if (s == PSORT_DONE)
- s = stmtsort_init_output((*sort));
+ s = stmtsort_init((*sort));
else
s = E_PARSER_INV_STMT;
err:
@@ -831,6 +837,13 @@ static int parsing_CREATE(struct Stmt *stmt, struct Stmt **create,
s = E_PARSER_UNX_TOKEN;
goto err;
}
+
+ /* use filename as an alias if alias is not set */
+ if (! (*create)->out->alias) {
+ str_raw_copy((*create)->out->filename,
+ &(*create)->out->alias);
+ }
+
s = PCREATE_DONE;
break;
@@ -1070,12 +1083,17 @@ static int parsing_JOIN(struct Stmt *stmt, struct Stmt **join,
}
case PJOIN_END:
- if ((*ptok)->str[0] == ';') {
- s = PJOIN_DONE;
- break;
+ if ((*ptok)->str[0] != ';') {
+ s = E_PARSER_UNK_TOKEN;
+ goto err;
}
- s = E_PARSER_UNK_TOKEN;
- goto err;
+ /* use filename as an alias if alias is not set */
+ if (! (*join)->out->alias) {
+ str_raw_copy((*join)->out->filename,
+ &(*join)->out->alias);
+ }
+ s = PJOIN_DONE;
+ break;
case PJOIN_INTO:
(*join)->out->filename = (*ptok)->str;
diff --git a/proc/vos_sort.c b/proc/vos_sort.c
index 6e2029c..a013bc4 100644
--- a/proc/vos_sort.c
+++ b/proc/vos_sort.c
@@ -128,22 +128,7 @@ static int sort_write(struct ProcSort *psort, struct Record *rows)
str_create(&tmp);
do {
- /* get a path to temporary directory */
- if (_vos.proc_max > 1) {
- do {
- s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock);
- } while (s);
- }
-
- str_append(tmp, _vos.p_proc_tmp_dir->str);
-
- _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next;
- if (! _vos.p_proc_tmp_dir)
- _vos.p_proc_tmp_dir = _vos.proc_tmp_dir;
-
- if (_vos.proc_max > 1) {
- pthread_mutex_unlock(&_vos.proc_tmp_dir_lock);
- }
+ str_append(tmp, get_tmp_dir(1));
/* get random file name */
s = str_raw_randomize(VOS_SORT_TMP_FORMAT, &rndm_name);
@@ -218,6 +203,10 @@ static void *sort_process(void *parm)
goto err;
}
+ if (_vos.debug & DBG_SORT) {
+ printf("(%lu) pos start : %ld\n", psort->tid, FCURP(F));
+ }
+
/* phase 1: create & fill rows */
while (FCURP(F) < psort->pos_end) {
s = record_read(&R, F, psort->sort->in->fields);
@@ -230,6 +219,10 @@ static void *sort_process(void *parm)
break;
}
+ if (_vos.debug & DBG_SORT) {
+ printf("(%lu) pos end : %ld\n", psort->tid, FCURP(F));
+ }
+
/* check if error, but not end of file */
if (s && s != E_FILE_END)
goto err;
@@ -359,7 +352,10 @@ int vos_process_sort(struct Stmt *sort)
/* create thread for sort */
for (i = 0; i < _vos.proc_max; i++) {
- sort_proc[i].pos_start = i * esize;
+ if (i == 0)
+ sort_proc[i].pos_start = i * esize;
+ else
+ sort_proc[i].pos_start = (i * esize) - 1;
if ((i + 1) == _vos.proc_max)
sort_proc[i].pos_end = fsize;
diff --git a/proc/vos_sort_merge.c b/proc/vos_sort_merge.c
index 30bddf2..97660f1 100644
--- a/proc/vos_sort_merge.c
+++ b/proc/vos_sort_merge.c
@@ -123,6 +123,24 @@ static struct MNode * mnode_get_loser(struct MNode **root,
return loser;
}
+static int single_merge(const char *from, const char *to)
+{
+ int s;
+ char *from_dir = 0;
+ char *to_dir = 0;
+
+ file_raw_get_dirname(from, &from_dir);
+ file_raw_get_dirname(to, &to_dir);
+
+ if (strcmp(from_dir, to_dir) == 0) {
+ s = rename(from, to);
+ } else {
+ s = file_raw_copy(from, to);
+ }
+
+ return s;
+}
+
int vos_sort_merge(struct Stmt *sort, struct LL *lsfile,
struct Record **_all_rows, unsigned long all_n_row)
{
@@ -136,9 +154,13 @@ int vos_sort_merge(struct Stmt *sort, struct LL *lsfile,
struct Record *R = 0;
struct Record *all_rows = (*_all_rows);
+ s = stmtsort_init_output(sort);
+ if (s)
+ return s;
+
/* in case of only one file to merge */
if (! lsfile->next) {
- s = rename(lsfile->str, sort->out->filename);
+ s = single_merge(lsfile->str, sort->out->filename);
return s;
}