Customer Datalake to Signal Playbook
  • 31 Minutes to read
  • PDF

Customer Datalake to Signal Playbook

  • PDF

Article summary

Objective


The objective of this document is to help users/customers understand how they can provide security posture data from a Data Lake (containing security posture information fetched from cyber security tools) or other data sources to SAFE.

This document introduces the readers to Signals, the basic information units vital for evaluating security risks in enterprise customers' organizations. Each signal comprises two key elements: a reference to a relevant entity (e.g., machine or user) and associated security context. The dedicated sections for Signals will clarify this concept in further detail.


Target


This document is intended for Engineers and Architects who wish to understand how to post security findings to SAFE. This can be done either by an integration with security datalake or by writing a custom connector with the source tool.


Prerequisites


The requirements to fetch signals from a data lake are as follows:

Step

Requirement

Data lake Identification

Check where the customer has hosted their data lake, and if there are any APIs to automatically fetch the data. Example: Customers might store their data in ServiceNow, S3 bucket, DBs like MS SQL, Snowflake etc.

Datalake format

Find the format in which the findings are stored in data lake.  This steps requires users to:

  1. Identify source fields

  2. Map source fields to the necessary SAFE’s signal fields

For example: Vulnerability might be called a Plugin or just a Name in a customer’s data lake and need to be translated as Name as required by the signal format.

Signal format

The signal format can be found here: https://github.com/Safe-Security/signal/blob/main/nodejs/src/interfaces/signal.ts

Sample signal templates can be found here: https://github.com/Safe-Security/signal/tree/main/examples/samples

Upload to SAFE

Ensure that the user has an Admin account in the specific SAFE platform and that they have generated their API credentials to authenticate with SAFE’s Rest APIs.

If not, an existing SAFE Admin user can generate API credentials for their SAFE user by following the document:

When designing an integration, it is important to consider the number of signals the integration will generate. There are two approaches to submitting signals:

  1. Individual signal submission.

  2. Zipped signal submission.

As a ballpark value, if more than 100 signals are to be generated by the integration in a 24-hour period, the integration must present the signals in zipped bundles.

Note:

Information about signal and signal zips has been discussed in the How to upload a signal to SAFE section.


Implementation


This section details how to implement a connection to SAFE and upload Signals. Mainly, the following SAFE APIs will be used throughout the process:

  • POST /api/v3/auth API

  • POST /api/v3/signals/zip API (Recommended)

To try the above and other APIs SAFE offers, navigate to SAFE’s Swagger documentation. To access SAFE’s Swagger documentation, you can refer to the following document: https://docs.safe.security/docs/accessing-safe-apis


Authentication


To authenticate SAFE’s APIs, you require an access token. To know more about generating the access tokens and how to renew them when they expire, follow the below guide.

Generate access tokens to authenticate SAFE’s API calls


To generate access tokens for SAFE’s API calls, you call the following POST /api/v3/auth API with your API credentials.

Below is a sample Python 3.x implementation for reference:

import requests
import getpass as gp
from base64 import b64encode
import json

