kubernetes

package
v3.115.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kubernetes provides coordination between containers in a Kubernetes pod when running Buildkite jobs.

Architecture

In Kubernetes, a job runs across multiple containers within a single pod:

  • Agent container: runs `buildkite-agent start --kubernetes-exec`, receives jobs from Buildkite, and acts as a coordinator.
  • Checkout container: clones the repository.
  • Command container(s): execute the build commands.

These containers need coordination because:

  1. Environment sharing: The agent container receives job details (API tokens, build metadata, plugin configs) from Buildkite. Other containers need this information but Kubernetes doesn't share environment variables between containers.

  2. Sequential execution: The checkout container must complete before command containers start. Kubernetes starts all containers simultaneously by default.

  3. Log aggregation: All container output must be collected and streamed to Buildkite as a single job log.

  4. Cancellation: When a job is cancelled in Buildkite, all containers must be notified to gracefully shut down.

  5. Exit status: The agent must collect exit statuses from all containers to report the final job result.

Components

Runner implements the server side, running in the agent container. It creates a Unix socket and exposes an RPC API for other containers to connect.

Client implements the client side, used by `kubernetes-bootstrap` running in checkout and command containers. It connects to the socket, receives environment variables, and reports logs and exit status.

Flow

1. Agent container starts Runner, which listens on a Unix socket.

2. Each container runs `kubernetes-bootstrap`, which creates a Client and calls Client.Connect to register with the runner and receive environment variables.

3. The client calls Client.StatusLoop, which blocks until the runner signals it can start (ensuring sequential execution).

4. The container executes `buildkite-agent bootstrap`, streaming logs through the socket via Client.Write.

5. On completion, the container calls Client.Exit to report its exit status.

6. If the job is cancelled, the runner broadcasts an interrupt to all connected clients via the status polling mechanism.

Index

Constants

This section is empty.

Variables

View Source
var ErrInterruptBeforeStart = errors.New("job interrupted before starting")

ErrInterruptBeforeStart is returned by StatusLoop when the job state goes directly from Wait to Interrupt (do not pass Go, do not collect $200).

Functions

func Umask

func Umask(mask int) (old int, err error)

Umask is a wrapper for `unix.Umask()` on non-Windows platforms

Types

type Client

type Client struct {
	ID         int
	SocketPath string
	// contains filtered or unexported fields
}

func (*Client) Close

func (c *Client) Close()

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) (*RegisterResponse, error)

Connect establishes a connection to the Agent container in the same k8s pod and registers the client. Because k8s might run the containers "out of order", the server socket might not exist yet, so this method retries the connection with a 1-second interval until the context is cancelled. Callers should use context.WithTimeout to control the connection timeout.

func (*Client) Exit

func (c *Client) Exit(exitStatus int) error

func (*Client) StatusLoop added in v3.97.0

func (c *Client) StatusLoop(ctx context.Context, onInterrupt func(error)) error

StatusLoop starts a goroutine that periodically pings the server for job status. It blocks until it is in the Start state, or it is interrupted early (with or without an error). After returning the goroutine continues pinging for status until the context is closed, calling onInterrupt when it becomes interrupted (with nil) or it encounters an error such as rpc.ServerError.

func (*Client) Write

func (c *Client) Write(p []byte) (int, error)

Write implements io.Writer

type ClientState added in v3.84.0

type ClientState int
const (
	StateNotYetConnected ClientState = iota
	StateConnected
	StateExited
	StateLost
)

type Empty

type Empty struct{}

Empty is an empty RPC message.

type ExitCode

type ExitCode struct {
	ID         int
	ExitStatus int
}

ExitCode is an RPC message that specifies an exit status for a client ID.

type Logs

type Logs struct {
	Data []byte
}

Logs is an RPC message that contains log data.

type RegisterResponse

type RegisterResponse struct {
	Env []string
}

RegisterResponse is an RPC message to registering clients containing info needed to run.

type RunState

type RunState int

RunState is an RPC message that describes to a client whether the job should continue waiting before running, start running, or stop running.

const (
	// RunStateWait means the job is not ready to start executing yet.
	RunStateWait RunState = iota

	// RunStateStart means the job can begin.
	RunStateStart

	// RunStateInterrupt means the job is cancelled or should be terminated for
	// some other reason.
	RunStateInterrupt
)

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner implements the agent's jobRunner interface, but instead of directly managing a subprocess, it runs a socket server that is connected to from another container.

func NewRunner added in v3.84.0

func NewRunner(l logger.Logger, c RunnerConfig) *Runner

NewRunner returns a runner, implementing the agent's jobRunner interface.

func (*Runner) AnyClientIn added in v3.84.0

func (r *Runner) AnyClientIn(state ClientState) bool

AnyClientIn reports whether any of the clients are in a particular state.

func (*Runner) Done

func (r *Runner) Done() <-chan struct{}

Done returns a channel that is closed when the job is completed.

func (*Runner) Exit

func (r *Runner) Exit(args ExitCode, reply *Empty) error

Exit is called when the client exits.

func (*Runner) Interrupt

func (r *Runner) Interrupt() error

Interrupts all clients, triggering graceful shutdown.

func (*Runner) Register

func (r *Runner) Register(id int, reply *RegisterResponse) error

Register is called when the client registers with the runner. The reply contains the env vars that would normally be in the environment of the bootstrap subcommand, particularly, the agent session token.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run runs the socket server.

func (*Runner) Started

func (r *Runner) Started() <-chan struct{}

Started returns a channel that is closed when the job has started running. (At least one client container has connected.)

func (*Runner) Status

func (r *Runner) Status(id int, reply *RunState) error

Status is called by the client to check the status of the job, so that it can pack things up if the job is cancelled. If the client stops calling Status before calling Exit, we assume it is lost.

func (*Runner) Terminate

func (r *Runner) Terminate() error

Terminate allows Run to return immediately, halting the RPC server.

func (*Runner) WaitStatus

func (r *Runner) WaitStatus() process.WaitStatus

WaitStatus returns a wait status that represents all the clients.

func (*Runner) WriteLogs

func (r *Runner) WriteLogs(args Logs, reply *Empty) error

WriteLogs is called to pass logs on to Buildkite.

type RunnerConfig added in v3.84.0

type RunnerConfig struct {
	SocketPath         string
	ClientCount        int
	Stdout, Stderr     io.Writer
	Env                []string
	ClientStartTimeout time.Duration
	ClientLostTimeout  time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL