// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"github.com/pkg/errors"
godiff "github.com/pmezard/go-difflib/difflib"
)
const (
ExternalDiffEnvVar = "XP_FUNCTION_CUE_DIFF"
)
func printNativeDiffs(expectedString, actualString string) error {
ud := godiff.UnifiedDiff{
A: godiff.SplitLines(expectedString),
B: godiff.SplitLines(actualString),
FromFile: "expected",
ToFile: "actual",
Context: 3,
}
s, err := godiff.GetUnifiedDiffString(ud)
if err != nil {
return errors.Wrap(err, "diff expected against actual")
}
_, _ = fmt.Fprintf(TestOutput, "diffs found:\n%s\n", strings.TrimSpace(s))
return nil
}
func printDiffs(expectedString, actualString string) error {
externalDiff := os.Getenv(ExternalDiffEnvVar)
if externalDiff == "" {
return printNativeDiffs(expectedString, actualString)
}
// use the logic of external kubectl diffs to run the command
args := strings.Split(externalDiff, " ")
cmd := args[0]
args = args[1:]
var realArgs []string
isValidChar := regexp.MustCompile(`^[a-zA-Z0-9-=]+$`).MatchString
for _, arg := range args {
if isValidChar(arg) {
realArgs = append(realArgs, arg)
}
}
dir, err := os.MkdirTemp("", "diff*")
if err != nil {
return err
}
defer func() {
_ = os.RemoveAll(dir)
}()
eFile := filepath.Join(dir, "expected.yaml")
err = os.WriteFile(eFile, []byte(expectedString), 0o644)
if err != nil {
return err
}
aFile := filepath.Join(dir, "actual.yaml")
err = os.WriteFile(aFile, []byte(actualString), 0o644)
if err != nil {
return err
}
realArgs = append(realArgs, eFile, aFile)
c := exec.Command(cmd, realArgs...)
c.Stdout = TestOutput
c.Stderr = TestOutput
return c.Run()
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"cuelang.org/go/cue/cuecontext"
"cuelang.org/go/cue/format"
"cuelang.org/go/encoding/openapi"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
)
type expectedXRDVersion struct {
Name string `json:"name"`
Schema struct {
OpenAPIV3Schema any `json:"openAPIV3Schema"`
} `json:"schema"`
}
type expectedXRD struct {
Spec struct {
Names struct {
Kind string `json:"kind"`
} `json:"names"`
Versions []*expectedXRDVersion `json:"versions"`
} `json:"spec"`
}
type openapiSchema struct {
Info struct {
Title string `json:"title"`
Description string `json:"description"`
} `json:"info"`
Components struct {
Schemas map[string]any `json:"schemas"`
} `json:"components"`
}
// ExtractSchema extracts an openAPI schema from a CRD/ XRD-like object and
// returns the equivalent cue types.
func ExtractSchema(reader io.Reader, pkg string) ([]byte, error) {
b, err := io.ReadAll(reader)
if err != nil {
return nil, errors.Wrap(err, "read bytes")
}
var xrd expectedXRD
err = yaml.Unmarshal(b, &xrd)
if err != nil {
return nil, errors.Wrap(err, "unmarshal crd/xrd")
}
var pkgDecl string
if pkg != "" {
pkgDecl = fmt.Sprintf("package %s\n\n", pkg)
}
out := bytes.NewBufferString(pkgDecl)
for _, version := range xrd.Spec.Versions {
var cueSchema openapiSchema
cueSchema.Info.Title = "generated cue schema"
v := version.Name
if len(v) > 0 {
v = strings.ToUpper(v[:1]) + v[1:]
}
cueSchema.Components.Schemas = map[string]any{
fmt.Sprintf("%s%s", xrd.Spec.Names.Kind, v): version.Schema.OpenAPIV3Schema,
}
jsonBytes, err := json.MarshalIndent(cueSchema, "", " ")
if err != nil {
return nil, errors.Wrap(err, "marshal schema")
}
runtime := cuecontext.New()
val := runtime.CompileBytes(jsonBytes)
if val.Err() != nil {
return nil, errors.Wrap(val.Err(), "compile generated schema object")
}
astFile, err := openapi.Extract(val, &openapi.Config{SelfContained: true})
if err != nil {
return nil, errors.Wrap(err, "extract openAPI schema")
}
b, err = format.Node(astFile, format.Simplify())
if err != nil {
return nil, errors.Wrap(val.Err(), "format source")
}
_, err = out.Write(b)
if err != nil {
return nil, errors.Wrap(err, "write bytes")
}
_, _ = out.Write([]byte("\n"))
}
return out.Bytes(), nil
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"bytes"
"encoding/json"
"fmt"
"cuelang.org/go/cue/format"
"cuelang.org/go/encoding/openapi"
"github.com/pkg/errors"
)
// GenerateOpenAPISchema generates an openapi schema using the contents of the supplied directory
// and returns it prefixing it with a package declaration for the supplied package.
func GenerateOpenAPISchema(dir, pkg string) ([]byte, error) {
iv, err := loadSingleInstanceValue(dir, nil)
if err != nil {
return nil, err
}
b, err := openapi.Gen(iv.value, &openapi.Config{
Info: map[string]any{
"title": "XRD schemas",
"description": fmt.Sprintf("Generated by %s, DO NOT EDIT", generator),
"version": "0.1.0",
},
SelfContained: true,
ExpandReferences: true,
})
if err != nil {
return nil, errors.Wrap(err, "generate openAPI")
}
var out bytes.Buffer
if pkg != "" {
_, _ = out.Write([]byte(fmt.Sprintf("package %s\n\n", pkg)))
}
err = json.Indent(&out, b, "", " ")
if err != nil {
return nil, errors.Wrap(err, "write output")
}
ret, err := format.Source(out.Bytes(), format.Simplify())
if err != nil {
return nil, errors.Wrap(err, "cue format")
}
return ret, nil
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"fmt"
"cuelang.org/go/cue"
"cuelang.org/go/cue/build"
"cuelang.org/go/cue/cuecontext"
"cuelang.org/go/cue/load"
"github.com/pkg/errors"
)
type instanceValue struct {
instance *build.Instance
value cue.Value
}
// loadSingleInstanceValue loads the package at the specific directory and returns the associated instance and value.
func loadSingleInstanceValue(dir string, cfg *load.Config) (*instanceValue, error) {
configs := load.Instances([]string{dir}, cfg)
if len(configs) != 1 {
return nil, fmt.Errorf("expected exactly one instance, got %d", len(configs))
}
config := configs[0]
if config.Err != nil {
return nil, errors.Wrap(config.Err, "load instance")
}
runtime := cuecontext.New()
val := runtime.BuildInstance(config)
if val.Err() != nil {
return nil, errors.Wrap(val.Err(), "build instance")
}
return &instanceValue{instance: config, value: val}, nil
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"bytes"
"encoding/json"
"fmt"
"cuelang.org/go/cmd/cue/cmd"
"github.com/pkg/errors"
)
const generator = "xp-function-cue"
// runDefCommand runs the equivalent of `cue def` using the cue command library instead of forking a cue process.
// The semantics of doing this are iffy at best - it is generally unsafe to do this more than once in a single process
// because of the state of cobra flags that is not cleared correctly across invocations.
func runDefCommand(dir string) (_ []byte, finalErr error) {
command, err := cmd.New([]string{"def", "--inline-imports", dir})
if err != nil {
return nil, errors.Wrap(err, "create def command")
}
var buf bytes.Buffer
command.SetOut(&buf)
defer func() {
if r := recover(); r != nil {
finalErr = fmt.Errorf("cue def: %s", buf.String())
}
}()
if err := command.Execute(); err != nil {
return nil, errors.Wrap(err, "execute def command")
}
return buf.Bytes(), nil
}
type OutputFormat string
const (
FormatRaw OutputFormat = "raw"
FormatCue OutputFormat = "cue"
)
type PackageScriptOpts struct {
Format OutputFormat // output format
OutputPackage string // package to declare for cue output
VarName string // variable name to use for cue output, default _script
}
// PackageScript generates self-contained definitions from the supplied directory and returns cue code for an object
// with a _script property that contains the code as a string. The returned object has a package declaration
// for the package supplied.
func PackageScript(dir string, opts PackageScriptOpts) (_ []byte, finalErr error) {
defs, err := runDefCommand(dir)
if err != nil {
return nil, err
}
if opts.Format == FormatRaw {
return defs, nil
}
header := ""
if opts.OutputPackage != "" {
header = "package " + opts.OutputPackage
}
jsonString, _ := json.Marshal(string(defs))
varName := opts.VarName
if varName == "" {
varName = "_script"
}
outputCode := fmt.Sprintf(`%s
// generated by %s, DO NOT EDIT
%s: %s
`, header, generator, varName, jsonString)
return []byte(outputCode), nil
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cuetools
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"cuelang.org/go/cue"
"cuelang.org/go/cue/load"
"cuelang.org/go/cue/parser"
fnv1beta1 "github.com/crossplane/function-sdk-go/proto/v1beta1"
"github.com/elastic/crossplane-function-cue/internal/fn"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
var TestOutput io.Writer = os.Stderr
type TestConfig struct {
Package string
TestPackage string
TestTags []string
RequestVar string
ResponseVar string
LegacyDesiredOnlyResponse bool
Debug bool
}
type Tester struct {
config *TestConfig
}
// NewTester returns a test for the supplied configuration. It auto-discovers tags from test file names if needed.
func NewTester(config TestConfig) (*Tester, error) {
ret := &Tester{config: &config}
if err := ret.init(); err != nil {
return nil, err
}
return ret, nil
}
func (t *Tester) init() error {
if t.config.Package == "" {
return fmt.Errorf("package was not specified")
}
if t.config.TestPackage == "" {
t.config.TestPackage = fmt.Sprintf("%s/%s", strings.TrimSuffix(t.config.Package, "/"), "tests")
}
// discover test tags from filenames
if len(t.config.TestTags) == 0 {
err := t.discoverTags()
if err != nil {
return errors.Wrap(err, "discover tags")
}
sort.Strings(t.config.TestTags)
}
if len(t.config.TestTags) == 0 {
return fmt.Errorf("no test tags found even after auto-discovery")
}
_, _ = fmt.Fprintf(TestOutput, "running test tags: %s\n", strings.Join(t.config.TestTags, ", "))
return nil
}
func (t *Tester) discoverTags() error {
pattern := fmt.Sprintf("%s/*.cue", strings.TrimSuffix(t.config.TestPackage, "/"))
matches, err := filepath.Glob(pattern)
if err != nil {
return errors.Wrapf(err, "glob %s", pattern)
}
for _, name := range matches {
base := filepath.Base(name)
pos := strings.Index(base, ".")
tag := base
if pos > 0 {
tag = base[:pos]
}
t.config.TestTags = append(t.config.TestTags, tag)
}
return nil
}
func evalPackage(pkg string, tag string, expr string, into proto.Message) (finalErr error) {
iv, err := loadSingleInstanceValue(pkg, &load.Config{Tags: []string{tag}})
if err != nil {
return err
}
val := iv.value
if expr != "" {
e, err := parser.ParseExpr("expression", expr)
if err != nil {
return errors.Wrap(err, "parse expression")
}
val = iv.value.Context().BuildExpr(e,
cue.Scope(iv.value),
cue.ImportPath(iv.instance.ID()),
cue.InferBuiltins(true),
)
if val.Err() != nil {
return errors.Wrap(val.Err(), "build expression")
}
}
b, err := val.MarshalJSON()
if err != nil {
return errors.Wrap(err, "marshal json")
}
err = protojson.Unmarshal(b, into)
if err != nil {
return errors.Wrap(err, "proto unmarshal")
}
return nil
}
// Run runs all tests and returns a consolidated error.
func (t *Tester) Run() error {
var errs []error
function, err := fn.New(fn.Options{Debug: t.config.Debug})
if err != nil {
return errors.Wrap(err, "create function executor")
}
codeBytes, err := runDefCommand(t.config.Package)
if err != nil {
return errors.Wrap(err, "create package script")
}
for _, tag := range t.config.TestTags {
err := t.runTest(function, codeBytes, tag)
if err != nil {
errs = append(errs, errors.Wrapf(err, "test %s", tag))
}
}
if len(errs) > 0 {
return fmt.Errorf("%d of %d tests had errors", len(errs), len(t.config.TestTags))
}
return nil
}
func canonicalYAML(in proto.Message) (string, error) {
b, err := protojson.Marshal(in)
if err != nil {
return "", err
}
var ret any
err = json.Unmarshal(b, &ret)
if err != nil {
return "", err
}
b, err = yaml.Marshal(ret)
if err != nil {
return "", err
}
return string(b), nil
}
func (t *Tester) runTest(f *fn.Cue, codeBytes []byte, tag string) (finalErr error) {
_, _ = fmt.Fprintf(TestOutput, "> run test %q\n", tag)
defer func() {
if finalErr != nil {
_, _ = fmt.Fprintf(TestOutput, "FAIL %s: %s\n", tag, finalErr)
} else {
_, _ = fmt.Fprintf(TestOutput, "PASS %s\n", tag)
}
}()
requestVar := "request"
if t.config.RequestVar != "" {
requestVar = t.config.RequestVar
}
var responseVar string
switch t.config.ResponseVar {
case ".":
responseVar = ""
case "":
responseVar = "response"
default:
responseVar = t.config.ResponseVar
}
var expected fnv1beta1.RunFunctionResponse
err := evalPackage(t.config.TestPackage, tag, responseVar, &expected)
if err != nil {
return errors.Wrap(err, "evaluate expected")
}
var req fnv1beta1.RunFunctionRequest
err = evalPackage(t.config.TestPackage, tag, requestVar, &req)
if err != nil {
return errors.Wrap(err, "evaluate request")
}
actual, err := f.Eval(&req, string(codeBytes), fn.EvalOptions{
RequestVar: requestVar,
ResponseVar: responseVar,
DesiredOnlyResponse: t.config.LegacyDesiredOnlyResponse,
Debug: fn.DebugOptions{Enabled: t.config.Debug},
})
if err != nil {
return errors.Wrap(err, "evaluate package with test request")
}
expectedString, err := canonicalYAML(&expected)
if err != nil {
return errors.Wrap(err, "serialize expected")
}
actualString, err := canonicalYAML(actual)
if err != nil {
return errors.Wrap(err, "serialize actual")
}
if expectedString == actualString {
return nil
}
err = printDiffs(expectedString, actualString)
if err != nil {
_, _ = fmt.Fprintln(TestOutput, "error in running diff:", err)
}
return fmt.Errorf("expected did not match actual")
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fn
import (
"encoding/json"
"fmt"
"cuelang.org/go/cue/format"
)
const connectionDetailsKey = "connectionDetails"
// start debugging routines
// noise that people typically wouldn't want to see when looking at inputs.
var systemAttributes = []struct{ parent, name string }{
{"annotations", "kubectl.kubernetes.io/last-applied-configuration"},
{"metadata", "managedFields"},
{"metadata", "creationTimestamp"},
{"metadata", "generation"},
{"metadata", "resourceVersion"},
{"metadata", "uid"},
}
var systemAttrsLookup map[string]map[string]bool
// init reformulates system attributes as a map for easier lookup.
func init() {
systemAttrsLookup = map[string]map[string]bool{}
for _, attr := range systemAttributes {
attrMap, ok := systemAttrsLookup[attr.parent]
if !ok {
attrMap = map[string]bool{}
systemAttrsLookup[attr.parent] = attrMap
}
attrMap[attr.name] = true
}
}
// walkDelete performs a recursive walk on the supplied object to remove system generated
// attributes.
func walkDelete(input any, parent string) {
switch input := input.(type) {
case []any:
for _, v := range input {
walkDelete(v, parent)
}
case map[string]any:
if parent == connectionDetailsKey {
for k := range input {
input[k] = []byte("<redacted>")
}
}
attrMap := systemAttrsLookup[parent]
for k, v := range input {
if attrMap != nil && attrMap[k] {
delete(input, k)
continue
}
walkDelete(v, k)
}
}
}
func (f *Cue) reserialize(jsonBytes []byte, raw bool) ([]byte, error) {
var input any
err := json.Unmarshal(jsonBytes, &input)
if err != nil {
f.log.Info(fmt.Sprintf("JSON unmarshal error: %v", err))
return jsonBytes, err
}
if !raw {
walkDelete(input, "")
}
b, err := json.MarshalIndent(input, "", " ")
if err != nil {
f.log.Info(fmt.Sprintf("JSON marshal error: %v", err))
return jsonBytes, err
}
return b, nil
}
// getDebugString modifies the supplied JSON bytes to remove k8s and crossplane generated metadata
// and returns its serialized form as a formatted cue object for a better user experience.
// In case of any errors, it returns the input bytes as a string.
func (f *Cue) getDebugString(jsonBytes []byte, raw bool) string {
var err error
jsonBytes, err = f.reserialize(jsonBytes, raw)
if err != nil {
return string(jsonBytes)
}
out, err := format.Source(jsonBytes, format.Simplify(), format.TabIndent(false), format.UseSpaces(2))
if err != nil {
f.log.Info(fmt.Sprintf("cue formatting error: %v", err))
return string(jsonBytes)
}
return string(out)
}
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fn
import (
"context"
"fmt"
"log"
"time"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"cuelang.org/go/cue/parser"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/function-sdk-go"
fnv1beta1 "github.com/crossplane/function-sdk-go/proto/v1beta1"
"github.com/crossplane/function-sdk-go/request"
"github.com/crossplane/function-sdk-go/response"
input "github.com/elastic/crossplane-function-cue/pkg/input/v1beta1"
"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
)
const debugAnnotation = "crossplane-function-cue/debug"
// Options are options for the cue runner.
type Options struct {
Logger logging.Logger
Debug bool
}
// Cue runs cue scripts that adhere to a specific interface.
type Cue struct {
fnv1beta1.UnimplementedFunctionRunnerServiceServer
log logging.Logger
debug bool
}
// New creates a cue runner.
func New(opts Options) (*Cue, error) {
if opts.Logger == nil {
var err error
opts.Logger, err = function.NewLogger(opts.Debug)
if err != nil {
return nil, err
}
}
return &Cue{
log: opts.Logger,
debug: opts.Debug,
}, nil
}
// DebugOptions are per-eval debug options.
type DebugOptions struct {
Enabled bool // enable input/ output debugging
Raw bool // do not remove any "noise" attributes in the input object
Script bool // render the final script as a debug output
}
type EvalOptions struct {
RequestVar string
ResponseVar string
DesiredOnlyResponse bool
Debug DebugOptions
}
// Eval evaluates the supplied script with an additional script that includes the supplied request and returns the
// response.
func (f *Cue) Eval(in *fnv1beta1.RunFunctionRequest, script string, opts EvalOptions) (*fnv1beta1.RunFunctionResponse, error) {
// input request only contains properties as documented in the interface, not the whole object
req := &fnv1beta1.RunFunctionRequest{
Observed: in.GetObserved(),
Desired: in.GetDesired(),
Context: in.GetContext(),
}
// extract request as object
reqBytes, err := protojson.MarshalOptions{Indent: " "}.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "proto json marshal")
}
preamble := fmt.Sprintf("%s: ", opts.RequestVar)
if opts.Debug.Enabled {
log.Printf("[request:begin]\n%s %s\n[request:end]\n", preamble, f.getDebugString(reqBytes, opts.Debug.Raw))
}
finalScript := fmt.Sprintf("%s\n%s %s\n", script, preamble, reqBytes)
if opts.Debug.Script {
log.Printf("[script:begin]\n%s\n[script:end]\n", finalScript)
}
runtime := cuecontext.New()
val := runtime.CompileBytes([]byte(finalScript))
if val.Err() != nil {
return nil, errors.Wrap(val.Err(), "compile cue code")
}
if opts.ResponseVar != "" {
e, err := parser.ParseExpr("expression", opts.ResponseVar)
if err != nil {
return nil, errors.Wrap(err, "parse response expression")
}
val = val.Context().BuildExpr(e,
cue.Scope(val),
cue.InferBuiltins(true),
)
if val.Err() != nil {
return nil, errors.Wrap(val.Err(), "build response expression")
}
}
resBytes, err := val.MarshalJSON() // this can fail if value is not concrete
if err != nil {
return nil, errors.Wrap(err, "marshal cue output")
}
if opts.Debug.Enabled {
log.Printf("[response:begin]\n%s\n[response:end]\n", f.getDebugString(resBytes, opts.Debug.Raw))
}
var ret fnv1beta1.RunFunctionResponse
if opts.DesiredOnlyResponse {
var state fnv1beta1.State
err = protojson.Unmarshal(resBytes, &state)
if err == nil {
ret.Desired = &state
}
} else {
err = protojson.Unmarshal(resBytes, &ret)
}
if err != nil {
return nil, errors.Wrap(err, "unmarshal cue output using proto json")
}
return &ret, nil
}
// RunFunction runs the function. It expects a single script that is complete, except for a request
// variable that the function runner supplies.
func (f *Cue) RunFunction(_ context.Context, req *fnv1beta1.RunFunctionRequest) (outRes *fnv1beta1.RunFunctionResponse, finalErr error) {
// setup response with desired state set up upstream functions
res := response.To(req, response.DefaultTTL)
logger := f.log
// automatically handle errors and response logging
defer func() {
if finalErr == nil {
logger.Info("cue module executed successfully")
response.Normal(outRes, "cue module executed successfully")
return
}
logger.Info(finalErr.Error())
response.Fatal(res, finalErr)
outRes = res
}()
// setup logging and debugging
oxr, err := request.GetObservedCompositeResource(req)
if err != nil {
return nil, errors.Wrap(err, "get observed composite")
}
tag := req.GetMeta().GetTag()
if tag != "" {
logger = f.log.WithValues("tag", tag)
}
logger = logger.WithValues(
"xr-version", oxr.Resource.GetAPIVersion(),
"xr-kind", oxr.Resource.GetKind(),
"xr-name", oxr.Resource.GetName(),
)
logger.Info("Running Function")
debugThis := false
annotations := oxr.Resource.GetAnnotations()
if annotations != nil && annotations[debugAnnotation] == "true" {
debugThis = true
}
// get inputs
in := &input.CueFunctionParams{}
if err := request.GetInput(req, in); err != nil {
return nil, errors.Wrap(err, "unable to get input")
}
if in.Spec.Script == "" {
return nil, fmt.Errorf("input script was not specified")
}
if in.Spec.DebugNew {
if len(req.GetObserved().GetResources()) == 0 {
debugThis = true
}
}
if in.Spec.TTL != "" {
d, err := time.ParseDuration(in.Spec.TTL)
if err != nil {
logger.Info(fmt.Sprintf("invalid TTL: %s, %v", in.Spec.TTL, err))
} else {
res.GetMeta().Ttl = durationpb.New(d)
}
}
// set up the request and response variables
requestVar := "#request"
if in.Spec.RequestVar != "" {
requestVar = in.Spec.RequestVar
}
var responseVar string
switch in.Spec.ResponseVar {
case ".":
responseVar = ""
case "":
responseVar = "response"
default:
responseVar = in.Spec.ResponseVar
}
state, err := f.Eval(req, in.Spec.Script, EvalOptions{
RequestVar: requestVar,
ResponseVar: responseVar,
DesiredOnlyResponse: in.Spec.LegacyDesiredOnlyResponse,
Debug: DebugOptions{
Enabled: f.debug || in.Spec.Debug || debugThis,
Raw: in.Spec.DebugRaw,
Script: in.Spec.DebugScript,
},
})
if err != nil {
return res, errors.Wrap(err, "eval script")
}
return f.mergeResponse(res, state)
}
func (f *Cue) mergeResponse(res *fnv1beta1.RunFunctionResponse, cueResponse *fnv1beta1.RunFunctionResponse) (*fnv1beta1.RunFunctionResponse, error) {
// selectively add returned resources without deleting any previous desired state
if res.Desired == nil {
res.Desired = &fnv1beta1.State{}
}
if res.Desired.Resources == nil {
res.Desired.Resources = map[string]*fnv1beta1.Resource{}
}
// only set desired composite if the cue script actually returns it
// TODO: maybe use fieldpath.Pave to only extract status
if cueResponse.Desired.GetComposite() != nil {
res.Desired.Composite = cueResponse.Desired.GetComposite()
}
// set desired resources from cue output
for k, v := range cueResponse.Desired.GetResources() {
res.Desired.Resources[k] = v
}
// merge the context if cueResponse has something in it
if cueResponse.Context != nil {
ctxMap := map[string]interface{}{}
// set up base map, if found
if res.Context != nil {
ctxMap = res.Context.AsMap()
}
// merge values from cueResponse
for k, v := range cueResponse.Context.AsMap() {
ctxMap[k] = v
}
s, err := structpb.NewStruct(ctxMap)
if err != nil {
return nil, errors.Wrap(err, "set response context")
}
res.Context = s
}
// TODO: allow the cue layer to set warnings in cueResponse?
return res, nil
}