flow/flow_learn_span/flow_learn_span_example.cpp

Reference Documentation

Platform
Intel® PAC
Napatech SmartNIC
Content Type
Reference Information
Capture Software Version
Link™ Capture Software 12.10
Napatech Software Suite: flow/flow_learn_span/flow_learn_span_example.cpp
flow/flow_learn_span/flow_learn_span_example.cpp

Description

This example sets up an application that receives incoming IPv4 UDP packets, and then programs / learns them as new flows. The flows is learn with sorted IP addresses and port, enabling a single physical port to handle packets in both directions of the flow. See SPAN port.

This application also maintains a lookup table of programmed flows, to ensure that the same flow are not learns more than once. The application relies heavily on the C++11 std library to do this in a concise way.

This application does not transmit packets, and does not monitor traffic in a significant way. Running this application alongside of the NT tools "pktgen" and "monitoring" will help making the functionality of this application clear.

The example program uses the following NTAPI functions:

This example is build for adapters with flowmatcher functionality.

/*
*
* Copyright 2023 Napatech A/S. All Rights Reserved.
*
* 1. Copying, modification, and distribution of this file, or executable
* versions of this file, is governed by the terms of the Napatech Software
* license agreement under which this file was made available. If you do not
* agree to the terms of the license do not install, copy, access or
* otherwise use this file.
*
* 2. Under the Napatech Software license agreement you are granted a
* limited, non-exclusive, non-assignable, copyright license to copy, modify
* and distribute this file in conjunction with Napatech SmartNIC's and
* similar hardware manufactured or supplied by Napatech A/S.
*
* 3. The full Napatech Software license agreement is included in this
* distribution, please see "NP-0405 Napatech Software license
* agreement.pdf"
*
* 4. Redistributions of source code must retain this copyright notice,
* list of conditions and the following disclaimer.
*
* THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTIES, EXPRESS OR
* IMPLIED, AND NAPATECH DISCLAIMS ALL IMPLIED WARRANTIES INCLUDING ANY
* IMPLIED WARRANTY OF TITLE, MERCHANTABILITY, NONINFRINGEMENT, OR OF
* FITNESS FOR A PARTICULAR PURPOSE. TO THE EXTENT NOT PROHIBITED BY
* APPLICABLE LAW, IN NO EVENT SHALL NAPATECH BE LIABLE FOR PERSONAL INJURY,
* OR ANY INCIDENTAL, SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES WHATSOEVER,
* INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF PROFITS, CORRUPTION OR
* LOSS OF DATA, FAILURE TO TRANSMIT OR RECEIVE ANY DATA OR INFORMATION,
* BUSINESS INTERRUPTION OR ANY OTHER COMMERCIAL DAMAGES OR LOSSES, ARISING
* OUT OF OR RELATED TO YOUR USE OR INABILITY TO USE NAPATECH SOFTWARE OR
* SERVICES OR ANY THIRD PARTY SOFTWARE OR APPLICATIONS IN CONJUNCTION WITH
* THE NAPATECH SOFTWARE OR SERVICES, HOWEVER CAUSED, REGARDLESS OF THE THEORY
* OF LIABILITY (CONTRACT, TORT OR OTHERWISE) AND EVEN IF NAPATECH HAS BEEN
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. SOME JURISDICTIONS DO NOT ALLOW
* THE EXCLUSION OR LIMITATION OF LIABILITY FOR PERSONAL INJURY, OR OF
* INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO THIS LIMITATION MAY NOT APPLY TO YOU.
*
*
*/
/**
* @example flow/flow_learn_span/flow_learn_span_example.cpp
* @section flow_learn_span_example Description
*
* This example sets up an application that receives incoming IPv4 UDP packets,
* and then programs / learns them as new flows. The flows is learn with sorted
* IP addresses and port, enabling a single physical port to handle packets in
* both directions of the flow. See SPAN port.
*
* This application also maintains a lookup table of programmed flows, to ensure
* that the same flow are not learns more than once. The application relies
* heavily on the C++11 std library to do this in a concise way.
*
* This application does not transmit packets, and does not monitor traffic in a
* significant way. Running this application alongside of the NT tools "pktgen"
* and "monitoring" will help making the functionality of this application clear.
*
* The example program uses the following NTAPI functions:
* - @ref NT_Init()
* - @ref NT_Done()
* - @ref NT_ExplainError()
* - @ref NT_ConfigOpen()
* - @ref NT_ConfigClose()
* - @ref NT_NTPL()
* - @ref NT_NetRxOpen()
* - @ref NT_NetRxClose()
* - @ref NT_NetRxGetNextPacket()
* - @ref NT_NET_GET_PKT_DESCR_PTR_DYN4()
* - @ref NT_FlowOpenAttrInit()
* - @ref NT_FlowOpenAttrSetAdapterNo()
* - @ref NT_FlowOpen_Attr()
* - @ref NT_FlowClose()
* - @ref NT_FlowWrite()
*
* This example is build for adapters with flowmatcher functionality.
*/
// Include this in order to access the Napatech API
#include <nt.h>
#include <algorithm>
#include <array>
#include <atomic>
#include <cstdint>
#include <cstring>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#define STR_INNER(A) #A
#define STR(A) STR_INNER(A)
namespace {
////////////////////////////////////////////////////////////////////////////////
// Utility functions
////////////////////////////////////////////////////////////////////////////////
/**
* Handle errors by printing an error and exit the application.
*/
void handle_error_status(int status, const char* message)
{
if (status != NT_SUCCESS) {
char error_buffer[NT_ERRBUF_SIZE];
NT_ExplainError(status, error_buffer, sizeof(error_buffer));
std::cerr << message << ": " << error_buffer << std::endl;
std::exit(EXIT_FAILURE);
}
}
/**
* Opens a config stream and programs the input NTPL.
* When the NTPL has been programmed, the config stream is closed again.
*/
void ntpl_multicall(const std::vector<std::string>& ntpls)
{
int status;
NtConfigStream_t config_stream;
status = NT_ConfigOpen(&config_stream, "flow_learn_span NT_ConfigOpen");
handle_error_status(status, "NT_ConfigOpen() failed");
for (const auto& ntpl : ntpls) {
NtNtplInfo_t ntpl_info;
status = NT_NTPL(config_stream, ntpl.c_str(), &ntpl_info, NT_NTPL_PARSER_VALIDATE_NORMAL);
handle_error_status(status, "NT_NTPL() failed");
}
status = NT_ConfigClose(config_stream);
handle_error_status(status, "NT_ConfigClose() failed");
}
/**
* Quick and dirty hashing function for some fields in NtFlow_t.
* The hash uniqueness in this function is not great, but since std::unordered_multimap
* is used to store the flows, the application is able to handle duplicate hashes.
*/
uint64_t flow_hash(const NtFlow_t* flow)
{
const uint64_t* ptr = reinterpret_cast<const uint64_t*>(flow->keyData);
uint64_t meta = (flow->ipProtocolField << 16) | (flow->keyId << 8) | flow->keySetId;
// Multiply data with some random primes.
return (ptr[0] * 45684803) ^
(ptr[1] * 198138211) ^
(ptr[2] * 994229) ^
(ptr[3] * 8349871) ^
(ptr[4] * 3294876479) ^
(meta * 663664226587);
}
////////////////////////////////////////////////////////////////////////////////
// The general thread main function used for all StreamID
////////////////////////////////////////////////////////////////////////////////
/**
* Opens a RX stream and received packets until an end-task signal is received.
*/
void rx_task(std::atomic<int>* ready, std::atomic<bool>* end_task,
uint32_t stream_id, std::function<void(const NtNetBuf_t&)> handle)
{
int status;
NtNetStreamRx_t net_stream;
NtNetBuf_t net_buffer;
// Open RX stream
status = NT_NetRxOpen(&net_stream, "flow_learn_span NT_NetRxOpen", NT_NET_INTERFACE_PACKET, stream_id, -1);
handle_error_status(status, "NT_NetRxOpen() failed");
// Tell main function that this is ready to receive packets, and then start the loop.
ready->fetch_add(1);
while (!end_task->load()) {
// Get next packet or continue.
status = NT_NetRxGetNextPacket(net_stream, &net_buffer, 100);
if (status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN) continue;
handle_error_status(status, "NT_NetRxGetNextPacket() failed");
// See the handle functions.
handle(net_buffer);
}
// Close the RX stream.
status = NT_NetRxClose(net_stream);
handle_error_status(status, "NT_NetRxClose() failed");
}
////////////////////////////////////////////////////////////////////////////////
// Global variables
////////////////////////////////////////////////////////////////////////////////
#define BLACKLIST 3
#define WHITELIST 4
uint64_t counter_unhandled = 0;
uint64_t counter_miss = 0;
uint64_t counter_blacklist_hit = 0;
uint64_t counter_whitelist_hit = 0;
std::unordered_multimap<uint64_t, std::unique_ptr<NtFlow_t>> flow_map;
std::atomic<uint64_t> flow_learn_queue_read_index {0};
std::atomic<uint64_t> flow_learn_queue_write_index {0};
std::array<NtFlow_t*, 0x100000> flow_learn_queue;
constexpr uint64_t flow_learn_queue_index_mask = 0xfffff;
////////////////////////////////////////////////////////////////////////////////
// Main thread function for programming flows
////////////////////////////////////////////////////////////////////////////////
/**
* Separate task to program flows
*/
void flow_program_task(std::atomic<int>* ready, std::atomic<bool>* end_task)
{
// Tell main function that this is ready to receive packets, and then start the loop.
ready->fetch_add(1);
while (!end_task->load()) {
// If a new flow has been added to the queue, the pop and learn it.
auto flow_raw_ptr = flow_learn_queue[flow_learn_queue_read_index];
flow_learn_queue_read_index.store((flow_learn_queue_read_index + 1) & flow_learn_queue_index_mask);
// Learn the flow and insert it into the map.
int status = NT_FlowWrite(flow_stream, flow_raw_ptr, -1);
handle_error_status(status, "NT_FlowWrite() failed");
}
}
}
////////////////////////////////////////////////////////////////////////////////
// The handle functions for packets for each StreamID
////////////////////////////////////////////////////////////////////////////////
/**
* Handle used for the unhandled packets RX thread.
*
* In this example, unhandled packets are only counted.
*/
{
counter_unhandled += 1;
}
/**
* Handle used for the missed packets RX thread.
*
* Whenever a missed packet is received, the function calculates the flow learn
* data, and checks if it is a new flow, or a previously known flow. If the flow
* is new, then the flow is programmed.
*/
void handle_stream_miss(const NtNetBuf_t& net_buffer)
{
counter_miss += 1;
// Get the relevant pointers from the packet data.
const NtDyn4Descr_t* dyn4 = NT_NET_GET_PKT_DESCR_PTR_DYN4(net_buffer);
const uint8_t* packet = reinterpret_cast<const uint8_t*>(dyn4) + dyn4->descrLength;
auto flow = std::unique_ptr<NtFlow_t>(new NtFlow_t);
std::memset(flow.get(), 0x0, sizeof(NtFlow_t));
const uint8_t* ipv4_src = packet + dyn4->offset0;
const uint8_t* ipv4_dst = packet + dyn4->offset0 + 4;
const uint8_t* udp_src = packet + dyn4->offset1;
const uint8_t* udp_dst = packet + dyn4->offset1 + 2;
// Sort the IPv4 addresses and UDP ports for the packet. This is only required
// when the option "KeySort=Sorted" is used in the KeyDef NTPL command.
// Because of endianness the comparison is actually not trivial,
// but the std algorithms library makes it a bit easier.
if (std::lexicographical_compare(ipv4_src, ipv4_src + 4, ipv4_dst, ipv4_dst + 4) ||
(std::equal(ipv4_src, ipv4_src + 4, ipv4_dst) &&
std::lexicographical_compare(udp_src, udp_src + 2, udp_dst, udp_dst + 2))) {
// Translates to: ipv4_src < ipv4_dst || (ipv4_src == ipv4_dst && udp_src < udp_dst)
std::memcpy(flow->keyData, ipv4_src, 4);
std::memcpy(flow->keyData + 4, ipv4_dst, 4);
std::memcpy(flow->keyData + 8, udp_src, 2);
std::memcpy(flow->keyData + 10, udp_dst, 2);
}
else {
std::memcpy(flow->keyData, ipv4_dst, 4);
std::memcpy(flow->keyData + 4, ipv4_src, 4);
std::memcpy(flow->keyData + 8, udp_dst, 2);
std::memcpy(flow->keyData + 10, udp_src, 2);
}
// Some packet inspection can be added here to decide whether the new flow
// should be added to the WHITELIST or the BLACKLIST. Set flow->keySetId accordingly.
// Set other relevant fields.
flow->ipProtocolField = 0x11; // Layer 4 is UDP.
flow->keyId = 1; // Value used in the Key-test in the NTPL filter "Key(kd, KeyID=1) == WHITELIST / BLACKLIST".
flow->keySetId = WHITELIST; // Value used to compare the Key-test in the NTPL filter "Key(kd, KeyID=1) == WHITELIST / BLACKLIST".
flow->op = 1; // 1 means learn, 0 means unlearn.
// Calculate a locally used hash. The "id" field is user defined and is used to
// store a hash in this example. For data structures other than std::unordered_multimap
// it could make sense to store the value of the pointer to NtFlow_t; this
// would enable instant and easy access to the locally stored flow data.
auto flow_raw_ptr = flow.get();
flow->id = flow_hash(flow_raw_ptr);
// Check if the unique key-value pair exists, and return if it does.
// The check would be a lot simpler if std::unordered_map was used instead of
// std::unordered_multimap, but when duplicate hashes would not be handled correctly.
auto range = flow_map.equal_range(flow->id);
for (auto it = range.first; it != range.second; ++it) {
if (std::equal(flow->keyData, flow->keyData + 40, it->second->keyData) &&
flow->ipProtocolField == it->second->ipProtocolField &&
flow->keyId == it->second->keyId && flow->keySetId == it->second->keySetId) {
return;
}
}
flow_map.insert(std::pair<uint64_t, std::unique_ptr<NtFlow_t>>{flow->id, std::move(flow)});
// Compared to the number of packets that can be received per second on a RX stream,
// the learn rate is very slow, which can cause packet drops if the RX has to do
// that task as well. Thus it can be a good idea to implement a queue, and perform
// the learning from another thread.
flow_learn_queue[flow_learn_queue_write_index] = flow_raw_ptr;
flow_learn_queue_write_index.store((flow_learn_queue_write_index + 1) & flow_learn_queue_index_mask);
}
/**
* Handle used for the blacklist packets RX thread.
*
* In this example, packets that hit a programmed flow are only counted.
*/
{
counter_blacklist_hit += 1;
}
/**
* Handle used for the whitelist packets RX thread.
*
* In this example, packets that hit a programmed flow are only counted.
*/
{
counter_whitelist_hit += 1;
}
} // unnamed namespace
////////////////////////////////////////////////////////////////////////////////
// Main
////////////////////////////////////////////////////////////////////////////////
int main(int, char**)
{
constexpr uint8_t adapter_no = 0;
// Start NTAPI.
handle_error_status(status, "NT_Init() failed");
// Setup NTPL using a utility function.
// Delete previous filters and programmed flow before starting the new filters.
"Delete=All",
// Macros for simplifying following NTPL commands.
"DefineMacro(\"FilterCheck\", \"Port==$1 and Layer3Protocol==IPv4 and Layer4Protocol==UDP\")",
"DefineMacro(\"KeyTypeProtoSpecs\", \"(layer3header[12]/32, layer3header[16]/32, layer4header[0]/16,layer4header[2]/16)\")",
// Specify the packet field sizes in the 5-tuple. The 5-tuple contains the
// IPv4 addresses, IP protocol field, and the UDP ports. The IP protocol
// field is implicit when the "IpProtocolField=Outer" option is used.
// In this example sorting is used to receive two-way traffic on one port.
"KeyType[Name=kt] = {32, 32, 16, 16}",
"KeyDef[Name=kd; KeyType=kt; IpProtocolField=Outer; KeySort=Sorted] = KeyTypeProtoSpecs",
// Set up filters with a Key test.
"Assign[StreamId=0; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == UNHANDLED",
"Assign[StreamId=1; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == MISS",
"Assign[StreamId=2; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(BLACKLIST),
"Assign[StreamId=3; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(WHITELIST),
// Set up a low priority catch-all filter where packets will be dropped if
// they do not match any of the previous three filters.
"Assign[StreamId=Drop; Priority=10]=All"
});
// Start the flow stream for learning and unlearning flows.
NtFlowAttr_t flow_attr;
NT_FlowOpenAttrInit(&flow_attr);
NT_FlowOpenAttrSetAdapterNo(&flow_attr, adapter_no);
status = NT_FlowOpen_Attr(&flow_stream, "flow_learn_span NT_FlowOpen_Attr", &flow_attr);
handle_error_status(status, "NT_FlowOpen_Attr() failed");
// Start RX and flow learn tasks
std::atomic<bool> end_task {false};
std::atomic<int> ready {0};
std::thread rx_task0(rx_task, &ready, &end_task, 0, handle_stream_unhandled);
std::thread rx_task1(rx_task, &ready, &end_task, 1, handle_stream_miss);
std::thread rx_task2(rx_task, &ready, &end_task, 2, handle_stream_blacklist_hit);
std::thread rx_task3(rx_task, &ready, &end_task, 3, handle_stream_whitelist_hit);
std::thread flow_program_task0(flow_program_task, &ready, &end_task);
// Wait for RX tasks to be ready, then wait for user to stop the application.
while(ready.load() < 5) std::this_thread::yield();
std::cout << "Press enter to end application..." << std::endl;
std::cin.get();
// End application.
end_task.store(true);
if (flow_program_task0.joinable()) flow_program_task0.join();
if (rx_task3.joinable()) rx_task3.join();
if (rx_task2.joinable()) rx_task2.join();
if (rx_task1.joinable()) rx_task1.join();
if (rx_task0.joinable()) rx_task0.join();
std::cout << "counter_unhandled: " << counter_unhandled << std::endl;
std::cout << "counter_miss: " << counter_miss << std::endl;
std::cout << "counter_blacklist_hit: " << counter_blacklist_hit << std::endl;
std::cout << "counter_whitelist_hit: " << counter_whitelist_hit << std::endl;
// Unlearn all the programmed flows.
for (auto it = flow_map.begin(); it != flow_map.end(); ++it) {
NtFlow_t* flow = it->second.get();
flow->op = 0;
status = NT_FlowWrite(flow_stream, flow, -1);
handle_error_status(status, "NT_FlowWrite() failed");
}
// Close the flow stream
handle_error_status(status, "NT_FlowClose() failed");
// End NTAPI.
return 0;
}