Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 78 additions & 73 deletions cmd/stream.go → cmd/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,42 @@ package cmd

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
internalHTTP "pb/pkg/http"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/spf13/cobra"
)

// StreamStatsData is the data structure for stream stats
type StreamStatsData struct {
// DatasetStatsData is the data structure for dataset stats
type DatasetStatsData struct {
Ingestion struct {
Count int `json:"count"`
Format string `json:"format"`
Size string `json:"size"`
Size uint64 `json:"size"`
} `json:"ingestion"`
Storage struct {
Format string `json:"format"`
Size string `json:"size"`
Size uint64 `json:"size"`
} `json:"storage"`
Stream string `json:"stream"`
Time time.Time `json:"time"`
}

type StreamListItem struct {
type DatasetListItem struct {
Name string
}

func (item *StreamListItem) Render() string {
func (item *DatasetListItem) Render() string {
render := StandardStyle.Render(item.Name)
return ItemOuter.Render(render)
}

// StreamRetentionData is the data structure for stream retention
type StreamRetentionData []struct {
// DatasetRetentionData is the data structure for dataset retention
type DatasetRetentionData []struct {
Description string `json:"description"`
Action string `json:"action"`
Duration string `json:"duration"`
Expand Down Expand Up @@ -105,11 +102,11 @@ type RuleConfig struct {
Repeats int `json:"repeats"`
}

// AddStreamCmd is the parent command for stream
var AddStreamCmd = &cobra.Command{
Use: "add stream-name",
Example: " pb stream add backend_logs",
Short: "Create a new stream",
// AddDatasetCmd is the parent command for dataset
var AddDatasetCmd = &cobra.Command{
Use: "add dataset-name",
Example: " pb dataset add backend_logs",
Short: "Create a new dataset",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
Expand Down Expand Up @@ -139,7 +136,7 @@ var AddStreamCmd = &cobra.Command{
cmd.Annotations["executionTime"] = time.Since(startTime).String()

if resp.StatusCode == 200 {
fmt.Printf("Created stream %s\n", StyleBold.Render(name))
fmt.Printf("Created dataset %s\n", StyleBold.Render(name))
} else {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -155,11 +152,11 @@ var AddStreamCmd = &cobra.Command{
},
}

// StatStreamCmd is the stat command for stream
var StatStreamCmd = &cobra.Command{
Use: "info stream-name",
Example: " pb stream info backend_logs",
Short: "Get statistics for a stream",
// StatDatasetCmd is the stat command for dataset
var StatDatasetCmd = &cobra.Command{
Use: "info dataset-name",
Example: " pb dataset info backend_logs",
Short: "Get statistics for a dataset",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
Expand All @@ -181,9 +178,12 @@ var StatStreamCmd = &cobra.Command{
}

ingestionCount := stats.Ingestion.Count
ingestionSize, _ := strconv.Atoi(strings.TrimRight(stats.Ingestion.Size, " Bytes"))
storageSize, _ := strconv.Atoi(strings.TrimRight(stats.Storage.Size, " Bytes"))
compressionRatio := 100 - (float64(storageSize) / float64(ingestionSize) * 100)
ingestionSize := stats.Ingestion.Size
storageSize := stats.Storage.Size
var compressionRatio float64
if ingestionSize > 0 {
compressionRatio = 100 - (float64(storageSize) / float64(ingestionSize) * 100)
}

// Fetch retention data
retention, err := fetchRetention(&client, name)
Expand All @@ -201,8 +201,8 @@ var StatStreamCmd = &cobra.Command{
return err
}

// Fetch stream type
streamType, err := fetchInfo(&client, name)
// Fetch dataset type
datasetType, err := fetchInfo(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
Expand All @@ -220,9 +220,9 @@ var StatStreamCmd = &cobra.Command{
"storage_size": humanize.Bytes(uint64(storageSize)),
"compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio),
},
"retention": retention,
"alerts": alertsData.Alerts,
"stream_type": streamType,
"retention": retention,
"alerts": alertsData.Alerts,
"dataset_type": datasetType,
}

jsonData, err := json.MarshalIndent(data, "", " ")
Expand All @@ -243,7 +243,7 @@ var StatStreamCmd = &cobra.Command{
fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize)))
fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%")
fmt.Printf(" %-18s %s\n", "Stream Type:", streamType)
fmt.Printf(" %-18s %s\n", "Dataset Type:", datasetType)
fmt.Println()

if isRetentionSet {
Expand All @@ -254,7 +254,7 @@ var StatStreamCmd = &cobra.Command{
fmt.Println()
}
} else {
fmt.Println(StyleBold.Render("No retention period set on stream\n"))
fmt.Println(StyleBold.Render("No retention period set on dataset\n"))
}

if isAlertsSet {
Expand All @@ -276,7 +276,7 @@ var StatStreamCmd = &cobra.Command{
fmt.Print("\n\n")
}
} else {
fmt.Println(StyleBold.Render("No alerts set on stream\n"))
fmt.Println(StyleBold.Render("No alerts set on dataset\n"))
}
}

Expand All @@ -285,14 +285,14 @@ var StatStreamCmd = &cobra.Command{
}

func init() {
StatStreamCmd.Flags().StringVarP(&outputFormat, "output", "o", "", "Output format (text|json)")
StatDatasetCmd.Flags().StringVarP(&outputFormat, "output", "o", "", "Output format (text|json)")
}

var RemoveStreamCmd = &cobra.Command{
Use: "remove stream-name",
var RemoveDatasetCmd = &cobra.Command{
Use: "remove dataset-name",
Aliases: []string{"rm"},
Example: " pb stream remove backend_logs",
Short: "Delete a stream",
Example: " pb dataset remove backend_logs",
Short: "Delete a dataset",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
Expand Down Expand Up @@ -322,7 +322,7 @@ var RemoveStreamCmd = &cobra.Command{
cmd.Annotations["executionTime"] = time.Since(startTime).String()

if resp.StatusCode == 200 {
fmt.Printf("Successfully deleted stream %s\n", StyleBold.Render(name))
fmt.Printf("Successfully deleted dataset %s\n", StyleBold.Render(name))
} else {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -338,11 +338,11 @@ var RemoveStreamCmd = &cobra.Command{
},
}

// ListStreamCmd is the list command for streams
var ListStreamCmd = &cobra.Command{
// ListDatasetCmd is the list command for datasets
var ListDatasetCmd = &cobra.Command{
Use: "list",
Example: " pb stream list",
Short: "List all streams",
Example: " pb dataset list",
Short: "List all datasets",
RunE: func(cmd *cobra.Command, _ []string) error {
// Capture start time
startTime := time.Now()
Expand All @@ -366,23 +366,23 @@ var ListStreamCmd = &cobra.Command{
return err
}

var streams []StreamListItem
var datasets []DatasetListItem
if resp.StatusCode == 200 {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
if err := json.Unmarshal(bytes, &streams); err != nil {
if err := json.Unmarshal(bytes, &datasets); err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

for _, stream := range streams {
fmt.Println(stream.Render())
for _, dataset := range datasets {
fmt.Println(dataset.Render())
}
} else {
fmt.Printf("Failed to fetch streams. Status Code: %s\n", resp.Status)
fmt.Printf("Failed to fetch datasets. Status Code: %s\n", resp.Status)
}

return nil
Expand All @@ -391,10 +391,10 @@ var ListStreamCmd = &cobra.Command{

func init() {
// Add the --output flag with default value "text"
ListStreamCmd.Flags().StringP("output", "o", "text", "Output format: 'text' or 'json'")
ListDatasetCmd.Flags().StringP("output", "o", "text", "Output format: 'text' or 'json'")
}

func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsData, err error) {
func fetchStats(client *internalHTTP.HTTPClient, name string) (data DatasetStatsData, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/stats", name), nil)
if err != nil {
return
Expand All @@ -411,17 +411,18 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
switch resp.StatusCode {
case http.StatusOK:
err = json.Unmarshal(bytes, &data)
} else {
body := string(bytes)
body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body)
err = errors.New(body)
case http.StatusNotFound:
// stream exists but has no stats yet (empty stream)
default:
err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes))
}
return
}

func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) {
func fetchRetention(client *internalHTTP.HTTPClient, name string) (data DatasetRetentionData, err error) {
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil)
if err != nil {
return
Expand All @@ -438,12 +439,13 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
switch resp.StatusCode {
case http.StatusOK:
err = json.Unmarshal(bytes, &data)
} else {
body := string(bytes)
body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body)
err = errors.New(body)
case http.StatusNotFound:
// no retention configured
default:
err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes))
}
return
}
Expand All @@ -465,17 +467,18 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
switch resp.StatusCode {
case http.StatusOK:
err = json.Unmarshal(bytes, &data)
} else {
body := string(bytes)
body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body)
err = errors.New(body)
case http.StatusNotFound:
// no alerts configured
default:
err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes))
}
return
}

func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) {
func fetchInfo(client *internalHTTP.HTTPClient, name string) (datasetType string, err error) {
// Create a new HTTP GET request
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil)
if err != nil {
Expand All @@ -496,7 +499,8 @@ func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string,
}

// Check for successful status code
if resp.StatusCode == http.StatusOK {
switch resp.StatusCode {
case http.StatusOK:
// Define a struct to parse the response
var response struct {
StreamType string `json:"stream_type"`
Expand All @@ -509,10 +513,11 @@ func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string,

// Return the extracted stream_type
return response.StreamType, nil
case http.StatusNotFound:
// endpoint not available on this server version or stream has no type info
return "unknown", nil
default:
// Handle non-200 responses
return "", fmt.Errorf("Request Failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, string(bytes))
}

// Handle non-200 responses
body := string(bytes)
errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body)
return "", errors.New(errMsg)
}
14 changes: 7 additions & 7 deletions cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ var GenerateSchemaCmd = &cobra.Command{

var CreateSchemaCmd = &cobra.Command{
Use: "create",
Short: "Create Schema for a Parseable stream",
Example: "pb schema create --stream=my_stream --file=schema.json",
Short: "Create Schema for a Parseable dataset",
Example: "pb schema create --dataset=my_dataset --file=schema.json",
RunE: func(cmd *cobra.Command, _ []string) error {
// Get the stream name from the `--stream` flag
streamName, err := cmd.Flags().GetString("stream")
// Get the dataset name from the `--dataset` flag
streamName, err := cmd.Flags().GetString("dataset")
if err != nil {
return fmt.Errorf(common.Red+"failed to read stream flag: %w"+common.Reset, err)
return fmt.Errorf(common.Red+"failed to read dataset flag: %w"+common.Reset, err)
}

if streamName == "" {
return fmt.Errorf(common.Red + "stream flag is required" + common.Reset)
return fmt.Errorf(common.Red + "dataset flag is required" + common.Reset)
}

// Get the file path from the `--file` flag
Expand Down Expand Up @@ -171,6 +171,6 @@ var CreateSchemaCmd = &cobra.Command{
func init() {
// Add the `--file` flag to the command
GenerateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to generate schema")
CreateSchemaCmd.Flags().StringP("stream", "s", "", "Name of the stream to associate with the schema")
CreateSchemaCmd.Flags().StringP("dataset", "s", "", "Name of the dataset to associate with the schema")
CreateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to create schema")
}
Loading
Loading