aboutsummaryrefslogtreecommitdiff
path: root/internal/postgres/versionstate.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2020-02-21 06:47:42 -0500
committerJulie Qiu <julie@golang.org>2020-04-06 15:50:52 -0400
commit3460441186bf5722823520bd7cf0e20e08ec8826 (patch)
treec1fe19f27135d341819fe86892f0f5838444c1fd /internal/postgres/versionstate.go
parent66ad1f1dbeac0aaaaadca678cb404c742729f8a4 (diff)
downloadgo-x-pkgsite-3460441186bf5722823520bd7cf0e20e08ec8826.tar.xz
internal/postgres: improve GetNextVersionsToFetch
One of the queries in GetNextVersionsToFetch was taking 30s to run. I couldn't find a way to speed it up. Finally I gave up and rewrote the method to do something much more straightforward from the DB's point of view. First I run a query to get the latest versions of everything in module_version_states. Then I scan the entire table for rows eligible for reprocessing, and group them by priority. Despite the apparent slowness of these queries, the whole thing actually runs much faster than the previous version. Change-Id: Ia88bc26849f0df131d8c00d07bf6a7d160c6b364 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/672575 CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com> Reviewed-by: Julie Qiu <julieqiu@google.com>
Diffstat (limited to 'internal/postgres/versionstate.go')
-rw-r--r--internal/postgres/versionstate.go199
1 files changed, 102 insertions, 97 deletions
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index c3d8176c..9555d42f 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -19,6 +19,7 @@ import (
"golang.org/x/discovery/internal/derrors"
"golang.org/x/discovery/internal/log"
"golang.org/x/discovery/internal/version"
+ "golang.org/x/mod/semver"
)
// InsertIndexVersions inserts new versions into the module_version_states
@@ -233,119 +234,123 @@ func (db *DB) queryModuleVersionStates(ctx context.Context, queryFormat string,
// GetNextVersionsToFetch returns the next batch of versions that must be
// processed.
func (db *DB) GetNextVersionsToFetch(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) {
- // We want to prioritize the latest versions over other ones, and we want to leave time-consuming
- // modules until the end. So we run several queries in succession, appending their results until
- // we reach the limit: latest release versions, latest non-release versions, everything else except
- // slow modules, and finally the slow modules.
+ // We want to prioritize the latest versions over other ones, and we want to
+ // leave time-consuming modules until the end.
+ // We run two queries: the first gets the latest versions of everything; the second
+ // runs through all eligible modules, organizing them by priority.
defer derrors.Wrap(&err, "GetNextVersionsToFetch(ctx, %d)", limit)
- // Query for getting latest release or non-release versions. The first
- // argument to Sprintf will be the columns, the second the operator./ If the
- // operator is "=", we will select all the relase versions; if "!=", then
- // the non-release versions.
- //
- // Adding the nonsensical (and never true) "OR right(sort_version, 2) = ''"
- // generates a much faster query plan for the '=' case, and preserves the fast
- // query plan for the "!=" case. Go fig.
- maxQuery := `
- WITH max_versions AS (
- SELECT module_path, max(sort_version) AS max_sv
- FROM module_version_states
- -- Compare the last character of sort_version to '~'.
- WHERE right(sort_version, 1) %[2]s '~' OR right(sort_version, 2) = ''
- GROUP BY 1
- )
- SELECT
- %[1]s
- FROM
- module_version_states s
- INNER JOIN
- max_versions m
- ON
- s.module_path = m.module_path
- AND
- s.sort_version = m.max_sv
- WHERE
- (status=0 OR status >= 500)
- AND next_processed_after < CURRENT_TIMESTAMP
- ORDER BY
- s.sort_version DESC
- LIMIT $1
- `
- // Query for including or excluding a list of module path patterns.
- // The first argument to Sprintf will be the columns, the
- // second the operator.
- // If the operator is "NOT", then we will exclude the module_paths;
- // it if is the empty string, we will include them.
- // We need to double the percents in the LIKE expressions or Sprintf
- // will try to interpret them.
- pathQuery := `
+ latestVersions, err := db.getLatestVersionsFromModuleVersionStates(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ // isBig reports whether the module path refers to a big module that takes a
+ // long time to process.
+ isBig := func(path string) bool {
+ for _, s := range []string{"kubernetes", "aws-sdk-go"} {
+ if strings.HasSuffix(path, s) {
+ return true
+ }
+ }
+ return false
+ }
+
+ // Create prioritized lists of modules to process. From high to low:
+ // 0: latest version, release
+ // 1: latest version, non-release
+ // 2: not a large zip
+ // 3: the rest
+ mvs := make([][]*internal.ModuleVersionState, 4)
+
+ query := fmt.Sprintf(`
SELECT
- %[1]s
+ %s
FROM
- module_version_states s
+ module_version_states
WHERE
(status=0 OR status >= 500)
- AND next_processed_after < CURRENT_TIMESTAMP
- AND %[2]s (
- module_path LIKE '%%/kubernetes'
- OR
- module_path LIKE '%%/aws-sdk-go'
- )
+ AND
+ next_processed_after < CURRENT_TIMESTAMP
ORDER BY
- right(sort_version, 1) = '~' DESC,
sort_version DESC
- LIMIT $1
- `
- // Prepend "s." to columns, because in maxQuery the bare column name is
- // ambiguous. All the queries use `s` as an alias for module_version_states.
- columnSlice := strings.Split(moduleVersionStateColumns, ",")
- for i, c := range columnSlice {
- columnSlice[i] = "s." + strings.TrimSpace(c)
+ `, moduleVersionStateColumns)
+
+ err = db.db.RunQuery(ctx, query, func(rows *sql.Rows) error {
+ // If the highest-priority list is full, we're done.
+ if len(mvs[0]) >= limit {
+ return io.EOF
+ }
+ mv, err := scanModuleVersionState(rows.Scan)
+ if err != nil {
+ return err
+ }
+ var prio int
+ switch {
+ case mv.Version == latestVersions[mv.ModulePath]:
+ if semver.Prerelease(mv.Version) == "" {
+ prio = 0 // latest release version
+ } else {
+ prio = 1 // latest non-release version
+ }
+ case !isBig(mv.ModulePath):
+ prio = 2
+ default:
+ prio = 3
+ }
+ mvs[prio] = append(mvs[prio], mv)
+ return nil
+ })
+ if err != nil && err != io.EOF {
+ return nil, err
}
- columns := strings.Join(columnSlice, ", ")
- queries := []string{
- // latest release versions
- fmt.Sprintf(maxQuery, columns, "="),
- // latest non-release versions
- fmt.Sprintf(maxQuery, columns, "!="),
- // all other versions in order, except matching module paths
- fmt.Sprintf(pathQuery, columns, "NOT"),
- // all the module paths previously excluded
- fmt.Sprintf(pathQuery, columns, ""),
+ // Combine the four prioritized lists into one.
+ var r []*internal.ModuleVersionState
+ for _, mv := range mvs {
+ if len(r)+len(mv) > limit {
+ return append(r, mv[:limit-len(r)]...), nil
+ }
+ r = append(r, mv...)
}
+ return r, nil
+}
- var mvs []*internal.ModuleVersionState
- // Keep track of rows we've seen to dedup, because queries overlap.
- seen := map[[2]string]bool{}
- for i, q := range queries {
- err := db.db.RunQuery(ctx, q, func(rows *sql.Rows) error {
- if len(mvs) >= limit {
- return io.EOF // signal that we're done
- }
- mv, err := scanModuleVersionState(rows.Scan)
- if err != nil {
+// getLatestVersions returns a map from module path to latest version in module_version_states.
+func (db *DB) getLatestVersionsFromModuleVersionStates(ctx context.Context) (map[string]string, error) {
+ m := map[string]string{}
+ // We want to prefer release to non-release versions. A sort_version will end in '~' if it
+ // is a release, and that is larger than any other character that can occur in a sort_version.
+ // So if we sort first by the last character in sort_version, then by sort_version itself,
+ // we will get releases before non-releases.
+ // To implement that two-level ordering in a MAX, we construct an array of the two strings.
+ // Arrays are ordered lexicographically, so MAX will do just what we want.
+ err := db.db.RunQuery(ctx, `
+ SELECT
+ s.module_path, s.version
+ FROM
+ module_version_states s
+ INNER JOIN (
+ SELECT module_path,
+ MAX(ARRAY[right(sort_version, 1), sort_version]) AS mv
+ FROM module_version_states
+ GROUP BY 1) m
+ ON
+ s.module_path = m.module_path
+ AND
+ s.sort_version = m.mv[2]`,
+ func(rows *sql.Rows) error {
+ var mp, v string
+ if err := rows.Scan(&mp, &v); err != nil {
return err
}
- key := [2]string{mv.ModulePath, mv.Version}
- if !seen[key] {
- mvs = append(mvs, mv)
- seen[key] = true
- }
+ m[mp] = v
return nil
- }, limit) // Do not reduce the limit on each iteration, because the queries overlap.
- switch err {
- case io.EOF:
- log.Infof(ctx, "GetNextVersionsToFetch: finished with query #%d", i)
- return mvs, nil
- case nil:
- continue
- default:
- return nil, err
- }
+ })
+ if err != nil {
+ return nil, err
}
- return mvs, nil
+ return m, nil
}
// GetRecentFailedVersions returns versions that have most recently failed.