Documentation
¶
Overview ¶
Package streams implements a client library for the Data Streams API providing a domain oriented abstraction for both report Streams and point in time retrieval with fault tolerant capabilities.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
)
Functions ¶
Types ¶
type Client ¶
type Client interface {
// GetFeeds lists all feeds available to this client.
GetFeeds(ctx context.Context) (r []*feed.Feed, err error)
// GetLatestReport fetches the latest report available for the given feedID.
GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)
// GetReports fetches the reports for the given feedIDs and timestamp.
GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)
// GetReportPage paginates the reports for the given feedID and start timestamp.
GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)
// Stream creates realtime report stream for the given feedIDs.
Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)
// Stream creates realtime report stream for the given feedIDs.
StreamWithStatusCallback(ctx context.Context, feedIDs []feed.ID,
connStatusCallback func(isConnected bool, host string, origin string)) (Stream, error)
}
Client is the data streams client interface.
Example ¶
package main
import (
"context"
"os"
"time"
streams "github.com/smartcontractkit/data-streams-sdk/go"
"github.com/smartcontractkit/data-streams-sdk/go/feed"
streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report"
v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"
)
func main() {
cfg := streams.Config{
ApiKey: "mykey",
ApiSecret: "mysecret",
RestURL: "https://streams.url",
WsURL: "https://streams.url",
Logger: streams.LogPrintf,
}
streamsClient, err := streams.New(cfg)
if err != nil {
streams.LogPrintf("error creating client: %s", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := streamsClient.GetFeeds(ctx)
cancel()
if err != nil {
streams.LogPrintf("error fetching feeds: %s", err)
os.Exit(1)
}
f := availableFeeds[0]
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
report, err := streamsClient.GetLatestReport(ctx, f.FeedID)
cancel()
if err != nil {
streams.LogPrintf("error fetching latest report: %s", err)
os.Exit(1)
}
streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
reports, err := streamsClient.GetReports(
ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, uint64(time.Now().Unix())-3)
cancel()
if err != nil {
streams.LogPrintf("error fetching reports: %s", err)
os.Exit(1)
}
for _, report = range reports {
decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
if err != nil {
streams.LogPrintf("error decoding decoded: %s", err)
os.Exit(1)
}
streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
streams.LogPrintf(
"FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
decoded.Data.FeedID.String(),
decoded.Data.FeedID.Version(),
decoded.Data.Bid.String(),
decoded.Data.Ask.String(),
decoded.Data.BenchmarkPrice.String(),
decoded.Data.LinkFee.String(),
decoded.Data.NativeFee.String(),
decoded.Data.ValidFromTimestamp,
decoded.Data.ExpiresAt,
)
}
}
type Config ¶
type Config struct {
ApiKey string // Client Api key
ApiSecret string // Client Api secret
RestURL string // Rest Api url
WsURL string // Websocket Api url
WsHA bool // Use concurrent connections to multiple Streams servers
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Logger func(format string, a ...any) // Logger function
// InspectHttp intercepts http responses for rest requests.
// The response object must not be modified.
InspectHttpResponse func(*http.Response)
// contains filtered or unexported fields
}
Config specifies the client configuration and dependencies. If specified the Logger function will be used to log informational client activity.
type CtxKey ¶
type CtxKey string
CtxKey type for context values
const ( // CustomHeadersCtxKey is used as key in the context.Context object // to pass in a custom http headers in a http.Header to be used by the client. // Custom header values will overwrite client headers if they have the same key. CustomHeadersCtxKey CtxKey = "CustomHeaders" )
type ReportPage ¶
type ReportPage struct {
Reports []*ReportResponse
NextPageTS uint64
}
ReportPage implements the server pagination response. NextPageTS is the timestamp to be used when requesting the next page.
type ReportResponse ¶
type ReportResponse struct {
FeedID feed.ID `json:"feedID"`
FullReport []byte `json:"fullReport"`
ValidFromTimestamp uint64 `json:"validFromTimestamp"`
ObservationsTimestamp uint64 `json:"observationsTimestamp"`
}
ReportResponse implements the report envelope that contains the full report payload, its FeedID and timestamps. For decoding the Report Payload use report.Decode().
func (*ReportResponse) MarshalJSON ¶
func (r *ReportResponse) MarshalJSON() ([]byte, error)
func (*ReportResponse) String ¶
func (r *ReportResponse) String() (s string)
func (*ReportResponse) UnmarshalJSON ¶
func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats ¶
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
ConfiguredConnections uint64 // Number of configured connections if in HA
ActiveConnections uint64 // Current number of active connections
}
Stats for the Stream
type Stream ¶
type Stream interface {
// Read the next available report on the Stream.
// Read blocks until a report is received, the context is canceled or
// all underlying connections are in a error state.
Read(context.Context) (*ReportResponse, error)
// Stats return basic stats about the Stream.
Stats() Stats
// Close the Stream. Is the caller responsibility to call close when
// the stream is no longer needed.
Close() error
}
Stream represents a realtime report stream. Safe for concurrent usage.
The Stream will maintain at least 2 concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.
Example ¶
package main
import (
"context"
"os"
"time"
streams "github.com/smartcontractkit/data-streams-sdk/go"
"github.com/smartcontractkit/data-streams-sdk/go/feed"
streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report"
v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"
)
func main() {
cfg := streams.Config{
ApiKey: "mykey",
ApiSecret: "mysecret",
RestURL: "https://streams.url",
WsURL: "https://streams.url",
Logger: streams.LogPrintf,
}
client, err := streams.New(cfg)
if err != nil {
streams.LogPrintf("error creating client: %s", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := client.GetFeeds(ctx)
cancel()
if err != nil {
streams.LogPrintf("error fetching feeds: %s", err)
os.Exit(1)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
stream, err := client.StreamWithStatusCallback(
ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID},
func(isConnected bool, host string, origin string) {
streams.LogPrintf("Host: %s, Origin: %s, isConnected: %s", host, origin, isConnected)
})
cancel()
if err != nil {
streams.LogPrintf("error subscribing: %s", err)
os.Exit(1)
}
defer stream.Close()
for stream.Stats().Accepted < 100 {
report, err := stream.Read(context.Background())
if err != nil {
streams.LogPrintf("error reading from stream: %s", err)
os.Exit(1)
}
streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
if err != nil {
streams.LogPrintf("error decoding decoded: %s", err)
os.Exit(1)
}
streams.LogPrintf(
"FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
decoded.Data.FeedID.String(),
decoded.Data.FeedID.Version(),
decoded.Data.Bid.String(),
decoded.Data.Ask.String(),
decoded.Data.BenchmarkPrice.String(),
decoded.Data.LinkFee.String(),
decoded.Data.NativeFee.String(),
decoded.Data.ValidFromTimestamp,
decoded.Data.ExpiresAt,
)
streams.LogPrintf("stream stats: %s", stream.Stats().String())
}
}
Directories
¶
| Path | Synopsis |
|---|---|
|
Package report implements the Streams feed and feed schema version types.
|
Package report implements the Streams feed and feed schema version types. |
|
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types.
|
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types. |