diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go index d1f2a0c28..3dbd0e045 100644 --- a/runtime/v2/shim/publisher.go +++ b/runtime/v2/shim/publisher.go @@ -41,13 +41,13 @@ type item struct { count int } -func newPublisher(address string) (*remoteEventsPublisher, error) { +func NewPublisher(address string) (*RemoteEventsPublisher, error) { client, err := ttrpcutil.NewClient(address) if err != nil { return nil, err } - l := &remoteEventsPublisher{ + l := &RemoteEventsPublisher{ client: client, closed: make(chan struct{}), requeue: make(chan *item, queueSize), @@ -57,18 +57,18 @@ func newPublisher(address string) (*remoteEventsPublisher, error) { return l, nil } -type remoteEventsPublisher struct { +type RemoteEventsPublisher struct { client *ttrpcutil.Client closed chan struct{} closer sync.Once requeue chan *item } -func (l *remoteEventsPublisher) Done() <-chan struct{} { +func (l *RemoteEventsPublisher) Done() <-chan struct{} { return l.closed } -func (l *remoteEventsPublisher) Close() (err error) { +func (l *RemoteEventsPublisher) Close() (err error) { err = l.client.Close() l.closer.Do(func() { close(l.closed) @@ -76,7 +76,7 @@ func (l *remoteEventsPublisher) Close() (err error) { return err } -func (l *remoteEventsPublisher) processQueue() { +func (l *RemoteEventsPublisher) processQueue() { for i := range l.requeue { if i.count > maxRequeue { logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic) @@ -91,7 +91,7 @@ func (l *remoteEventsPublisher) processQueue() { } } -func (l *remoteEventsPublisher) queue(i *item) { +func (l *RemoteEventsPublisher) queue(i *item) { go func() { i.count++ // re-queue after a short delay @@ -100,7 +100,7 @@ func (l *remoteEventsPublisher) queue(i *item) { }() } -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { +func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -127,7 +127,7 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event return nil } -func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error { +func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error { _, err := l.client.EventsService().Forward(ctx, req) if err == nil { return nil diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 2b527c48b..d540aa87e 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -169,7 +169,7 @@ func run(id string, initFunc Init, config Config) error { ttrpcAddress := os.Getenv(ttrpcAddressEnv) - publisher, err := newPublisher(ttrpcAddress) + publisher, err := NewPublisher(ttrpcAddress) if err != nil { return err }