4
0

Compare commits

...

20 Commits
v1.1.0 ... main

Author SHA1 Message Date
dc.To
6ed8f159e7 修改依赖路径 2025-08-27 15:03:16 +08:00
dc.To
c9133edfd9 update dependency library 2025-08-07 10:16:58 +08:00
dc.To
fed21bfb86 update etcd variable 2025-08-07 10:03:10 +08:00
dc.To
981e1e0cf0 update invoke add atomic with context config 2025-08-07 09:52:32 +08:00
dc.To
7db81a88e4 添加配置调用 2025-08-06 17:51:00 +08:00
dc.To
ffdfa9c94f remove debug code 2025-07-10 11:10:17 +08:00
dc.To
fe25f60b5d 更新超时链 2025-07-10 11:02:46 +08:00
dc.To
ccf5df87e7 update timeout invoker 2025-06-27 14:37:24 +08:00
dc.To
c680bd2d5e update grpc name 2025-06-19 20:09:45 +08:00
dc.To
1c5ad5f20b add bss client 2025-04-29 18:27:54 +08:00
dc.To
0110dee434 return server 2025-04-27 15:36:20 +08:00
dc.To
db0378c430 remove Resolve 2025-04-27 15:28:14 +08:00
dc.To
a569f303f3 fix bugs 2025-04-27 15:24:43 +08:00
dc.To
e3ec5b5e99 fix package 2025-04-27 15:22:56 +08:00
dc.To
5a310fa8c6 rename to grpcx 2025-04-27 14:43:08 +08:00
dc.To
6648f2d530 update grpcx 2025-04-27 14:38:09 +08:00
dc.To
7827731423 rename to registry 2025-04-27 14:16:23 +08:00
dc.To
8cdc0939bb rename to config 2025-04-27 14:11:35 +08:00
dc.To
c6cc969dea rename to etcd 2025-04-27 14:11:09 +08:00
dc.To
586662aa65 add etcd config 2025-04-27 14:10:45 +08:00
4 changed files with 209 additions and 16 deletions

70
apapter/atomic.go Normal file
View File

@ -0,0 +1,70 @@
package apapter
import (
"context"
"errors"
"sync/atomic"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/encoding/gbase64"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"google.golang.org/grpc/metadata"
)
/**
* 原子钟接口配置
* @author dc.To
* @version 20250807
*/
type AtomicAdapter struct {
store atomic.Value
adapter string
}
func NewAtomicAdapter(ctx context.Context, adapter string) *AtomicAdapter {
a := &AtomicAdapter{adapter: adapter}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if c := md.Get(a.adapter); len(c) > 0 {
a.SetContent(gjson.New(gbase64.MustDecodeToString(c[0])).Map())
} else {
g.Log("cfg").Error(ctx, "No Metadata from incoming cfg context")
}
}
return a
}
func (a *AtomicAdapter) SetContent(content map[string]interface{}) error {
a.store.Store(content)
return nil
}
func (a *AtomicAdapter) Available(ctx context.Context, resource ...string) (ok bool) {
return a.store.Load() != nil
}
func (a *AtomicAdapter) Get(ctx context.Context, pattern string) (value interface{}, err error) {
raw := a.store.Load()
if raw == nil {
return nil, errors.New("config not loaded")
}
data := raw.(map[string]interface{})
if v, ok := data[pattern]; ok {
return gvar.New(v), nil
}
return gvar.New(nil), nil
}
func (a *AtomicAdapter) Data(ctx context.Context) (data map[string]interface{}, err error) {
raw := a.store.Load()
if raw == nil {
return nil, errors.New("config not loaded")
}
original := raw.(map[string]interface{})
clone := make(map[string]interface{}, len(original))
for k, v := range original {
clone[k] = v
}
return clone, nil
}

50
apapter/context.go Normal file
View File

@ -0,0 +1,50 @@
package apapter
import (
"context"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/encoding/gbase64"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"google.golang.org/grpc/metadata"
)
/**
* @Description: ContextAdapter
* @author dc.To
* @version 20250807
*/
type ContextAdapter struct {
ctx context.Context
adapter string
}
func NewContextAdapter(ctx context.Context, adapter string) *ContextAdapter {
return &ContextAdapter{ctx: ctx, adapter: adapter}
}
func (a *ContextAdapter) ctxWithConfig(ctx context.Context) *gvar.Var {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if c := md.Get(a.adapter); len(c) > 0 {
return gvar.New(gbase64.MustDecodeToString(c[0]))
}
}
g.Log("cfg").Error(ctx, "No Metadata from ["+a.adapter+"] cfg context")
return gvar.New(nil)
}
func (a *ContextAdapter) Available(ctx context.Context, resource ...string) (ok bool) {
return a.ctxWithConfig(ctx) != nil
}
func (a *ContextAdapter) Get(ctx context.Context, pattern string) (value interface{}, err error) {
return gjson.New(a.ctxWithConfig(ctx)).Get(pattern).Val(), nil
}
func (a *ContextAdapter) Data(ctx context.Context) (data map[string]interface{}, err error) {
return gjson.New(a.ctxWithConfig(ctx)).Map(), nil
}