# The following function takes the subdomain as the argument and fetches the access token.
def getAccessToken(subdomain):
    url = f"https://{subdomain}.safescore.ai/api/v3/auth"
    username = input(f"Enter {subdomain}'s API username: ")
    password = gp.getpass(prompt=f"Enter {subdomain}'s API password: ")
    print(f"Getting Access Token for {subdomain}")
    payload = ""
    headers = {
         "Authorization": "Basic {}".format(b64encode(bytes(f"{username}:{password}", "utf-8")).decode("ascii")),
         'Content-Type': 'text/plain'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    if (response.status_code) == 200:
        json_str = json.dumps(response.json())
        resp = json.loads(json_str)
        token = (resp['accessToken'])
        print(f"Access Token Received for {subdomain}")
        return token
    else:
        print(f"Incorrect API credentials for {subdomain}. Please check.")
        exit(0)

example_subdomain = 'companyABC'
access_token = getAccessToken(example_subdomain)

SAFE’s API tokens renewal


SAFE’s API Tokens expire within 60 minutes of their generation. If an existing token expires, the following response code is logged when calling any SAFE API: 401.

If a program is running for more than 60 minutes, users can check for the above response code and regenerate a new access token to continue with the script execution.

The following code snippet is an example to help users check for token expiry and regenerate new access tokens:

# The following function takes the API Method (such as GET/POST/PUT), the API URL as the argument and fetches the access token.
def performApiCalls(method, url, accessToken, payload={}):
    print(f'Calling the following {method} API URL: {url}')
    headers = {
          'Authorization': 'Bearer ' + accessToken,
          'Content-Type': 'application/json'
    }
    response = requests.request(method, url, headers=headers, data=payload)
    # Proceed with the execution if API call is successful with response code 200
    if (response.status_code) == 200:
        code = response.status_code
        resp = response.json()
        return (code, resp)
    # Generate a new access token if the API call fails with response code 401
    elif (response.status_code) == 401:
        print("Token expired, re-enter the credentials")
        if getAccessToken():
            print("Access Token successfully refreshed")
            return performApiCalls(method, url, accessToken, payload)

Signal Creation

This section aims to explain all about a Signal, when you should use it, and how to create it.


SAFE’s signal architecture


A signal represents the fundamental unit of information, encompassing crucial security details about an enterprise customer. These details empower SAFE to assess and quantify risks for the customer's organization. It contains two fundamental properties:

  • A reference to an entity related to the customer’s organisation. This can be a machine or a user.

  • A security context about the entity.

For example, if the security details are vulnerability findings: One Signal would be made up of one vulnerability finding information only, as it is the smallest unit of security information.

For more details information, we encourage you to reach out to our GitHub page here: https://github.com/Safe-Security/signal/tree/main


When to use a signal?

You should use a Signal to:

  • Create new assets/entities in SAFE with its associated findings.

  • Update findings against existing assets/entities in SAFE.

  • Update asset/entity metadata.

  • Update the custom fields associated with any existing asset/entity.


How to create a signal JSON?


To convert crucial security information from your signal source to SAFE-supported signal format, you can go through the signal scheme present here: https://github.com/Safe-Security/signal/blob/main/python/app/dataclass/signal.py

A signal consists of a few mandatory fields and a few optional fields depending on the type of information being uploaded onto the SAFE platform. This information can be in the form of a Configuration Assessment (CA), Vulnerability Assessment (VA) or Endpoint Detection and Response (EDR).

The following example checklist for VA signals can help you understand these fields in detail:

Does the signal source have the following attribute?

Requirement type

Signal JSON Field

A unique GUID (randomly generated during submission, for traceability)

Mandatory

id

Signal type = va (In this case)

Mandatory

securityContext.type

Name of the vulnerability (a unique text line)

Mandatory

name

Hostname (or IP), preferably FQDN

Mandatory

entity.name

Status of vulnerability (pass/fail)

Mandatory

securityContext.status.complianceStatus

Severity (CVSS value)

Mandatory

securityContext.severity.value

CVE ID

For SAFE 4.1+: Optional, but recommended.

For other SAFE versions: Mandatory if no TTP is provided

securityContext.standardsMapping

(Array of CVEs, Ex: [CVE-2021-44228,CVE-2021-44229])

Ip Address

Optional

entity.entityAttributes.ipAddresses (Array of IPs) (name should be "Ip" and ipv4 should be Ip Address)

ATT&CK TTP mapping*

For SAFE 4.1+: Optional.

For other SAFE versions: Mandatory if no CVE ID is provided.

securityContexts.standardsMapping - Applicable only for VA, EDR finding types with multiple instances

securityContext.standardsMapping - Applicable for all finding types (including VA and EDR) with a single instance.

Description (para)

Optional

signal.description

Source tool name (a unique name)

Optional

source.name

Next submission Interval in minutes

Optional

First Seen time

Optional

firstSeen

Last Seen time

Recommended

lastSeen

CIA values

Optional

entity.entityAttributes.confidentialityRequirement

entity.entityAttributes.integrityRequirement

entity.entityAttributes.availabilityRequirement

Asset criticality

Optional

entity.entityAttributes.criticality

Signal Expiry

Optional

signal.expiresAt (Format: YYYY-MM-DDTHH:MM:SS.000Z)

Additional tags for entity (Example: dept, location, os)

Recommended

entity.entityAttributes.tags

Example:

entity.entityAttributes.tags: {
"dept": ["IT"]
}

CVSS version

Optional

CVSS string

Optional

The status of the signal (Qualified/Failed)

Mandatory

securityContexts.complianceStatus.status - Applicable only for VA, EDR finding types with multiple instances

securityContext.complianceStatus.status - Applicable for all finding types (including VA and EDR) with a single instance.

Evidence of the security context.
Example: A reference to a screenshot or Windows registry content that proves a missing configuration

Optional field, will work with empty values

securityContexts.evidence - Applicable only for VA, EDR finding types with multiple instances

securityContext.evidence - Applicable for all finding types (including VA and EDR) with a single instance.

For your reference, sample signal examples have been provided on the following page: https://github.com/Safe-Security/signal/tree/main/examples/samples

The above linked page contains signal examples of varying quality such as Sample and High Quality to give the user a complete view of the requirements around signal creation.

Note:

High Quality Signal are recommended as they contains information for both the entity as well as the security findings associated with the entity.


Signal Submission

This section illustrates the details of the methods to upload a signal to SAFE, as well as best practices around the topic.


How to upload a signal to SAFE?


SAFE provides the following two ways to upload a signal:

  • Via the Signals (JSON) API (POST /api/v3/signals/):

    • This API is used to upload a single signal at a time in the form of a JSON.

    • This API has a limitation of 6000 requests/minute.

    • This is ideal for uploading a small number of/or a single signal(s) against an asset.

  • Via the Signals (ZIP) API (POST /api/v3/signals/zip):

    • This API is used to upload a ZIP containing multiple signals at a time in the form of a JSON.

    • This API has a limitation of ZIP size 5 Mb/request.

    • This is ideal for uploading a large number of signals against multiple assets.

Please note that the Signals (ZIP) API approach is the only the recommended approach by SAFE, as it has a number of advantages over the single Signal (JSON) upload approach. A few of the major advantages are covered below.


Advantages of the Signal (ZIP) API approach


As discussed above, the Signal (ZIP) API approach is highly recommended by SAFE due to the major advantages it has over the Signals (JSON) API approach. A few of these are:

  • SAFE processes signals contained in a ZIP faster than multiple signal JSONs uploaded individually.

    • This acceleration in processing speed is due to the elimination of repetitive tasks performed inside SAFE such as creating new/checking existing assets, adding assessments against assets etc

    • To understand this with an example, suppose you need to upload 500 signals for an asset:

      • The Signals (JSON) API approach would cause SAFE to repeatedly perform the internal activities 500 times i.e., once for each API call.

      • The Signals (ZIP) API approach would cause SAFE to perform the internal activities only once for all 500 signals.

  • The ZIP approach requires significantly fewer API calls as compared to the Signals (JSON) API.

    • To understand this with an example: Suppose you need to upload 500 signals against an asset:

      • The Signals (JSON) API approach would require 500 API calls to be made to SAFE and is subjected to rate-limiting, token expiry, etc.

      • The Signals (ZIP) API approach would require just 1 API call as all 500 signals for the asset are packaged into one ZIP. The only limitation here is that the ZIP size shouldn’t exceed 5 Mb/request.

  • The number of API calls made to SAFE is directly proportional to the time taken to process signals in SAFE. This is an easy-win situation for the ZIP approach.

  • Through the ZIP approach, users are provided additional configuration properties which can be useful for custom connectors/requirements.

This guide will focus on the usage of the ZIP API.


Signal ZIP API


Size Limits


SAFE supports a maximum size of 5 Mb per ZIP file containing signals. If your ZIP file exceeds this limit, you must divide the ZIP file into smaller ZIPs so that the size limit is not breached.

Note:

When splitting up a ZIP you must ensure that  the entire assessment for the asset is contained in a single zip, else it will result in assessment loss.


Advanced processing options


Signals JSON ZIPs provide users with the ability to instruct SAFE on how they want their Signals to be processed. This is possible due to the ability of ZIPs to be packaged with a configuration file i.e., a config.json (ZipConfig). This ZipConfig can be added as a separate JSON file inside the Signals ZIP that is being submitted to SAFE and can contain the following tags:

{
fullAssessmentForTypes?: SecurityType[];
shouldImportAssets?: boolean;
shouldAllowOverrideAFControl?: boolean;
shouldUpdateExistingAssetsMetadata?: boolean;
}

These tags instruct SAFE to process the signals in the ZIP as follows:

  • fullAssessmentForTypes: This tag allows users to configure:

    • Whether to append the signals in the JSON to the existing assessment of an asset, for the asset types mentioned.

    • Or to replace the entire assessment against the entity with the latest signals present in the ZIP.

    • Possible values for this tag are as follows:

      finding = "finding",
      ca = "ca",
      va = "va",
      edr = "edr",
      uba = "uba",
      waf = "waf",
      others = "others"
    • When you should append vs replace signals?

      • Appending a signal to an existing assessment data of an asset is preferable in cases where:

        • More than one ZIP is required to provide assessment data for an asset/entity due to ZIP size restrictions (5 Mb/request).

        • Only new findings are to be appended/updated to an existing entity’s assessment data, given that the old assessment data remains true to the entity’s current security posture.

      • Replacing all existing assessment data for an asset is preferable in cases where:

        • The existing assessment data is stale and doesn’t represent the current risk posture of the asset. Having only the latest assessment data is preferred as it contains the currently open findings for the assets.

    • Info:

      • If this flag is not provided, SAFE will replace existing assessment against entities with the assessment data present in the signals (default behaviour).

    Note:

    While using this flag with “true” value for any asset, you need to ensure that the entire assessment for the asset is in a single zip, else it will result in assessment loss.

  • shouldImportAssets: This tag instructs SAFE to import any assets/entities present in the signals which don’t already exist in the SAFE platform.

    • Example: If WindowsVM1 exists as an entity in the signal JSON (based on the data received from a Signal source), but doesn’t exist in SAFE - the shouldImportAssets tag instructs SAFE on how to handle such a case.

      • If the value provided is true, it will onboard the new entity to SAFE.

      • If the value provided is false, it will not onboard the new entity to SAFE and ignore the signals associated with that entity.

    • Note: If this flag is not provided, SAFE will import the assets/entities present in the signals (default behaviour).

  • shouldAllowOverrideAFControl: This tag instructs SAFE to override a Finding’s Accepted Failed status in SAFE. To explain further:

    • SAFE allows users to mark a Finding as Accepted Failed in cases where some security finding is expected.

    • This status of the Finding remains unchanged unless the user manually reverts the Finding status to Failed.

    • This tag overrides the status of such Findings i.e., it can mark Accepted Failed Findings as Failed based on the status of such Findings in the latest Signal JSON upload.

    • Note: If this flag is not provided, SAFE does not override existing Accepted Failed findings (default behaviour).

  • shouldUpdateExistingAssetsMetadata: This tag instructs SAFE as to whether metadata (such as entity name) corresponding to an asset should be updated.
    This tag is useful when the assets are being fetched from multiple integrations, but don’t want all of them to keep updating the metadata corresponding to the assets.

    • Note: If this flag is not provided, SAFE will update metadata for existing assets based on the values provided in the signals (default behaviour).

An example code demonstrating the creation of signal config JSON is as follows:

def createConfigJson(path, metaDataFlag):
    config_json = { "shouldImportAssets": "true", "fullAssessmentForTypes": ["va"], "shouldUpdateExistingAssetsMetadata": "false" }
    with open(f'{path}/config.json','w') as fobj:
            fobj.write(json.dumps(config_json, indent=2))

The above code instructs SAFE to perform the following activity while processing the signals provided in the ZIP:

  • "shouldImportAssets": "true" - Imports any new asset/entity found in Signal JSON which is not already onboarded to SAFE

  • "fullAssessmentForTypes": ["va"] - Instructs SAFE to replace all existing VA assessments for an entity/asset defined by the signal ZIP with the latest vulnerability signals.

  • "shouldUpdateExistingAssetsMetadata": "false" - Instructs SAFE not to update existing asset metadata.


Update Frequency for Signals


The frequency at which you update SAFE signals will vary based on how dynamic your data source is. SAFE would normally recommend that a delta sync (e.g. the changes since the last sync) is uploaded once every 24 hours. This is to ensure an up-to-date view of your environment.

To upload only the necessary signals to SAFE, you can decide whether to proceed with a full upload approach, a partial upload approach, or a combination of both as follows:

  • A full upload approach can be used whenever all the signals and asset metadata are uploaded to SAFE for the first time. It is recommended that this is done at the initiation of the integration. You may choose to perform a periodic full upload to fully reconcile SAFE and the data lake.

    • This is recommended to avoid automatic assessment deletion after a duration (Default: 90 days) and asset deletion if there is no data.

      • Please note that the asset deletion is configurable by the customer under settings.

    • You must ensure the full sync is completed before starting any delta syncs.

  • Post this, the customer can have some automation in place to check their signal source frequently and find out whenever some finding such as vulnerability is updated. It could be a case where a new vulnerability is found or existing vulnerabilities are patched (change in status).

In the cases mentioned above, the customer must submit only the active findings instead of all the signals.


Upload rates, Timeouts and Backoff


As with any API, there is a limit to the number of calls being made to SAFE. SAFE responds with a 429 API response code whenever the number of API calls being made to SAFE exceeds this limit. SAFE further imposes a timeout due to this rate-limiting on the subsequent API calls, till the system is available for new ingestions.

Hence whenever your APIs get an error 429 API response code, it would mean that you have hit rate-limiting. In such cases, the users can refer to the back-off time provided in the error response message.

For your reference, here’s a Python 3.x code snippet which helps in dealing with such timeouts:

def performApiCalls(method, API_URL, headers, files, accessToken):
    logger.info(f'Calling the following {method} API URL: {API_URL}')
    response = requests.request(method, API_URL, headers=headers, files=files)
    if (response.status_code) == 200:
        code = response.status_code
        resp = response.json()
        return (code, resp)
    elif (response.status_code) == 429:
        json_str = json.dumps(response.json())
        resp = json.loads(json_str)
        message = resp["message"]
        logger.info(f"{message}")
        sec = message.split(' ')[9-1]
        time.sleep(float(sec))

SAFE recommends that all integrations should implement the following:

  • Backoff handling, in response to 429 errors.

  • The next upload should only proceed having received a response from SAFE (either success or failure).

Note:

Uploads based on a timing schedule (e.g. once every 30 seconds) are not recommended.


Custom Fields and Other Metadata


This section describes how to attach custom fields to assets or add other metadata such as department. Please note that this activity is only supported with the Signal (ZIP) API approach.

Initial Steps


Asset/Entity Metadata consists of two types of fields:

  • Default fields: These fields are present by default in SAFE i.e., they are associated with every asset/entity as soon as they get onboarded to SAFE.

    • A few examples of such fields are Department, Location, Asset Types, etc.

    • In case no value is provided for such fields while onboarding an asset/entity to SAFE, a default value is applied.

    • To update/attach values for such predefined fields, just define the options for them via the SAFE APIs such as:

      • To add a new department: POST /api/v3/company/departments

      • To add a new location: POST /api/v3/locations

  • Custom fields: These fields are to be defined by the customer beforehand i.e., they are not built-in to the system. As their definition suggests, these fields are ‘custom' i.e., created by users as per their requirements.

    • As mentioned above, these fields as well as their options need to be explicitly defined in SAFE.

    • Please refer to this guide to know how to create a custom field and define its options: https://docs.safe.security/docs/custom-fields-1

    • Once custom fields are created, the user can send the custom field configuration as part of a signal. For example, the following configuration in a signal can add Dept1 value for a custom field named customDept:

      entity.entityAttributes
      "tags": {
              "customDept": [
                "Dept1"
              ]
            }

Hence, the only major difference between default fields and custom fields is:

  • The default fields are predefined in SAFE and only need to have their options defined by users.

  • The customer fields are to be defined by the user, and provide the custom field options to be associated with such fields.

Both of the above fields can be attached/updated via signals. More on this has been explored below:

Signal JSON to update asset metadata


You can manipulate the entity dictionary in a signal JSON with the required values for the keys (metadata) which are to be updated for an asset.

The following sample signal JSON can attach/update an existing asset’s/entity’s metadata with hostname: signalUpdateAssetMetadata.acme.com when uploaded to a SAFE platform

  • Location: New York (Default field)

  • OS: Windows 11 (Default field)

  • Department: Customer Success (Default field)

  • Custom Field: Domain with value: Production (Custom Field)

{
 "entity": {
    "type": "machine",
    "name": "signalUpdateAssetMetadata.acme.com",
    "entityAttributes": {
      "criticality": "high",
      "confidentialityRequirement": "high",
      "integrityRequirement": "high",
      "availabilityRequirement": "high",
      "tags": {
        "location": [
          "New York"
        ],
        "dept": [
          "Customer Success"
        ],
        "os": [
          "Windows 11"
          ],
        "Domain": [
          "Production"
          ]
      }
    }
  }
}

Make sure to have the shouldUpdateExistingAssetsMetadata tag as true in the ZipConfig to instruct SAFE to update the asset:

{
  "shouldImportAssets": false,
  "shouldUpdateExistingAssetsMetadata": true
}

Updating a finding’s status in SAFE


In a case where the status of a finding has changed i.e., either from Failed to Qualified or vice versa at the signal source, the same needs to be reflected in the SAFE platform to accurately measure the security posture of the asset/entity.

To perform this task, just change the securityContext.status.complianceStatus in the previously uploaded signal JSON for the asset+Finding combination to either pass or fail (depending on the finding's current status in the signal source for the asset/entity) and upload to SAFE to update the finding’s status in SAFE.


FAQs


Where can I find SAFE’s APIs?


All of the SAFE’s APIs are listed in SAFE’s Swagger documentation. To access Swagger, you can refer to the following document: https://docs.safe.security/docs/accessing-safe-apis

The above documents also contain the steps to create your API credentials.


What are common error responses seen when integrating with SAFE?


SAFE returns errors based on HTTP codes. Common errors include:

  • 400 (Bad Request) - This error is observed when SAFE detects an API call as malformed. This is a client error, and the user would need to verify their API call for anything incorrect. Some examples which can trigger API calls are as follows:

    • The API’s JSON payload encapsulates strings in single quotes instead of double quotes (JSON standard).

    • The API's JSON payload misses commas when separating keys.

    • Brackets not closed/incorrect in API’s JSON payload.

  • 401 (Unauthorised) -  This error is observed when the access token used to authorise the API call gets expired/invalid. The user would need to regenerate the access token to proceed with subsequent API calls.

  • 404 (Bad Request) - An invalid API call has been made with SAFE.

  • 429 (Too many requests) -  This error is observed when the API request encounters a rate limit from SAFE. The user is encouraged to refer to the backoff time provided in the response message before making the next API call.

  • 500 (and above - Server side issue) - SAFE is unable to handle API calls due to a scheduled upgrade or an unknown internal server error.


What are common issues seen when integrating with SAFE?


Common issues include:

  • Submitting a high number of signals without using the Signal ZIP format can result in slower ingestion (due to rate limits) and reduced performance (due to increased computational overhead).

  • Lack of backoffs while expecting rate limiting can cause API calls to fail.

  • Providing signals with invalid expiry date:

    • If you are providing an expiry date for your signals, make sure to have them in the following format: YYYY-MM-DDTHH:MM:SS.000Z. Providing the expiry date in an invalid format might result in the Findings not being expired at the expected time. Example: 2023-12-21T10:00:00.000Z

    • Make sure that you aren’t providing an expiry date preceding the current date. Providing such a date results in signals not being converted to findings in SAFE.


I submitted multiple assets but I see some assets are not onboarded into SAFE.

This is observed when either the zip config is incorrect for the specific assets, or there is an asset matching criteria issue i.e., the assets have the same hostname, FQDN, etc. leading them to be considered an not-unique by the system.


Why am I not seeing a score for my asset?

SAFE determines the risk posture of an asset by referring to the MITRE ATT&CK Techniques. Hence, every signal onboarded to SAFE is recommended to have either (or both):

  • A valid CVE (Common Vulnerabilities and Exposure) ID is associated with them i.e., the CVE shouldn’t be classified as Reserved, Disputed, or Rejected.

  • A valid TTP (Tactics, Techniques, and Procedures) associated with the signal.

In case any signal doesn't contain the above-mentioned details, it won’t be considered for scoring and breach likelihood calculation in SAFE. Subsequently, if an asset consists completely of such signals, it won’t have any score in SAFE.

For more details, refer to the following document: https://docs.safe.security/docs/risk-scenario


Can I remove/expire Findings in SAFE?

SAFE allows removing/expiring a Finding from an asset/entity if the assessment is no longer valid. Finding assessment will be removed in the following cases:

  1. If the assessment tool (signal source) has defined an expiry date for an assessment and it has crossed the current date.

    1. Example: If the signal source defines SAFE to expire a Finding against an asset on 21 December 2023, SAFE will remove it on the specified date.

  2. If the assessment tool (signal source) has not defined an expiry date, SAFE will automatically define an expiry date as per the Finding Retirement Policy.

  3. If "fullAssessmentForTypes" (va/ca/edr) is defined in the signal zip config, only the new signals coming as a part of the current assessment will be assigned to the specific asset/entity. All previous assessment data will be retired from SAFE.


    How does SAFE verify if an asset/entity present in a signal is already onboarded? How does SAFE update/append findings for an existing asset/entity?

SAFE automatically checks all uploaded signals for entity context (asset metadata) and security context (finding metadata) as follows:

  • SAFE checks if the provided entity context in a signal matches with an existing entity/asset in SAFE:

    • If yes, then SAFE checks if the provided security context in the signal matches any existing findings assigned to the existing entity/asset.

      • If yes, SAFE will update the finding status as defined in the latest security context.

    • If the provided security context doesn’t match any existing findings assigned to the existing entity/asset:

      • SAFE checks if the fullAssessmentForTypes tag is used in ZipConfig.

        • If the flag is provided with the appropriate finding type, all existing assessment data will be replaced with the latest assessment data.

        • If the fullAssessmentForTypes tag is not used in ZipConfig, SAFE will append the finding to the asset/entity’s assessment data.

  • If the provided entity context doesn’t match with an existing entity/asset in SAFE:

    • If shouldImportAssets is true in ZipConfig: A new asset will be onboarded to SAFE.

    • If shouldImportAssets is false in ZipConfig: The signal will be ignored.


Examples


Example Script to upload VA Signal JSON ZIPs to a SAFE platform

Objective of the script: To convert vulnerability findings from a CSV report to signals and upload them to SAFE.

Requirements:

  • Python 3.x and related modules:

    • csv, json, getpass, time, base64, logging, argparse, requests, datetime, os, zipfile

  • API credentials for the SAFE platform.

  • A CSV containing vulnerabilities.

    • Sample CSV for template:  

# Import the required modules
import csv
import json
import getpass as gp
import time
from base64 import b64encode
import logging
import argparse
import requests
from datetime import datetime, timezone
import os
import zipfile

# For logs
logging.basicConfig(filename="/var/log/csvtojson/csvtojson.log", filemode='w', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

# Variables
timenow=(datetime.now(timezone.utc)).isoformat()
timeforfolder = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
csv.field_size_limit(100000000)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--csv', help= "Please enter the complete path of csv")
    parser.add_argument('--destbucket', help= "Please enter the destination S3 bucket name")
    parser.add_argument('--subdomain', help= "Please enter SAFE tenant's subdomain which can help in logging purposes", required=True)
    parser.add_argument('--csvkey', help= "Please enter the complate path of S3 csv")
    parser.add_argument('--skip_info_controls', default=True, action=argparse.BooleanOptionalAction, help= "True, False")
    parser.add_argument('--metaDataUpdate', default=True, action=argparse.BooleanOptionalAction, required=True)
    args = parser.parse_args()
    logger.info(f"Starting with the script - - {args.subdomain}")
    folderName = f"{args.subdomain}-{timeforfolder}"


    logger.info(f"Working on Signal Source: TenableSC - {args.subdomain}")
    logger.info(f"Reading csv file: {args.csv} - {args.subdomain}")
    with open(args.csv, 'r', encoding='utf-8-sig') as fobj:
        # Create a signal for every row in the CSV file
        tenable_sc_json(fobj, args.subdomain, folderName, args.skip_info_controls)
    # Convert the signals to ZIP
    zip_folder(folderName, args.subdomain, args.metaDataUpdate, 3)
    accessToken = getAccessToken(args.subdomain)
    uploadtoSAFE(folderName, args.destbucket, args.subdomain, accessToken)
    
def tenable_sc_json(csv_file: object, subdomain, folder_name, skip_info_controls):
    createfolder(folder_name)
    logFilesCount = 0
    totalJsonCount = 0
    reader = csv.DictReader(csv_file)
    for row in reader:
        signal_json = {}
        version = row.get("Version")
        signal_json["version"] = "1.0"
        vulnID = row.get("Plugin")
        signal_json["id"] = vulnID
        signal_json["type"] = "default"
        vulnTitle = row.get("Plugin Name")
        signal_json["name"] = vulnTitle
        signal_json["source"] = {"name": "tenablesc"}
        vulnDesc = row.get("Description")
        entityOSName = row.get("Family")
        signal_json["description"] = vulnDesc
        entityIPAddress = row.get("Asset Name")
        entityDNSName = row.get("DNS Name")
        if (entityIPAddress is None or entityIPAddress == "") and entityDNSName is not None:
            signal_json["entity"] = {"type": "machine", "name": entityDNSName, "entityAttributes": {"ipAddresses": [{"name": "Ip", "ipv4": entityDNSName}], "type": entityOSName}}
        elif entityIPAddress is not None and (entityDNSName is None or entityDNSName == ""):
            signal_json["entity"] = {"type": "machine", "name": entityIPAddress, "entityAttributes": {"ipAddresses": [{"name": "Ip", "ipv4": entityIPAddress}], "type": entityOSName}}
        else:
            signal_json["entity"] = {"type": "machine", "name": entityDNSName, "entityAttributes": {"ipAddresses": [{"name": "Ip", "ipv4": entityIPAddress}], "type": entityOSName}}
        vulnProof = row.get("Plugin Output")
        vulnBaseScore = row.get("CVSS V3 Base Score")
        if (vulnBaseScore is None or vulnBaseScore == "" or float(vulnBaseScore) == 0.0) and skip_info_controls == True:
            continue
        vulnTempScore = row.get("CVSS V3 Temporal Score")
        if vulnTempScore == "" or vulnTempScore is None:
            vulnTempScore = "0.0"
        vulnLevel = row.get("Severity")
        vulnVector = row.get("CVSS V3 Vector")
        vulnCVEId = row.get("CVE")
        vulnCVEId = str(vulnCVEId).replace('[','').replace(']','').replace('\'','').replace('\"','')
        cveArray = list(map(str.strip, vulnCVEId.split(',')))
        cveArrayJson = []
        for cve in cveArray:
            cveArrayJson.append({"name": "cve", "value": cve})
        vulnSolution = row.get("Solution")
        vulnRef = row.get("See Also")
        vulnPort = row.get("Port")
        vulnProtocol = row.get("Protocol")
        if vulnPort is not None and vulnProtocol is not None:
            signal_json["tags"] = {"port": vulnPort, "protocol": vulnProtocol}
        elif vulnPort is not None:
            signal_json["tags"] = {"port": vulnPort}
        elif vulnProtocol is not None:
            signal_json["tags"] = {"protocol": vulnProtocol}
        if vulnRef is None:
            vulnRef = ""
        signal_json["securityContext"] = {"type": "va", "status": {"complianceStatus": "fail"}, "evidence": {"observationText": vulnProof}, "severity": {"type": "cvss", "value": float(vulnBaseScore), "level": vulnLevel, "cvss": {"vector": vulnVector, "baseScore": float(vulnBaseScore), "temporalScore": float(vulnTempScore)}}, "standardsMapping": cveArrayJson, "remediation": {"description": vulnSolution, "reference": vulnRef}, "controlType": "detection"}
        vulnFirstSince = row.get("First Discovered")
        vulnLastSince = row.get("Last Observed")
        expiryDate = row.get("Expiry Date")
        signal_json["firstSeen"] = vulnFirstSince
        signal_json["lastSeen"] = vulnLastSince
        signal_json["expiresAt"] = expiryDate
        signal_json["createdAt"] = timenow
        if vulnID is not None:
            vulnID = "_".join( vulnID.split() )
        createfolder(f'{folder_name}/{entityDNSName}{entityIPAddress}')
        logFilesCount += 1
        totalJsonCount += 1
        with open(f'{folder_name}/{entityDNSName}{entityIPAddress}/tenable-{entityDNSName}-{row["Asset Name"]}-{vulnID}.json','w') as fobj:
            fobj.write(json.dumps(signal_json, indent=2))
            if logFilesCount % 1000 == 0:
                logger.info(f"JSON files created for {totalJsonCount} rows - {subdomain}")
                logFilesCount = 0
    logger.info(f"Total JSON files created - {totalJsonCount} - {subdomain}")

def getAccessToken(subdomain):
    url = f"https://{subdomain}.safescore.ai/api/v3/auth"
    username = input(f"Enter {subdomain}'s API username: ")
    password = gp.getpass(prompt=f"Enter {subdomain}'s API password: ")
    logger.info(f"Getting Access Token for {subdomain}")
    payload="<file contents here>"
    headers = {
         "Authorization": "Basic {}".format(b64encode(bytes(f"{username}:{password}", "utf-8")).decode("ascii")),
         'Content-Type': 'text/plain'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    if (response.status_code) == 200:
        json_str = json.dumps(response.json())
        resp = json.loads(json_str)
        token = (resp['accessToken'])
        logger.info(f"Access Token Received for {subdomain}")
        return token
    else:
        logger.info(f"Incorrect API credentials for {subdomain}. Please check.")
        exit(0)

def performApiCalls(method, API_URL, headers, files, accessToken):
    logger.info(f'Calling the following {method} API URL: {API_URL}')
    response = requests.request(method, API_URL, headers=headers, files=files)
    if (response.status_code) == 200:
        code = response.status_code
        resp = response.json()
        return (code, resp)
    elif (response.status_code) == 401:
        logger.info("Token expired, re-enter the credentials")
        if getAccessToken():
            logger.info("Access Token successfully refreshed")
            return performApiCalls(method, API_URL, accessToken, files)
    elif (response.status_code) == 429:
        json_str = json.dumps(response.json())
        resp = json.loads(json_str)
        message = resp["message"]
        logger.info(f"{message}")
        sec = message.split(' ')[9-1]
        time.sleep(float(sec))
        return performApiCalls(method, API_URL, accessToken, files)
    elif(response.status_code) == 400:
        code = response.status_code
        resp = response.json()
        message = resp["message"]
        logger.info(f"Error: {message}")
        return (code, resp)
    else:
        logger.info(f"Calling API {API_URL} failed with status code {response.status_code}. Here is the response:- {response.json()}")
        return ""

def uploadtoSAFE(folder_name, bucket_name, subdomain, accessToken):
    logger.info(f"Starting ZIP upload to SAFE - {subdomain}")
    API_URL = f'https://{subdomain}.safescore.ai/api/v3/signals/zip'
    headers = {'accept': 'application/json'}
    for root, dirs, files in os.walk(folder_name):
        for file in files:
            file_path = os.path.join(root, file)
            # Check if the file is a ZIP file
            if zipfile.is_zipfile(file_path):
                files = {'file': (file_path, open(f'{file_path}', 'rb'), 'application/zip')}
                code, response = performApiCalls("POST", API_URL, headers, files, accessToken)
                logger.info(response) 
    logger.info(f"ZIP Upload completed successfully - {subdomain}")
    removecmd = f"rm -rf {folder_name}/"
    os.system(removecmd)
    logger.info(f"Deleting all zip files - {subdomain}")

def createfolder(folder_name):
    if not os.path.exists(f"{folder_name}"):
        os.makedirs(f"{folder_name}")

def zip_folder(path, subdomain, metaDataFlag, max_filesize=3):
    logger.info(f"Starting with zip creation - {subdomain}")
    folders = list(filter(lambda x: os.path.isdir(os.path.join(path, x)), os.listdir(path)))
    assetCount = 0
    totalCount = 0
    for folder in folders:
        assetCount += 1
        totalCount += 1
        createConfigJson(f'{path}/{folder}', metaDataFlag)
        files_written = 1 # file counter
        zip_name = f"{path}/{folder}-{files_written}.zip" # first zip filename
        zipWriter = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) # initialize zip writer
        for root, dirs, files in os.walk(f'{path}/{folder}'):
            for file in files:
                zipWriter.write(os.path.join(root, file), 
                        os.path.relpath(os.path.join(root, file), 
                                        os.path.join(f'{path}/{folder}', '..')))
                
                # current zip size
                current_size = os.path.getsize(zip_name) >> 20
                if current_size > max_filesize: # if zip size increase allowed size
                    zipWriter.close() # close current zip writer
                    files_written += 1 # add increment
                    zip_name = f"{path}/{folder}-{files_written}.zip" # new zip name
                    zipWriter = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) # initialize writer
        removecmd = f"rm -rf {path}/{folder}"
        os.system(removecmd)
        if assetCount % 50 == 0:
            logger.info(f"Zip files created for {totalCount} assets - {subdomain}")
            assetCount = 0
        createfolder(f'{path}/zipfiles')
        cmd = f"mv {path}/*.zip {path}/zipfiles"
        os.system(cmd)
    logger.info(f"Total Zip files created {totalCount} - {subdomain}")
    logger.info(f"Zip files created successfully - {subdomain}")

def createConfigJson(path, metaDataFlag):
    config_json = { "shouldImportAssets": "true", "fullAssessmentForTypes": ["va"], "shouldUpdateExistingAssetsMetadata": metaDataFlag }
    with open(f'{path}/config.json','w') as fobj:
            fobj.write(json.dumps(config_json, indent=2))

if __name__ == "__main__":
    main()


Was this article helpful?