This example sets up an application that receives incoming IPv4 UDP packets, and then programs / learns them as new flows. The flows is learn with sorted IP addresses and port, enabling a single physical port to handle packets in both directions of the flow. See SPAN port.
This application also maintains a lookup table of programmed flows, to ensure that the same flow are not learns more than once. The application relies heavily on the C++11 std library to do this in a concise way.
This application does not transmit packets, and does not monitor traffic in a significant way. Running this application alongside of the NT tools "pktgen" and "monitoring" will help making the functionality of this application clear.
This example is build for adapters with flowmatcher functionality.
#include <algorithm>
#include <array>
#include <atomic>
#include <cstdint>
#include <cstring>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#define STR_INNER(A) #A
#define STR(A) STR_INNER(A)
namespace {
{
if (status != NT_SUCCESS) {
std::cerr << message << ": " << error_buffer << std::endl;
std::exit(EXIT_FAILURE);
}
}
{
status =
NT_ConfigOpen(&config_stream,
"flow_learn_span NT_ConfigOpen");
for (const auto& ntpl : ntpls) {
}
}
{
const uint64_t* ptr =
reinterpret_cast<const uint64_t*
>(flow->
keyData);
return (ptr[0] * 45684803) ^
(ptr[1] * 198138211) ^
(ptr[2] * 994229) ^
(ptr[3] * 8349871) ^
(ptr[4] * 3294876479) ^
(meta * 663664226587);
}
void rx_task(std::atomic<int>* ready, std::atomic<bool>* end_task,
uint32_t stream_id, std::function<
void(
const NtNetBuf_t&)> handle)
{
ready->fetch_add(1);
while (!end_task->load()) {
if (status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN) continue;
handle(net_buffer);
}
}
#define BLACKLIST 3
#define WHITELIST 4
std::unordered_multimap<uint64_t, std::unique_ptr<NtFlow_t>>
flow_map;
{
ready->fetch_add(1);
while (!end_task->load()) {
}
}
}
{
counter_unhandled += 1;
}
{
counter_miss += 1;
const uint8_t* packet =
reinterpret_cast<const uint8_t*
>(dyn4) + dyn4->
descrLength;
auto flow = std::unique_ptr<NtFlow_t>(
new NtFlow_t);
std::memset(flow.get(), 0x0,
sizeof(
NtFlow_t));
const uint8_t* ipv4_src = packet + dyn4->
offset0;
const uint8_t* ipv4_dst = packet + dyn4->
offset0 + 4;
const uint8_t* udp_src = packet + dyn4->
offset1;
const uint8_t* udp_dst = packet + dyn4->
offset1 + 2;
if (std::lexicographical_compare(ipv4_src, ipv4_src + 4, ipv4_dst, ipv4_dst + 4) ||
(std::equal(ipv4_src, ipv4_src + 4, ipv4_dst) &&
std::lexicographical_compare(udp_src, udp_src + 2, udp_dst, udp_dst + 2))) {
std::memcpy(flow->
keyData, ipv4_src, 4);
std::memcpy(flow->
keyData + 4, ipv4_dst, 4);
std::memcpy(flow->
keyData + 8, udp_src, 2);
std::memcpy(flow->
keyData + 10, udp_dst, 2);
}
else {
std::memcpy(flow->
keyData, ipv4_dst, 4);
std::memcpy(flow->
keyData + 4, ipv4_src, 4);
std::memcpy(flow->
keyData + 8, udp_dst, 2);
std::memcpy(flow->
keyData + 10, udp_src, 2);
}
auto flow_raw_ptr = flow.get();
auto range = flow_map.equal_range(flow->
id);
for (auto it = range.first; it != range.second; ++it) {
if (std::equal(flow->
keyData, flow->
keyData + 40, it->second->keyData) &&
flow->
keyId == it->second->keyId && flow->
keySetId == it->second->keySetId) {
return;
}
}
flow_map.insert(std::pair<uint64_t, std::unique_ptr<NtFlow_t>>{flow->
id, std::move(flow)});
}
{
counter_blacklist_hit += 1;
}
{
counter_whitelist_hit += 1;
}
}
{
constexpr uint8_t adapter_no = 0;
"Delete=All",
"DefineMacro(\"FilterCheck\", \"Port==$1 and Layer3Protocol==IPv4 and Layer4Protocol==UDP\")",
"DefineMacro(\"KeyTypeProtoSpecs\", \"(layer3header[12]/32, layer3header[16]/32, layer4header[0]/16,layer4header[2]/16)\")",
"KeyType[Name=kt] = {32, 32, 16, 16}",
"KeyDef[Name=kd; KeyType=kt; IpProtocolField=Outer; KeySort=Sorted] = KeyTypeProtoSpecs",
"Assign[StreamId=0; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == UNHANDLED",
"Assign[StreamId=1; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == MISS",
"Assign[StreamId=2; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(
BLACKLIST),
"Assign[StreamId=3; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(
WHITELIST),
"Assign[StreamId=Drop; Priority=10]=All"
});
NtFlowAttr_t flow_attr;
std::atomic<bool> end_task {false};
std::atomic<int> ready {0};
while(ready.load() < 5) std::this_thread::yield();
std::cout << "Press enter to end application..." << std::endl;
std::cin.get();
end_task.store(true);
if (flow_program_task0.joinable()) flow_program_task0.join();
if (rx_task3.joinable()) rx_task3.join();
if (rx_task2.joinable()) rx_task2.join();
if (rx_task1.joinable()) rx_task1.join();
if (rx_task0.joinable()) rx_task0.join();
}
return 0;
}