View File

@ -3,11 +3,9 @@ package chain
import ( import (
"context" "context"
"strings" "strings"
"time"
"github.com/gogf/gf/v2/encoding/gbase64" "git.blueorigin.work/blueorigin/invoke/apapter"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcfg"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -18,19 +16,16 @@ import (
* @version 20250409 * @version 20250409
*/ */
func ClientTimeoutInvokerChain(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { func ClientTimeoutInvokerChain(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
return invoker(ctx, method, req, res, cc, opts...) return invoker(ctx, method, req, res, cc, opts...)
} }
/** /**
* 上下文拦截 * 上下文调用
* @param void * @param void
* @author dc.To * @author dc.To
* @version 20250424 * @version 20250424
*/ */
func ClientContextInvokerChain(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { func ClientContextInvokerChain(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, nil, cc, opts...) return invoker(ctx, method, req, nil, cc, opts...)
} }
@ -41,7 +36,6 @@ func ClientContextInvokerChain(ctx context.Context, method string, req, res inte
* @version 20250425 * @version 20250425
*/ */
func ClientContextUnaryChain(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { func ClientContextUnaryChain(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(ctx, req) return handler(ctx, req)
} }
@ -51,20 +45,16 @@ func ClientContextUnaryChain(ctx context.Context, req interface{}, info *grpc.Un
* @version 20250418 * @version 20250418
*/ */
func ClientMetadataUnaryChain(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { func ClientMetadataUnaryChain(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
if ok { if ok {
for k, v := range md { for k, v := range md {
if strings.HasPrefix(k, "x-cfg-") && len(v) > 0 { if strings.HasPrefix(k, "x-cfg-") && len(v) > 0 {
c := strings.Join(v, "") g.Cfg(strings.TrimPrefix(k, "x-cfg-")).SetAdapter(apapter.NewContextAdapter(ctx, k))
if len(c) > 0 {
adapter, err := gcfg.NewAdapterContent(gbase64.MustDecodeToString(v[0]))
if err != nil {
g.Log("unary").Error(ctx, err)
}
g.Cfg(strings.TrimPrefix(k, "x-cfg-")).SetAdapter(adapter)
}
} }
} }
} else {
g.Log("unary").Error(ctx, "No Metadata from incoming cfg context")
} }
return handler(ctx, req) return handler(ctx, req)
} }

83
grpcx/grpcx.go Normal file
View File

@ -0,0 +1,83 @@
package grpcx
import (
"os"
"path/filepath"
"git.blueorigin.work/blueorigin/invoke/chain"
"git.blueorigin.work/blueorigin/protobuf"
"github.com/gogf/gf/contrib/registry/etcd/v2"
"github.com/gogf/gf/contrib/rpc/grpcx/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var Endpoints = os.Getenv("ETCD_ENDPOINTS")
var GrpcConfig *grpcx.GrpcServerConfig = grpcx.Server.NewConfig()
func Etcd() *etcd.Registry {
return etcd.New(Endpoints)
}
func SetOptions(o ...grpc.ServerOption) *grpcx.GrpcServerConfig {
GrpcConfig.Options = append(GrpcConfig.Options, o...)
return GrpcConfig
}
/**
* 服务注册
* @param void
* @author dc.To
* @version 20250427
*/
func Registry(f ...func(s *grpcx.GrpcServer)) *grpcx.GrpcServer {
GrpcConfig.Options = append(GrpcConfig.Options, []grpc.ServerOption{
grpcx.Server.ChainUnary(
grpcx.Server.UnaryValidate,
chain.ClientContextUnaryChain,
chain.ClientMetadataUnaryChain,
)}...,
)
if Endpoints != "" {
grpcx.Resolver.Register(Etcd())
}
if GrpcConfig.Name == "default" || GrpcConfig.Name == "" {
GrpcConfig.Name = filepath.Base(os.Args[0])
}
s := grpcx.Server.New(GrpcConfig)
for _, fn := range f {
fn(s)
}
Reflect(s.Server)
return s
}
/**
* grpc reflect
* @author dc.To
* @version 20250427
*/
func Reflect(s reflection.GRPCServer, f ...func()) {
if Endpoints == "" {
reflection.Register(s)
}
for _, fn := range f {
fn()
}
}
/**
* BSS 双向通信流
* @author dc.To
* @version 20250410
*/
func BssClient() protobuf.BssClient {
return protobuf.NewBssClient(grpcx.Client.MustNewGrpcClientConn("api"))
}