brpc

package module
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2025 License: BSD-3-Clause Imports: 19 Imported by: 0

README

bRPC

Overview

brpc is a framework that enables servers to initiate Remote Procedure Calls (RPCs) directly on clients, flipping the traditional client-to-server communication model.

Key Features
  • Server-initiated RPCs: Enables bidirectional communication where servers can initiate RPC calls directly on clients.
  • Centralized Communication Hub: Provides a single central service (socket) for managing interactions with various client extensions (plugins).
  • Extension Independence: The central service is designed to be independent of individual plugins, enabling seamless integration of new extensions or modifications to existing ones without needing server-side changes. This maintains flexibility and reduces complexity.
  • Single Port Operation: The entire framework requires only one port for managing multiple extensions, significantly reducing the risk of port congestion and simplifying network configuration.
Technical Approach

To ensure independence between the central service and plugins, JSON is used as the primary data protocol, while GOB handles framework-level communication. This architecture allows for lightweight, extensible interactions with clients and easy data serialization across different extensions.

Installation

brpc can be installed like any other Go library through go get:

  go get github.com/easysy/brpc@latest

Getting Started

Socket (example)
package main

import (
	"fmt"
	"net"
	"time"

	"github.com/easysy/brpc"
)

func main() {
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		panic(err)
	}

	s := new(brpc.Socket)
	s.Serve(lis)
	s.RegisterCallback(func(info *brpc.PluginInfo, graceful bool) {
		fmt.Printf("Callback: plugin %s disconnected, gracefully: '%v'\n", info.Name, graceful)
	})

	// Read async messages
	go func() {
		for {
			async, e := s.Async()
			if e != nil {
				return
			}
			fmt.Println("Async:", async)
		}
	}()

	pn := "your_plugin_name"

	// Wait for your plugin to be connected
	if !s.WaitFor(pn, time.Second*30) {
		panic(pn + "not connected before timeout")
	}

	var resp any
	if resp, err = s.Call("", pn, "ExampleMethod", nil); err != nil {
		panic(err)
	}

	fmt.Println("Response:", resp)

	// Get a list of connected plugins
	fmt.Println("List:", s.Connected(false))

	// If you need, you can stop any plugin
	s.Unplug("1", pn)

	fmt.Println("List:", s.Connected(false))

	// Shutdown the socket
	if err = s.Shutdown("2"); err != nil {
		panic(err)
	}
}

Plugin (example)
package main

import (
	"context"
	"net"
	"time"

	"github.com/easysy/brpc"
)

type Plugin struct {
	hook chan any
}

func (p *Plugin) UseAsyncHook(hook chan any) {
	p.hook = hook
}

func (p *Plugin) ExampleMethod(_ context.Context, _ struct{}) (string, error) {
	p.hook <- "ExampleMethod started"
	time.Sleep(time.Second * 5)
	p.hook <- "ExampleMethod ended"
	return "Message", nil
}

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:8080")
	if err != nil {
		panic(err)
	}

	p := new(brpc.Plugin)
	info := new(brpc.PluginInfo)
	info.Name = "your_plugin_name" // The name must be unique for each plugin
	info.Version = "your_plugin_version"
	if err = p.Start(new(Plugin), info, conn, ""); err != nil {
		panic(err)
	}
}

Documentation

Index

Constants

View Source
const (
	MethodAsync    = "Async"
	MethodShutdown = "Shutdown"
)

Variables

View Source
var (
	ErrShutdown       = errors.New("connection is shut down")
	ErrMethodNotFound = errors.New("method not found")
)

Functions

This section is empty.

Types

type AsyncData

type AsyncData struct {
	Name    string `json:"name,omitempty"`
	Payload any    `json:"payload,omitempty"`
}

AsyncData represents data received asynchronously from a plugin.

type Entity added in v0.0.10

type Entity struct {
	Name      string   `json:"name,omitempty"`
	Type      string   `json:"type,omitempty"`
	Mandatory bool     `json:"mandatory,omitempty"`
	Fields    []Entity `json:"fields,omitempty"`
}

func TypeDescription added in v0.0.10

func TypeDescription(v any) *Entity

func (*Entity) DeepCopy added in v0.0.11

func (e *Entity) DeepCopy() *Entity

type Envelope

type Envelope struct {
	Seq     uint64
	Trace   string
	Method  string
	Error   string
	Payload []byte
}

Envelope represents a message structure used as an RPC call/return. It is used internally.

type Function added in v0.0.10

