aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--op/vos_File.c128
-rw-r--r--op/vos_File.h2
-rw-r--r--op/vos_Stmt.c78
-rw-r--r--op/vos_Stmt.h1
-rw-r--r--op/vos_StmtJoin.h1
-rw-r--r--op/vos_StmtSort.c66
-rw-r--r--op/vos_StmtSort.h1
-rw-r--r--proc/vos_create.c2
-rw-r--r--proc/vos_create.h1
-rw-r--r--proc/vos_join.c4
-rw-r--r--proc/vos_parser.c34
-rw-r--r--proc/vos_sort.c30
-rw-r--r--proc/vos_sort_merge.c24
-rw-r--r--type/vos_TStmtJoin.h17
-rw-r--r--type/vos_TStmtMeta.h13
-rw-r--r--vos.c32
-rw-r--r--vos.h1
17 files changed, 333 insertions, 102 deletions
diff --git a/op/vos_File.c b/op/vos_File.c
index dfb80cb..f6ccd07 100644
--- a/op/vos_File.c
+++ b/op/vos_File.c
@@ -32,6 +32,8 @@ int file_open(struct File **F, const char *f, int flag)
if ((*F)->d < 0) {
str_raw_copy(f, &_vos.e_sparm0);
+ free((*F));
+ (*F) = 0;
switch (errno) {
case ENOENT:
@@ -249,3 +251,129 @@ int file_raw_is_exist(const char *file)
return s;
}
+
+/**
+ * @desc: copy file 'from' to 'to'.
+ */
+int file_raw_copy(const char *from, const char *to)
+{
+ int s;
+ int fdin;
+ int fdout;
+ long int nread = 0;
+ long int nwrite = 0;
+ long int ntot = 0;
+ char *buf;
+
+ fdin = open(from, O_RDONLY);
+ if (fdin < 0)
+ return E_FILE_OPEN;
+
+ fdout = open(to, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
+ if (fdout < 0) {
+ close(fdin);
+ return E_FILE_OPEN;
+ }
+
+ buf = (char *) calloc(_vos.file_buf_size, sizeof(char));
+ if (! buf) {
+ close(fdin);
+ close(fdout);
+ return E_MEM;
+ }
+
+ nread = read(fdin, buf, _vos.file_buf_size);
+ if (nread < 0) {
+ s = E_FILE_READ;
+ goto err;
+ }
+ while (nread > 0) {
+ do {
+ nwrite = write(fdout, &buf[ntot], nread);
+ if (nwrite < 0) {
+ s = E_FILE_WRITE;
+ goto err;
+ }
+ ntot += nwrite;
+ nread -= nwrite;
+ } while (nread > 0);
+
+ nwrite = 0;
+ ntot = 0;
+
+ nread = read(fdin, buf, _vos.file_buf_size);
+ if (nread < 0) {
+ s = E_FILE_READ;
+ goto err;
+ }
+ }
+
+ s = 0;
+err:
+ close(fdout);
+ close(fdin);
+ free(buf);
+ return s;
+}
+
+/**
+ * @desc: get directory name from path.
+ * path => dirname
+ * "/usr/lib" => "/usr"
+ * "/usr/" => "/"
+ * "usr" => "."
+ * "/" => "/"
+ * "." => "."
+ * ".." => ".."
+ *
+ * @return:
+ * < 0 : success.
+ * < E_MEM : fail.
+ */
+int file_raw_get_dirname(const char *path, char **dirname)
+{
+ int l = 0;
+ char *d = 0;
+
+ if (! path
+ || (path && strcmp(path, "..") == 0)) {
+ d = (char *) calloc(2, sizeof(char));
+ if (d)
+ d[0] = '.';
+ goto out;
+ }
+
+ l = strlen(path) - 1;
+
+ /* path : '/' */
+ if (l == 0 && path[0] == '/') {
+ d = (char *) calloc(2, sizeof(char));
+ if (d)
+ d[0] = '/';
+ goto out;
+ }
+
+ /* path : '/path///' */
+ while (l >= 0 && path[l] == '/')
+ l--;
+
+ while (l >= 0 && path[l] != '/')
+ l--;
+
+ if (l < 0) {
+ d = (char *) calloc(2, sizeof(char));
+ if (d)
+ d[0] = '.';
+ } else {
+ if (l == 0)
+ l++;
+ d = (char *) calloc(l + 1, sizeof(char));
+ if (d)
+ memcpy(d, path, l);
+ }
+
+out:
+ (*dirname) = d;
+
+ return d ? 0 : E_MEM;
+}
diff --git a/op/vos_File.h b/op/vos_File.h
index 90f8a22..6d98166 100644
--- a/op/vos_File.h
+++ b/op/vos_File.h
@@ -23,5 +23,7 @@ void file_close(struct File **F);
int file_raw_get_size(const char *file, unsigned long *fsize);
int file_raw_is_exist(const char *file);
+int file_raw_copy(const char *from, const char *to);
+int file_raw_get_dirname(const char *path, char **dirname);
#endif
diff --git a/op/vos_Stmt.c b/op/vos_Stmt.c
index 41fc13c..0ad0bbd 100644
--- a/op/vos_Stmt.c
+++ b/op/vos_Stmt.c
@@ -26,59 +26,26 @@ void stmt_add(struct Stmt **stmt, struct Stmt *new_stmt)
(*stmt)->last = new_stmt;
}
-struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name)
+struct Stmt * stmt_find_by_name(struct Stmt *p, const char *name)
{
- int s = 0;
- struct Stmt *p = stmt->last;
-
while (p) {
switch (p->type) {
case STMT_LOAD:
- s = strcasecmp(p->in->filename, name);
- if (s == 0)
- return p;
-
- if (p->in->alias) {
- s = strcasecmp(p->in->alias, name);
- if (s == 0)
- return p;
- }
- break;
-
case STMT_SORT:
- s = strcasecmp(p->in->filename, name);
- if (s == 0)
+ if (strcasecmp(p->in->filename, name) == 0)
return p;
- if (p->in->alias) {
- s = strcasecmp(p->in->alias, name);
- if (s == 0)
- return p;
- }
- break;
-
- case STMT_CREATE:
- s = strcasecmp(p->out->filename, name);
- if (s == 0)
+ if (strcasecmp(p->in->alias, name) == 0)
return p;
-
- if (p->out->alias) {
- s = strcasecmp(p->out->alias, name);
- if (s == 0)
- return p;
- }
break;
+ case STMT_CREATE:
case STMT_JOIN:
- s = strcasecmp(p->out->filename, name);
- if (s == 0)
+ if (strcasecmp(p->out->filename, name) == 0)
return p;
- if (p->out->alias) {
- s = strcasecmp(p->out->alias, name);
- if (s == 0)
- return p;
- }
+ if (strcasecmp(p->out->alias, name) == 0)
+ return p;
break;
}
p = p->prev;
@@ -87,6 +54,37 @@ struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name)
return 0;
}
+/**
+ * @desc: update the filename in 'smeta' to point to sort output.
+ *
+ * some statement use sort output as input, since sort output some time
+ * is not defined in script (without INTO clause) then we need to update
+ * the filename before processing.
+ *
+ * @return:
+ * < 0 : success.
+ * < E_FILE_NOT_EXIST : fail, could not find file alias in 'stmt'.
+ */
+int stmt_update_meta(struct Stmt *stmt, struct StmtMeta *smeta)
+{
+ struct Stmt *p;
+
+ if (smeta->filename)
+ return 0;
+
+ p = stmt_find_by_name(stmt, smeta->alias);
+ if (! p) {
+ str_raw_copy(smeta->alias, &_vos.e_sparm0);
+ return E_FILE_NOT_EXIST;
+ }
+
+ if (p->type == STMT_SORT) {
+ str_raw_copy(p->out->filename, &smeta->filename);
+ }
+
+ return 0;
+}
+
void stmt_print(struct Stmt *stmt)
{
while (stmt) {
diff --git a/op/vos_Stmt.h b/op/vos_Stmt.h
index cbd589b..17a001a 100644
--- a/op/vos_Stmt.h
+++ b/op/vos_Stmt.h
@@ -13,6 +13,7 @@ extern const char *_stmt_type[N_STMT_TYPE];
void stmt_add(struct Stmt **stmt, struct Stmt *new_stmt);
struct Stmt * stmt_find_by_name(struct Stmt *stmt, const char *name);
+int stmt_update_meta(struct Stmt *stmt, struct StmtMeta *smeta);
void stmt_print(struct Stmt *stmt);
void stmt_destroy(struct Stmt **stmt);
diff --git a/op/vos_StmtJoin.h b/op/vos_StmtJoin.h
index a360f91..7267492 100644
--- a/op/vos_StmtJoin.h
+++ b/op/vos_StmtJoin.h
@@ -1,7 +1,6 @@
#ifndef _VOS_STMTJOIN_H
#define _VOS_STMTJOIN_H 1
-#include "type/vos_TStmtJoin.h"
#include "type/vos_TStmt.h"
#include "op/vos_StmtMeta.h"
#include "op/vos_File.h"
diff --git a/op/vos_StmtSort.c b/op/vos_StmtSort.c
index a2d4bc8..437feeb 100644
--- a/op/vos_StmtSort.c
+++ b/op/vos_StmtSort.c
@@ -8,7 +8,6 @@ int stmtsort_create(struct Stmt **sort)
(*sort)->out = (struct StmtMeta *) calloc(1, sizeof(struct StmtMeta));
if (! (*sort)->out) {
- free((*sort)->in);
free((*sort));
(*sort) = 0;
return E_MEM;
@@ -17,9 +16,9 @@ int stmtsort_create(struct Stmt **sort)
return 0;
}
-int stmtsort_init_output(struct Stmt *sort)
+int stmtsort_init(struct Stmt *sort)
{
- int s = E_MEM;
+ int s = 0;
struct Field *fld_in = 0;
struct Field *fld_out = 0;
@@ -30,18 +29,7 @@ int stmtsort_init_output(struct Stmt *sort)
return E_MEM;
}
- if (! sort->out->filename) {
- do {
- s = str_raw_randomize(VOS_SORT_OUT_FORMAT,
- &sort->out->filename);
- if (s)
- return s;
-
- s = file_raw_is_exist(sort->out->filename);
- if (s)
- free(sort->out->filename);
- } while (s);
- } else {
+ if (sort->out->filename) {
/* check if file output is exist */
s = file_raw_is_exist(sort->out->filename);
if (s) {
@@ -79,6 +67,49 @@ int stmtsort_init_output(struct Stmt *sort)
return 0;
}
+/**
+ * @stmtsort_init_output: get temporary file name for sort output.
+ *
+ * @return:
+ * < 0 : success.
+ * < !0 : fail.
+ */
+int stmtsort_init_output(struct Stmt *sort)
+{
+ int s;
+ char *rndm_name = 0;
+ struct String *tmp = 0;
+
+ /* filename already declared by INTO statement */
+ if (sort->out->filename)
+ return 0;
+
+ str_create(&tmp);
+
+ do {
+ str_append(tmp, get_tmp_dir(0));
+
+ s = str_raw_randomize(VOS_SORT_OUT_FORMAT, &rndm_name);
+ if (s)
+ goto err;
+
+ str_append(tmp, rndm_name);
+
+ s = file_raw_is_exist(tmp->buf);
+ if (s)
+ str_prune(tmp);
+
+ free(rndm_name);
+ } while (s);
+
+ sort->out->flag |= SORT_TMP;
+ sort->out->filename = tmp->buf;
+ tmp->buf = 0;
+err:
+ str_destroy(&tmp);
+ return s;
+}
+
void stmtsort_print(struct Stmt *sort)
{
if (! sort)
@@ -99,8 +130,11 @@ void stmtsort_destroy(struct Stmt **sort)
stmtmeta_soft_destroy(&(*sort)->in);
if ((*sort)->out) {
- if ((*sort)->out->filename)
+ if ((*sort)->out->filename) {
+ if ((*sort)->out->flag & SORT_TMP)
+ unlink((*sort)->out->filename);
free((*sort)->out->filename);
+ }
if ((*sort)->out->alias)
free((*sort)->out->alias);
field_soft_destroy(&(*sort)->out->fields);
diff --git a/op/vos_StmtSort.h b/op/vos_StmtSort.h
index e167876..4edbaff 100644
--- a/op/vos_StmtSort.h
+++ b/op/vos_StmtSort.h
@@ -8,6 +8,7 @@
#define sort_get_idx(T) get_token_idx(_fflag_sort, N_FFLAG_SORT, T)
int stmtsort_create(struct Stmt **sort);
+int stmtsort_init(struct Stmt *sort);
int stmtsort_init_output(struct Stmt *sort);
void stmtsort_print(struct Stmt *sort);
void stmtsort_destroy(struct Stmt **sort);
diff --git a/proc/vos_create.c b/proc/vos_create.c
index 14134ab..d7c6730 100644
--- a/proc/vos_create.c
+++ b/proc/vos_create.c
@@ -176,6 +176,8 @@ int vos_process_create(struct Stmt *create)
pin = create->in;
for (i = 0; i < n_in; i++) {
+ stmt_update_meta(create->prev, pin);
+
cproc[i].in = pin;
cproc[i].status = CPROC_START;
cproc[i].buckets = buckets;
diff --git a/proc/vos_create.h b/proc/vos_create.h
index 92def4e..9e83de0 100644
--- a/proc/vos_create.h
+++ b/proc/vos_create.h
@@ -4,6 +4,7 @@
#include <pthread.h>
#include "type/vos_TProcCreate.h"
#include "op/vos_Bucket.h"
+#include "op/vos_Stmt.h"
#define THREAD_TIME_WAIT 0.1
diff --git a/proc/vos_join.c b/proc/vos_join.c
index 1da9736..e5b48bb 100644
--- a/proc/vos_join.c
+++ b/proc/vos_join.c
@@ -14,7 +14,7 @@ static int join_do_sort(struct Stmt *join)
join->in->next = 0;
sort->in = join->in;
- s = stmtsort_init_output(sort);
+ s = stmtsort_init(sort);
if (s)
goto err;
@@ -38,7 +38,7 @@ static int join_do_sort(struct Stmt *join)
sort->in = join->in->next;
- s = stmtsort_init_output(sort);
+ s = stmtsort_init(sort);
if (s)
goto err;
diff --git a/proc/vos_parser.c b/proc/vos_parser.c
index dfad97e..590a4bc 100644
--- a/proc/vos_parser.c
+++ b/proc/vos_parser.c
@@ -108,7 +108,7 @@ static int stmt_get_output_from(struct StmtMeta **smeta, struct Stmt *stmt,
int s;
struct Stmt *p = 0;
- p = stmt_find_by_name(stmt, name);
+ p = stmt_find_by_name(stmt->last, name);
if (! p)
return E_PARSER_INV_VALUE;
@@ -507,13 +507,19 @@ static int parsing_LOAD(struct Stmt **load, struct LL **ptok)
s = E_PARSER_UNX_TOKEN;
goto err;
}
+
+ /* use filename as an alias if alias is not set */
+ if (!(*load)->in->alias) {
+ str_raw_copy((*load)->in->filename,
+ &(*load)->in->alias);
+ }
s = PLOAD_DONE;
break;
case PLOAD_AS_NAME:
(*load)->in->alias = (*ptok)->str;
(*ptok)->str = 0;
- s = PLOAD_END;
+ s = PLOAD_END;
break;
}
(*ptok) = (*ptok)->next;
@@ -629,7 +635,7 @@ static int parsing_SORT(struct Stmt *stmt, struct Stmt **sort,
}
if (s == PSORT_DONE)
- s = stmtsort_init_output((*sort));
+ s = stmtsort_init((*sort));
else
s = E_PARSER_INV_STMT;
err:
@@ -831,6 +837,13 @@ static int parsing_CREATE(struct Stmt *stmt, struct Stmt **create,
s = E_PARSER_UNX_TOKEN;
goto err;
}
+
+ /* use filename as an alias if alias is not set */
+ if (! (*create)->out->alias) {
+ str_raw_copy((*create)->out->filename,
+ &(*create)->out->alias);
+ }
+
s = PCREATE_DONE;
break;
@@ -1070,12 +1083,17 @@ static int parsing_JOIN(struct Stmt *stmt, struct Stmt **join,
}
case PJOIN_END:
- if ((*ptok)->str[0] == ';') {
- s = PJOIN_DONE;
- break;
+ if ((*ptok)->str[0] != ';') {
+ s = E_PARSER_UNK_TOKEN;
+ goto err;
}
- s = E_PARSER_UNK_TOKEN;
- goto err;
+ /* use filename as an alias if alias is not set */
+ if (! (*join)->out->alias) {
+ str_raw_copy((*join)->out->filename,
+ &(*join)->out->alias);
+ }
+ s = PJOIN_DONE;
+ break;
case PJOIN_INTO:
(*join)->out->filename = (*ptok)->str;
diff --git a/proc/vos_sort.c b/proc/vos_sort.c
index 6e2029c..a013bc4 100644
--- a/proc/vos_sort.c
+++ b/proc/vos_sort.c
@@ -128,22 +128,7 @@ static int sort_write(struct ProcSort *psort, struct Record *rows)
str_create(&tmp);
do {
- /* get a path to temporary directory */
- if (_vos.proc_max > 1) {
- do {
- s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock);
- } while (s);
- }
-
- str_append(tmp, _vos.p_proc_tmp_dir->str);
-
- _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next;
- if (! _vos.p_proc_tmp_dir)
- _vos.p_proc_tmp_dir = _vos.proc_tmp_dir;
-
- if (_vos.proc_max > 1) {
- pthread_mutex_unlock(&_vos.proc_tmp_dir_lock);
- }
+ str_append(tmp, get_tmp_dir(1));
/* get random file name */
s = str_raw_randomize(VOS_SORT_TMP_FORMAT, &rndm_name);
@@ -218,6 +203,10 @@ static void *sort_process(void *parm)
goto err;
}
+ if (_vos.debug & DBG_SORT) {
+ printf("(%lu) pos start : %ld\n", psort->tid, FCURP(F));
+ }
+
/* phase 1: create & fill rows */
while (FCURP(F) < psort->pos_end) {
s = record_read(&R, F, psort->sort->in->fields);
@@ -230,6 +219,10 @@ static void *sort_process(void *parm)
break;
}
+ if (_vos.debug & DBG_SORT) {
+ printf("(%lu) pos end : %ld\n", psort->tid, FCURP(F));
+ }
+
/* check if error, but not end of file */
if (s && s != E_FILE_END)
goto err;
@@ -359,7 +352,10 @@ int vos_process_sort(struct Stmt *sort)
/* create thread for sort */
for (i = 0; i < _vos.proc_max; i++) {
- sort_proc[i].pos_start = i * esize;
+ if (i == 0)
+ sort_proc[i].pos_start = i * esize;
+ else
+ sort_proc[i].pos_start = (i * esize) - 1;
if ((i + 1) == _vos.proc_max)
sort_proc[i].pos_end = fsize;
diff --git a/proc/vos_sort_merge.c b/proc/vos_sort_merge.c
index 30bddf2..97660f1 100644
--- a/proc/vos_sort_merge.c
+++ b/proc/vos_sort_merge.c
@@ -123,6 +123,24 @@ static struct MNode * mnode_get_loser(struct MNode **root,
return loser;
}
+static int single_merge(const char *from, const char *to)
+{
+ int s;
+ char *from_dir = 0;
+ char *to_dir = 0;
+
+ file_raw_get_dirname(from, &from_dir);
+ file_raw_get_dirname(to, &to_dir);
+
+ if (strcmp(from_dir, to_dir) == 0) {
+ s = rename(from, to);
+ } else {
+ s = file_raw_copy(from, to);
+ }
+
+ return s;
+}
+
int vos_sort_merge(struct Stmt *sort, struct LL *lsfile,
struct Record **_all_rows, unsigned long all_n_row)
{
@@ -136,9 +154,13 @@ int vos_sort_merge(struct Stmt *sort, struct LL *lsfile,
struct Record *R = 0;
struct Record *all_rows = (*_all_rows);
+ s = stmtsort_init_output(sort);
+ if (s)
+ return s;
+
/* in case of only one file to merge */
if (! lsfile->next) {
- s = rename(lsfile->str, sort->out->filename);
+ s = single_merge(lsfile->str, sort->out->filename);
return s;
}
diff --git a/type/vos_TStmtJoin.h b/type/vos_TStmtJoin.h
deleted file mode 100644
index aae8fc3..0000000
--- a/type/vos_TStmtJoin.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef _VOS_TYPE_STMTJOIN_H
-#define _VOS_TYPE_STMTJOIN_H 1
-
-enum _stmt_join_flag {
- JOIN_NORMAL = 0,
- JOIN_OUTER = 1,
- JOIN_ANTI = 2,
- N_JOIN_FLAG
-};
-
-enum _stmt_join_sort_flag {
- JOIN_UNSORTED = 4,
- JOIN_SORTED = 8,
- N_JOIN_SORT_FLAG
-};
-
-#endif
diff --git a/type/vos_TStmtMeta.h b/type/vos_TStmtMeta.h
index 5e37c3b..6377988 100644
--- a/type/vos_TStmtMeta.h
+++ b/type/vos_TStmtMeta.h
@@ -3,6 +3,19 @@
#include "type/vos_TField.h"
+enum _stmtmeta_flag {
+ JOIN_NORMAL = 0,
+ JOIN_OUTER = 1,
+ JOIN_ANTI = 2,
+ N_JOIN_FLAG = 3,
+
+ JOIN_UNSORTED = 4,
+ JOIN_SORTED = 8,
+ N_JOIN_SORT_FLAG = 9,
+
+ SORT_TMP = 16
+};
+
struct StmtMeta {
int flag;
char *filename;
diff --git a/vos.c b/vos.c
index c71cb27..12b29b5 100644
--- a/vos.c
+++ b/vos.c
@@ -34,6 +34,38 @@ int get_token_idx(const char **ls, const unsigned int n, const char *tok)
return i;
}
+/**
+ * @desc: get one of the temporary directory from the list of temporary
+ * directories.
+ *
+ * @return:
+ * < tmp_dir : a path to random temporary directory.
+ */
+char *get_tmp_dir(const int lock)
+{
+ int s;
+ char *tmp_dir = 0;
+
+ if (_vos.proc_max > 1 && lock) {
+ do {
+ s = pthread_mutex_trylock(&_vos.proc_tmp_dir_lock);
+ } while (s);
+ }
+
+ tmp_dir = _vos.p_proc_tmp_dir->str;
+
+ /* to the next temporary dir */
+ _vos.p_proc_tmp_dir = _vos.p_proc_tmp_dir->next;
+ if (! _vos.p_proc_tmp_dir)
+ _vos.p_proc_tmp_dir = _vos.proc_tmp_dir;
+
+ if (_vos.proc_max > 1 && lock) {
+ pthread_mutex_unlock(&_vos.proc_tmp_dir_lock);
+ }
+
+ return tmp_dir;
+}
+
static int vos_init(int argc, char **argv)
{
int i = 1;
diff --git a/vos.h b/vos.h
index f29a3ab..fa2568e 100644
--- a/vos.h
+++ b/vos.h
@@ -16,5 +16,6 @@
struct Vos _vos;
int get_token_idx(const char **ls, const unsigned int n, const char *tok);
+char *get_tmp_dir(const int lock);
#endif