flow_learn_span_example.cpp Source File

Reference Documentation

Platform
Intel® PAC
Napatech SmartNIC
Content Type
Reference Information
Capture Software Version
Link™ Capture Software 12.10
Napatech Software Suite: examples/flow/flow_learn_span/flow_learn_span_example.cpp Source File
flow_learn_span_example.cpp
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2023 Napatech A/S. All Rights Reserved.
4  *
5  * 1. Copying, modification, and distribution of this file, or executable
6  * versions of this file, is governed by the terms of the Napatech Software
7  * license agreement under which this file was made available. If you do not
8  * agree to the terms of the license do not install, copy, access or
9  * otherwise use this file.
10  *
11  * 2. Under the Napatech Software license agreement you are granted a
12  * limited, non-exclusive, non-assignable, copyright license to copy, modify
13  * and distribute this file in conjunction with Napatech SmartNIC's and
14  * similar hardware manufactured or supplied by Napatech A/S.
15  *
16  * 3. The full Napatech Software license agreement is included in this
17  * distribution, please see "NP-0405 Napatech Software license
18  * agreement.pdf"
19  *
20  * 4. Redistributions of source code must retain this copyright notice,
21  * list of conditions and the following disclaimer.
22  *
23  * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTIES, EXPRESS OR
24  * IMPLIED, AND NAPATECH DISCLAIMS ALL IMPLIED WARRANTIES INCLUDING ANY
25  * IMPLIED WARRANTY OF TITLE, MERCHANTABILITY, NONINFRINGEMENT, OR OF
26  * FITNESS FOR A PARTICULAR PURPOSE. TO THE EXTENT NOT PROHIBITED BY
27  * APPLICABLE LAW, IN NO EVENT SHALL NAPATECH BE LIABLE FOR PERSONAL INJURY,
28  * OR ANY INCIDENTAL, SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES WHATSOEVER,
29  * INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF PROFITS, CORRUPTION OR
30  * LOSS OF DATA, FAILURE TO TRANSMIT OR RECEIVE ANY DATA OR INFORMATION,
31  * BUSINESS INTERRUPTION OR ANY OTHER COMMERCIAL DAMAGES OR LOSSES, ARISING
32  * OUT OF OR RELATED TO YOUR USE OR INABILITY TO USE NAPATECH SOFTWARE OR
33  * SERVICES OR ANY THIRD PARTY SOFTWARE OR APPLICATIONS IN CONJUNCTION WITH
34  * THE NAPATECH SOFTWARE OR SERVICES, HOWEVER CAUSED, REGARDLESS OF THE THEORY
35  * OF LIABILITY (CONTRACT, TORT OR OTHERWISE) AND EVEN IF NAPATECH HAS BEEN
36  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. SOME JURISDICTIONS DO NOT ALLOW
37  * THE EXCLUSION OR LIMITATION OF LIABILITY FOR PERSONAL INJURY, OR OF
38  * INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO THIS LIMITATION MAY NOT APPLY TO YOU.
39  *
40  *
41 
42  */
43 
44 /**
45  * @example flow/flow_learn_span/flow_learn_span_example.cpp
46  * @section flow_learn_span_example Description
47  *
48  * This example sets up an application that receives incoming IPv4 UDP packets,
49  * and then programs / learns them as new flows. The flows is learn with sorted
50  * IP addresses and port, enabling a single physical port to handle packets in
51  * both directions of the flow. See SPAN port.
52  *
53  * This application also maintains a lookup table of programmed flows, to ensure
54  * that the same flow are not learns more than once. The application relies
55  * heavily on the C++11 std library to do this in a concise way.
56  *
57  * This application does not transmit packets, and does not monitor traffic in a
58  * significant way. Running this application alongside of the NT tools "pktgen"
59  * and "monitoring" will help making the functionality of this application clear.
60  *
61  * The example program uses the following NTAPI functions:
62  * - @ref NT_Init()
63  * - @ref NT_Done()
64  * - @ref NT_ExplainError()
65  * - @ref NT_ConfigOpen()
66  * - @ref NT_ConfigClose()
67  * - @ref NT_NTPL()
68  * - @ref NT_NetRxOpen()
69  * - @ref NT_NetRxClose()
70  * - @ref NT_NetRxGetNextPacket()
71  * - @ref NT_NET_GET_PKT_DESCR_PTR_DYN4()
72  * - @ref NT_FlowOpenAttrInit()
73  * - @ref NT_FlowOpenAttrSetAdapterNo()
74  * - @ref NT_FlowOpen_Attr()
75  * - @ref NT_FlowClose()
76  * - @ref NT_FlowWrite()
77  *
78  * This example is build for adapters with flowmatcher functionality.
79  */
80 
81 // Include this in order to access the Napatech API
82 #include <nt.h>
83 
84 #include <algorithm>
85 #include <array>
86 #include <atomic>
87 #include <cstdint>
88 #include <cstring>
89 #include <functional>
90 #include <iostream>
91 #include <memory>
92 #include <string>
93 #include <thread>
94 #include <unordered_map>
95 #include <vector>
96 
97 #define STR_INNER(A) #A
98 #define STR(A) STR_INNER(A)
99 
100 namespace {
101 
102 ////////////////////////////////////////////////////////////////////////////////
103 // Utility functions
104 ////////////////////////////////////////////////////////////////////////////////
105 
106 /**
107  * Handle errors by printing an error and exit the application.
108  */
109 void handle_error_status(int status, const char* message)
110 {
111  if (status != NT_SUCCESS) {
112  char error_buffer[NT_ERRBUF_SIZE];
113  NT_ExplainError(status, error_buffer, sizeof(error_buffer));
114  std::cerr << message << ": " << error_buffer << std::endl;
115  std::exit(EXIT_FAILURE);
116  }
117 }
118 
119 /**
120  * Opens a config stream and programs the input NTPL.
121  * When the NTPL has been programmed, the config stream is closed again.
122  */
123 void ntpl_multicall(const std::vector<std::string>& ntpls)
124 {
125  int status;
126  NtConfigStream_t config_stream;
127 
128  status = NT_ConfigOpen(&config_stream, "flow_learn_span NT_ConfigOpen");
129  handle_error_status(status, "NT_ConfigOpen() failed");
130 
131  for (const auto& ntpl : ntpls) {
132  NtNtplInfo_t ntpl_info;
133  status = NT_NTPL(config_stream, ntpl.c_str(), &ntpl_info, NT_NTPL_PARSER_VALIDATE_NORMAL);
134  handle_error_status(status, "NT_NTPL() failed");
135  }
136 
137  status = NT_ConfigClose(config_stream);
138  handle_error_status(status, "NT_ConfigClose() failed");
139 }
140 
141 /**
142  * Quick and dirty hashing function for some fields in NtFlow_t.
143  * The hash uniqueness in this function is not great, but since std::unordered_multimap
144  * is used to store the flows, the application is able to handle duplicate hashes.
145  */
146 uint64_t flow_hash(const NtFlow_t* flow)
147 {
148  const uint64_t* ptr = reinterpret_cast<const uint64_t*>(flow->keyData);
149  uint64_t meta = (flow->ipProtocolField << 16) | (flow->keyId << 8) | flow->keySetId;
150 
151  // Multiply data with some random primes.
152  return (ptr[0] * 45684803) ^
153  (ptr[1] * 198138211) ^
154  (ptr[2] * 994229) ^
155  (ptr[3] * 8349871) ^
156  (ptr[4] * 3294876479) ^
157  (meta * 663664226587);
158 }
159 
160 ////////////////////////////////////////////////////////////////////////////////
161 // The general thread main function used for all StreamID
162 ////////////////////////////////////////////////////////////////////////////////
163 
164 /**
165  * Opens a RX stream and received packets until an end-task signal is received.
166  */
167 void rx_task(std::atomic<int>* ready, std::atomic<bool>* end_task,
168  uint32_t stream_id, std::function<void(const NtNetBuf_t&)> handle)
169 {
170  int status;
171  NtNetStreamRx_t net_stream;
172  NtNetBuf_t net_buffer;
173 
174  // Open RX stream
175  status = NT_NetRxOpen(&net_stream, "flow_learn_span NT_NetRxOpen", NT_NET_INTERFACE_PACKET, stream_id, -1);
176  handle_error_status(status, "NT_NetRxOpen() failed");
177 
178  // Tell main function that this is ready to receive packets, and then start the loop.
179  ready->fetch_add(1);
180 
181  while (!end_task->load()) {
182  // Get next packet or continue.
183  status = NT_NetRxGetNextPacket(net_stream, &net_buffer, 100);
184  if (status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN) continue;
185  handle_error_status(status, "NT_NetRxGetNextPacket() failed");
186 
187  // See the handle functions.
188  handle(net_buffer);
189  }
190 
191  // Close the RX stream.
192  status = NT_NetRxClose(net_stream);
193  handle_error_status(status, "NT_NetRxClose() failed");
194 }
195 
196 ////////////////////////////////////////////////////////////////////////////////
197 // Global variables
198 ////////////////////////////////////////////////////////////////////////////////
199 
200 #define BLACKLIST 3
201 #define WHITELIST 4
202 
203 uint64_t counter_unhandled = 0;
204 uint64_t counter_miss = 0;
207 
209 std::unordered_multimap<uint64_t, std::unique_ptr<NtFlow_t>> flow_map;
210 
211 std::atomic<uint64_t> flow_learn_queue_read_index {0};
212 std::atomic<uint64_t> flow_learn_queue_write_index {0};
213 std::array<NtFlow_t*, 0x100000> flow_learn_queue;
214 constexpr uint64_t flow_learn_queue_index_mask = 0xfffff;
215 
216 ////////////////////////////////////////////////////////////////////////////////
217 // Main thread function for programming flows
218 ////////////////////////////////////////////////////////////////////////////////
219 
220 /**
221  * Separate task to program flows
222  */
223 void flow_program_task(std::atomic<int>* ready, std::atomic<bool>* end_task)
224 {
225 
226  // Tell main function that this is ready to receive packets, and then start the loop.
227  ready->fetch_add(1);
228 
229  while (!end_task->load()) {
230  // If a new flow has been added to the queue, the pop and learn it.
232  auto flow_raw_ptr = flow_learn_queue[flow_learn_queue_read_index];
233  flow_learn_queue_read_index.store((flow_learn_queue_read_index + 1) & flow_learn_queue_index_mask);
234 
235  // Learn the flow and insert it into the map.
236  int status = NT_FlowWrite(flow_stream, flow_raw_ptr, -1);
237  handle_error_status(status, "NT_FlowWrite() failed");
238  }
239  }
240 }
241 
242 ////////////////////////////////////////////////////////////////////////////////
243 // The handle functions for packets for each StreamID
244 ////////////////////////////////////////////////////////////////////////////////
245 
246 /**
247  * Handle used for the unhandled packets RX thread.
248  *
249  * In this example, unhandled packets are only counted.
250  */
252 {
253  counter_unhandled += 1;
254 }
255 
256 /**
257  * Handle used for the missed packets RX thread.
258  *
259  * Whenever a missed packet is received, the function calculates the flow learn
260  * data, and checks if it is a new flow, or a previously known flow. If the flow
261  * is new, then the flow is programmed.
262  */
263 void handle_stream_miss(const NtNetBuf_t& net_buffer)
264 {
265  counter_miss += 1;
266 
267  // Get the relevant pointers from the packet data.
268  const NtDyn4Descr_t* dyn4 = NT_NET_GET_PKT_DESCR_PTR_DYN4(net_buffer);
269  const uint8_t* packet = reinterpret_cast<const uint8_t*>(dyn4) + dyn4->descrLength;
270 
271  auto flow = std::unique_ptr<NtFlow_t>(new NtFlow_t);
272  std::memset(flow.get(), 0x0, sizeof(NtFlow_t));
273 
274  const uint8_t* ipv4_src = packet + dyn4->offset0;
275  const uint8_t* ipv4_dst = packet + dyn4->offset0 + 4;
276 
277  const uint8_t* udp_src = packet + dyn4->offset1;
278  const uint8_t* udp_dst = packet + dyn4->offset1 + 2;
279 
280  // Sort the IPv4 addresses and UDP ports for the packet. This is only required
281  // when the option "KeySort=Sorted" is used in the KeyDef NTPL command.
282  // Because of endianness the comparison is actually not trivial,
283  // but the std algorithms library makes it a bit easier.
284  if (std::lexicographical_compare(ipv4_src, ipv4_src + 4, ipv4_dst, ipv4_dst + 4) ||
285  (std::equal(ipv4_src, ipv4_src + 4, ipv4_dst) &&
286  std::lexicographical_compare(udp_src, udp_src + 2, udp_dst, udp_dst + 2))) {
287  // Translates to: ipv4_src < ipv4_dst || (ipv4_src == ipv4_dst && udp_src < udp_dst)
288  std::memcpy(flow->keyData, ipv4_src, 4);
289  std::memcpy(flow->keyData + 4, ipv4_dst, 4);
290  std::memcpy(flow->keyData + 8, udp_src, 2);
291  std::memcpy(flow->keyData + 10, udp_dst, 2);
292  }
293  else {
294  std::memcpy(flow->keyData, ipv4_dst, 4);
295  std::memcpy(flow->keyData + 4, ipv4_src, 4);
296  std::memcpy(flow->keyData + 8, udp_dst, 2);
297  std::memcpy(flow->keyData + 10, udp_src, 2);
298  }
299 
300  // Some packet inspection can be added here to decide whether the new flow
301  // should be added to the WHITELIST or the BLACKLIST. Set flow->keySetId accordingly.
302 
303  // Set other relevant fields.
304  flow->ipProtocolField = 0x11; // Layer 4 is UDP.
305  flow->keyId = 1; // Value used in the Key-test in the NTPL filter "Key(kd, KeyID=1) == WHITELIST / BLACKLIST".
306  flow->keySetId = WHITELIST; // Value used to compare the Key-test in the NTPL filter "Key(kd, KeyID=1) == WHITELIST / BLACKLIST".
307  flow->op = 1; // 1 means learn, 0 means unlearn.
308 
309  // Calculate a locally used hash. The "id" field is user defined and is used to
310  // store a hash in this example. For data structures other than std::unordered_multimap
311  // it could make sense to store the value of the pointer to NtFlow_t; this
312  // would enable instant and easy access to the locally stored flow data.
313  auto flow_raw_ptr = flow.get();
314  flow->id = flow_hash(flow_raw_ptr);
315 
316  // Check if the unique key-value pair exists, and return if it does.
317  // The check would be a lot simpler if std::unordered_map was used instead of
318  // std::unordered_multimap, but when duplicate hashes would not be handled correctly.
319  auto range = flow_map.equal_range(flow->id);
320  for (auto it = range.first; it != range.second; ++it) {
321  if (std::equal(flow->keyData, flow->keyData + 40, it->second->keyData) &&
322  flow->ipProtocolField == it->second->ipProtocolField &&
323  flow->keyId == it->second->keyId && flow->keySetId == it->second->keySetId) {
324  return;
325  }
326  }
327 
328  flow_map.insert(std::pair<uint64_t, std::unique_ptr<NtFlow_t>>{flow->id, std::move(flow)});
329 
330  // Compared to the number of packets that can be received per second on a RX stream,
331  // the learn rate is very slow, which can cause packet drops if the RX has to do
332  // that task as well. Thus it can be a good idea to implement a queue, and perform
333  // the learning from another thread.
334  flow_learn_queue[flow_learn_queue_write_index] = flow_raw_ptr;
335  flow_learn_queue_write_index.store((flow_learn_queue_write_index + 1) & flow_learn_queue_index_mask);
336 }
337 
338 /**
339  * Handle used for the blacklist packets RX thread.
340  *
341  * In this example, packets that hit a programmed flow are only counted.
342  */
344 {
345  counter_blacklist_hit += 1;
346 }
347 
348 /**
349  * Handle used for the whitelist packets RX thread.
350  *
351  * In this example, packets that hit a programmed flow are only counted.
352  */
354 {
355  counter_whitelist_hit += 1;
356 }
357 
358 } // unnamed namespace
359 
360 ////////////////////////////////////////////////////////////////////////////////
361 // Main
362 ////////////////////////////////////////////////////////////////////////////////
363 
364 int main(int, char**)
365 {
366  constexpr uint8_t adapter_no = 0;
367 
368  // Start NTAPI.
370  handle_error_status(status, "NT_Init() failed");
371 
372  // Setup NTPL using a utility function.
374  // Delete previous filters and programmed flow before starting the new filters.
375  "Delete=All",
376 
377  // Macros for simplifying following NTPL commands.
378  "DefineMacro(\"FilterCheck\", \"Port==$1 and Layer3Protocol==IPv4 and Layer4Protocol==UDP\")",
379  "DefineMacro(\"KeyTypeProtoSpecs\", \"(layer3header[12]/32, layer3header[16]/32, layer4header[0]/16,layer4header[2]/16)\")",
380 
381  // Specify the packet field sizes in the 5-tuple. The 5-tuple contains the
382  // IPv4 addresses, IP protocol field, and the UDP ports. The IP protocol
383  // field is implicit when the "IpProtocolField=Outer" option is used.
384  // In this example sorting is used to receive two-way traffic on one port.
385  "KeyType[Name=kt] = {32, 32, 16, 16}",
386  "KeyDef[Name=kd; KeyType=kt; IpProtocolField=Outer; KeySort=Sorted] = KeyTypeProtoSpecs",
387 
388  // Set up filters with a Key test.
389  "Assign[StreamId=0; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == UNHANDLED",
390  "Assign[StreamId=1; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == MISS",
391  "Assign[StreamId=2; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(BLACKLIST),
392  "Assign[StreamId=3; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(WHITELIST),
393 
394  // Set up a low priority catch-all filter where packets will be dropped if
395  // they do not match any of the previous three filters.
396  "Assign[StreamId=Drop; Priority=10]=All"
397  });
398 
399  // Start the flow stream for learning and unlearning flows.
400  NtFlowAttr_t flow_attr;
401 
402  NT_FlowOpenAttrInit(&flow_attr);
403  NT_FlowOpenAttrSetAdapterNo(&flow_attr, adapter_no);
404 
405  status = NT_FlowOpen_Attr(&flow_stream, "flow_learn_span NT_FlowOpen_Attr", &flow_attr);
406  handle_error_status(status, "NT_FlowOpen_Attr() failed");
407 
408  // Start RX and flow learn tasks
409  std::atomic<bool> end_task {false};
410  std::atomic<int> ready {0};
411 
412  std::thread rx_task0(rx_task, &ready, &end_task, 0, handle_stream_unhandled);
413  std::thread rx_task1(rx_task, &ready, &end_task, 1, handle_stream_miss);
414  std::thread rx_task2(rx_task, &ready, &end_task, 2, handle_stream_blacklist_hit);
415  std::thread rx_task3(rx_task, &ready, &end_task, 3, handle_stream_whitelist_hit);
416  std::thread flow_program_task0(flow_program_task, &ready, &end_task);
417 
418  // Wait for RX tasks to be ready, then wait for user to stop the application.
419  while(ready.load() < 5) std::this_thread::yield();
420 
421  std::cout << "Press enter to end application..." << std::endl;
422  std::cin.get();
423 
424  // End application.
425  end_task.store(true);
426 
427  if (flow_program_task0.joinable()) flow_program_task0.join();
428  if (rx_task3.joinable()) rx_task3.join();
429  if (rx_task2.joinable()) rx_task2.join();
430  if (rx_task1.joinable()) rx_task1.join();
431  if (rx_task0.joinable()) rx_task0.join();
432 
433  std::cout << "counter_unhandled: " << counter_unhandled << std::endl;
434  std::cout << "counter_miss: " << counter_miss << std::endl;
435  std::cout << "counter_blacklist_hit: " << counter_blacklist_hit << std::endl;
436  std::cout << "counter_whitelist_hit: " << counter_whitelist_hit << std::endl;
437 
438  // Unlearn all the programmed flows.
439  for (auto it = flow_map.begin(); it != flow_map.end(); ++it) {
440  NtFlow_t* flow = it->second.get();
441  flow->op = 0;
442 
443  status = NT_FlowWrite(flow_stream, flow, -1);
444  handle_error_status(status, "NT_FlowWrite() failed");
445  }
446 
447  // Close the flow stream
448  status = NT_FlowClose(flow_stream);
449  handle_error_status(status, "NT_FlowClose() failed");
450 
451  // End NTAPI.
452  NT_Done();
453 
454  return 0;
455 }