This is a Go based library to manage and interact with JetStream.
This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs.
It’s essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical
knowledge of the JetStream internals.
For typical end users we suggest the nats.go package.
Initialization
This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for
all interaction with JetStream.
Multiple Managers can be used in your application each with own timeouts and connection.
This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager
was created as above.
Schema Registry
All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.
The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.
Example Message
To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:
Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api.
Parsing Message Content
JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.
nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
kind, msg, _ := api.ParseMessage(m.Data)
log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit
switch e := event.(type){
case advisory.JetStreamAPIAuditV1:
fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)
}
})
Above we gain full access to all contents of the message in it’s native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:
nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
kind, msg, _ := api.ParseMessage(m.Data)
if kind == "io.nats.unknown_message" {
return // a message without metadata or of a unknown format was received
}
ne, ok := event.(api.Event)
if !ok {
return fmt.Errorf("event %q does not implement the Event interface", kind)
}
err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
if err != nil {
return fmt.Errorf("display failed: %s", err)
}
})
The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.
API Validation
The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and
provide better errors.
type SchemaValidator struct{}
func (v SchemaValidator) ValidateStruct(data any, schemaType string) (ok bool, errs []string) {
s, err := api.Schema(schemaType)
if err != nil {
return false, []string{"unknown schema type %s", schemaType}
}
ls := gojsonschema.NewBytesLoader(s)
ld := gojsonschema.NewGoLoader(data)
result, err := gojsonschema.Validate(ls, ld)
if err != nil {
return false, []string{fmt.Sprintf("validation failed: %s", err)}
}
if result.Valid() {
return true, nil
}
errors := make([]string, len(result.Errors()))
for i, verr := range result.Errors() {
errors[i] = verr.String()
}
return false, errors
}
This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.
This can be used by the Manager to validate all API access.
This library provides a noexprlang build tag that disables expression matching
for Streams and Consumers queries. The purpose of this build tag is to disable
the use of the github.com/expr-lang/expr module that disables go compiler’s dead
code elimination because it uses some types and functions of the reflect package.
Overview
This is a Go based library to manage and interact with JetStream.
This package is the underlying library for the
natsCLI, our Terraform provider, GitHub Actions and Kubernetes CRDs. It’s essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical knowledge of the JetStream internals.For typical end users we suggest the nats.go package.
Initialization
This package is modeled as a
Managerinstance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.Multiple Managers can be used in your application each with own timeouts and connection.
This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.
Schema Registry
All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the
apipackage has a Schema Registry and helpers to discover and interact with these.The Schema Registry can be accessed on the cli in the
nats schemascommand where you can list, search and view schemas and validate data based on schemas.Example Message
To retrieve the Stream State for a specific Stream one accesses the
$JS.API.STREAM.INFO.<stream>API, this will respond with data like below:Here the type of the message is
io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.Message Schemas
Given a message kind one can retrieve the full JSON Schema as bytes:
Once can also retrieve it based on a specific message content:
Several other Schema related helpers exist to search Schemas, fine URLs and more. See the
.
apiParsing Message Content
JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.
Above we gain full access to all contents of the message in it’s native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:
This will produce output like:
The
api.TextCompactFormatis one of a few we support, alsoapi.TextExtendedFormatfor a full multi line format,api.ApplicationCloudEventV1Formatfor CloudEvents v1 format andapi.ApplicationJSONFormatfor JSON.API Validation
The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.
This is a
api.StructValidatorimplementation that uses JSON Schema to do deep validation of the structures sent to JetStream.This can be used by the
Managerto validate all API access.Build tag
This library provides a
noexprlangbuild tag that disables expression matching for Streams and Consumers queries. The purpose of this build tag is to disable the use of thegithub.com/expr-lang/exprmodule that disables go compiler’s dead code elimination because it uses some types and functions of thereflectpackage.