Covert Stackdriver Logging load tests to soak tests

This commit is contained in:
Mik Vyatskov
2017-06-29 13:45:10 +02:00
parent c9ad8dcde0
commit 625192b3a2
7 changed files with 250 additions and 235 deletions

View File

@@ -28,14 +28,14 @@ import (
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
gcl "google.golang.org/api/logging/v2beta1"
sd "google.golang.org/api/logging/v2beta1"
pubsub "google.golang.org/api/pubsub/v1"
)
const (
// The amount of time to wait before considering
// Stackdriver Logging sink operational
sinkInitialDelay = 1 * time.Minute
// The amount of time to wait for Stackdriver Logging
// sink to become operational
sinkStartupTimeout = 10 * time.Minute
// The limit on the number of messages to pull from PubSub
maxPullLogMessages = 100 * 1000
@@ -44,26 +44,26 @@ const (
maxCacheSize = 10 * 1000
// PubSub topic with log entries polling interval
gclLoggingPollInterval = 100 * time.Millisecond
sdLoggingPollInterval = 100 * time.Millisecond
)
type gclLogsProvider struct {
GclService *gcl.Service
type sdLogsProvider struct {
SdService *sd.Service
PubsubService *pubsub.Service
Framework *framework.Framework
Topic *pubsub.Topic
Subscription *pubsub.Subscription
LogSink *gcl.LogSink
LogSink *sd.LogSink
LogEntryCache map[string]chan logEntry
EventCache chan map[string]interface{}
CacheMutex *sync.Mutex
PollingStopChannel chan struct{}
}
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
func newSdLogsProvider(f *framework.Framework) (*sdLogsProvider, error) {
ctx := context.Background()
hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope)
gclService, err := gcl.New(hc)
hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
sdService, err := sd.New(hc)
if err != nil {
return nil, err
}
@@ -73,8 +73,8 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return nil, err
}
provider := &gclLogsProvider{
GclService: gclService,
provider := &sdLogsProvider{
SdService: sdService,
PubsubService: pubsubService,
Framework: f,
LogEntryCache: map[string]chan logEntry{},
@@ -85,131 +85,144 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return provider, nil
}
func (gclLogsProvider *gclLogsProvider) Init() error {
func (sdLogsProvider *sdLogsProvider) Init() error {
projectId := framework.TestContext.CloudConfig.ProjectID
nsName := gclLogsProvider.Framework.Namespace.Name
nsName := sdLogsProvider.Framework.Namespace.Name
topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName)
topic, err := sdLogsProvider.createPubSubTopic(projectId, nsName)
if err != nil {
return fmt.Errorf("failed to create PubSub topic: %v", err)
}
gclLogsProvider.Topic = topic
sdLogsProvider.Topic = topic
subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
subs, err := sdLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
if err != nil {
return fmt.Errorf("failed to create PubSub subscription: %v", err)
}
gclLogsProvider.Subscription = subs
sdLogsProvider.Subscription = subs
logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name)
logSink, err := sdLogsProvider.createSink(projectId, nsName, nsName, topic.Name)
if err != nil {
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
}
gclLogsProvider.LogSink = logSink
sdLogsProvider.LogSink = logSink
if err = gclLogsProvider.authorizeGclLogSink(); err != nil {
if err = sdLogsProvider.authorizeSink(); err != nil {
return fmt.Errorf("failed to authorize log sink: %v", err)
}
framework.Logf("Waiting for log sink to become operational")
// TODO: Replace with something more intelligent
time.Sleep(sinkInitialDelay)
if err = sdLogsProvider.waitSinkInit(); err != nil {
return fmt.Errorf("failed to wait for sink to become operational: %v", err)
}
go gclLogsProvider.pollLogs()
go sdLogsProvider.pollLogs()
return nil
}
func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
func (sdLogsProvider *sdLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName)
topic := &pubsub.Topic{
Name: topicFullName,
}
return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
return sdLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
}
func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
func (sdLogsProvider *sdLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName)
subs := &pubsub.Subscription{
Name: subsFullName,
Topic: topicName,
}
return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
return sdLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
}
func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) {
func (sdLogsProvider *sdLogsProvider) createSink(projectId, nsName, sinkName, topicName string) (*sd.LogSink, error) {
projectDst := fmt.Sprintf("projects/%s", projectId)
filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+
"(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName)
framework.Logf("Using the following filter for entries: %s", filter)
sink := &gcl.LogSink{
sink := &sd.LogSink{
Name: sinkName,
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
Filter: filter,
}
return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do()
return sdLogsProvider.SdService.Projects.Sinks.Create(projectDst, sink).Do()
}
func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error {
topicsService := gclLogsProvider.PubsubService.Projects.Topics
policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do()
func (sdLogsProvider *sdLogsProvider) authorizeSink() error {
topicsService := sdLogsProvider.PubsubService.Projects.Topics
policy, err := topicsService.GetIamPolicy(sdLogsProvider.Topic.Name).Do()
if err != nil {
return err
}
binding := &pubsub.Binding{
Role: "roles/pubsub.publisher",
Members: []string{gclLogsProvider.LogSink.WriterIdentity},
Members: []string{sdLogsProvider.LogSink.WriterIdentity},
}
policy.Bindings = append(policy.Bindings, binding)
req := &pubsub.SetIamPolicyRequest{Policy: policy}
if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil {
if _, err = topicsService.SetIamPolicy(sdLogsProvider.Topic.Name, req).Do(); err != nil {
return err
}
return nil
}
func (gclLogsProvider *gclLogsProvider) pollLogs() {
wait.PollUntil(gclLoggingPollInterval, func() (bool, error) {
subsName := gclLogsProvider.Subscription.Name
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
req := &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: maxPullLogMessages,
}
resp, err := subsService.Pull(subsName, req).Do()
func (sdLogsProvider *sdLogsProvider) waitSinkInit() error {
framework.Logf("Waiting for log sink to become operational")
return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
err := publish(sdLogsProvider.PubsubService, sdLogsProvider.Topic, "embrace eternity")
if err != nil {
framework.Logf("Failed to pull messaged from PubSub due to %v", err)
framework.Logf("Failed to push message to PubSub due to %v", err)
}
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil
}
if len(messages) > 0 {
framework.Logf("Sink %s is operational", sdLogsProvider.LogSink.Name)
return true, nil
}
return false, nil
})
}
func (sdLogsProvider *sdLogsProvider) pollLogs() {
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil
}
ids := []string{}
for _, msg := range resp.ReceivedMessages {
ids = append(ids, msg.AckId)
for _, msg := range messages {
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil {
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
continue
}
var gclLogEntry gcl.LogEntry
if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil {
var sdLogEntry sd.LogEntry
if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
continue
}
switch gclLogEntry.Resource.Type {
switch sdLogEntry.Resource.Type {
case "container":
podName := gclLogEntry.Resource.Labels["pod_id"]
ch := gclLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: gclLogEntry.TextPayload}
podName := sdLogEntry.Resource.Labels["pod_id"]
ch := sdLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: sdLogEntry.TextPayload}
break
case "gke_cluster":
jsonPayloadRaw, err := gclLogEntry.JsonPayload.MarshalJSON()
jsonPayloadRaw, err := sdLogEntry.JsonPayload.MarshalJSON()
if err != nil {
framework.Logf("Failed to get jsonPayload from LogEntry %v", gclLogEntry)
framework.Logf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
break
}
var eventObject map[string]interface{}
@@ -218,55 +231,48 @@ func (gclLogsProvider *gclLogsProvider) pollLogs() {
framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:]))
break
}
gclLogsProvider.EventCache <- eventObject
sdLogsProvider.EventCache <- eventObject
break
default:
framework.Logf("Received LogEntry with unexpected resource type: %s", gclLogEntry.Resource.Type)
framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
break
}
}
if len(ids) > 0 {
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil {
framework.Logf("Failed to ack: %v", err)
}
}
return false, nil
}, gclLogsProvider.PollingStopChannel)
}, sdLogsProvider.PollingStopChannel)
}
func (gclLogsProvider *gclLogsProvider) Cleanup() {
gclLogsProvider.PollingStopChannel <- struct{}{}
func (sdLogsProvider *sdLogsProvider) Cleanup() {
sdLogsProvider.PollingStopChannel <- struct{}{}
if gclLogsProvider.LogSink != nil {
if sdLogsProvider.LogSink != nil {
projectId := framework.TestContext.CloudConfig.ProjectID
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name)
sinksService := gclLogsProvider.GclService.Projects.Sinks
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, sdLogsProvider.LogSink.Name)
sinksService := sdLogsProvider.SdService.Projects.Sinks
if _, err := sinksService.Delete(sinkNameId).Do(); err != nil {
framework.Logf("Failed to delete LogSink: %v", err)
}
}
if gclLogsProvider.Subscription != nil {
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil {
if sdLogsProvider.Subscription != nil {
subsService := sdLogsProvider.PubsubService.Projects.Subscriptions
if _, err := subsService.Delete(sdLogsProvider.Subscription.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub subscription: %v", err)
}
}
if gclLogsProvider.Topic != nil {
topicsService := gclLogsProvider.PubsubService.Projects.Topics
if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil {
if sdLogsProvider.Topic != nil {
topicsService := sdLogsProvider.PubsubService.Projects.Topics
if _, err := topicsService.Delete(sdLogsProvider.Topic.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub topic: %v", err)
}
}
}
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
func (sdLogsProvider *sdLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
var entries []logEntry
ch := gclLogsProvider.getCacheChannel(pod.Name)
ch := sdLogsProvider.getCacheChannel(pod.Name)
polling_loop:
for {
select {
@@ -279,16 +285,16 @@ polling_loop:
return entries
}
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
func (logsProvider *sdLogsProvider) FluentdApplicationName() string {
return "fluentd-gcp"
}
func (gclLogsProvider *gclLogsProvider) ReadEvents() []map[string]interface{} {
func (sdLogsProvider *sdLogsProvider) ReadEvents() []map[string]interface{} {
var events []map[string]interface{}
polling_loop:
for {
select {
case event := <-gclLogsProvider.EventCache:
case event := <-sdLogsProvider.EventCache:
events = append(events, event)
default:
break polling_loop
@@ -297,15 +303,54 @@ polling_loop:
return events
}
func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry {
gclLogsProvider.CacheMutex.Lock()
defer gclLogsProvider.CacheMutex.Unlock()
func (sdLogsProvider *sdLogsProvider) getCacheChannel(podName string) chan logEntry {
sdLogsProvider.CacheMutex.Lock()
defer sdLogsProvider.CacheMutex.Unlock()
if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok {
if ch, ok := sdLogsProvider.LogEntryCache[podName]; ok {
return ch
}
newCh := make(chan logEntry, maxCacheSize)
gclLogsProvider.LogEntryCache[podName] = newCh
sdLogsProvider.LogEntryCache[podName] = newCh
return newCh
}
func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
subsService := service.Projects.Subscriptions
req := &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: maxPullLogMessages,
}
resp, err := subsService.Pull(subs.Name, req).Do()
if err != nil {
return nil, err
}
var ids []string
for _, msg := range resp.ReceivedMessages {
ids = append(ids, msg.AckId)
}
if len(ids) > 0 {
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
framework.Logf("Failed to ack poll: %v", err)
}
}
return resp.ReceivedMessages, nil
}
func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
topicsService := service.Projects.Topics
req := &pubsub.PublishRequest{
Messages: []*pubsub.PubsubMessage{
{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
},
},
}
_, err := topicsService.Publish(topic.Name, req).Do()
return err
}