Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic and super-experimental support for distributed execution #3205

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions cmd/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package cmd

import (
"bytes"
"encoding/json"

"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/guregu/null.v3"
)

// TODO: a whole lot of cleanup, refactoring, error handling and hardening
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
c := &cmdsRunAndAgent{gs: gs}

c.loadConfiguredTest = func(cmd *cobra.Command, args []string) (
*loadedAndConfiguredTest, execution.Controller, error,
) {
// TODO: add some gRPC authentication
conn, err := grpc.Dial(args[0], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}
c.testEndHook = func(err error) {
gs.Logger.Debugf("k6 agent run ended with err=%s", err)
_ = conn.Close()
}

client := distributed.NewDistributedTestClient(conn)

resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{})
if err != nil {
return nil, nil, err
}

controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
if err != nil {
return nil, nil, err
}

var options lib.Options
if err = json.Unmarshal(resp.Options, &options); err != nil {
return nil, nil, err
}

arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive))
if err != nil {
return nil, nil, err
}

registry := metrics.NewRegistry()
piState := &lib.TestPreInitState{
Logger: gs.Logger,
RuntimeOptions: lib.RuntimeOptions{
NoThresholds: null.BoolFrom(true),
NoSummary: null.BoolFrom(true),
Env: arc.Env,
CompatibilityMode: null.StringFrom(arc.CompatibilityMode),
},
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
}

initRunner, err := js.NewFromArchive(piState, arc)
if err != nil {
return nil, nil, err
}

test := &loadedTest{
pwd: arc.Pwd,
sourceRootPath: arc.Filename,
source: &loader.SourceData{
Data: resp.Archive,
URL: arc.FilenameURL,
},
fs: afero.NewMemMapFs(), // TODO: figure out what should be here
fileSystems: arc.Filesystems,
preInitState: piState,
initRunner: initRunner,
}

pseudoConsoldatedConfig := applyDefault(Config{Options: options})
for _, thresholds := range pseudoConsoldatedConfig.Thresholds {
if err = thresholds.Parse(); err != nil {
return nil, nil, err
}
}
derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger)
if err != nil {
return nil, nil, err
}

configuredTest := &loadedAndConfiguredTest{
loadedTest: test,
consolidatedConfig: pseudoConsoldatedConfig,
derivedConfig: derivedConfig,
}

gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server

return configuredTest, controller, nil // TODO
}

agentCmd := &cobra.Command{
Use: "agent",
Short: "Join a distributed load test",
Long: `TODO`,
Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"),
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

// TODO: add flags

return agentCmd
}
85 changes: 85 additions & 0 deletions cmd/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cmd

import (
"net"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution/distributed"
"google.golang.org/grpc"
)

// cmdCoordinator handles the `k6 coordinator` sub-command
type cmdCoordinator struct {
gs *state.GlobalState
gRPCAddress string
instanceCount int
}

func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}

coordinator, err := distributed.NewCoordinatorServer(
c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger,
)
if err != nil {
return err
}

c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
listener, err := net.Listen("tcp", c.gRPCAddress)
if err != nil {
return err
}

grpcServer := grpc.NewServer() // TODO: add auth and a whole bunch of other options
distributed.RegisterDistributedTestServer(grpcServer, coordinator)

go func() {
err := grpcServer.Serve(listener)
c.gs.Logger.Debugf("gRPC server end: %s", err)
}()
coordinator.Wait()
c.gs.Logger.Infof("All done!")
return nil
}

func (c *cmdCoordinator) flagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.SortFlags = false
flags.AddFlagSet(optionFlagSet())
flags.AddFlagSet(runtimeOptionFlagSet(false))

// TODO: add support bi-directional gRPC authentication and authorization
flags.StringVar(&c.gRPCAddress, "grpc-addr", "localhost:6566", "address on which to bind the gRPC server")

// TODO: add some better way to specify the test, e.g. an execution segment
// sequence + some sort of a way to map instances with specific segments
// (e.g. key-value tags that can be matched to every execution segment, with
// each instance advertising its own tags when it connects).
flags.IntVar(&c.instanceCount, "instance-count", 1, "number of distributed instances")
return flags
}

func getCmdCoordnator(gs *state.GlobalState) *cobra.Command {
c := &cmdCoordinator{
gs: gs,
}

coordinatorCmd := &cobra.Command{
Use: "coordinator",
Short: "Start a distributed load test",
Long: `TODO`,
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

coordinatorCmd.Flags().SortFlags = false
coordinatorCmd.Flags().AddFlagSet(c.flagSet())

return coordinatorCmd
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newRootCommand(gs *state.GlobalState) *rootCommand {
getCmdArchive, getCmdCloud, getCmdNewScript, getCmdInspect,
getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun,
getCmdStats, getCmdStatus, getCmdVersion,
getCmdAgent, getCmdCoordnator,
}

for _, sc := range subCommands {
Expand Down
16 changes: 10 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ import (
"go.k6.io/k6/ui/pb"
)

// cmdRun handles the `k6 run` sub-command
type cmdRun struct {
// cmdsRunAndAgent handles the `k6 run` and `k6 agent` sub-commands
type cmdsRunAndAgent struct {
gs *state.GlobalState

// TODO: figure out something more elegant?
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
testEndHook func(err error)
}

const (
Expand All @@ -60,14 +61,17 @@ const (
// TODO: split apart some more
//
//nolint:funlen,gocognit,gocyclo,cyclop
func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
var logger logrus.FieldLogger = c.gs.Logger
defer func() {
if err == nil {
logger.Debug("Everything has finished, exiting k6 normally!")
} else {
logger.WithError(err).Debug("Everything has finished, exiting k6 with an error!")
}
if c.testEndHook != nil {
c.testEndHook(err)
}
}()
printBanner(c.gs)

Expand Down Expand Up @@ -435,7 +439,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
return nil
}

func (c *cmdRun) flagSet() *pflag.FlagSet {
func (c *cmdsRunAndAgent) flagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.SortFlags = false
flags.AddFlagSet(optionFlagSet())
Expand All @@ -444,7 +448,7 @@ func (c *cmdRun) flagSet() *pflag.FlagSet {
return flags
}

func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
func (c *cmdsRunAndAgent) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
ro := test.preInitState.RuntimeOptions
if ro.TracesOutput.String == "none" {
test.preInitState.TracerProvider = trace.NewNoopTracerProvider()
Expand All @@ -461,7 +465,7 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu
}

func getCmdRun(gs *state.GlobalState) *cobra.Command {
c := &cmdRun{
c := &cmdsRunAndAgent{
gs: gs,
loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) {
test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig)
Expand Down
Loading