aboutsummaryrefslogtreecommitdiff
path: root/internal/postgres/postgres.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2019-11-19 19:04:01 -0500
committerJulie Qiu <julie@golang.org>2020-03-27 16:46:48 -0400
commit65e2b7a804ce25942210f8db0e9552db9d7d6ff5 (patch)
treeb7e330862db226a7f431eeda083312dc52d96556 /internal/postgres/postgres.go
parentc77f28bf038ee25dce5a0cd513b721683bf940cc (diff)
downloadgo-x-pkgsite-65e2b7a804ce25942210f8db0e9552db9d7d6ff5.tar.xz
internal/database, internal/testing/dbtest: site-agnostic DB functionality
Extract into a separate package the core functionality from internal/postgres that doesn't depend on our particular schema. This makes it available for other uses, like devtools commands and etl autocomplete. Do the same for testing functionality. We now have three packages where before we had only one: - internal/postgres: discovery-specific DB operations and test support - internal/database: discovery-agnostic DB operations - internal/testing/dbtest: discovery-agnostic DB test support Change-Id: I54c59aee328dae71ba6c77170a72e7a83da7c785 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/602327 Reviewed-by: Robert Findley <rfindley@google.com>
Diffstat (limited to 'internal/postgres/postgres.go')
-rw-r--r--internal/postgres/postgres.go302
1 files changed, 15 insertions, 287 deletions
diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go
index 3a958c8c..d437838a 100644
--- a/internal/postgres/postgres.go
+++ b/internal/postgres/postgres.go
@@ -5,310 +5,38 @@
package postgres
import (
- "context"
"database/sql"
- "fmt"
- "regexp"
- "strings"
- "sync/atomic"
- "time"
- "unicode"
- "golang.org/x/discovery/internal/config"
- "golang.org/x/discovery/internal/derrors"
- "golang.org/x/discovery/internal/log"
+ "golang.org/x/discovery/internal/database"
)
-// DB wraps a sql.DB to provide an API for interacting with discovery data
-// stored in Postgres.
type DB struct {
- db *sql.DB
+ db *database.DB
}
-// GetSQLDB returns the underlying SQL database for the postgres instance. This
-// allows the ETL to perform streaming operations on the database.
-func (db *DB) GetSQLDB() *sql.DB {
- return db.db
-}
-
-func (db *DB) exec(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error) {
- defer logQuery(query, args)(&err)
-
- return db.db.ExecContext(ctx, query, args...)
-}
-
-func execTx(ctx context.Context, tx *sql.Tx, query string, args ...interface{}) (res sql.Result, err error) {
- defer logQuery(query, args)(&err)
-
- return tx.ExecContext(ctx, query, args...)
-}
-
-func (db *DB) query(ctx context.Context, query string, args ...interface{}) (_ *sql.Rows, err error) {
- defer logQuery(query, args)(&err)
- return db.db.QueryContext(ctx, query, args...)
-}
-
-func (db *DB) queryRow(ctx context.Context, query string, args ...interface{}) *sql.Row {
- defer logQuery(query, args)(nil)
- return db.db.QueryRowContext(ctx, query, args...)
-}
-
-var (
- queryCounter int64 // atomic: per-process counter for unique query IDs
- queryLoggingDisabled bool // For use in tests only: not concurrency-safe.
-)
-
-type queryEndLogEntry struct {
- ID string
- Query string
- Args string
- DurationSeconds float64
- Error string `json:",omitempty"`
-}
-
-func logQuery(query string, args []interface{}) func(*error) {
- if queryLoggingDisabled {
- return func(*error) {}
- }
- const maxlen = 300 // maximum length of displayed query
-
- // To make the query more compact and readable, replace newlines with spaces
- // and collapse adjacent whitespace.
- var r []rune
- for _, c := range query {
- if c == '\n' {
- c = ' '
- }
- if len(r) == 0 || !unicode.IsSpace(r[len(r)-1]) || !unicode.IsSpace(c) {
- r = append(r, c)
- }
- }
- query = string(r)
- if len(query) > maxlen {
- query = query[:maxlen] + "..."
- }
-
- instanceID := config.InstanceID()
- if instanceID == "" {
- instanceID = "local"
- } else {
- // Instance IDs are long strings. The low-order part seems quite random, so
- // shortening the ID will still likely result in something unique.
- instanceID = instanceID[len(instanceID)-4:]
- }
- n := atomic.AddInt64(&queryCounter, 1)
- uid := fmt.Sprintf("%s-%d", instanceID, n)
-
- // Construct a short string of the args.
- const (
- maxArgs = 20
- maxArgLen = 50
- )
- var argStrings []string
- for i := 0; i < len(args) && i < maxArgs; i++ {
- s := fmt.Sprint(args[i])
- if len(s) > maxArgLen {
- s = s[:maxArgLen] + "..."
- }
- argStrings = append(argStrings, s)
- }
- if len(args) > maxArgs {
- argStrings = append(argStrings, "...")
- }
- argString := strings.Join(argStrings, ", ")
-
- log.Debugf("%s %s args=%s", uid, query, argString)
- start := time.Now()
- return func(errp *error) {
- dur := time.Since(start)
- if errp == nil { // happens with queryRow
- log.Debugf("%s done", uid)
- } else {
- derrors.Wrap(errp, "DB running query %s", uid)
- entry := queryEndLogEntry{
- ID: uid,
- Query: query,
- Args: argString,
- DurationSeconds: dur.Seconds(),
- }
- if *errp == nil {
- log.Debug(entry)
- } else {
- entry.Error = (*errp).Error()
- log.Error(entry)
- }
- }
- }
-}
-
-// Open creates a new DB for the given Postgres connection string.
-func Open(driverName, dbinfo string) (_ *DB, err error) {
- defer derrors.Wrap(&err, "postgres.Open(%q, %q)",
- driverName, redactPassword(dbinfo))
-
- db, err := sql.Open(driverName, dbinfo)
+// Open opens a new postgres DB.
+// TODO(jba): take a *sql.DB.
+func Open(driverName, dbinfo string) (*DB, error) {
+ db, err := database.Open(driverName, dbinfo)
if err != nil {
return nil, err
}
-
- if err = db.Ping(); err != nil {
- return nil, err
- }
return &DB{db}, nil
}
-var passwordRegexp = regexp.MustCompile(`password=\S+`)
-
-func redactPassword(dbinfo string) string {
- return passwordRegexp.ReplaceAllLiteralString(dbinfo, "password=REDACTED")
-}
-
-// Transact executes the given function in the context of a SQL transaction,
-// rolling back the transaction if the function panics or returns an error.
-func (db *DB) Transact(txFunc func(*sql.Tx) error) (err error) {
- tx, err := db.db.Begin()
- if err != nil {
- return fmt.Errorf("db.Begin(): %v", err)
- }
-
- defer func() {
- if p := recover(); p != nil {
- tx.Rollback()
- panic(p)
- } else if err != nil {
- tx.Rollback()
- } else {
- if err = tx.Commit(); err != nil {
- err = fmt.Errorf("tx.Commit(): %v", err)
- }
- }
- }()
-
- if err := txFunc(tx); err != nil {
- return fmt.Errorf("txFunc(tx): %v", err)
- }
- return nil
-}
-
-const onConflictDoNothing = "ON CONFLICT DO NOTHING"
-
-// bulkInsert constructs and executes a multi-value insert statement. The
-// query is constructed using the format: INSERT TO <table> (<columns>) VALUES
-// (<placeholders-for-each-item-in-values>) If conflictNoAction is true, it
-// append ON CONFLICT DO NOTHING to the end of the query. The query is executed
-// using a PREPARE statement with the provided values.
-func bulkInsert(ctx context.Context, tx *sql.Tx, table string, columns []string, values []interface{}, conflictAction string) (err error) {
- defer derrors.Wrap(&err, "bulkInsert(ctx, tx, %q, %v, [%d values], %q)",
- table, columns, len(values), conflictAction)
-
- if remainder := len(values) % len(columns); remainder != 0 {
- return fmt.Errorf("modulus of len(values) and len(columns) must be 0: got %d", remainder)
- }
-
- // Postgres supports up to 65535 parameters, but stop well before that
- // so we don't construct humongous queries.
- const maxParameters = 1000
- stride := (maxParameters / len(columns)) * len(columns)
- if stride == 0 {
- // This is a pathological case (len(columns) > maxParameters), but we
- // handle it cautiously.
- return fmt.Errorf("too many columns to insert: %d", len(columns))
- }
- for leftBound := 0; leftBound < len(values); leftBound += stride {
- rightBound := leftBound + stride
- if rightBound > len(values) {
- rightBound = len(values)
- }
- valueSlice := values[leftBound:rightBound]
- query, err := buildInsertQuery(table, columns, valueSlice, conflictAction)
- if err != nil {
- return fmt.Errorf("buildInsertQuery(%q, %v, values[%d:%d], %q): %v", table, columns, leftBound, rightBound, conflictAction, err)
- }
-
- if _, err := execTx(ctx, tx, query, valueSlice...); err != nil {
- return fmt.Errorf("tx.ExecContext(ctx, [bulk insert query], values[%d:%d]): %v", leftBound, rightBound, err)
- }
- }
- return nil
-}
-
-// buildInsertQuery builds an multi-value insert query, following the format:
-// INSERT TO <table> (<columns>) VALUES
-// (<placeholders-for-each-item-in-values>) If conflictNoAction is true, it
-// append ON CONFLICT DO NOTHING to the end of the query.
-//
-// When calling buildInsertQuery, it must be true that
-// len(values) % len(columns) == 0
-func buildInsertQuery(table string, columns []string, values []interface{}, conflictAction string) (string, error) {
- var b strings.Builder
- fmt.Fprintf(&b, "INSERT INTO %s", table)
- fmt.Fprintf(&b, "(%s) VALUES", strings.Join(columns, ", "))
-
- var placeholders []string
- for i := 1; i <= len(values); i++ {
- // Construct the full query by adding placeholders for each
- // set of values that we want to insert.
- placeholders = append(placeholders, fmt.Sprintf("$%d", i))
- if i%len(columns) != 0 {
- continue
- }
-
- // When the end of a set is reached, write it to the query
- // builder and reset placeholders.
- fmt.Fprintf(&b, "(%s)", strings.Join(placeholders, ", "))
- placeholders = []string{}
-
- // Do not add a comma delimiter after the last set of values.
- if i == len(values) {
- break
- }
- b.WriteString(", ")
- }
- if conflictAction != "" {
- b.WriteString(" " + conflictAction)
- }
-
- return b.String(), nil
-}
-
-// Close closes the database connection.
+// Close closes a DB.
func (db *DB) Close() error {
return db.db.Close()
}
-// runQuery executes query, then calls f on each row.
-func (db *DB) runQuery(ctx context.Context, query string, f func(*sql.Rows) error, params ...interface{}) error {
- rows, err := db.query(ctx, query, params...)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- if err := f(rows); err != nil {
- return err
- }
- }
- return rows.Err()
-}
-
-// emptyStringScanner wraps the functionality of sql.NullString to just write
-// an empty string if the value is NULL.
-type emptyStringScanner struct {
- ptr *string
-}
-
-func (e emptyStringScanner) Scan(value interface{}) error {
- var ns sql.NullString
- if err := ns.Scan(value); err != nil {
- return err
- }
- *e.ptr = ns.String
- return nil
+// Underlying returns the *database.DB inside db.
+func (db *DB) Underlying() *database.DB {
+ return db.db
}
-// nullIsEmpty returns a sql.Scanner that writes the empty string to s if the
-// sql.Value is NULL.
-func nullIsEmpty(s *string) sql.Scanner {
- return emptyStringScanner{s}
+// TODO(jba): remove.
+// GetSQLDB returns the underlying SQL database for the postgres instance. This
+// allows the ETL to perform streaming operations on the database.
+func (db *DB) GetSQLDB() *sql.DB {
+ return db.db.Underlying()
}