fix(server): restore country metrics (#1242)

This commit is contained in:
Vinicius Fortuna 2022-12-02 11:13:59 -05:00 committed by GitHub
parent d1f5d9b726
commit c324848cb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 192 additions and 100 deletions

View file

@ -1,4 +1,4 @@
runtime: nodejs10
runtime: nodejs16
service: dev
handlers:
- url: /.*

View file

@ -1,4 +1,4 @@
runtime: nodejs10
runtime: nodejs16
service: prod
handlers:
- url: /.*

View file

@ -41,6 +41,16 @@ describe('postConnectionMetrics', () => {
countries: ['EC'],
bytesTransferred: 456,
},
{
userId: '',
countries: ['BR'],
bytesTransferred: 789,
},
{
userId: 'uid1',
countries: [],
bytesTransferred: 555,
},
];
const report = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports};
await postConnectionMetrics(table, report);
@ -61,6 +71,22 @@ describe('postConnectionMetrics', () => {
bytesTransferred: userReports[1].bytesTransferred,
countries: userReports[1].countries,
},
{
serverId: report.serverId,
startTimestamp: new Date(report.startUtcMs).toISOString(),
endTimestamp: new Date(report.endUtcMs).toISOString(),
userId: undefined,
bytesTransferred: userReports[2].bytesTransferred,
countries: userReports[2].countries,
},
{
serverId: report.serverId,
startTimestamp: new Date(report.startUtcMs).toISOString(),
endTimestamp: new Date(report.endUtcMs).toISOString(),
userId: userReports[3].userId,
bytesTransferred: userReports[3].bytesTransferred,
countries: userReports[3].countries,
},
];
expect(table.rows).toEqual(rows);
});
@ -69,8 +95,11 @@ describe('postConnectionMetrics', () => {
describe('isValidConnectionMetricsReport', () => {
it('returns true for valid report', () => {
const userReports = [
{userId: 'uid0', countries: ['US', 'UK'], bytesTransferred: 123},
{userId: 'uid1', countries: ['EC'], bytesTransferred: 456},
{userId: 'uid0', countries: ['AA'], bytesTransferred: 111},
{userId: 'uid1', bytesTransferred: 222},
{userId: 'uid2', countries: [], bytesTransferred: 333},
{countries: ['BB'], bytesTransferred: 444},
{userId: '', countries: ['CC'], bytesTransferred: 555},
];
const report = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports};
expect(isValidConnectionMetricsReport(report)).toBeTruthy();
@ -160,36 +189,20 @@ describe('isValidConnectionMetricsReport', () => {
expect(isValidConnectionMetricsReport(invalidReport5)).toBeFalsy();
});
it('returns false for missing user report fields', () => {
const userReports = [
const invalidReport1 = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports: [
{
// Missing `userId`
countries: ['US', 'UK'],
// Missing `userId` and `countries`
bytesTransferred: 123,
},
{userId: 'uid1', countries: ['EC'], bytesTransferred: 456},
];
const invalidReport = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports};
expect(isValidConnectionMetricsReport(invalidReport)).toBeFalsy();
]};
expect(isValidConnectionMetricsReport(invalidReport1)).toBeFalsy();
const userReports2 = [
{
// Missing `countries`
userId: 'uid0',
bytesTransferred: 123,
},
];
const invalidReport2 = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports: userReports2};
const invalidReport2 = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports: {
// Missing `bytesTransferred`
userId: 'uid0',
countries: ['US', 'UK'],
}};
expect(isValidConnectionMetricsReport(invalidReport2)).toBeFalsy();
const userReports3 = [
{
// Missing `bytesTransferred`
userId: 'uid0',
countries: ['US', 'UK'],
},
];
const invalidReport3 = {serverId: 'id', startUtcMs: 1, endUtcMs: 2, userReports: userReports3};
expect(isValidConnectionMetricsReport(invalidReport3)).toBeFalsy();
});
it('returns false for incorrect report field types', () => {
const invalidReport = {

View file

@ -20,9 +20,9 @@ export interface ConnectionRow {
serverId: string;
startTimestamp: string; // ISO formatted string.
endTimestamp: string; // ISO formatted string.
userId: string;
userId?: string;
bytesTransferred: number;
countries: string[];
countries?: string[];
}
export class BigQueryConnectionsTable implements InsertableTable<ConnectionRow> {
@ -49,9 +49,9 @@ function getConnectionRowsFromReport(report: HourlyConnectionMetricsReport): Con
serverId: report.serverId,
startTimestamp: startTimestampStr,
endTimestamp: endTimestampStr,
userId: userReport.userId,
userId: userReport.userId || undefined,
bytesTransferred: userReport.bytesTransferred,
countries: userReport.countries,
countries: userReport.countries || [],
});
}
return rows;
@ -93,18 +93,15 @@ export function isValidConnectionMetricsReport(
return false;
}
const requiredUserReportFields = ['userId', 'countries', 'bytesTransferred'];
const MIN_BYTES_TRANSFERRED = 0;
const MAX_BYTES_TRANSFERRED = 1 * Math.pow(2, 40); // 1 TB.
for (const userReport of testObject.userReports) {
// Test that each `userReport` contains the required fields.
for (const fieldName of requiredUserReportFields) {
if (!userReport[fieldName]) {
return false;
}
// We require at least the userId or the country to be set.
if (!userReport.userId && (userReport.countries?.length ?? 0) === 0) {
return false;
}
// Check that `userId` is a string.
if (typeof userReport.userId !== 'string') {
if (userReport.userId && typeof userReport.userId !== 'string') {
return false;
}
@ -118,9 +115,11 @@ export function isValidConnectionMetricsReport(
}
// Check that `countries` are strings.
for (const country of userReport.countries) {
if (typeof country !== 'string') {
return false;
if (userReport.countries) {
for (const country of userReport.countries) {
if (typeof country !== 'string') {
return false;
}
}
}
}

View file

@ -24,5 +24,6 @@ npm run action metrics_server/build
cp "${SRC_DIR}/app_dev.yaml" "${BUILD_DIR}/app.yaml"
cp "${SRC_DIR}/config_dev.json" "${BUILD_DIR}/config.json"
cp "${SRC_DIR}/package.json" "${BUILD_DIR}/"
cp "./package-lock.json" "${BUILD_DIR}/"
gcloud app deploy "${SRC_DIR}/dispatch.yaml" "${BUILD_DIR}" --project uproxysite --verbosity info --promote --stop-previous-version

View file

@ -24,5 +24,6 @@ npm run action metrics_server/build
cp "${SRC_DIR}/app_prod.yaml" "${BUILD_DIR}/app.yaml"
cp "${SRC_DIR}/config_prod.json" "${BUILD_DIR}/config.json"
cp "${SRC_DIR}/package.json" "${BUILD_DIR}/"
cp "./package-lock.json" "${BUILD_DIR}/"
gcloud app deploy "${SRC_DIR}/dispatch.yaml" "${BUILD_DIR}" --project uproxysite --verbosity info --no-promote --no-stop-previous-version

View file

@ -1,7 +1,7 @@
{
"name": "outline-metrics-server",
"private": true,
"version": "0.1.0",
"version": "1.0.1",
"description": "Outline metrics server",
"author": "Outline",
"license": "Apache",

View file

@ -1,3 +1,7 @@
# Unreleased
- Fixes
- Fix reporting of country metrics and improve logging output (https://github.com/Jigsaw-Code/outline-server/pull/1242)
# 1.7.1
- Fixes
- Corner case of isPortUsed that could result in infinite restart loop (https://github.com/Jigsaw-Code/outline-server/pull/1238)

View file

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
export SB_PUBLIC_IP="${SB_PUBLIC_IP:-$(curl https://ipinfo.io/ip)}"
export SB_PUBLIC_IP="${SB_PUBLIC_IP:-$(curl --silent https://ipinfo.io/ip)}"
export SB_METRICS_URL="${SB_METRICS_URL:-https://prod.metrics.getoutline.org}"
# Make sure we don't leak readable files to other users.

View file

@ -100,7 +100,8 @@ async function spawnPrometheusSubprocess(
processArgs: string[],
prometheusEndpoint: string
): Promise<child_process.ChildProcess> {
logging.info(`Starting Prometheus with args [${processArgs}]`);
logging.info('======== Starting Prometheus ========');
logging.info(`${binaryFilename} ${processArgs.map(a => `"${a}"`).join(' ')}`);
const runProcess = child_process.spawn(binaryFilename, processArgs);
runProcess.on('error', (error) => {
logging.error(`Error spawning Prometheus: ${error}`);

View file

@ -27,6 +27,7 @@ import * as logging from '../infrastructure/logging';
import {PrometheusClient, startPrometheus} from '../infrastructure/prometheus_scraper';
import {RolloutTracker} from '../infrastructure/rollout';
import {AccessKeyId} from '../model/access_key';
import {version} from '../package.json';
import {PrometheusManagerMetrics} from './manager_metrics';
import {bindService, ShadowsocksManagerService} from './manager_service';
@ -80,6 +81,9 @@ function createRolloutTracker(
async function main() {
const verbose = process.env.LOG_LEVEL === 'debug';
logging.info('======== Outline Server main() ========');
logging.info(`Version is ${version}`);
const portProvider = new PortProvider();
const accessKeyConfig = json_config.loadFileConfig<AccessKeyConfigJson>(
getPersistentFilename('shadowbox_config.json')
@ -113,12 +117,8 @@ async function main() {
process.exit(1);
}
logging.debug(`=== Config ===`);
logging.debug(`Hostname: ${proxyHostname}`);
logging.debug(`SB_METRICS_URL: ${metricsCollectorUrl}`);
logging.debug(`==============`);
logging.info('Starting...');
logging.info(`Hostname: ${proxyHostname}`);
logging.info(`SB_METRICS_URL: ${metricsCollectorUrl}`);
const prometheusPort = await portProvider.reserveFirstFreePort(9090);
// Use 127.0.0.1 instead of localhost for Prometheus because it's resolving incorrectly for some users.

View file

@ -98,6 +98,8 @@ export class OutlineShadowsocksServer implements ShadowsocksServer {
if (this.isReplayProtectionEnabled) {
commandArguments.push('--replay_history=10000');
}
logging.info('======== Starting Outline Shadowsocks Service ========');
logging.info(`${this.binaryFilename} ${commandArguments.map(a => `"${a}"`).join(' ')}`);
this.ssProcess = child_process.spawn(this.binaryFilename, commandArguments);
this.ssProcess.on('error', (error) => {
logging.error(`Error spawning outline-ss-server: ${error}`);

View file

@ -20,6 +20,7 @@ import {AccessKeyConfigJson} from './server_access_key';
import {ServerConfigJson} from './server_config';
import {
CountryUsage,
DailyFeatureMetricsReportJson,
HourlyServerMetricsReportJson,
KeyUsage,
@ -82,10 +83,17 @@ describe('OutlineSharedMetricsPublisher', () => {
);
publisher.startSharing();
usageMetrics.usage = [
{accessKeyId: 'user-0', inboundBytes: 11, countries: ['AA', 'BB']},
{accessKeyId: 'user-1', inboundBytes: 22, countries: ['CC']},
{accessKeyId: 'user-0', inboundBytes: 33, countries: ['AA', 'DD']},
usageMetrics.keyUsage = [
{accessKeyId: 'user-0', inboundBytes: 11},
{accessKeyId: 'user-1', inboundBytes: 22},
{accessKeyId: 'user-0', inboundBytes: 33},
];
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'BB', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
];
clock.nowMs += 60 * 60 * 1000;
@ -95,16 +103,25 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{userId: 'M(user-0)', bytesTransferred: 11, countries: ['AA', 'BB']},
{userId: 'M(user-1)', bytesTransferred: 22, countries: ['CC']},
{userId: 'M(user-0)', bytesTransferred: 33, countries: ['AA', 'DD']},
{userId: 'M(user-0)', bytesTransferred: 11},
{userId: 'M(user-1)', bytesTransferred: 22},
{userId: 'M(user-0)', bytesTransferred: 33},
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 11, countries: ['BB']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
],
});
startTime = clock.nowMs;
usageMetrics.usage = [
{accessKeyId: 'user-0', inboundBytes: 44, countries: ['EE']},
{accessKeyId: 'user-2', inboundBytes: 55, countries: ['FF']},
usageMetrics.keyUsage = [
{accessKeyId: 'user-0', inboundBytes: 44},
{accessKeyId: 'user-2', inboundBytes: 55},
];
usageMetrics.countryUsage = [
{country: 'EE', inboundBytes: 44},
{country: 'FF', inboundBytes: 55},
];
clock.nowMs += 60 * 60 * 1000;
@ -114,8 +131,10 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{userId: 'M(user-0)', bytesTransferred: 44, countries: ['EE']},
{userId: 'M(user-2)', bytesTransferred: 55, countries: ['FF']},
{userId: 'M(user-0)', bytesTransferred: 44},
{userId: 'M(user-2)', bytesTransferred: 55},
{bytesTransferred: 44, countries: ['EE']},
{bytesTransferred: 55, countries: ['FF']},
],
});
@ -138,10 +157,17 @@ describe('OutlineSharedMetricsPublisher', () => {
);
publisher.startSharing();
usageMetrics.usage = [
{accessKeyId: 'user-0', inboundBytes: 11, countries: ['AA', 'SY']},
{accessKeyId: 'user-1', inboundBytes: 22, countries: ['CC']},
{accessKeyId: 'user-0', inboundBytes: 33, countries: ['AA', 'DD']},
usageMetrics.keyUsage = [
{accessKeyId: 'user-0', inboundBytes: 11},
{accessKeyId: 'user-1', inboundBytes: 22},
{accessKeyId: 'user-0', inboundBytes: 33},
];
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'SY', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
];
clock.nowMs += 60 * 60 * 1000;
@ -151,8 +177,13 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{userId: 'M(user-1)', bytesTransferred: 22, countries: ['CC']},
{userId: 'M(user-0)', bytesTransferred: 33, countries: ['AA', 'DD']},
{userId: 'M(user-0)', bytesTransferred: 11},
{userId: 'M(user-1)', bytesTransferred: 22},
{userId: 'M(user-0)', bytesTransferred: 33},
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
],
});
publisher.stopSharing();
@ -258,11 +289,19 @@ class FakeMetricsCollector implements MetricsCollectorClient {
}
class ManualUsageMetrics implements UsageMetrics {
public usage = [] as KeyUsage[];
getUsage(): Promise<KeyUsage[]> {
return Promise.resolve(this.usage);
public keyUsage = [] as KeyUsage[];
public countryUsage = [] as CountryUsage[];
getKeyUsage(): Promise<KeyUsage[]> {
return Promise.resolve(this.keyUsage);
}
getCountryUsage(): Promise<CountryUsage[]> {
return Promise.resolve(this.countryUsage)
}
reset() {
this.usage = [] as KeyUsage[];
this.keyUsage = [] as KeyUsage[];
this.countryUsage = [] as CountryUsage[];
}
}

View file

@ -30,7 +30,11 @@ const SANCTIONED_COUNTRIES = new Set(['CU', 'KP', 'SY']);
// Used internally to track key usage.
export interface KeyUsage {
accessKeyId: string;
countries: string[];
inboundBytes: number;
}
export interface CountryUsage {
country: string;
inboundBytes: number;
}
@ -46,8 +50,8 @@ export interface HourlyServerMetricsReportJson {
// JSON format for the published report.
// Field renames will break backwards-compatibility.
export interface HourlyUserMetricsReportJson {
userId: string;
countries: string[];
userId?: string;
countries?: string[];
bytesTransferred: number;
}
@ -74,7 +78,8 @@ export interface SharedMetricsPublisher {
}
export interface UsageMetrics {
getUsage(): Promise<KeyUsage[]>;
getKeyUsage(): Promise<KeyUsage[]>;
getCountryUsage(): Promise<CountryUsage[]>;
reset();
}
@ -84,22 +89,34 @@ export class PrometheusUsageMetrics implements UsageMetrics {
constructor(private prometheusClient: PrometheusClient) {}
async getUsage(): Promise<KeyUsage[]> {
async getKeyUsage(): Promise<KeyUsage[]> {
const timeDeltaSecs = Math.round((Date.now() - this.resetTimeMs) / 1000);
// We measure the traffic to and from the target, since that's what we are protecting.
const result = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (location, access_key)`
`sum(increase(shadowsocks_data_bytes{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (access_key)`
);
const usage = [] as KeyUsage[];
for (const entry of result.result) {
const accessKeyId = entry.metric['access_key'] || '';
let countries = [];
const countriesStr = entry.metric['location'] || '';
if (countriesStr) {
countries = countriesStr.split(',').map((e) => e.trim());
}
const inboundBytes = Math.round(parseFloat(entry.value[1]));
usage.push({accessKeyId, countries, inboundBytes});
if (inboundBytes > 0) {
usage.push({accessKeyId, inboundBytes});
}
}
return usage;
}
async getCountryUsage(): Promise<CountryUsage[]> {
const timeDeltaSecs = Math.round((Date.now() - this.resetTimeMs) / 1000);
// We measure the traffic to and from the target, since that's what we are protecting.
const result = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (location)`
);
const usage = [] as CountryUsage[];
for (const entry of result.result) {
const country = entry.metric['location'] || '';
const inboundBytes = Math.round(parseFloat(entry.value[1]));
usage.push({country, inboundBytes});
}
return usage;
}
@ -132,7 +149,7 @@ export class RestMetricsCollectorClient {
body: reportJson,
};
const url = `${this.serviceUrl}${urlPath}`;
logging.info(`Posting metrics to ${url} with options ${JSON.stringify(options)}`);
logging.debug(`Posting metrics to ${url} with options ${JSON.stringify(options)}`);
try {
const response = await follow_redirects.requestFollowRedirectsWithSameMethodAndBody(
url,
@ -174,7 +191,9 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return;
}
try {
await this.reportServerUsageMetrics(await usageMetrics.getUsage());
const keyUsagePromise = usageMetrics.getKeyUsage()
const countryUsagePromise = usageMetrics.getCountryUsage()
await this.reportServerUsageMetrics(await keyUsagePromise, await countryUsagePromise);
usageMetrics.reset();
} catch (err) {
logging.error(`Failed to report server usage metrics: ${err}`);
@ -208,21 +227,39 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return this.serverConfig.data().metricsEnabled || false;
}
private async reportServerUsageMetrics(usageMetrics: KeyUsage[]): Promise<void> {
private async reportServerUsageMetrics(keyUsageMetrics: KeyUsage[], countryUsageMetrics: CountryUsage[]): Promise<void> {
const reportEndTimestampMs = this.clock.now();
const userReports = [] as HourlyUserMetricsReportJson[];
for (const keyUsage of usageMetrics) {
// HACK! We use the same backend reporting endpoint for key and country usage.
// A row with empty country is for key usage, a row with empty userId is for country usage.
// Note that this reports usage twice. If you want the total, filter to rows with non empty countries.
for (const keyUsage of keyUsageMetrics) {
if (keyUsage.inboundBytes === 0) {
continue;
}
if (hasSanctionedCountry(keyUsage.countries)) {
const userId = this.toMetricsId(keyUsage.accessKeyId);
if (!userId) {
continue;
}
userReports.push({
userId: this.toMetricsId(keyUsage.accessKeyId) || '',
userId,
bytesTransferred: keyUsage.inboundBytes,
countries: [...keyUsage.countries],
});
}
for (const countryUsage of countryUsageMetrics) {
if (countryUsage.inboundBytes === 0) {
continue;
}
if (isSanctionedCountry(countryUsage.country)) {
continue;
}
// Make sure to always set the country to differentiate the row
// from key usage rows.
const country = countryUsage.country || 'ZZ';
userReports.push({
bytesTransferred: countryUsage.inboundBytes,
countries: [country],
});
}
const report = {
@ -254,11 +291,6 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
}
}
function hasSanctionedCountry(countries: string[]) {
for (const country of countries) {
if (SANCTIONED_COUNTRIES.has(country)) {
return true;
}
}
return false;
function isSanctionedCountry(country: string) {
return SANCTIONED_COUNTRIES.has(country);
}