Refactored KubernetesSeedProvider and added unit tests. Updated Docker image and bumped Cassandra version

This commit is contained in:
chrislovecnm
2016-04-28 13:26:45 -06:00
parent 7a725418af
commit e8ce426093
13 changed files with 642 additions and 290 deletions

1
examples/cassandra/java/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
target

View File

@@ -1,47 +1,93 @@
<!--
Copyright (C) 2015 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>io.k8s.cassandra</groupId>
<artifactId>kubernetes-cassandra</artifactId>
<version>0.0.5</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.0.11</version>
</dependency>
</dependencies>
<modelVersion>4.0.0</modelVersion>
<groupId>io.k8s.cassandra</groupId>
<artifactId>kubernetes-cassandra</artifactId>
<version>1.0.0</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<logback.version>1.1.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.6.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.6.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,175 +0,0 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.X509Certificate;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.cassandra.locator.SeedProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KubernetesSeedProvider implements SeedProvider {
@JsonIgnoreProperties(ignoreUnknown = true)
static class Address {
public String ip;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Subset {
public List<Address> addresses;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Endpoints {
public List<Subset> subsets;
}
private static String getEnvOrDefault(String var, String def) {
String val = System.getenv(var);
if (val == null) {
val = def;
}
return val;
}
private static String getServiceAccountToken() throws IOException {
String file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
return new String(Files.readAllBytes(Paths.get(file)));
}
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
private List defaultSeeds;
private TrustManager[] trustAll;
private HostnameVerifier trustAllHosts;
public KubernetesSeedProvider(Map<String, String> params) {
// Taken from SimpleSeedProvider.java
// These are used as a fallback, if we get nothing from k8s.
String[] hosts = params.get("seeds").split(",", -1);
defaultSeeds = new ArrayList<InetAddress>(hosts.length);
for (String host : hosts)
{
try {
defaultSeeds.add(InetAddress.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
// not fatal... DD will bark if there end up being zero seeds.
logger.warn("Seed provider couldn't lookup host " + host);
}
}
// TODO: Load the CA cert when it is available on all platforms.
trustAll = new TrustManager[] {
new X509TrustManager() {
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
public X509Certificate[] getAcceptedIssuers() { return null; }
}
};
trustAllHosts = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
}
public List<InetAddress> getSeeds() {
List<InetAddress> list = new ArrayList<InetAddress>();
//String host = "https://kubernetes.default.svc.cluster.local";
String proto = "https://";
String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local");
String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443");
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);
try {
String token = getServiceAccountToken();
SSLContext ctx = SSLContext.getInstance("SSL");
ctx.init(null, trustAll, new SecureRandom());
URL url = new URL(proto + host + ":" + port + path + serviceName);
logger.info("Getting endpoints from " + url);
HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
// TODO: Remove this once the CA cert is propagated everywhere, and replace
// with loading the CA cert.
conn.setSSLSocketFactory(ctx.getSocketFactory());
conn.setHostnameVerifier(trustAllHosts);
conn.addRequestProperty("Authorization", "Bearer " + token);
ObjectMapper mapper = new ObjectMapper();
Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
if (endpoints != null) {
// Here is a problem point, endpoints.subsets can be null in first node cases.
if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){
for (Subset subset : endpoints.subsets) {
if (subset.addresses != null && !subset.addresses.isEmpty()) {
for (Address address : subset.addresses) {
list.add(InetAddress.getByName(address.ip));
}
}
}
}
logger.info("Available endpoints: " + list);
} else {
logger.warn("Endpoints are not available");
}
} catch (IOException | NoSuchAlgorithmException | KeyManagementException ex) {
logger.warn("Request to kubernetes apiserver failed", ex);
}
if (list.size() == 0) {
// If we got nothing, we might be the first instance, in that case
// fall back on the seeds that were passed in cassandra.yaml.
return defaultSeeds;
}
return list;
}
// Simple main to test the implementation
public static void main(String[] args) {
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
System.out.println(provider.getSeeds());
}
}

View File

@@ -0,0 +1,274 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.ConfigurationLoader;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.SimpleSeedProvider;
import org.apache.cassandra.utils.FBUtilities;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by
* communicating with the Kubernetes API.
* <p>Various System Variable can be used to configure this provider:
* <ul>
* <li>KUBERNETES_PORT_443_TCP_ADDR defaults to kubernetes.default.svc.cluster.local</li>
* <li>KUBERNETES_PORT_443_TCP_PORT defaults to 443</li>
* <li>CASSANDRA_SERVICE defaults to cassandra</li>
* <li>POD_NAMESPACE defaults to 'default'</li>
* <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li>
* </ul>
*/
public class KubernetesSeedProvider implements SeedProvider {
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
/**
* default seeds to fall back on
*/
private List<InetAddress> defaultSeeds;
private TrustManager[] trustAll;
private HostnameVerifier trustAllHosts;
/**
* Create new Seeds
* @param params
*/
public KubernetesSeedProvider(Map<String, String> params) {
// Create default seeds
defaultSeeds = createDefaultSeeds();
// TODO: Load the CA cert when it is available on all platforms.
trustAll = new TrustManager[] {
new X509TrustManager() {
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
public X509Certificate[] getAcceptedIssuers() { return null; }
}
};
trustAllHosts = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
}
/**
* Call kubernetes API to collect a list of seed providers
* @return list of seed providers
*/
public List<InetAddress> getSeeds() {
String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local");
String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443");
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);
String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8");
Integer seedSize = Integer.valueOf(seedSizeVar);
List<InetAddress> seeds = new ArrayList<InetAddress>();
try {
String token = getServiceAccountToken();
SSLContext ctx = SSLContext.getInstance("SSL");
ctx.init(null, trustAll, new SecureRandom());
String PROTO = "https://";
URL url = new URL(PROTO + host + ":" + port + path + serviceName);
logger.info("Getting endpoints from " + url);
HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
// TODO: Remove this once the CA cert is propagated everywhere, and replace
// with loading the CA cert.
conn.setHostnameVerifier(trustAllHosts);
conn.setSSLSocketFactory(ctx.getSocketFactory());
conn.addRequestProperty("Authorization", "Bearer " + token);
ObjectMapper mapper = new ObjectMapper();
Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
if (endpoints != null) {
// Here is a problem point, endpoints.subsets can be null in first node cases.
if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){
for (Subset subset : endpoints.subsets) {
if (subset.addresses != null && !subset.addresses.isEmpty()) {
for (Address address : subset.addresses) {
seeds.add(InetAddress.getByName(address.ip));
if(seeds.size() >= seedSize) {
logger.info("Available num endpoints: " + seeds.size());
return Collections.unmodifiableList(seeds);
}
}
}
}
}
logger.info("Available num endpoints: " + seeds.size());
} else {
logger.warn("Endpoints are not available using default seeds in cassandra.yaml");
return Collections.unmodifiableList(defaultSeeds);
}
} catch (IOException | NoSuchAlgorithmException | KeyManagementException ex) {
logger.warn("Request to kubernetes apiserver failed, using default seeds in cassandra.yaml", ex);
return Collections.unmodifiableList(defaultSeeds);
}
if (seeds.size() == 0) {
// If we got nothing, we might be the first instance, in that case
// fall back on the seeds that were passed in cassandra.yaml.
logger.warn("Seeds are not available using default seeds in cassandra.yaml");
return Collections.unmodifiableList(defaultSeeds);
}
return Collections.unmodifiableList(seeds);
}
/**
* Code taken from {@link SimpleSeedProvider}. This is used as a fall back
* incase we don't find seeds
* @return
*/
protected List<InetAddress> createDefaultSeeds()
{
Config conf;
try
{
conf = loadConfig();
}
catch (Exception e)
{
throw new AssertionError(e);
}
String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
List<InetAddress> seeds = new ArrayList<InetAddress>();
for (String host : hosts)
{
try
{
seeds.add(InetAddress.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
// not fatal... DD will bark if there end up being zero seeds.
logger.warn("Seed provider couldn't lookup host {}", host);
}
}
if(seeds.size() == 0) {
try {
seeds.add(InetAddress.getLocalHost());
} catch (UnknownHostException e) {
logger.warn("Seed provider couldn't lookup localhost");
}
}
return Collections.unmodifiableList(seeds);
}
/**
* Code taken from {@link SimpleSeedProvider}
* @return
*/
protected static Config loadConfig() throws ConfigurationException
{
String loaderClass = System.getProperty("cassandra.config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
return loader.loadConfig();
}
private static String getEnvOrDefault(String var, String def) {
String val = System.getenv(var);
if (val == null) {
val = def;
}
return val;
}
private static String getServiceAccountToken() throws IOException {
String file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
try {
return new String(Files.readAllBytes(Paths.get(file)));
} catch (IOException e) {
logger.warn("unable to load service account token");
throw e;
}
}
protected List<InetAddress> getDefaultSeeds() {
return defaultSeeds;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Address {
public String ip;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Subset {
public List<Address> addresses;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Endpoints {
public List<Subset> subsets;
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.locator.SeedProvider;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.Matchers.*;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.*;
public class KubernetesSeedProviderTest {
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class);
@Test
@Ignore("has to be run inside of a kube cluster")
public void getSeeds() throws Exception {
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
List<InetAddress> seeds = provider.getSeeds();
assertThat(seeds, is(not(empty())));
}
@Test
public void testDefaultSeeds() throws Exception {
KubernetesSeedProvider provider = new KubernetesSeedProvider(new HashMap<String,String>());
List<InetAddress> seeds = provider.getDefaultSeeds();
List<InetAddress> seedsTest = new ArrayList<>();
seedsTest.add(InetAddress.getByName("8.4.4.4"));
seedsTest.add(InetAddress.getByName("8.8.8.8"));
assertThat(seeds, is(not(empty())));
assertThat(seeds, is(seedsTest));
logger.debug("seeds loaded {}", seeds);
}
}

View File

@@ -0,0 +1,57 @@
# Copyright (C) 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
# Warning!
# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
#
cluster_name: Test Cluster
# memtable_allocation_type: heap_buffers
memtable_allocation_type: offheap_objects
commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
commitlog_directory: target/cassandra/commitlog
hints_directory: target/cassandra/hints
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
storage_port: 7010
rpc_port: 9170
start_native_transport: true
native_transport_port: 9042
column_index_size_in_kb: 4
saved_caches_directory: target/cassandra/saved_caches
data_file_directories:
- target/cassandra/data
disk_access_mode: mmap
seed_provider:
- class_name: io.k8s.cassandra.KubernetesSeedProvider
parameters:
- seeds: "8.4.4.4,8.8.8.8"
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
dynamic_snitch: true
request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
request_scheduler_id: keyspace
server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
incremental_backups: true
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 0
row_cache_class_name: org.apache.cassandra.cache.OHCProvider
row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true

View File

@@ -0,0 +1,34 @@
<!--
Copyright (C) 2015 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<configuration debug="false" scan="true">
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
</appender>
<logger name="io.k8s.cassandra" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>