package main
import (
"log/slog"
"os"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal"
"github.com/cruciblehq/cruxd/internal/cli"
)
// The entry point for the cruxd daemon.
//
// Initializes logging, displays startup information, and executes the root
// command. If any error occurs during execution, it exits with a non-zero code.
func main() {
slog.SetDefault(logger())
slog.Debug("build", "version", internal.VersionString())
slog.Debug("cruxd is running",
"pid", os.Getpid(),
"cwd", cwd(),
"args", os.Args,
)
if err := cli.Execute(); err != nil {
slog.Error(err.Error())
os.Exit(1)
}
}
// Creates a buffered logger seeded from build-time linker flags.
//
// The logger is reconfigured after flag parsing via cli.Execute.
func logger() *slog.Logger {
handler := crex.NewHandler()
handler.SetLevel(logLevel())
return slog.New(handler.WithGroup(internal.Name))
}
// Returns the log level derived from build-time linker flags.
func logLevel() slog.Level {
if internal.IsDebug() {
return slog.LevelDebug
}
if internal.IsQuiet() {
return slog.LevelWarn
}
return slog.LevelInfo
}
// Returns the current working directory or "(unknown)".
func cwd() string {
cwd, err := os.Getwd()
if err != nil {
return "(unknown)"
}
return cwd
}
package build
import (
"context"
"log/slog"
"os"
goruntime "runtime"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal/runtime"
"github.com/cruciblehq/spec/manifest"
"github.com/cruciblehq/spec/paths"
)
// Controls recipe execution.
type Options struct {
Recipe *manifest.Recipe // Recipe to execute.
Resource string // Resource name, used as a prefix for container IDs.
Output string // Directory for the exported image.
Root string // Project root, for resolving copy sources.
Entrypoint []string // OCI entrypoint for the output image (services only).
Platforms []string // Target platforms (e.g., ["linux/amd64"]). Defaults to host.
}
// Returned after successful recipe execution.
type Result struct {
Output string // Directory containing the exported image.
}
// Executes a recipe against the container runtime.
//
// Stages are built in declaration order. Each stage starts a container from
// its base image, executes the stage's steps, and the non-transient stage is
// exported as the final image to the output directory.
func Run(ctx context.Context, rt *runtime.Runtime, opts Options) (*Result, error) {
if len(opts.Platforms) == 0 {
opts.Platforms = []string{"linux/" + goruntime.GOARCH}
}
slog.Info("executing recipe",
"resource", opts.Resource,
"output", opts.Output,
"stages", len(opts.Recipe.Stages),
"platforms", opts.Platforms,
)
if err := os.MkdirAll(opts.Output, paths.DefaultDirMode); err != nil {
return nil, crex.Wrap(ErrFileSystemOperation, err)
}
return newRecipe(rt, opts).build(ctx, opts.Recipe.Stages)
}
package build
import (
"archive/tar"
"context"
"io"
"os"
"path/filepath"
"strings"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal/runtime"
)
// Executes a copy operation, transferring files into the container.
//
// The copy string has the format "src dest" for host copies, or "stage:src
// dest" for cross-stage copies. Host sources are resolved relative to the
// build context. Cross-stage sources are read from a named stage container's
// filesystem.
func executeCopy(ctx context.Context, ctr *runtime.Container, copyStr, workdir, buildCtx string, stages map[string]*runtime.Container) error {
src, dest, err := parseCopy(copyStr, workdir)
if err != nil {
return crex.Wrap(ErrCopy, err)
}
// Ensure the destination parent directory exists.
destDir := filepath.Dir(dest)
if destDir != "" {
if err := ctr.MkdirAll(ctx, destDir); err != nil {
return crex.Wrap(ErrCopy, err)
}
}
// Cross-stage copy: "stage:path".
if stage, path, ok := parseStageCopy(src); ok {
return executeStageCopy(ctx, ctr, stages, stage, path, dest)
}
return executeHostCopy(ctx, ctr, src, dest, buildCtx)
}
// Copies a file or directory from the host into the container.
func executeHostCopy(ctx context.Context, ctr *runtime.Container, src, dest, buildCtx string) error {
if !filepath.IsAbs(src) {
src = filepath.Join(buildCtx, src)
}
info, err := os.Stat(src)
if err != nil {
return crex.Wrap(ErrCopy, err)
}
pr, pw := io.Pipe()
go func() {
tw := tar.NewWriter(pw)
var writeErr error
if info.IsDir() {
writeErr = writeDirToTar(tw, src, filepath.Base(dest))
} else {
writeErr = writeFileToTar(tw, src, filepath.Base(dest))
}
tw.Close()
pw.CloseWithError(writeErr)
}()
if err := ctr.CopyTo(ctx, pr, filepath.Dir(dest)); err != nil {
return crex.Wrap(ErrCopy, err)
}
return nil
}
// Copies a path from a named stage container into the target container.
//
// The tar stream is piped directly from the source container's CopyFrom
// to the target container's CopyTo.
func executeStageCopy(ctx context.Context, ctr *runtime.Container, stages map[string]*runtime.Container, stage, path, dest string) error {
srcCtr, ok := stages[stage]
if !ok {
return crex.Wrapf(ErrCopy, "unknown stage %q", stage)
}
pr, pw := io.Pipe()
errc := make(chan error, 1)
go func() {
errc <- srcCtr.CopyFrom(ctx, pw, path)
pw.Close()
}()
if err := ctr.CopyTo(ctx, pr, filepath.Dir(dest)); err != nil {
return crex.Wrap(ErrCopy, err)
}
if err := <-errc; err != nil {
return crex.Wrap(ErrCopy, err)
}
return nil
}
// Parses a cross-stage copy source of the form "stage:path".
//
// Returns the stage name, the path within the stage, and true if the source
// matches the cross-stage format. Returns false if it is a regular host path.
func parseStageCopy(src string) (stage, path string, ok bool) {
i := strings.IndexByte(src, ':')
if i < 1 {
return "", "", false
}
// A colon after a path separator is not a stage prefix (e.g. "/foo:bar").
if strings.ContainsRune(src[:i], '/') {
return "", "", false
}
return src[:i], src[i+1:], true
}
// Parses a copy string into source and destination paths.
//
// The string must contain exactly two whitespace-separated tokens. If dest
// is not absolute, it is joined with workdir.
func parseCopy(s, workdir string) (src, dest string, err error) {
parts := strings.Fields(s)
if len(parts) != 2 {
return "", "", crex.Wrapf(ErrCopy, "missing source or destination in %q", s)
}
src = parts[0]
dest = parts[1]
if !filepath.IsAbs(dest) {
if workdir == "" {
return "", "", crex.Wrapf(ErrCopy, "relative dest %q requires workdir", dest)
}
dest = filepath.Join(workdir, dest)
}
return src, dest, nil
}
// Writes a single file to a tar writer with the given archive name.
func writeFileToTar(tw *tar.Writer, hostPath, name string) error {
info, err := os.Stat(hostPath)
if err != nil {
return err
}
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
header.Name = name
if err := tw.WriteHeader(header); err != nil {
return err
}
f, err := os.Open(hostPath)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(tw, f)
return err
}
// Writes a directory tree to a tar writer rooted at the given archive prefix.
func writeDirToTar(tw *tar.Writer, hostDir, prefix string) error {
return filepath.WalkDir(hostDir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
relPath, err := filepath.Rel(hostDir, path)
if err != nil {
return err
}
archivePath := filepath.ToSlash(filepath.Join(prefix, relPath))
return writeTarEntry(tw, path, archivePath, d)
})
}
// Writes a single file or directory entry to a tar writer.
func writeTarEntry(tw *tar.Writer, hostPath, archivePath string, d os.DirEntry) error {
info, err := d.Info()
if err != nil {
return err
}
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
header.Name = archivePath
if err := tw.WriteHeader(header); err != nil {
return err
}
if info.Mode().IsRegular() {
f, err := os.Open(hostPath)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(tw, f)
return err
}
return nil
}
package build
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal/runtime"
"github.com/cruciblehq/spec/manifest"
"github.com/cruciblehq/spec/paths"
"github.com/cruciblehq/spec/protocol"
)
// Holds shared state for building all stages of a recipe.
type recipe struct {
rt *runtime.Runtime // Container runtime for image and container operations.
resource string // Resource name, used as a prefix for container IDs.
output string // Output directory for the final build artifact.
context string // Directory containing the manifest, root for resolving copy sources.
entrypoint []string // OCI entrypoint to set on the output image (services only).
platforms []string // Target platforms to build for.
containers []*runtime.Container // All stage containers across all platforms, destroyed after the build completes.
}
// Creates a new [recipe] from the given options.
func newRecipe(rt *runtime.Runtime, opts Options) *recipe {
return &recipe{
rt: rt,
resource: opts.Resource,
output: opts.Output,
context: opts.Root,
entrypoint: opts.Entrypoint,
platforms: opts.Platforms,
}
}
// Builds the recipe end-to-end against the container runtime.
//
// Each target platform is built independently. Stages are built in declaration
// order for each platform. The non-transient stage is exported as the final
// image to the platform's output directory. All stage containers are destroyed
// when the build completes.
func (r *recipe) build(ctx context.Context, recipeStages []manifest.Stage) (*Result, error) {
// Use a background context for cleanup so containers are always destroyed,
// even if the parent context was cancelled (e.g., client disconnect).
defer r.destroyContainers(context.Background())
for _, platform := range r.platforms {
if err := r.buildPlatform(ctx, recipeStages, platform); err != nil {
return nil, err
}
}
return &Result{Output: r.output}, nil
}
// Builds all stages of the recipe for a single platform.
//
// Each platform maintains its own set of named stage containers for
// cross-stage copy lookups. The output is written to a platform-specific
// subdirectory when building for multiple platforms.
func (r *recipe) buildPlatform(ctx context.Context, recipeStages []manifest.Stage, platform string) error {
slog.Info("building platform", "platform", platform)
output := r.platformOutput(platform)
if err := os.MkdirAll(output, paths.DefaultDirMode); err != nil {
return crex.Wrap(ErrFileSystemOperation, err)
}
stages := make(map[string]*runtime.Container)
for i, stage := range recipeStages {
if err := r.buildStage(ctx, stage, i, platform, output, stages); err != nil {
return crex.Wrapf(ErrBuild, "platform %s, stage %s: %w", platform, stageLabel(stage.Name, i), err)
}
}
return nil
}
// Builds a single stage of a recipe for a specific platform.
//
// Resolves the stage's base image, starts a build container, executes the
// stage's steps, then commits the result. Non-transient stages are exported
// to the output directory.
func (r *recipe) buildStage(ctx context.Context, stage manifest.Stage, index int, platform, output string, stages map[string]*runtime.Container) error {
label := stageLabel(stage.Name, index)
slog.Info(fmt.Sprintf("building stage %s", label), "platform", platform)
ctr, err := r.startStageContainer(ctx, stage, index, platform)
if err != nil {
return err
}
r.containers = append(r.containers, ctr)
if stage.Name != "" {
stages[stage.Name] = ctr
}
if err := executeSteps(ctx, ctr, stage.Steps, newStepState(), r.context, stages); err != nil {
return err
}
if !stage.Transient {
return r.exportStage(ctx, ctr, output)
}
return nil
}
// Resolves the base image source and starts the stage container.
func (r *recipe) startStageContainer(ctx context.Context, stage manifest.Stage, index int, platform string) (*runtime.Container, error) {
src, err := r.resolveImageSource(stage)
if err != nil {
return nil, err
}
id := r.containerID(stage.Name, index, platform)
var ctr *runtime.Container
switch src.Type {
case manifest.SourceFile:
ctr, err = r.rt.StartContainer(ctx, src.Value, id, platform)
case manifest.SourceOCI:
ctr, err = r.rt.StartContainerFromOCI(ctx, src.Value, id, platform)
default:
return nil, crex.Wrapf(ErrBuild, "unsupported source type %q", src.Type)
}
if err != nil {
return nil, crex.Wrap(runtime.ErrRuntime, err)
}
return ctr, nil
}
// Resolves the stage's base image source.
//
// For file sources, relative paths are resolved against the build context
// directory. OCI references (single-token image names like "alpine:3.21")
// are returned as-is for the runtime to pull from a container registry.
func (r *recipe) resolveImageSource(stage manifest.Stage) (manifest.Source, error) {
src, err := stage.ParseFrom()
if err != nil {
return manifest.Source{}, err
}
if src.Type == manifest.SourceFile && !filepath.IsAbs(src.Value) {
src.Value = filepath.Join(r.context, src.Value)
}
return src, nil
}
// Stops the container and exports it as the final image.
func (r *recipe) exportStage(ctx context.Context, ctr *runtime.Container, output string) error {
if err := ctr.Stop(ctx); err != nil {
return crex.Wrap(runtime.ErrRuntime, err)
}
if err := ctr.Export(ctx, output, r.entrypoint); err != nil {
return crex.Wrap(runtime.ErrRuntime, err)
}
return nil
}
// Destroys all stage containers.
func (r *recipe) destroyContainers(ctx context.Context) {
for _, ctr := range r.containers {
ctr.Destroy(ctx)
}
}
// Returns a unique container ID for a stage, scoped to this resource and platform.
//
// If resource namescontain any slashes (e.g., "crucible/runtime-go"), they are
// replaced with dashes to ensure the resulting container ID is valid. The stage
// name is included when available for readability; otherwise, the 1-based stage
// index is used.
func (r *recipe) containerID(name string, index int, platform string) string {
resource := protocol.ContainerID(r.resource)
slug := platformSlug(platform)
if name != "" {
return fmt.Sprintf("%s-%s-stage-%s", resource, slug, name)
}
return fmt.Sprintf("%s-%s-stage-%d", resource, slug, index+1)
}
// Returns the output directory for a specific platform.
//
// When building for a single platform, the output directory is left as-is
// to preserve the existing {output}/image.tar convention. For multi-platform
// builds, each platform gets a subdirectory (e.g., {output}/linux-amd64).
func (r *recipe) platformOutput(platform string) string {
if len(r.platforms) == 1 {
return r.output
}
return filepath.Join(r.output, platformSlug(platform))
}
// Converts a platform string to a filesystem-safe slug.
//
// Replaces slashes with dashes (e.g., "linux/amd64" becomes "linux-amd64").
func platformSlug(platform string) string {
return strings.ReplaceAll(platform, "/", "-")
}
// Returns a label for a stage, preferring the name when available and falling
// back to the 1-based index.
func stageLabel(name string, index int) string {
if name != "" {
return fmt.Sprintf("%q", name)
}
return fmt.Sprintf("%d", index+1)
}
package build
import (
"context"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal/runtime"
"github.com/cruciblehq/spec/manifest"
)
// Executes a list of steps in order against the build container.
func executeSteps(ctx context.Context, ctr *runtime.Container, steps []manifest.Step, state *stepState, buildCtx string, stages map[string]*runtime.Container) error {
for i, step := range steps {
if err := executeStep(ctx, ctr, step, state, buildCtx, stages); err != nil {
return crex.Wrapf(ErrBuild, "step %d: %w", i+1, err)
}
}
return nil
}
// Executes a single step, dispatching to operation execution, group recursion,
// or state mutation depending on the step's fields.
func executeStep(ctx context.Context, ctr *runtime.Container, step manifest.Step, state *stepState, buildCtx string, stages map[string]*runtime.Container) error {
hasOp := step.Run != "" || step.Copy != ""
// Platform group: apply group-level modifiers and recurse.
if len(step.Steps) > 0 {
state.apply(step)
return executeSteps(ctx, ctr, step.Steps, state, buildCtx, stages)
}
// Operation with optional scoped modifiers.
if hasOp {
return executeOperation(ctx, ctr, step, state, buildCtx, stages)
}
// Standalone modifier(s): persist in state.
state.apply(step)
return nil
}
// Executes a run or copy operation with scoped modifier overrides.
//
// Step-level modifiers override the persistent state for this operation only.
// The persistent state is not modified.
func executeOperation(ctx context.Context, ctr *runtime.Container, step manifest.Step, state *stepState, buildCtx string, stages map[string]*runtime.Container) error {
resolved := state.resolve(step)
if resolved.workdir != "" {
if err := ctr.MkdirAll(ctx, resolved.workdir); err != nil {
return err
}
}
switch {
case step.Run != "":
result, err := ctr.Exec(ctx, resolved.shell, step.Run, resolved.environ(), resolved.workdir)
if err != nil {
return err
}
if result.ExitCode != 0 {
return crex.Wrapf(ErrCommandFailed, "exit code %d: %s", result.ExitCode, result.Stderr)
}
case step.Copy != "":
if err := executeCopy(ctx, ctr, step.Copy, resolved.workdir, buildCtx, stages); err != nil {
return err
}
}
return nil
}
package build
import (
"maps"
"github.com/cruciblehq/spec/manifest"
)
// Default shell used for run steps when no shell modifier has been set.
const defaultShell = "/bin/sh"
// Tracks accumulated modifiers during step execution.
//
// State flows linearly through the step list. Standalone modifiers update
// the state permanently via apply. Operations read the effective values for
// a single step via resolve without modifying the persistent state.
type stepState struct {
shell string
workdir string
env map[string]string
}
// Creates a new [stepState] with default values.
func newStepState() *stepState {
return &stepState{
shell: defaultShell,
env: make(map[string]string),
}
}
// Persists modifier fields from a step into the state.
//
// Called for standalone modifier steps and platform groups. The state is
// mutated permanently, affecting all subsequent steps.
func (s *stepState) apply(step manifest.Step) {
if step.Shell != "" {
s.shell = step.Shell
}
if step.Workdir != "" {
s.workdir = step.Workdir
}
maps.Copy(s.env, step.Env)
}
// Returns a new [stepState] with step-level modifiers overlaid on the
// persistent state. The receiver is not modified.
//
// Step-level modifiers override the corresponding state values for this
// operation only.
func (s *stepState) resolve(step manifest.Step) *stepState {
resolved := &stepState{
shell: s.shell,
workdir: s.workdir,
env: make(map[string]string, len(s.env)+len(step.Env)),
}
maps.Copy(resolved.env, s.env)
maps.Copy(resolved.env, step.Env)
if step.Shell != "" {
resolved.shell = step.Shell
}
if step.Workdir != "" {
resolved.workdir = step.Workdir
}
return resolved
}
// Formats the environment as a list of "key=value" strings suitable for
// passing to container exec.
func (s *stepState) environ() []string {
env := make([]string, 0, len(s.env))
for k, v := range s.env {
env = append(env, k+"="+v)
}
return env
}
package cli
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/alecthomas/kong"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal"
)
// Represents the root command for the cruxd daemon.
var RootCmd struct {
Quiet bool `short:"q" help:"Suppress informational output."`
Verbose bool `short:"v" help:"Enable verbose output."`
Debug bool `short:"d" help:"Enable debug output."`
Socket string `short:"s" help:"Override the default Unix socket path." placeholder:"PATH"`
PIDFile string `help:"Override the default PID file path." placeholder:"PATH"`
ReadyFD int `help:"File descriptor to signal readiness on." default:"-1" placeholder:"FD"`
Start StartCmd `cmd:"" help:"Start the daemon."`
Version VersionCmd `cmd:"" help:"Show version information."`
}
// Parses arguments, configures logging, and runs the selected subcommand.
func Execute() error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
kongCtx := kong.Parse(&RootCmd,
kong.Name(internal.Name),
kong.Description("The Crucible daemon.\n\nListens on a Unix domain socket for commands from the crux CLI."),
kong.UsageOnError(),
kong.Vars{
"version": internal.VersionString(),
},
kong.BindTo(ctx, (*context.Context)(nil)),
)
configureLogger()
return kongCtx.Run()
}
// Configures the global logger based on CLI flags.
func configureLogger() {
handler, ok := slog.Default().Handler().(crex.Handler)
if !ok {
return // Not a crex.Handler, nothing to configure
}
debug := RootCmd.Debug || internal.IsDebug()
quiet := RootCmd.Quiet || internal.IsQuiet()
verbose := RootCmd.Verbose || internal.IsVerbose()
// Configure formatter
formatter := crex.NewPrettyFormatter(isatty(os.Stderr))
formatter.SetVerbose(verbose)
// Configure handler
if debug {
handler.SetLevel(slog.LevelDebug)
} else if quiet {
handler.SetLevel(slog.LevelWarn)
} else {
handler.SetLevel(slog.LevelInfo)
}
// Commit
handler.SetFormatter(formatter)
handler.SetStream(os.Stderr)
handler.Flush()
}
// Whether the given file is an interactive terminal.
func isatty(f *os.File) bool {
info, err := f.Stat()
if err != nil {
return false
}
return (info.Mode() & os.ModeCharDevice) != 0
}
package cli
import (
"context"
"log/slog"
"github.com/cruciblehq/cruxd/internal/server"
)
// Represents the 'cruxd start' command.
type StartCmd struct{}
// Executes the start command.
//
// Starts the gRPC server on a Unix domain socket and blocks until the context
// is cancelled (e.g. via SIGINT or SIGTERM).
func (c *StartCmd) Run(ctx context.Context) error {
srv, err := server.New(server.Config{
SocketPath: RootCmd.Socket,
PIDFilePath: RootCmd.PIDFile,
ReadyFD: RootCmd.ReadyFD,
})
if err != nil {
return err
}
if err := srv.Start(); err != nil {
return err
}
slog.Info("cruxd is running")
<-ctx.Done()
slog.Info("shutting down")
return srv.Stop()
}
package cli
import (
"context"
"fmt"
"github.com/cruciblehq/cruxd/internal"
)
// Represents the 'cruxd version' command.
type VersionCmd struct{}
// Executes the version command.
func (c *VersionCmd) Run(ctx context.Context) error {
fmt.Println(internal.VersionString())
return nil
}
package internal
import (
"strconv"
"sync/atomic"
)
var (
quietMode atomic.Bool // Indicates whether quiet mode is enabled.
debugMode atomic.Bool // Indicates whether debug logging is enabled.
verboseMode atomic.Bool // Indicates whether verbose logging is enabled.
)
// Parses the linker flags into usable runtime variables.
//
// The rawQuiet, rawDebug, and rawVerbose variables should be set via ldflags
// during the build process. If not set, they default to "false".
func init() {
if v, err := strconv.ParseBool(rawQuiet); err == nil {
quietMode.Store(v)
}
if v, err := strconv.ParseBool(rawDebug); err == nil {
debugMode.Store(v)
}
if v, err := strconv.ParseBool(rawVerbose); err == nil {
verboseMode.Store(v)
}
}
// Enables or disables quiet mode.
func SetQuiet(enabled bool) {
quietMode.Store(enabled)
}
// Returns true if quiet mode is enabled.
func IsQuiet() bool {
return quietMode.Load()
}
// Enables or disables debug mode.
func SetDebug(enabled bool) {
debugMode.Store(enabled)
}
// Returns true if debug mode is enabled.
func IsDebug() bool {
return debugMode.Load()
}
// Enables or disables verbose logging.
func SetVerbose(enabled bool) {
verboseMode.Store(enabled)
}
// Returns true if verbose logging is enabled.
func IsVerbose() bool {
return verboseMode.Load()
}
package runtime
import (
"context"
"log/slog"
"syscall"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/spec/protocol"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// A running build container backed by containerd.
type Container struct {
client *containerd.Client // Containerd client for managing the container.
id string // Unique identifier for the container, used as the containerd container ID.
platform string // OCI platform (e.g., "linux/amd64").
}
// Queries the current state of the container.
//
// Returns [protocol.ContainerRunning] if the task is active,
// [protocol.ContainerStopped] if the container exists but has no running
// task, or [protocol.ContainerNotCreated] if the container does not exist.
func (c *Container) Status(ctx context.Context) (protocol.ContainerState, error) {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
if errdefs.IsNotFound(err) {
return protocol.ContainerNotCreated, nil
}
return "", crex.Wrap(ErrRuntime, err)
}
task, err := ctr.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
return protocol.ContainerStopped, nil
}
return "", crex.Wrap(ErrRuntime, err)
}
status, err := task.Status(ctx)
if err != nil {
return "", crex.Wrap(ErrRuntime, err)
}
switch status.Status {
case containerd.Running:
return protocol.ContainerRunning, nil
default:
return protocol.ContainerStopped, nil
}
}
// Stops the container's task.
//
// The running task is killed and deleted. The container metadata is preserved.
// Calling Stop on an already-stopped container is not an error.
func (c *Container) Stop(ctx context.Context) error {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return crex.Wrap(ErrRuntime, err)
}
task, err := ctr.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return crex.Wrap(ErrRuntime, err)
}
task.Kill(ctx, syscall.SIGKILL)
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return crex.Wrap(ErrRuntime, err)
}
return nil
}
// Removes the container and its resources.
//
// The task is killed and the container is removed from containerd along
// with its snapshot. After destruction the handle is invalid.
func (c *Container) Destroy(ctx context.Context) {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
if !errdefs.IsNotFound(err) {
slog.Error("failed to load container for destruction", "id", c.id, "error", err)
}
return
}
if task, err := ctr.Task(ctx, nil); err == nil {
task.Kill(ctx, syscall.SIGKILL)
task.Delete(ctx, containerd.WithProcessKill)
}
if err := ctr.Delete(ctx, containerd.WithSnapshotCleanup); err != nil && !errdefs.IsNotFound(err) {
slog.Error("failed to delete container during destruction", "id", c.id, "error", err)
}
}
// Starts a new task on an existing container.
//
// Any leftover task from a previous run is cleaned up first. The container
// must already exist; use [Container.create] for initial creation.
func (c *Container) Start(ctx context.Context) error {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
return err
}
// Delete any stale task left over from a prior run.
if task, err := ctr.Task(ctx, nil); err == nil {
task.Kill(ctx, syscall.SIGKILL)
task.Delete(ctx, containerd.WithProcessKill)
}
return c.startTask(ctx, ctr)
}
// Creates the containerd container with the standard configuration.
//
// Spec options are applied sequentially. Each one mutates the OCI spec in
// place, so extraOpts appended after the base options can override values
// set by WithImageConfig (last writer wins). Build containers use this to
// replace the image entrypoint with "sleep infinity".
func (c *Container) create(ctx context.Context, image containerd.Image, extraOpts ...oci.SpecOpts) (containerd.Container, error) {
specOpts := []oci.SpecOpts{
oci.WithDefaultSpecForPlatform(c.platform),
oci.WithImageConfig(image),
oci.WithHostNamespace(specs.NetworkNamespace),
oci.WithHostResolvconf,
}
specOpts = append(specOpts, extraOpts...)
return c.client.NewContainer(ctx, c.id,
containerd.WithImage(image),
containerd.WithSnapshotter(snapshotter),
containerd.WithNewSnapshot(c.id, image),
containerd.WithRuntime(ociRuntime, nil),
containerd.WithNewSpec(specOpts...),
)
}
// Starts the container's long-running task with no attached IO.
func (c *Container) startTask(ctx context.Context, ctr containerd.Container) error {
task, err := ctr.NewTask(ctx, cio.NullIO)
if err != nil {
return err
}
if err := task.Start(ctx); err != nil {
task.Delete(ctx)
return err
}
return nil
}
// Removes an existing container with this ID, if one exists.
//
// Any running task is killed and the container is deleted along with its
// snapshot. This is a no-op when no container with the ID is found.
func (c *Container) remove(ctx context.Context) {
existing, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
return
}
if task, err := existing.Task(ctx, nil); err == nil {
task.Kill(ctx, syscall.SIGKILL)
task.Delete(ctx, containerd.WithProcessKill)
}
existing.Delete(ctx, containerd.WithSnapshotCleanup)
}
package runtime
import (
"context"
"io"
"path/filepath"
"github.com/cruciblehq/crex"
)
// Creates a directory inside the container, including parents.
func (c *Container) MkdirAll(ctx context.Context, path string) error {
return c.mustExec(ctx, "mkdir", nil, nil, "mkdir", "-p", path)
}
// Copies a tar stream into the container's filesystem.
//
// The contents of r are extracted into destDir by piping them to "tar xf - -C
// destDir" inside the container.
func (c *Container) CopyTo(ctx context.Context, r io.Reader, destDir string) error {
return c.mustExec(ctx, "tar extract", r, nil, "tar", "xf", "-", "-C", destDir)
}
// Copies a path from the container's filesystem as a tar stream.
//
// The file or directory at path is archived by running "tar cf - -C <dir>
// <base>" inside the container and streaming the output to w.
func (c *Container) CopyFrom(ctx context.Context, w io.Writer, path string) error {
return c.mustExec(ctx, "tar archive", nil, w, "tar", "cf", "-", "-C", filepath.Dir(path), filepath.Base(path))
}
// Helper method that runs a command inside the container, returning an error
// that includes desc if the process exits with a non-zero code.
func (c *Container) mustExec(ctx context.Context, desc string, stdin io.Reader, stdout io.Writer, args ...string) error {
exitCode, stderr, err := c.execCommand(ctx, stdin, stdout, nil, "", args...)
if err != nil {
return err
}
if exitCode != 0 {
return crex.Wrapf(ErrRuntime, "%s failed with exit code %d (%s)", desc, exitCode, stderr)
}
return nil
}
package runtime
import (
"io"
"sync"
)
// Wraps an [io.Reader] and signals when it returns [io.EOF].
//
// The done channel is closed exactly once on the first EOF, making it safe to
// use from multiple goroutines.
type doneReader struct {
r io.Reader
once sync.Once
done chan struct{}
}
// Creates a new [doneReader] wrapping the given reader.
func newDoneReader(r io.Reader) *doneReader {
return &doneReader{r: r, done: make(chan struct{})}
}
// Delegates to the underlying reader.
//
// Closes the done channel on the first [io.EOF]. Non-EOF errors are returned
// without closing the channel.
func (d *doneReader) Read(p []byte) (int, error) {
n, err := d.r.Read(p)
if err == io.EOF {
d.once.Do(func() { close(d.done) })
}
return n, err
}
package runtime
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"sync/atomic"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/cruciblehq/crex"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// Sequence counter for generating unique exec process identifiers.
var execSeq uint64
// Returns a unique exec process identifier.
func nextExecID() string {
return fmt.Sprintf("exec-%d", atomic.AddUint64(&execSeq, 1))
}
// Output of a command execution inside a container.
type ExecResult struct {
ExitCode int // Exit code of the process.
Stdout string // Captured standard output.
Stderr string // Captured standard error.
}
// Runs a command inside the container.
//
// The command is passed to the shell as a single argument via "shell -c
// command". Environment variables and working directory override the
// container's OCI spec for this execution only.
func (c *Container) Exec(ctx context.Context, shell, command string, env []string, workdir string) (*ExecResult, error) {
var stdout bytes.Buffer
exitCode, stderr, err := c.execCommand(ctx, nil, &stdout, env, workdir, shell, "-c", command)
if err != nil {
return nil, err
}
return &ExecResult{
ExitCode: exitCode,
Stdout: stdout.String(),
Stderr: stderr,
}, nil
}
// Runs a command and arguments directly inside the container.
//
// Unlike [Exec], which passes a command string to a shell, ExecArgs runs the
// command directly without shell wrapping. This is suitable for CLI-invoked
// exec where the user provides the full command line.
func (c *Container) ExecArgs(ctx context.Context, args []string) (*ExecResult, error) {
pspec, err := c.buildProcessSpec(ctx, nil, "", args...)
if err != nil {
return nil, err
}
var stdout, stderr bytes.Buffer
exitCode, err := c.execProcess(ctx, pspec, nil, &stdout, &stderr)
if err != nil {
return nil, err
}
return &ExecResult{
ExitCode: exitCode,
Stdout: stdout.String(),
Stderr: stderr.String(),
}, nil
}
// Builds an OCI process spec for running a command inside the container.
//
// A process spec defines everything needed to start a process: the command
// and arguments, environment variables, working directory, and terminal mode.
// The base values are copied from the container's own OCI spec, then env and
// workdir are overridden if provided.
func (c *Container) buildProcessSpec(ctx context.Context, env []string, workdir string, args ...string) (*specs.Process, error) {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
return nil, err
}
spec, err := ctr.Spec(ctx)
if err != nil {
return nil, err
}
pspec := *spec.Process
pspec.Terminal = false
pspec.Args = args
if len(env) > 0 {
pspec.Env = mergeEnv(pspec.Env, env)
}
if workdir != "" {
pspec.Cwd = workdir
}
return &pspec, nil
}
// Merges override env vars on top of a base env slice.
func mergeEnv(base, overrides []string) []string {
merged := make(map[string]string, len(base)+len(overrides))
for _, entry := range base {
if k, v, ok := strings.Cut(entry, "="); ok {
merged[k] = v
}
}
for _, entry := range overrides {
if k, v, ok := strings.Cut(entry, "="); ok {
merged[k] = v
}
}
result := make([]string, 0, len(merged))
for k, v := range merged {
result = append(result, k+"="+v)
}
return result
}
// Runs a command inside the container, returning the exit code and captured
// stderr. Builds the process spec from args, then delegates to execProcess.
// A non-zero exit code is not treated as an error; the caller decides.
func (c *Container) execCommand(ctx context.Context, stdin io.Reader, stdout io.Writer, env []string, workdir string, args ...string) (int, string, error) {
pspec, err := c.buildProcessSpec(ctx, env, workdir, args...)
if err != nil {
return 0, "", crex.Wrap(ErrRuntime, err)
}
var stderr bytes.Buffer
exitCode, err := c.execProcess(ctx, pspec, stdin, stdout, &stderr)
if err != nil {
return 0, "", err
}
return exitCode, stderr.String(), nil
}
// Starts a process inside the container's running task, waits for it to exit,
// and returns the exit code.
//
// The process is attached to the task as an additional exec, not as the
// primary process. This requires the task to already be running (started by
// [Container.startTask] during container creation). stdin, stdout, and stderr
// are connected to the process. Nil streams are replaced with io.Discard
// (stdout/stderr) or left disconnected (stdin). A non-zero exit code is not
// treated as an error; the caller decides how to handle it.
//
// When stdin is provided, the container's stdin is explicitly closed after the
// reader returns EOF so the exec process receives the EOF signal. This is
// required because the containerd shim holds both ends of the stdin FIFO open
// and will not propagate EOF on its own.
func (c *Container) execProcess(ctx context.Context, pspec *specs.Process, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
task, err := c.loadTask(ctx)
if err != nil {
return 0, err
}
if stdout == nil {
stdout = io.Discard
}
if stderr == nil {
stderr = io.Discard
}
// Wrap stdin to detect when the reader returns EOF.
var stdinDone <-chan struct{}
if stdin != nil {
dr := newDoneReader(stdin)
stdin = dr
stdinDone = dr.done
}
process, err := task.Exec(ctx, nextExecID(), pspec, cio.NewCreator(
cio.WithStreams(stdin, stdout, stderr),
))
if err != nil {
return 0, crex.Wrap(ErrRuntime, err)
}
return awaitProcess(ctx, process, stdinDone)
}
// Loads the container's running task.
func (c *Container) loadTask(ctx context.Context) (containerd.Task, error) {
ctr, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
task, err := ctr.Task(ctx, nil)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
return task, nil
}
// Waits for an exec process to exit and returns the exit code.
//
// The process is started, then the function blocks until it exits. If
// stdinDone is non-nil, the process stdin is closed when the channel fires
// so the exec process receives EOF. The process is always deleted before
// returning.
func awaitProcess(ctx context.Context, process containerd.Process, stdinDone <-chan struct{}) (int, error) {
statusC, err := process.Wait(ctx)
if err != nil {
process.Delete(ctx)
return 0, crex.Wrap(ErrRuntime, err)
}
if err := process.Start(ctx); err != nil {
process.Delete(ctx)
return 0, crex.Wrap(ErrRuntime, err)
}
// Close the container's stdin after the reader is exhausted. Without this
// the shim keeps its write end of the stdin FIFO open and the exec process
// never receives EOF.
if stdinDone != nil {
go func() {
<-stdinDone
process.CloseIO(ctx, containerd.WithStdinCloser)
}()
}
exitStatus := <-statusC
process.Delete(ctx)
code, _, err := exitStatus.Result()
if err != nil {
return 0, crex.Wrap(ErrRuntime, err)
}
return int(code), nil
}
package runtime
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/images/archive"
"github.com/containerd/containerd/v2/pkg/rootfs"
"github.com/containerd/platforms"
"github.com/cruciblehq/crex"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// Filename of the OCI archive produced by Export.
const exportFilename = "image.tar"
// Commits the container's filesystem changes and exports the result as an
// OCI archive.
//
// The diff between the container's snapshot and its parent is stored as a
// new layer. If entrypoint is non-empty it is set on the image config. The
// resulting image is written to output/image.tar. The stored image record
// in containerd is never modified. The mutated manifest, config, and index
// are written to the content store as ephemeral blobs and referenced only
// during the export. A content lease protects these blobs from garbage
// collection until the export completes.
func (c *Container) Export(ctx context.Context, output string, entrypoint []string) error {
loaded, err := c.client.LoadContainer(ctx, c.id)
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
info, err := loaded.Info(ctx)
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
layer, diffID, err := c.snapshotDiff(ctx, info)
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
// Acquire a content lease so the ephemeral blobs written by
// buildExportTarget survive until the archive export finishes.
// Without a lease, containerd's GC scheduler may collect them
// between the write and the export.
ctx, done, err := c.client.WithLease(ctx)
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
defer done(context.Background())
target, err := c.buildExportTarget(ctx, info.Image, func(manifest *ocispec.Manifest, config *ocispec.Image) {
manifest.Layers = append(manifest.Layers, layer)
config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, diffID)
if len(entrypoint) > 0 {
config.Config.Entrypoint = entrypoint
config.Config.Cmd = nil
}
})
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
exportPath := filepath.Join(output, exportFilename)
if err := c.exportImage(ctx, target, info.Image, exportPath); err != nil {
return crex.Wrap(ErrRuntime, err)
}
slog.Info("image exported", "path", exportPath)
return nil
}
// Computes the diff between the container's snapshot and its parent, returning
// the layer descriptor and its diff ID without modifying the image.
func (c *Container) snapshotDiff(ctx context.Context, info containers.Container) (ocispec.Descriptor, digest.Digest, error) {
layer, err := rootfs.CreateDiff(ctx,
info.SnapshotKey,
c.client.SnapshotService(info.Snapshotter),
c.client.DiffService(),
)
if err != nil {
return ocispec.Descriptor{}, "", err
}
diffID, err := images.GetDiffID(ctx, c.client.ContentStore(), layer)
if err != nil {
return ocispec.Descriptor{}, "", err
}
return layer, diffID, nil
}
// Writes the image to an OCI tar archive at the given path.
//
// The target descriptor is exported directly via [archive.WithManifest]
// rather than looking up the image by name. This allows the caller to
// export ephemeral content (e.g., a mutated manifest with an extra layer)
// without modifying the stored image record. The image name is attached
// as the OCI reference annotation on the archive entry. When the target
// is a multi-platform index, only the manifest matching the container's
// platform is included.
func (c *Container) exportImage(ctx context.Context, target ocispec.Descriptor, imageName, path string) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
p, err := platforms.Parse(c.platform)
if err != nil {
return err
}
return c.client.Export(ctx, f,
archive.WithManifest(target, imageName),
archive.WithPlatform(platforms.Only(p)),
)
}
// Builds the export target descriptor by applying a mutation to the image's
// manifest and config.
//
// The mutated manifest, config, and (when the root is an index) a new
// single-entry index are written to the content store as ephemeral blobs.
// The stored image record is never modified, so subsequent builds always
// see the original, clean image pulled from the registry.
func (c *Container) buildExportTarget(ctx context.Context, imageName string, mutate func(*ocispec.Manifest, *ocispec.Image)) (ocispec.Descriptor, error) {
is := c.client.ImageService()
img, err := is.Get(ctx, imageName)
if err != nil {
return ocispec.Descriptor{}, err
}
target, index, manifestIdx, err := c.resolveManifestDescriptor(ctx, img.Target, imageName)
if err != nil {
return ocispec.Descriptor{}, err
}
newManifestDesc, err := c.mutateManifest(ctx, target, imageName, mutate)
if err != nil {
return ocispec.Descriptor{}, err
}
return c.buildImageTarget(ctx, img.Target, index, manifestIdx, newManifestDesc, imageName)
}
// Resolves the image root descriptor to a platform-specific manifest.
//
// If the root is an OCI Image Index, the index is read and walked to find
// the manifest matching the container's platform. Returns the manifest
// descriptor, the index (nil when the root is al+,,ready a manifest), and the
// position of the manifest within the index.
//
// Some registries (notably Docker Hub) serve index entries without explicit
// platform metadata. When a descriptor lacks a platform field, the manifest
// and its config are read to extract the platform from the image config, the
// same fallback that containerd's images.Manifest uses internally.
func (c *Container) resolveManifestDescriptor(ctx context.Context, root ocispec.Descriptor, imageName string) (ocispec.Descriptor, *ocispec.Index, int, error) {
if !images.IsIndexType(root.MediaType) {
return root, nil, 0, nil
}
idx, err := c.readIndex(ctx, root)
if err != nil {
return ocispec.Descriptor{}, nil, 0, err
}
p, err := platforms.Parse(c.platform)
if err != nil {
return ocispec.Descriptor{}, nil, 0, err
}
i, ok := c.matchManifest(ctx, idx, platforms.OnlyStrict(p))
if ok {
return idx.Manifests[i], &idx, i, nil
}
if len(idx.Manifests) == 0 {
return ocispec.Descriptor{}, nil, 0, crex.Wrapf(ErrEmptyIndex, "%s", imageName)
}
return idx.Manifests[0], &idx, 0, nil
}
// Searches the index for a manifest matching the given platform.
//
// Descriptors with an explicit platform field are checked first. If none
// match, descriptors without a platform field are probed by reading the
// image config to discover the platform (the "ConfigPlatform" fallback).
// Returns the index position and true when a match is found.
func (c *Container) matchManifest(ctx context.Context, idx ocispec.Index, matcher platforms.MatchComparer) (int, bool) {
for i, m := range idx.Manifests {
if m.Platform != nil && matcher.Match(*m.Platform) {
return i, true
}
}
for i, m := range idx.Manifests {
if m.Platform != nil || !images.IsManifestType(m.MediaType) {
continue
}
if p, ok := c.configPlatform(ctx, m); ok && matcher.Match(p) {
return i, true
}
}
return 0, false
}
// Reads the image config referenced by a manifest descriptor and returns the
// platform declared in the config.
//
// Returns false when the config cannot be read.
func (c *Container) configPlatform(ctx context.Context, desc ocispec.Descriptor) (ocispec.Platform, bool) {
manifest, err := c.readManifest(ctx, desc)
if err != nil {
return ocispec.Platform{}, false
}
config, err := c.readConfig(ctx, manifest.Config)
if err != nil {
return ocispec.Platform{}, false
}
return ocispec.Platform{
OS: config.OS,
Architecture: config.Architecture,
Variant: config.Variant,
}, true
}
// Reads the manifest and config, applies the mutation, and writes the
// updated blobs back to the content store.
func (c *Container) mutateManifest(ctx context.Context, target ocispec.Descriptor, imageName string, mutate func(*ocispec.Manifest, *ocispec.Image)) (ocispec.Descriptor, error) {
manifest, err := c.readManifest(ctx, target)
if err != nil {
return ocispec.Descriptor{}, err
}
config, err := c.readConfig(ctx, manifest.Config)
if err != nil {
return ocispec.Descriptor{}, err
}
mutate(&manifest, &config)
newConfigDesc, err := c.writeBlob(ctx, manifest.Config.MediaType, config, imageName+"-config")
if err != nil {
return ocispec.Descriptor{}, err
}
manifest.Config = newConfigDesc
return c.writeBlob(ctx, target.MediaType, manifest, imageName+"-manifest", content.WithLabels(manifestGCLabels(manifest)))
}
// Produces the final image target descriptor after a manifest update.
//
// When the image was resolved through an index, a new single-entry index is
// written containing only the updated manifest. Entries for other platforms
// are dropped because their layer blobs are typically not present in the
// content store (only the target platform's layers are fetched).
func (c *Container) buildImageTarget(ctx context.Context, root ocispec.Descriptor, index *ocispec.Index, manifestIdx int, newManifest ocispec.Descriptor, imageName string) (ocispec.Descriptor, error) {
if index == nil {
return newManifest, nil
}
index.Manifests = []ocispec.Descriptor{newManifest}
return c.writeBlob(ctx, root.MediaType, index, imageName+"-index", content.WithLabels(indexGCLabels(*index)))
}
// Loads an OCI manifest from the content store.
func (c *Container) readManifest(ctx context.Context, desc ocispec.Descriptor) (ocispec.Manifest, error) {
b, err := content.ReadBlob(ctx, c.client.ContentStore(), desc)
if err != nil {
return ocispec.Manifest{}, err
}
var m ocispec.Manifest
if err := json.Unmarshal(b, &m); err != nil {
return ocispec.Manifest{}, err
}
return m, nil
}
// Loads an OCI image index from the content store.
func (c *Container) readIndex(ctx context.Context, desc ocispec.Descriptor) (ocispec.Index, error) {
b, err := content.ReadBlob(ctx, c.client.ContentStore(), desc)
if err != nil {
return ocispec.Index{}, err
}
var idx ocispec.Index
if err := json.Unmarshal(b, &idx); err != nil {
return ocispec.Index{}, err
}
return idx, nil
}
// Loads an OCI image config from the content store.
func (c *Container) readConfig(ctx context.Context, desc ocispec.Descriptor) (ocispec.Image, error) {
b, err := content.ReadBlob(ctx, c.client.ContentStore(), desc)
if err != nil {
return ocispec.Image{}, err
}
var img ocispec.Image
if err := json.Unmarshal(b, &img); err != nil {
return ocispec.Image{}, err
}
return img, nil
}
// Serializes a value and writes it to the content store, returning the
// descriptor that references the stored blob.
func (c *Container) writeBlob(ctx context.Context, mediaType string, v any, ref string, opts ...content.Opt) (ocispec.Descriptor, error) {
cs := c.client.ContentStore()
b, err := json.Marshal(v)
if err != nil {
return ocispec.Descriptor{}, err
}
desc := ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(b),
Size: int64(len(b)),
}
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), desc, opts...); err != nil {
return ocispec.Descriptor{}, err
}
return desc, nil
}
// Computes containerd GC reference labels for a manifest's children.
//
// These labels allow containerd's garbage collector to trace reachability
// from the manifest blob to its config and layer blobs.
func manifestGCLabels(m ocispec.Manifest) map[string]string {
labels := map[string]string{
"containerd.io/gc.ref.content.config": m.Config.Digest.String(),
}
for i, layer := range m.Layers {
key := fmt.Sprintf("containerd.io/gc.ref.content.l.%d", i)
labels[key] = layer.Digest.String()
}
return labels
}
// Computes containerd GC reference labels for an index's children.
func indexGCLabels(idx ocispec.Index) map[string]string {
labels := make(map[string]string, len(idx.Manifests))
for i, m := range idx.Manifests {
key := fmt.Sprintf("containerd.io/gc.ref.content.m.%d", i)
labels[key] = m.Digest.String()
}
return labels
}
package runtime
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"log/slog"
"os"
goruntime "runtime"
"syscall"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/transfer/archive"
timage "github.com/containerd/containerd/v2/core/transfer/image"
tregistry "github.com/containerd/containerd/v2/core/transfer/registry"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/spec/protocol"
dref "github.com/distribution/reference"
)
const (
// Snapshotter used for container filesystems. containerd runs as root
// inside the VM, so the native overlayfs kernel module is available.
snapshotter = "overlayfs"
// OCI runtime shim for running containers.
ociRuntime = "io.containerd.runc.v2"
)
// Manages the containerd client and provides image and container operations.
type Runtime struct {
client *containerd.Client // Containerd client for managing containers and images.
}
// Creates a runtime connected to the containerd socket at the given address.
//
// The namespace scopes all containerd operations to a single tenant. The
// runtime must be closed when no longer needed.
func New(address, namespace string) (*Runtime, error) {
client, err := containerd.New(address, containerd.WithDefaultNamespace(namespace))
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
return &Runtime{client: client}, nil
}
// Closes the containerd client connection.
func (rt *Runtime) Close() error {
return rt.client.Close()
}
// Imports an OCI archive, unpacks it for the target platform, and starts
// a container.
//
// The archive is transferred server-side into containerd's content store,
// tagged with a deterministic name derived from the path, and the layers
// for the target platform are unpacked into the snapshotter. A container
// is created with a fresh snapshot and a long-running task (sleep infinity)
// is started so that subsequent Exec calls have a running process to attach
// to. Any existing container with the same ID is removed before the new one
// is created. Building for a platform other than the host requires
// QEMU / binfmt_misc support in the kernel.
func (rt *Runtime) StartContainer(ctx context.Context, path string, id string, platform string) (*Container, error) {
tag := imageTag(path)
if err := rt.transferImage(ctx, path, tag, platform); err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
c := &Container{
client: rt.client,
id: id,
platform: platform,
}
// Remove any stale container from a previous build with the same ID.
c.remove(ctx)
image, err := rt.resolveImage(ctx, tag, platform)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
ctr, err := c.create(ctx, image, oci.WithProcessArgs("sleep", "infinity"))
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
if err := c.startTask(ctx, ctr); err != nil {
ctr.Delete(ctx, containerd.WithSnapshotCleanup)
return nil, crex.Wrap(ErrRuntime, err)
}
return c, nil
}
// Pulls a remote OCI image and starts a container from it.
//
// The reference is a single-token OCI image name such as "alpine:3.21" or
// "docker.io/library/alpine:3.21", normalized to include the default
// registry and tag when omitted. The image is pulled into containerd's
// content store, unpacked for the target platform, and a container with a
// long-running task is started. Any existing container with the same ID is
// removed before the new one is created.
func (rt *Runtime) StartContainerFromOCI(ctx context.Context, ref string, id string, platform string) (*Container, error) {
image, err := rt.pullImage(ctx, ref, platform)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
c := &Container{
client: rt.client,
id: id,
platform: platform,
}
c.remove(ctx)
ctr, err := c.create(ctx, image, oci.WithProcessArgs("sleep", "infinity"))
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
if err := c.startTask(ctx, ctr); err != nil {
ctr.Delete(ctx, containerd.WithSnapshotCleanup)
return nil, crex.Wrap(ErrRuntime, err)
}
return c, nil
}
// Pulls a remote OCI image from a container registry.
//
// The reference is a single-token image name. Bare names like "alpine:3.21"
// are normalized to "docker.io/library/alpine:3.21", and untagged names
// receive the "latest" tag. This differs from Crucible references, which
// are space-separated name and version strings resolved by the CLI before
// reaching the daemon. The image is stored in containerd's content store
// and unpacked into the snapshotter for the specified platform.
//
// Uses the containerd transfer service rather than the lower-level Pull or
// Fetch APIs. The transfer service handles multi-platform index resolution
// correctly, including index entries whose descriptors lack explicit platform
// metadata (as seen in some Docker Official Images).
//
// If the image is already present and unpacked for the target platform the
// pull is skipped, avoiding unnecessary registry requests (e.g. when
// Docker Hub rate limits are in effect).
func (rt *Runtime) pullImage(ctx context.Context, ref string, platform string) (containerd.Image, error) {
named, err := dref.ParseNormalizedNamed(ref)
if err != nil {
return nil, err
}
fullRef := dref.TagNameOnly(named).String()
p, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
// Fast path: reuse an image that is already unpacked locally.
if img, err := rt.resolveImage(ctx, fullRef, platform); err == nil {
unpacked, err := img.IsUnpacked(ctx, snapshotter)
if err == nil && unpacked {
slog.Info("image already unpacked, skipping pull", "ref", fullRef, "platform", platform)
return img, nil
}
}
slog.Info("pulling image", "ref", fullRef, "platform", platform)
src, err := tregistry.NewOCIRegistry(ctx, fullRef)
if err != nil {
return nil, err
}
dest := timage.NewStore(fullRef,
timage.WithPlatforms(p),
timage.WithUnpack(p, snapshotter),
)
if err := rt.client.Transfer(ctx, src, dest); err != nil {
return nil, err
}
return rt.resolveImage(ctx, fullRef, platform)
}
// Transfers an OCI archive into containerd's content store server-side.
//
// The archive is streamed to containerd which imports it, stores it under
// the given tag, and unpacks the layers for the target platform into the
// snapshotter. The entire operation runs inside the containerd process,
// so cruxd does not need mount privileges.
func (rt *Runtime) transferImage(ctx context.Context, path, tag, platform string) error {
fh, err := os.Open(path)
if err != nil {
return err
}
defer fh.Close()
p, err := platforms.Parse(platform)
if err != nil {
return err
}
src := archive.NewImageImportStream(fh, "")
dest := timage.NewStore(tag, timage.WithUnpack(p, snapshotter))
return rt.client.Transfer(ctx, src, dest)
}
// Looks up a tagged image and selects the manifest for the given platform.
//
// Multi-platform images contain manifests for multiple architectures. This
// method selects one, so that subsequent operations target the correct
// architecture.
func (rt *Runtime) resolveImage(ctx context.Context, tag, platform string) (containerd.Image, error) {
p, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
img, err := rt.client.ImageService().Get(ctx, tag)
if err != nil {
return nil, err
}
return containerd.NewImageWithPlatform(rt.client, img, platforms.Only(p)), nil
}
// Produces a containerd image tag from an archive path.
//
// The path is hashed to produce a tag that is always valid for OCI references
// regardless of which characters the path contains.
func imageTag(path string) string {
h := sha256.Sum256([]byte(path))
return fmt.Sprintf("import/%s:latest", hex.EncodeToString(h[:]))
}
// Returns the default OCI platform for the host architecture.
func defaultPlatform() string {
return "linux/" + goruntime.GOARCH
}
// Imports an OCI archive, tags it under the given name, and unpacks it for
// the host platform.
//
// The archive is transferred server-side into containerd's content store,
// tagged with the provided name, and the layers are unpacked into the
// snapshotter.
func (rt *Runtime) ImportImage(ctx context.Context, path, tag string) error {
platform := defaultPlatform()
if err := rt.transferImage(ctx, path, tag, platform); err != nil {
return crex.Wrap(ErrRuntime, err)
}
return nil
}
// Starts a container from a previously imported image tag.
//
// The operation is idempotent: if the container is already running it is
// left untouched; if the container exists but has no active task a new
// task is started on the existing snapshot; otherwise a new container is
// created from the image.
func (rt *Runtime) StartFromTag(ctx context.Context, tag, id string) (*Container, error) {
platform := defaultPlatform()
c := &Container{
client: rt.client,
id: id,
platform: platform,
}
status, err := c.Status(ctx)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
switch status {
case protocol.ContainerRunning:
return c, nil
case protocol.ContainerStopped:
if err := c.Start(ctx); err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
return c, nil
default:
image, err := rt.resolveImage(ctx, tag, platform)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
ctr, err := c.create(ctx, image)
if err != nil {
return nil, crex.Wrap(ErrRuntime, err)
}
if err := c.startTask(ctx, ctr); err != nil {
ctr.Delete(ctx, containerd.WithSnapshotCleanup)
return nil, crex.Wrap(ErrRuntime, err)
}
return c, nil
}
}
// Removes an image and all containers created from it.
//
// Containers are discovered by querying containerd for records whose image
// field matches the tag. Each container's task is killed before the container
// and its snapshot are deleted.
func (rt *Runtime) DestroyImage(ctx context.Context, tag string) error {
ctrs, err := rt.client.Containers(ctx, fmt.Sprintf("image==%s", tag))
if err != nil {
return crex.Wrap(ErrRuntime, err)
}
for _, ctr := range ctrs {
if task, taskErr := ctr.Task(ctx, nil); taskErr == nil {
task.Kill(ctx, syscall.SIGKILL)
task.Delete(ctx, containerd.WithProcessKill)
}
if err := ctr.Delete(ctx, containerd.WithSnapshotCleanup); err != nil && !errdefs.IsNotFound(err) {
return crex.Wrap(ErrRuntime, err)
}
}
if err := rt.client.ImageService().Delete(ctx, tag); err != nil && !errdefs.IsNotFound(err) {
return crex.Wrap(ErrRuntime, err)
}
return nil
}
// Returns a handle for an existing container.
//
// The container is not loaded or verified; the handle is a lightweight
// reference that resolves the container lazily on subsequent calls.
func (rt *Runtime) Container(id string) *Container {
return &Container{
client: rt.client,
id: id,
platform: defaultPlatform(),
}
}
package server
import (
"context"
"encoding/json"
"net"
"os"
"time"
"github.com/cruciblehq/cruxd/internal"
"github.com/cruciblehq/cruxd/internal/build"
"github.com/cruciblehq/spec/protocol"
)
// Handles a build command.
//
// Receives a recipe from crux and executes it against the container runtime.
func (s *Server) handleBuild(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.BuildRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
result, err := build.Run(ctx, s.runtime, build.Options{
Recipe: req.Recipe,
Resource: req.Resource,
Output: req.Output,
Root: req.Root,
Entrypoint: req.Entrypoint,
Platforms: req.Platforms,
})
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.mu.Lock()
s.builds++
s.mu.Unlock()
s.respond(conn, protocol.CmdOK, &protocol.BuildResult{Output: result.Output})
}
// Handles a status command.
func (s *Server) handleStatus(_ context.Context, conn net.Conn) {
s.mu.Lock()
builds := s.builds
s.mu.Unlock()
uptime := time.Since(s.startedAt).Truncate(time.Second)
s.respond(conn, protocol.CmdOK, &protocol.StatusResult{
Running: true,
Version: internal.VersionString(),
Pid: os.Getpid(),
Uptime: uptime.String(),
Builds: builds,
})
}
// Handles an image-import command.
func (s *Server) handleImageImport(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ImageImportRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
tag := protocol.ImageTag(req.Ref, req.Version)
if err := s.runtime.ImportImage(ctx, req.Path, tag); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, nil)
}
// Handles an image-start command.
func (s *Server) handleImageStart(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ImageStartRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
tag := protocol.ImageTag(req.Ref, req.Version)
id := req.ID
if id == "" {
id = req.Ref
}
id = protocol.ContainerID(id)
if _, err := s.runtime.StartFromTag(ctx, tag, id); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, nil)
}
// Handles an image-destroy command.
func (s *Server) handleImageDestroy(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ImageDestroyRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
tag := protocol.ImageTag(req.Ref, req.Version)
if err := s.runtime.DestroyImage(ctx, tag); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, nil)
}
// Handles a container-stop command.
func (s *Server) handleContainerStop(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ContainerStopRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
ctr := s.runtime.Container(protocol.ContainerID(req.ID))
if err := ctr.Stop(ctx); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, nil)
}
// Handles a container-destroy command.
func (s *Server) handleContainerDestroy(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ContainerDestroyRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
ctr := s.runtime.Container(protocol.ContainerID(req.ID))
ctr.Destroy(ctx)
s.respond(conn, protocol.CmdOK, nil)
}
// Handles a container-status command.
func (s *Server) handleContainerStatus(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ContainerStatusRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
ctr := s.runtime.Container(protocol.ContainerID(req.ID))
status, err := ctr.Status(ctx)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, &protocol.ContainerStatusResult{Status: status})
}
// Handles a container-exec command.
func (s *Server) handleContainerExec(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ContainerExecRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
ctr := s.runtime.Container(protocol.ContainerID(req.ID))
result, err := ctr.ExecArgs(ctx, req.Command)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, &protocol.ContainerExecResult{
ExitCode: result.ExitCode,
Stdout: result.Stdout,
Stderr: result.Stderr,
})
}
// Handles a container-update command.
func (s *Server) handleContainerUpdate(ctx context.Context, conn net.Conn, payload json.RawMessage) {
req, err := protocol.DecodePayload[protocol.ContainerUpdateRequest](payload)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
tag := protocol.ImageTag(req.Ref, req.Version)
ctr := s.runtime.Container(protocol.ContainerID(req.ID))
if err := ctr.Stop(ctx); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
if err := s.runtime.ImportImage(ctx, req.Path, tag); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
if _, err := s.runtime.StartFromTag(ctx, tag, req.ID); err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
s.respond(conn, protocol.CmdOK, nil)
}
package server
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net"
"os"
"os/user"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/cruciblehq/crex"
"github.com/cruciblehq/cruxd/internal/runtime"
"github.com/cruciblehq/spec/paths"
"github.com/cruciblehq/spec/protocol"
)
const (
// Default containerd socket address.
DefaultContainerdAddress = "/run/containerd/containerd.sock"
// Default containerd namespace for images and containers.
DefaultContainerdNamespace = "cruxd"
// Group name used to grant socket access. Members of this group can
// connect to the daemon socket without owning the process.
socketGroup = "cruxd"
// File mode applied to the Unix socket. Owner and group get read-write
// (required for connect); others get no access.
socketMode = 0660
)
// Holds server configuration.
type Config struct {
SocketPath string // Override for the Unix socket path. Empty uses the default.
PIDFilePath string // Override for the PID file path. Empty uses the default.
ContainerdAddress string // Containerd socket address. Empty uses [DefaultContainerdAddress].
ContainerdNamespace string // Containerd namespace for images and containers. Empty uses [DefaultContainerdNamespace].
ReadyFD int // File descriptor to signal readiness on. Negative means disabled.
}
// Listens on a Unix domain socket and dispatches commands.
type Server struct {
socketPath string // Path to the Unix socket file.
pidFilePath string // Path to the PID file.
readyFD int // File descriptor for readiness signaling (-1 = disabled).
runtime *runtime.Runtime // Containerd-backed container runtime.
listener net.Listener // Listener for incoming connections.
startedAt time.Time // Timestamp when the server started.
builds int // Total number of build commands processed.
done chan struct{} // Channel to signal server shutdown.
mu sync.Mutex // Mutex to protect shared state.
}
// Creates a new server instance.
//
// The socket is not opened until [Start] is called.
func New(cfg Config) (*Server, error) {
socketPath := cfg.SocketPath
if socketPath == "" {
socketPath = paths.Socket("default")
}
pidFilePath := cfg.PIDFilePath
if pidFilePath == "" {
pidFilePath = paths.PIDFile("default")
}
containerdAddress := cfg.ContainerdAddress
if containerdAddress == "" {
containerdAddress = DefaultContainerdAddress
}
containerdNamespace := cfg.ContainerdNamespace
if containerdNamespace == "" {
containerdNamespace = DefaultContainerdNamespace
}
rt, err := runtime.New(containerdAddress, containerdNamespace)
if err != nil {
return nil, crex.Wrap(ErrServer, err)
}
return &Server{
socketPath: socketPath,
pidFilePath: pidFilePath,
readyFD: cfg.ReadyFD,
runtime: rt,
done: make(chan struct{}),
}, nil
}
// Opens the Unix socket and begins accepting connections.
func (s *Server) Start() error {
listener, err := listen(s.socketPath)
if err != nil {
return err
}
s.listener = listener
s.startedAt = time.Now()
if err := writePID(s.pidFilePath); err != nil {
slog.Error("failed to write PID file", "error", err)
}
slog.Info("server listening on socket", "path", s.socketPath)
s.signalReady()
go s.accept()
return nil
}
// Signals readiness to the parent process via the ready-fd.
//
// The ready-fd is a bootstrap channel that solves a sequencing problem: crux
// needs to know when the Unix socket is bound and accepting connections, but
// it cannot use the socket itself for that signal because the socket does not
// exist yet when crux needs to start waiting. The ready-fd (a pipe on Linux,
// stdout on Darwin) bridges the gap between process start and socket readiness.
//
// Writes a [protocol.CmdOK] envelope followed by a newline. If the fd is not
// a standard stream (stdin, stdout, stderr), it is closed after writing so
// the reader receives EOF. Standard streams are left open because on Darwin
// the ready-fd is stdout, and closing it would tear down the limactl SSH
// session that keeps cruxd alive.
func (s *Server) signalReady() {
if s.readyFD < 0 {
return
}
f := os.NewFile(uintptr(s.readyFD), "ready-fd")
if f == nil {
slog.Error("invalid file descriptor", "fd", s.readyFD)
return
}
data, err := protocol.Encode(protocol.CmdOK, nil)
if err != nil {
slog.Error("failed to encode ready message", "fd", s.readyFD, "error", err)
return
}
data = append(data, '\n')
if _, err := f.Write(data); err != nil {
slog.Error("failed to signal readiness", "fd", s.readyFD, "error", err)
}
if s.readyFD > 2 {
f.Close()
}
}
// Creates the Unix socket listener, removes any stale socket from a previous
// run, and applies permissions.
func listen(socketPath string) (net.Listener, error) {
dir := filepath.Dir(socketPath)
if err := os.MkdirAll(dir, paths.DefaultDirMode); err != nil {
return nil, crex.Wrap(ErrServer, err)
}
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return nil, crex.Wrapf(ErrServer, "failed to listen on %s", socketPath)
}
setSocketPermissions(socketPath)
return listener, nil
}
// Restricts socket access to owner and group where supported.
//
// On virtiofs mounts (used by Lima on Darwin), permission changes may fail
// because the host filesystem controls access. This is non-fatal since the
// socket is already usable by the creating process.
func setSocketPermissions(socketPath string) {
if err := os.Chmod(socketPath, socketMode); err != nil {
slog.Debug("failed to chmod socket, filesystem may not support it", "path", socketPath, "error", err)
return
}
if g, err := user.LookupGroup(socketGroup); err == nil {
if gid, err := strconv.Atoi(g.Gid); err == nil {
if err := os.Chown(socketPath, -1, gid); err != nil {
slog.Warn("failed to chgrp socket", "group", socketGroup, "error", err)
}
}
} else {
slog.Warn("socket group not found, socket accessible to owner only", "group", socketGroup)
}
}
// Shuts down the server and cleans up resources.
func (s *Server) Stop() error {
close(s.done)
if s.listener != nil {
s.listener.Close()
}
if s.runtime != nil {
s.runtime.Close()
}
os.Remove(s.socketPath)
os.Remove(s.pidFilePath)
return nil
}
// Blocks until the server stops.
func (s *Server) Wait() {
<-s.done
}
// Accepts connections in a loop until the server shuts down.
func (s *Server) accept() {
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.done:
return
default:
slog.Error("accept error", "error", err)
continue
}
}
go s.handle(conn)
}
}
// Processes a single connection.
//
// Reads one newline-delimited JSON message, dispatches the command, and
// writes the response. The connection is closed after one exchange.
func (s *Server) handle(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
line, err := reader.ReadBytes(byte(10))
if err != nil {
slog.Error("read error", "error", err)
return
}
env, payload, err := protocol.Decode(line)
if err != nil {
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{Message: err.Error()})
return
}
slog.Info("command received", "command", env.Command)
ctx, cancel := contextWithDisconnect(context.Background(), reader)
defer cancel()
s.dispatch(ctx, conn, env.Command, payload)
}
// Routes a command to the appropriate handler.
func (s *Server) dispatch(ctx context.Context, conn net.Conn, cmd protocol.Command, payload json.RawMessage) {
switch cmd {
case protocol.CmdBuild:
s.handleBuild(ctx, conn, payload)
case protocol.CmdImageImport:
s.handleImageImport(ctx, conn, payload)
case protocol.CmdImageStart:
s.handleImageStart(ctx, conn, payload)
case protocol.CmdImageDestroy:
s.handleImageDestroy(ctx, conn, payload)
case protocol.CmdContainerStop:
s.handleContainerStop(ctx, conn, payload)
case protocol.CmdContainerDestroy:
s.handleContainerDestroy(ctx, conn, payload)
case protocol.CmdContainerStatus:
s.handleContainerStatus(ctx, conn, payload)
case protocol.CmdContainerExec:
s.handleContainerExec(ctx, conn, payload)
case protocol.CmdContainerUpdate:
s.handleContainerUpdate(ctx, conn, payload)
case protocol.CmdStatus:
s.handleStatus(ctx, conn)
default:
s.respond(conn, protocol.CmdError, &protocol.ErrorResult{
Message: fmt.Sprintf("unknown command: %s", cmd),
})
}
}
// Writes a JSON envelope response to the connection.
func (s *Server) respond(conn net.Conn, cmd protocol.Command, payload any) {
data, err := protocol.Encode(cmd, payload)
if err != nil {
slog.Error("encode response failed", "error", err)
return
}
data = append(data, byte(10))
conn.Write(data)
}
// Writes the daemon PID to the PID file so the CLI can detect whether the
// daemon is already running and send it signals.
func writePID(pidFilePath string) error {
dir := filepath.Dir(pidFilePath)
if err := os.MkdirAll(dir, paths.DefaultDirMode); err != nil {
return err
}
return os.WriteFile(pidFilePath, []byte(fmt.Sprintf("%d", os.Getpid())), paths.DefaultFileMode)
}
// Returns a derived context that is cancelled when the remote end of the
// connection closes.
//
// Detection works by reading from r in a background goroutine. The read blocks
// until the peer closes the connection, at which point it returns an error and
// the derived context is cancelled. The caller must ensure that no further data
// is expected on r for the lifetime of the returned context. If data arrives
// unexpectedly, it will be discarded and the context will be cancelled
// prematurely. The returned [context.CancelFunc] must always be called to
// release resources, even if the connection closes on its own.
func contextWithDisconnect(parent context.Context, r io.Reader) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(parent)
go func() {
buf := make([]byte, 1)
r.Read(buf)
cancel()
}()
return ctx, cancel
}
package internal
import (
"fmt"
"runtime"
"strings"
)
const (
// String to indicate an undefined variable
defaultUndefined = "(undefined)"
// String to indicate a local (non-pipeline) build
defaultLocalBuild = "(local)"
// Main branch name used in version strings
mainBranch = "main"
)
var (
version = "" // Version number (e.g., "1.2.3")
stage = "" // Development stage or git branch (e.g., "staging", "main")
gitCommit = "" // Git commit hash (e.g., "a1b2c3d4")
rawQuiet = "false" // Whether to enable quiet mode
rawDebug = "false" // Whether to enable debug mode
rawVerbose = "false" // Whether to enable verbose logging
)
// Returns the current version.
//
// If the version is not set, returns "(undefined)". If the version includes a
// "v" or "V" prefix (e.g., "v1.0.0"), it is stripped.
func Version() string {
v := strings.TrimSpace(version)
if v == "" {
return defaultUndefined
}
v = strings.ToLower(v)
v = strings.TrimPrefix(v, "v")
return v
}
// Returns the development stage (e.g., "alpha").
//
// The development should correspond to the git branch name used during the
// build. If it is not set, returns "(undefined)".
func Stage() string {
s := strings.TrimSpace(stage)
if s == "" {
return defaultUndefined
}
return strings.ToLower(s)
}
// Returns the git commit hash.
//
// If the commit hash is not set, returns "(undefined)".
func GitCommit() string {
c := strings.TrimSpace(gitCommit)
if c == "" {
return defaultUndefined
}
return c
}
// Returns the build architecture.
func Arch() string {
return runtime.GOARCH
}
// Returns true if this is a local (non-pipeline) build.
//
// A build is considered local if any of the version, git commit, or stage
// variables are unset. Pipeline builds should set all three variables via
// linker flags.
func IsLocal() bool {
return strings.TrimSpace(version) == "" ||
strings.TrimSpace(gitCommit) == "" ||
strings.TrimSpace(stage) == ""
}
// Returns a detailed version string.
//
// If this is a local build, returns "(local)". Otherwise, returns a string
// formatted as "<version>+<stage> <git-commit> [<arch>]".
func VersionString() string {
if IsLocal() {
return defaultLocalBuild
}
s := Stage()
if s == mainBranch {
s = ""
} else {
s = "+" + s
}
return fmt.Sprintf("%s%s %s [%s]", Version(), s, GitCommit(), Arch())
}