// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "fmt" "os" "os/exec" "path/filepath" "regexp" "strings" "github.com/pkg/errors" godiff "github.com/pmezard/go-difflib/difflib" ) const ( ExternalDiffEnvVar = "XP_FUNCTION_CUE_DIFF" ) func printNativeDiffs(expectedString, actualString string) error { ud := godiff.UnifiedDiff{ A: godiff.SplitLines(expectedString), B: godiff.SplitLines(actualString), FromFile: "expected", ToFile: "actual", Context: 3, } s, err := godiff.GetUnifiedDiffString(ud) if err != nil { return errors.Wrap(err, "diff expected against actual") } _, _ = fmt.Fprintf(TestOutput, "diffs found:\n%s\n", strings.TrimSpace(s)) return nil } func printDiffs(expectedString, actualString string) error { externalDiff := os.Getenv(ExternalDiffEnvVar) if externalDiff == "" { return printNativeDiffs(expectedString, actualString) } // use the logic of external kubectl diffs to run the command args := strings.Split(externalDiff, " ") cmd := args[0] args = args[1:] var realArgs []string isValidChar := regexp.MustCompile(`^[a-zA-Z0-9-=]+$`).MatchString for _, arg := range args { if isValidChar(arg) { realArgs = append(realArgs, arg) } } dir, err := os.MkdirTemp("", "diff*") if err != nil { return err } defer func() { _ = os.RemoveAll(dir) }() eFile := filepath.Join(dir, "expected.yaml") err = os.WriteFile(eFile, []byte(expectedString), 0o644) if err != nil { return err } aFile := filepath.Join(dir, "actual.yaml") err = os.WriteFile(aFile, []byte(actualString), 0o644) if err != nil { return err } realArgs = append(realArgs, eFile, aFile) c := exec.Command(cmd, realArgs...) c.Stdout = TestOutput c.Stderr = TestOutput return c.Run() }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "bytes" "encoding/json" "fmt" "io" "strings" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/format" "cuelang.org/go/encoding/openapi" "github.com/ghodss/yaml" "github.com/pkg/errors" ) type expectedXRDVersion struct { Name string `json:"name"` Schema struct { OpenAPIV3Schema any `json:"openAPIV3Schema"` } `json:"schema"` } type expectedXRD struct { Spec struct { Names struct { Kind string `json:"kind"` } `json:"names"` Versions []*expectedXRDVersion `json:"versions"` } `json:"spec"` } type openapiSchema struct { Info struct { Title string `json:"title"` Description string `json:"description"` } `json:"info"` Components struct { Schemas map[string]any `json:"schemas"` } `json:"components"` } // ExtractSchema extracts an openAPI schema from a CRD/ XRD-like object and // returns the equivalent cue types. func ExtractSchema(reader io.Reader, pkg string) ([]byte, error) { b, err := io.ReadAll(reader) if err != nil { return nil, errors.Wrap(err, "read bytes") } var xrd expectedXRD err = yaml.Unmarshal(b, &xrd) if err != nil { return nil, errors.Wrap(err, "unmarshal crd/xrd") } var pkgDecl string if pkg != "" { pkgDecl = fmt.Sprintf("package %s\n\n", pkg) } out := bytes.NewBufferString(pkgDecl) for _, version := range xrd.Spec.Versions { var cueSchema openapiSchema cueSchema.Info.Title = "generated cue schema" v := version.Name if len(v) > 0 { v = strings.ToUpper(v[:1]) + v[1:] } cueSchema.Components.Schemas = map[string]any{ fmt.Sprintf("%s%s", xrd.Spec.Names.Kind, v): version.Schema.OpenAPIV3Schema, } jsonBytes, err := json.MarshalIndent(cueSchema, "", " ") if err != nil { return nil, errors.Wrap(err, "marshal schema") } runtime := cuecontext.New() val := runtime.CompileBytes(jsonBytes) if val.Err() != nil { return nil, errors.Wrap(val.Err(), "compile generated schema object") } astFile, err := openapi.Extract(val, &openapi.Config{SelfContained: true}) if err != nil { return nil, errors.Wrap(err, "extract openAPI schema") } b, err = format.Node(astFile, format.Simplify()) if err != nil { return nil, errors.Wrap(val.Err(), "format source") } _, err = out.Write(b) if err != nil { return nil, errors.Wrap(err, "write bytes") } _, _ = out.Write([]byte("\n")) } return out.Bytes(), nil }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "bytes" "encoding/json" "fmt" "cuelang.org/go/cue/format" "cuelang.org/go/encoding/openapi" "github.com/pkg/errors" ) // GenerateOpenAPISchema generates an openapi schema using the contents of the supplied directory // and returns it prefixing it with a package declaration for the supplied package. func GenerateOpenAPISchema(dir, pkg string) ([]byte, error) { iv, err := loadSingleInstanceValue(dir, nil) if err != nil { return nil, err } b, err := openapi.Gen(iv.value, &openapi.Config{ Info: map[string]any{ "title": "XRD schemas", "description": fmt.Sprintf("Generated by %s, DO NOT EDIT", generator), "version": "0.1.0", }, SelfContained: true, ExpandReferences: true, }) if err != nil { return nil, errors.Wrap(err, "generate openAPI") } var out bytes.Buffer if pkg != "" { _, _ = out.Write([]byte(fmt.Sprintf("package %s\n\n", pkg))) } err = json.Indent(&out, b, "", " ") if err != nil { return nil, errors.Wrap(err, "write output") } ret, err := format.Source(out.Bytes(), format.Simplify()) if err != nil { return nil, errors.Wrap(err, "cue format") } return ret, nil }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "fmt" "cuelang.org/go/cue" "cuelang.org/go/cue/build" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/load" "github.com/pkg/errors" ) type instanceValue struct { instance *build.Instance value cue.Value } // loadSingleInstanceValue loads the package at the specific directory and returns the associated instance and value. func loadSingleInstanceValue(dir string, cfg *load.Config) (*instanceValue, error) { configs := load.Instances([]string{dir}, cfg) if len(configs) != 1 { return nil, fmt.Errorf("expected exactly one instance, got %d", len(configs)) } config := configs[0] if config.Err != nil { return nil, errors.Wrap(config.Err, "load instance") } runtime := cuecontext.New() val := runtime.BuildInstance(config) if val.Err() != nil { return nil, errors.Wrap(val.Err(), "build instance") } return &instanceValue{instance: config, value: val}, nil }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "bytes" "encoding/json" "fmt" "cuelang.org/go/cmd/cue/cmd" "github.com/pkg/errors" ) const generator = "xp-function-cue" // runDefCommand runs the equivalent of `cue def` using the cue command library instead of forking a cue process. // The semantics of doing this are iffy at best - it is generally unsafe to do this more than once in a single process // because of the state of cobra flags that is not cleared correctly across invocations. func runDefCommand(dir string) (_ []byte, finalErr error) { command, err := cmd.New([]string{"def", "--inline-imports", dir}) if err != nil { return nil, errors.Wrap(err, "create def command") } var buf bytes.Buffer command.SetOut(&buf) defer func() { if r := recover(); r != nil { finalErr = fmt.Errorf("cue def: %s", buf.String()) } }() if err := command.Execute(); err != nil { return nil, errors.Wrap(err, "execute def command") } return buf.Bytes(), nil } type OutputFormat string const ( FormatRaw OutputFormat = "raw" FormatCue OutputFormat = "cue" ) type PackageScriptOpts struct { Format OutputFormat // output format OutputPackage string // package to declare for cue output VarName string // variable name to use for cue output, default _script } // PackageScript generates self-contained definitions from the supplied directory and returns cue code for an object // with a _script property that contains the code as a string. The returned object has a package declaration // for the package supplied. func PackageScript(dir string, opts PackageScriptOpts) (_ []byte, finalErr error) { defs, err := runDefCommand(dir) if err != nil { return nil, err } if opts.Format == FormatRaw { return defs, nil } header := "" if opts.OutputPackage != "" { header = "package " + opts.OutputPackage } jsonString, _ := json.Marshal(string(defs)) varName := opts.VarName if varName == "" { varName = "_script" } outputCode := fmt.Sprintf(`%s // generated by %s, DO NOT EDIT %s: %s `, header, generator, varName, jsonString) return []byte(outputCode), nil }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cuetools import ( "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "strings" "cuelang.org/go/cue" "cuelang.org/go/cue/load" "cuelang.org/go/cue/parser" fnv1beta1 "github.com/crossplane/function-sdk-go/proto/v1beta1" "github.com/elastic/crossplane-function-cue/internal/fn" "github.com/ghodss/yaml" "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) var TestOutput io.Writer = os.Stderr type TestConfig struct { Package string TestPackage string TestTags []string RequestVar string ResponseVar string LegacyDesiredOnlyResponse bool Debug bool } type Tester struct { config *TestConfig } // NewTester returns a test for the supplied configuration. It auto-discovers tags from test file names if needed. func NewTester(config TestConfig) (*Tester, error) { ret := &Tester{config: &config} if err := ret.init(); err != nil { return nil, err } return ret, nil } func (t *Tester) init() error { if t.config.Package == "" { return fmt.Errorf("package was not specified") } if t.config.TestPackage == "" { t.config.TestPackage = fmt.Sprintf("%s/%s", strings.TrimSuffix(t.config.Package, "/"), "tests") } // discover test tags from filenames if len(t.config.TestTags) == 0 { err := t.discoverTags() if err != nil { return errors.Wrap(err, "discover tags") } sort.Strings(t.config.TestTags) } if len(t.config.TestTags) == 0 { return fmt.Errorf("no test tags found even after auto-discovery") } _, _ = fmt.Fprintf(TestOutput, "running test tags: %s\n", strings.Join(t.config.TestTags, ", ")) return nil } func (t *Tester) discoverTags() error { pattern := fmt.Sprintf("%s/*.cue", strings.TrimSuffix(t.config.TestPackage, "/")) matches, err := filepath.Glob(pattern) if err != nil { return errors.Wrapf(err, "glob %s", pattern) } for _, name := range matches { base := filepath.Base(name) pos := strings.Index(base, ".") tag := base if pos > 0 { tag = base[:pos] } t.config.TestTags = append(t.config.TestTags, tag) } return nil } func evalPackage(pkg string, tag string, expr string, into proto.Message) (finalErr error) { iv, err := loadSingleInstanceValue(pkg, &load.Config{Tags: []string{tag}}) if err != nil { return err } val := iv.value if expr != "" { e, err := parser.ParseExpr("expression", expr) if err != nil { return errors.Wrap(err, "parse expression") } val = iv.value.Context().BuildExpr(e, cue.Scope(iv.value), cue.ImportPath(iv.instance.ID()), cue.InferBuiltins(true), ) if val.Err() != nil { return errors.Wrap(val.Err(), "build expression") } } b, err := val.MarshalJSON() if err != nil { return errors.Wrap(err, "marshal json") } err = protojson.Unmarshal(b, into) if err != nil { return errors.Wrap(err, "proto unmarshal") } return nil } // Run runs all tests and returns a consolidated error. func (t *Tester) Run() error { var errs []error function, err := fn.New(fn.Options{Debug: t.config.Debug}) if err != nil { return errors.Wrap(err, "create function executor") } codeBytes, err := runDefCommand(t.config.Package) if err != nil { return errors.Wrap(err, "create package script") } for _, tag := range t.config.TestTags { err := t.runTest(function, codeBytes, tag) if err != nil { errs = append(errs, errors.Wrapf(err, "test %s", tag)) } } if len(errs) > 0 { return fmt.Errorf("%d of %d tests had errors", len(errs), len(t.config.TestTags)) } return nil } func canonicalYAML(in proto.Message) (string, error) { b, err := protojson.Marshal(in) if err != nil { return "", err } var ret any err = json.Unmarshal(b, &ret) if err != nil { return "", err } b, err = yaml.Marshal(ret) if err != nil { return "", err } return string(b), nil } func (t *Tester) runTest(f *fn.Cue, codeBytes []byte, tag string) (finalErr error) { _, _ = fmt.Fprintf(TestOutput, "> run test %q\n", tag) defer func() { if finalErr != nil { _, _ = fmt.Fprintf(TestOutput, "FAIL %s: %s\n", tag, finalErr) } else { _, _ = fmt.Fprintf(TestOutput, "PASS %s\n", tag) } }() requestVar := "request" if t.config.RequestVar != "" { requestVar = t.config.RequestVar } var responseVar string switch t.config.ResponseVar { case ".": responseVar = "" case "": responseVar = "response" default: responseVar = t.config.ResponseVar } var expected fnv1beta1.RunFunctionResponse err := evalPackage(t.config.TestPackage, tag, responseVar, &expected) if err != nil { return errors.Wrap(err, "evaluate expected") } var req fnv1beta1.RunFunctionRequest err = evalPackage(t.config.TestPackage, tag, requestVar, &req) if err != nil { return errors.Wrap(err, "evaluate request") } actual, err := f.Eval(&req, string(codeBytes), fn.EvalOptions{ RequestVar: requestVar, ResponseVar: responseVar, DesiredOnlyResponse: t.config.LegacyDesiredOnlyResponse, Debug: fn.DebugOptions{Enabled: t.config.Debug}, }) if err != nil { return errors.Wrap(err, "evaluate package with test request") } expectedString, err := canonicalYAML(&expected) if err != nil { return errors.Wrap(err, "serialize expected") } actualString, err := canonicalYAML(actual) if err != nil { return errors.Wrap(err, "serialize actual") } if expectedString == actualString { return nil } err = printDiffs(expectedString, actualString) if err != nil { _, _ = fmt.Fprintln(TestOutput, "error in running diff:", err) } return fmt.Errorf("expected did not match actual") }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package fn import ( "encoding/json" "fmt" "cuelang.org/go/cue/format" ) const connectionDetailsKey = "connectionDetails" // start debugging routines // noise that people typically wouldn't want to see when looking at inputs. var systemAttributes = []struct{ parent, name string }{ {"annotations", "kubectl.kubernetes.io/last-applied-configuration"}, {"metadata", "managedFields"}, {"metadata", "creationTimestamp"}, {"metadata", "generation"}, {"metadata", "resourceVersion"}, {"metadata", "uid"}, } var systemAttrsLookup map[string]map[string]bool // init reformulates system attributes as a map for easier lookup. func init() { systemAttrsLookup = map[string]map[string]bool{} for _, attr := range systemAttributes { attrMap, ok := systemAttrsLookup[attr.parent] if !ok { attrMap = map[string]bool{} systemAttrsLookup[attr.parent] = attrMap } attrMap[attr.name] = true } } // walkDelete performs a recursive walk on the supplied object to remove system generated // attributes. func walkDelete(input any, parent string) { switch input := input.(type) { case []any: for _, v := range input { walkDelete(v, parent) } case map[string]any: if parent == connectionDetailsKey { for k := range input { input[k] = []byte("<redacted>") } } attrMap := systemAttrsLookup[parent] for k, v := range input { if attrMap != nil && attrMap[k] { delete(input, k) continue } walkDelete(v, k) } } } func (f *Cue) reserialize(jsonBytes []byte, raw bool) ([]byte, error) { var input any err := json.Unmarshal(jsonBytes, &input) if err != nil { f.log.Info(fmt.Sprintf("JSON unmarshal error: %v", err)) return jsonBytes, err } if !raw { walkDelete(input, "") } b, err := json.MarshalIndent(input, "", " ") if err != nil { f.log.Info(fmt.Sprintf("JSON marshal error: %v", err)) return jsonBytes, err } return b, nil } // getDebugString modifies the supplied JSON bytes to remove k8s and crossplane generated metadata // and returns its serialized form as a formatted cue object for a better user experience. // In case of any errors, it returns the input bytes as a string. func (f *Cue) getDebugString(jsonBytes []byte, raw bool) string { var err error jsonBytes, err = f.reserialize(jsonBytes, raw) if err != nil { return string(jsonBytes) } out, err := format.Source(jsonBytes, format.Simplify(), format.TabIndent(false), format.UseSpaces(2)) if err != nil { f.log.Info(fmt.Sprintf("cue formatting error: %v", err)) return string(jsonBytes) } return string(out) }
// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package fn import ( "context" "fmt" "log" "time" "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/cue/parser" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/function-sdk-go" fnv1beta1 "github.com/crossplane/function-sdk-go/proto/v1beta1" "github.com/crossplane/function-sdk-go/request" "github.com/crossplane/function-sdk-go/response" input "github.com/elastic/crossplane-function-cue/pkg/input/v1beta1" "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/structpb" ) const debugAnnotation = "crossplane-function-cue/debug" // Options are options for the cue runner. type Options struct { Logger logging.Logger Debug bool } // Cue runs cue scripts that adhere to a specific interface. type Cue struct { fnv1beta1.UnimplementedFunctionRunnerServiceServer log logging.Logger debug bool } // New creates a cue runner. func New(opts Options) (*Cue, error) { if opts.Logger == nil { var err error opts.Logger, err = function.NewLogger(opts.Debug) if err != nil { return nil, err } } return &Cue{ log: opts.Logger, debug: opts.Debug, }, nil } // DebugOptions are per-eval debug options. type DebugOptions struct { Enabled bool // enable input/ output debugging Raw bool // do not remove any "noise" attributes in the input object Script bool // render the final script as a debug output } type EvalOptions struct { RequestVar string ResponseVar string DesiredOnlyResponse bool Debug DebugOptions } // Eval evaluates the supplied script with an additional script that includes the supplied request and returns the // response. func (f *Cue) Eval(in *fnv1beta1.RunFunctionRequest, script string, opts EvalOptions) (*fnv1beta1.RunFunctionResponse, error) { // input request only contains properties as documented in the interface, not the whole object req := &fnv1beta1.RunFunctionRequest{ Observed: in.GetObserved(), Desired: in.GetDesired(), Context: in.GetContext(), } // extract request as object reqBytes, err := protojson.MarshalOptions{Indent: " "}.Marshal(req) if err != nil { return nil, errors.Wrap(err, "proto json marshal") } preamble := fmt.Sprintf("%s: ", opts.RequestVar) if opts.Debug.Enabled { log.Printf("[request:begin]\n%s %s\n[request:end]\n", preamble, f.getDebugString(reqBytes, opts.Debug.Raw)) } finalScript := fmt.Sprintf("%s\n%s %s\n", script, preamble, reqBytes) if opts.Debug.Script { log.Printf("[script:begin]\n%s\n[script:end]\n", finalScript) } runtime := cuecontext.New() val := runtime.CompileBytes([]byte(finalScript)) if val.Err() != nil { return nil, errors.Wrap(val.Err(), "compile cue code") } if opts.ResponseVar != "" { e, err := parser.ParseExpr("expression", opts.ResponseVar) if err != nil { return nil, errors.Wrap(err, "parse response expression") } val = val.Context().BuildExpr(e, cue.Scope(val), cue.InferBuiltins(true), ) if val.Err() != nil { return nil, errors.Wrap(val.Err(), "build response expression") } } resBytes, err := val.MarshalJSON() // this can fail if value is not concrete if err != nil { return nil, errors.Wrap(err, "marshal cue output") } if opts.Debug.Enabled { log.Printf("[response:begin]\n%s\n[response:end]\n", f.getDebugString(resBytes, opts.Debug.Raw)) } var ret fnv1beta1.RunFunctionResponse if opts.DesiredOnlyResponse { var state fnv1beta1.State err = protojson.Unmarshal(resBytes, &state) if err == nil { ret.Desired = &state } } else { err = protojson.Unmarshal(resBytes, &ret) } if err != nil { return nil, errors.Wrap(err, "unmarshal cue output using proto json") } return &ret, nil } // RunFunction runs the function. It expects a single script that is complete, except for a request // variable that the function runner supplies. func (f *Cue) RunFunction(_ context.Context, req *fnv1beta1.RunFunctionRequest) (outRes *fnv1beta1.RunFunctionResponse, finalErr error) { // setup response with desired state set up upstream functions res := response.To(req, response.DefaultTTL) logger := f.log // automatically handle errors and response logging defer func() { if finalErr == nil { logger.Info("cue module executed successfully") response.Normal(outRes, "cue module executed successfully") return } logger.Info(finalErr.Error()) response.Fatal(res, finalErr) outRes = res }() // setup logging and debugging oxr, err := request.GetObservedCompositeResource(req) if err != nil { return nil, errors.Wrap(err, "get observed composite") } tag := req.GetMeta().GetTag() if tag != "" { logger = f.log.WithValues("tag", tag) } logger = logger.WithValues( "xr-version", oxr.Resource.GetAPIVersion(), "xr-kind", oxr.Resource.GetKind(), "xr-name", oxr.Resource.GetName(), ) logger.Info("Running Function") debugThis := false annotations := oxr.Resource.GetAnnotations() if annotations != nil && annotations[debugAnnotation] == "true" { debugThis = true } // get inputs in := &input.CueFunctionParams{} if err := request.GetInput(req, in); err != nil { return nil, errors.Wrap(err, "unable to get input") } if in.Spec.Script == "" { return nil, fmt.Errorf("input script was not specified") } if in.Spec.DebugNew { if len(req.GetObserved().GetResources()) == 0 { debugThis = true } } if in.Spec.TTL != "" { d, err := time.ParseDuration(in.Spec.TTL) if err != nil { logger.Info(fmt.Sprintf("invalid TTL: %s, %v", in.Spec.TTL, err)) } else { res.GetMeta().Ttl = durationpb.New(d) } } // set up the request and response variables requestVar := "#request" if in.Spec.RequestVar != "" { requestVar = in.Spec.RequestVar } var responseVar string switch in.Spec.ResponseVar { case ".": responseVar = "" case "": responseVar = "response" default: responseVar = in.Spec.ResponseVar } state, err := f.Eval(req, in.Spec.Script, EvalOptions{ RequestVar: requestVar, ResponseVar: responseVar, DesiredOnlyResponse: in.Spec.LegacyDesiredOnlyResponse, Debug: DebugOptions{ Enabled: f.debug || in.Spec.Debug || debugThis, Raw: in.Spec.DebugRaw, Script: in.Spec.DebugScript, }, }) if err != nil { return res, errors.Wrap(err, "eval script") } return f.mergeResponse(res, state) } func (f *Cue) mergeResponse(res *fnv1beta1.RunFunctionResponse, cueResponse *fnv1beta1.RunFunctionResponse) (*fnv1beta1.RunFunctionResponse, error) { // selectively add returned resources without deleting any previous desired state if res.Desired == nil { res.Desired = &fnv1beta1.State{} } if res.Desired.Resources == nil { res.Desired.Resources = map[string]*fnv1beta1.Resource{} } // only set desired composite if the cue script actually returns it // TODO: maybe use fieldpath.Pave to only extract status if cueResponse.Desired.GetComposite() != nil { res.Desired.Composite = cueResponse.Desired.GetComposite() } // set desired resources from cue output for k, v := range cueResponse.Desired.GetResources() { res.Desired.Resources[k] = v } // merge the context if cueResponse has something in it if cueResponse.Context != nil { ctxMap := map[string]interface{}{} // set up base map, if found if res.Context != nil { ctxMap = res.Context.AsMap() } // merge values from cueResponse for k, v := range cueResponse.Context.AsMap() { ctxMap[k] = v } s, err := structpb.NewStruct(ctxMap) if err != nil { return nil, errors.Wrap(err, "set response context") } res.Context = s } // TODO: allow the cue layer to set warnings in cueResponse? return res, nil }