type Function struct {
	Name   string  `json:"name,omitempty"`
	Input  *Entity `json:"input,omitempty"`
	Output *Entity `json:"output,omitempty"`
}

func (*Function) DeepCopy added in v0.0.11

func (f *Function) DeepCopy() Function

type Options added in v0.0.12

type Options interface {
	// contains filtered or unexported methods
}

func WithKeySequencer added in v0.0.12

func WithKeySequencer(fn func(name string) string, attempts uint) Options

WithKeySequencer returns an option that applies a function to modify repeated plugin names, enabling multiple connections with the same base name by generating unique variations.

The `fn` parameter is used to transform the name when a conflict is detected, and `attempts` defines the maximum number of times the function will be applied before failing to register the plugin.

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin represents a plugin instance that handles communication via a connection. It manages a map of registered methods and uses reflection to invoke them based on incoming requests.

func (*Plugin) Start

func (p *Plugin) Start(v any, info *PluginInfo, conn io.ReadWriteCloser, ctxKey any) error

Start registers the provided plugin receiver uses reflection to inspect the receiver's methods and ensure that they follow the expected signature. It checks for methods that look schematically like:

func (t *T) MethodName(ctx context.Context, in T1) (out T2, err error)

and / or

func (t *T) UseAsyncHook(hook chan any)

where T1 and T2 are valid exported (or builtin) types. T1 must not be a channel, function or non-empty interface type. Similarly, T2 must not be a channel or function type.

Methods that do not match the required signatures are ignored.

If the plugin has at least one valid method it establishes a connection, sends a handshake containing plugin information to the socket and enters the listener for incoming requests.

If no valid methods are found, it returns an error.

If the plugin uses the async hook, a goroutine is started to handle hook-related values.

type PluginInfo

type PluginInfo struct {
	Name      string              `json:"name,omitempty"`
	Version   string              `json:"version,omitempty"`
	Functions map[string]Function `json:"functions,omitempty"`
}

PluginInfo holds metadata about a plugin.

func (*PluginInfo) DeepCopy added in v0.0.11

func (p *PluginInfo) DeepCopy(full bool) *PluginInfo

DeepCopy creates a copy of the PluginInfo instance. If `full` is true, it performs a deep copy of all Functions; otherwise, only Name and Version are copied.

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

Socket represents a server that manages plugin connections and communications.

func (*Socket) Async

func (s *Socket) Async() (*AsyncData, error)

Async retrieves the next message from the async channel. It returns connection closed error when socket is stopped.

func (*Socket) Broadcast

func (s *Socket) Broadcast(id, method string, payload any) (map[string]any, map[string]error)

Broadcast invokes the named method with the provided payload on all running plugins, waits for them to complete, and returns their return and error status.

func (*Socket) Call

func (s *Socket) Call(id, name, method string, payload any) (any, error)

Call invokes the named method of the specified plugin with the provided payload, waits for it to complete, and returns its return and error status.

func (*Socket) Connected

func (s *Socket) Connected(full bool) map[string]*PluginInfo

Connected returns a list of connected plugins. If `full` is true, all Functions are included; otherwise, only Name and Version are returned.

func (*Socket) PluginInfo added in v0.0.11

func (s *Socket) PluginInfo(name string) *PluginInfo

PluginInfo returns a full plugin info based on its name.

func (*Socket) RegisterCallback added in v0.0.14

func (s *Socket) RegisterCallback(callback func(info *PluginInfo, graceful bool))

RegisterCallback registers a callback function that will be called whenever a plugin is disconnected.

callback: a function that receives plugin information and a graceful flag.

  • info: details about the disconnected plugin;
  • graceful: indicates whether the plugin was disconnected normally (true) via a stop request or unexpectedly (false) due to a crash or termination.

func (*Socket) Serve

func (s *Socket) Serve(listener net.Listener, opts ...Options)

Serve initializes the Socket and starts listen for incoming connections.

func (*Socket) Shutdown

func (s *Socket) Shutdown(id string) error

Shutdown gracefully stops all running plugins and stops the Socket. An optional `id` can be provided to trace the request.

func (*Socket) Unplug

func (s *Socket) Unplug(id string, name string)

Unplug sends a stop request to a plugin based on its name. An optional `id` can be provided to trace the request.

func (*Socket) WaitFor added in v0.0.8

func (s *Socket) WaitFor(name string, timeout time.Duration) bool

WaitFor waits for the specified plugin to connect within a given timeout duration. If the plugin connects within the timeout, it returns `true`. If the timeout expires before the plugin connects, it returns `false`.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL