net/numa/numa_example.c

Reference Documentation

product_line_custom
Napatech SmartNIC
category
Reference Information

Description

This source file is an example of how to utilize host buffers allocated to a specific NUMA node using NTAPI. NUMA support is currently not available on FreeBSD, so this example only applies to Linux. The example uses host buffers on NUMA node 1 and receives data on port 0.

Prerequisites

A Napatech capture accelerator is need to run this example. The ntservice.ini must have at least one HostBuffersRx defined. Below is an example of a minimum ini-file. It will create a 32MB RX hostbuffer from NUMA node 1. [Adapter0] AdapterType = NT20E BusId = 00:0a:00.00 HostBuffersRx = [1,32,1]

Program flow

The following is required to use a host buffer from a specific NUMA node:
  • #include/nt.h - Applications/Tools only need to include nt.h to obtain prototypes, macros etc. from NTAPI.

  • Binding the current process to NUMA node 1 can be done in several ways:
    • For Linux:
      • Using sched_setaffinity() on Linux. This will require that it is known which CPU cores are located on NUMA node 1.

      • Using numa_bind() from libnuma on Linux to bind to NUMA node 1.

      • Using numactl on the command line to start the example on NUMA node 1. e.g numactl --cpubind=1 --membind=1 ./numa_example

    • For Windows:
      • Using GetNumaHighestNodeNumber, GetProcessAffinityMask and GetNumaNodeProcessorMask to find the numa node that the app/process is able to use. If the system is well known this step can be skipped.

      • Using SetProcessAffinityMask to move the app/process to the wanted processor (numa node).

  • NT_Init(NTAPI_VERSION) - Initialize the NTAPI library. NTAPI_VERSION is a define that describes the version of the API described in the header files included by nt.h. NT_Init() will ask the NTAPI library to convert return data to the NTAPI_VERSION if possible. This will ensure that applications can run on NTAPI libraries of newer versions.

  • NT_ConfigOpen() - Open a configuration stream.

  • NT_NTPL() - Setup stream 1 to use NUMA node 1. This must be done before assigning a filter.

  • NT_NTPL() - Assign traffic to the stream. A stream doesn't return data until traffic is assigned to it by a filter. Stream IDs might be shared between other streams.

  • NT_ConfigClose() - Close the configuration stream.

  • NT_NetRxOpen() - Open the stream using a stream ID. The stream ID must match the one used when creating the filter.

  • NT_NetRxGet() is called with a timeout of 1000ms and will return NT_STATUS_TIMEOUT in case nothing is received within 1000ms and will return NT_SUCCESS if something is returned. Return values different from that is an indication of an error. Packets that are prior to the expected time are released via NT_NetRxRelease().

  • NT_NetRxRelease() - Release a packet.

  • NT_NetRxClose() - Close the stream when terminating.

  • NT_NTPL() - Release the NTPL assignment made on the hostbuffer.

  • NT_Done() - Close down the NTAPI library.

  • NT_ExplainError() - Explain an error code returned by NTAPI functions.

Code

/* * %NT_SOFTWARE_LICENSE% */ /** * @example net/numa/numa_example.c * @section numa_example_description Description * * This source file is an example of how to utilize host buffers * allocated to a specific NUMA node using NTAPI. NUMA support is * currently not available on FreeBSD, so this example only applies to * Linux. The example uses host buffers on NUMA node 1 and receives * data on port 0. * * The following NTAPI functions are used: * - @ref NT_Init() * - @ref NT_ConfigOpen() * - @ref NT_NTPL() * - @ref NT_ConfigClose() * - @ref NT_NetRxOpen() * - @ref NT_NetRxGet() * - @ref NT_NET_GET_PKT_WIRE_LENGTH() * - @ref NT_NetRxRelease() * - @ref NT_NetRxClose() * - @ref NT_Done() * - @ref NT_ExplainError() * * @section numa_example_prerequisites Prerequisites * A Napatech capture accelerator is need to run this example. The ntservice.ini * must have at least one HostBuffersRx defined. Below is an example * of a minimum ini-file. It will create a 32MB RX hostbuffer from * NUMA node 1. * @code * [Adapter0] * AdapterType = NT20E * BusId = 00:0a:00.00 * HostBuffersRx = [1,32,1] * @endcode * * @section numa_example_flow Program flow * @{ * The following is required to use a host buffer from a specific NUMA node: * - \#include/nt.h - Applications/Tools only need to include @ref * nt.h to obtain prototypes, macros etc. from NTAPI. * - Binding the current process to NUMA node 1 can be done in several ways: * - For Linux: * - Using sched_setaffinity() on Linux. This will require that it is known * which CPU cores are located on NUMA node 1. * - Using numa_bind() from libnuma on Linux to bind to NUMA node 1. * - Using numactl on the command line to start the example on NUMA node 1. e.g * @code numactl --cpubind=1 --membind=1 ./numa_example @endcode * - For Windows: * - Using GetNumaHighestNodeNumber, GetProcessAffinityMask and GetNumaNodeProcessorMask * to find the numa node that the app/process is able to use. If the system is well known * this step can be skipped. * - Using SetProcessAffinityMask to move the app/process to the wanted processor (numa node). * - @ref NT_Init(@ref NTAPI_VERSION) - Initialize the NTAPI * library. @ref NTAPI_VERSION is a define that describes the version * of the API described in the header files included by @ref * nt.h. @ref NT_Init() will ask the NTAPI library to convert return data * to the @ref NTAPI_VERSION if possible. This will ensure that * applications can run on NTAPI libraries of newer versions. * - @ref NT_ConfigOpen() - Open a configuration stream. * - @ref NT_NTPL() - Setup stream 1 to use NUMA node 1. This must be * done before assigning a filter. * - @ref NT_NTPL() - Assign traffic to the stream. A stream doesn't * return data until traffic is assigned to it by a filter. Stream * IDs might be shared between other streams. * - @ref NT_ConfigClose() - Close the configuration stream. * - @ref NT_NetRxOpen() - Open the stream using a stream ID. * The stream ID must match the one used when creating the * filter. * - @ref NT_NetRxGet() is called with a timeout of 1000ms and will return * NT_STATUS_TIMEOUT in case nothing is received within 1000ms and * will return NT_SUCCESS if something is returned. Return values * different from that is an indication of an error. Packets that * are prior to the expected time are released via @ref NT_NetRxRelease(). * - @ref NT_NetRxRelease() - Release a packet. * - @ref NT_NetRxClose() - Close the stream when terminating. * - @ref NT_NTPL() - Release the NTPL assignment made on the hostbuffer. * - @ref NT_Done() - Close down the NTAPI library. * - @ref NT_ExplainError() - Explain an error code returned by NTAPI functions. * * *<hr> * @section numa_example_code Code * @} */ #ifdef WIN32 #include <winsock2.h> #endif #include <nt.h> #if defined(WIN32) || defined (WIN64) //#define snprintf _snprintf #define snprintf(a, b, c, d) _snprintf_s((a), _countof(a), (b), (c), (d)) #endif // Define to use sched_setaffinity //#define USE_SCHED_SETAFFINITY #ifdef USE_SCHED_SETAFFINITY #define __USE_GNU #include <sched.h> #endif // Define to use libnuma //#define USE_LIBNUMA #ifdef USE_LIBNUMA #include <numa.h> #endif int main(void) { unsigned int numPackets=0; // The number of packets received unsigned int numBytes=0; // The number of bytes received char tmpBuffer[20]; // Buffer to build filter string char errorBuffer[NT_ERRBUF_SIZE]; // Error buffer int status; // Status variable NtNetStreamRx_t hNetRx; // Handle to the RX stream NtConfigStream_t hCfgStream; // Handle to a config stream NtNtplInfo_t ntplInfo; // Return data structure from the NT_NTPL() call. NtNetBuf_t hNetBuf; // Net buffer container. Packet data is returned in this when calling NT_NetRxGet(). bool firstFound = false; #ifdef USE_SCHED_SETAFFINITY // In this example it is assumed that CPU core 1,3,5,7 are located on NUMA node 1 cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(1, &mask); CPU_SET(3, &mask); CPU_SET(5, &mask); CPU_SET(7, &mask); sched_setaffinity(0, sizeof(cpu_set_t), &mask); #endif #ifdef USE_LIBNUMA // Uses the numa_bind() function from libnuma to bind current process to NUMA node 1. #if (defined LIBNUMA_API_VERSION) && LIBNUMA_API_VERSION == 2 struct bitmask * mask = numa_bitmask_alloc(numa_num_possible_nodes()); numa_bitmask_setbit(mask, 1); numa_bind(mask); numa_bitmask_free(mask); #else nodemask_t mask; nodemask_zero(&mask); nodemask_set(&mask, 1); numa_bind(&mask); #endif #endif #ifdef WIN32 { DWORD_PTR processAffinityMask; DWORD_PTR systemAffinityMask; DWORD_PTR setAffinityMask; ULONGLONG processorMask; ULONG highestNodeNumber; // Is this a numa system if (!GetNumaHighestNodeNumber(&highestNodeNumber)) { fprintf(stderr, "GetNumaHighestNodeNumber failed with errorcode: %08X\n", GetLastError()); return -1; } if (highestNodeNumber == 0) { fprintf(stderr, "This is not a numa system. Cannot run this example.\n"); return -1; } // Find the cores that this process is allowed to use if (!GetProcessAffinityMask(GetCurrentProcess(), &processAffinityMask, &systemAffinityMask)) { fprintf(stderr, "GetProcessAffinityMask failed with errorcode: %08X\n", GetLastError()); return -1; } // Find the cores belonging to numa node 1 if (!GetNumaNodeProcessorMask(1, &processorMask)) { fprintf(stderr, "GetNumaNodeProcessorMask failed with errorcode: %08X\n", GetLastError()); return -1; } // Use only the cores that is allowed for this process setAffinityMask = processAffinityMask & processorMask; // Make this process to run on a processor on numa node 1 if (!SetProcessAffinityMask(GetCurrentProcess(), setAffinityMask)) { fprintf(stderr, "SetProcessAffinityMask failed with errorcode: %08X\n", GetLastError()); return -1; } } #endif // Initialize the NTAPI library and thereby check if NTAPI_VERSION can be used together with this library if((status = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_Init() failed: %s\n", errorBuffer); return -1; } // Open a config stream to assign a filter to a stream ID. if((status = NT_ConfigOpen(&hCfgStream, "TestStream")) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigOpen() failed: %s\n", errorBuffer); return -1; } // Setup stream 1 to use NUMA node 1. This must be done before assigning a filter. if((status = NT_NTPL(hCfgStream, "Setup[NUMANode=1] = StreamId==1", &ntplInfo, NT_NTPL_PARSER_VALIDATE_NORMAL)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NTPL() failed: %s\n", errorBuffer); fprintf(stderr, ">>> NTPL errorcode: %X\n", ntplInfo.u.errorData.errCode); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[0]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[1]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[2]); return -1; } // Assign all traffic on port 0 to stream ID 1 if((status = NT_NTPL(hCfgStream, "Assign[StreamId=1] = Port == 0", &ntplInfo, NT_NTPL_PARSER_VALIDATE_NORMAL)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NTPL() failed: %s\n", errorBuffer); fprintf(stderr, ">>> NTPL errorcode: %X\n", ntplInfo.u.errorData.errCode); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[0]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[1]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[2]); return -1; } // Close the config stream if((status = NT_ConfigClose(hCfgStream)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigClose() failed: %s\n", errorBuffer); return -1; } // Get a stream handle with stream ID 1. NT_NET_INTERFACE_PACKET // specify that we will receive data in a packet based matter. if((status = NT_NetRxOpen(&hNetRx, "TestStream", NT_NET_INTERFACE_PACKET, 1, -1)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NetRxOpen() failed: %s\n", errorBuffer); return -1; } // Optional step. Wait for the first packet that hit the NTPL assign command printf("Waiting for the first packet\n"); while(!firstFound) { if((status = NT_NetRxGet(hNetRx, &hNetBuf, 1000)) != NT_SUCCESS) { if((status == NT_STATUS_TIMEOUT) || (status == NT_STATUS_TRYAGAIN)) { // Timeouts are ok, we just need to wait a little longer for a packet continue; } // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NetRxGet() failed: %s\n", errorBuffer); return -1; } // We got a packet. Check if the timestamp is newer than when the NTPL assign command was applied if(NT_NET_GET_PKT_TIMESTAMP(hNetBuf) > ntplInfo.ts) { // We have received a packet that is received after the NTPL assign command was applied // Note that this packet is released again. firstFound = true; } // Release the packet as it is too "old". if((status = NT_NetRxRelease(hNetRx, hNetBuf)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NetRxRelease() failed: %s\n", errorBuffer); return -1; } } // Get 10000000 packages #define NUMBER_OF_PACKETS 1000000 while (numPackets < NUMBER_OF_PACKETS) { // Get packet if((status = NT_NetRxGet(hNetRx, &hNetBuf, 1000)) != NT_SUCCESS) { if((status == NT_STATUS_TIMEOUT) || (status == NT_STATUS_TRYAGAIN)) { // Timeouts are ok, we just need to wait a little longer for a packet continue; } fprintf(stderr, "NT_NetRxGet failed\n"); break; } numBytes += NT_NET_GET_PKT_WIRE_LENGTH(hNetBuf); numPackets++; if((numPackets % 1000) == 0) { printf("Packets received %d of %d \r", numPackets, NUMBER_OF_PACKETS); fflush(stdout); } // Release packet after use NT_NetRxRelease(hNetRx, hNetBuf); } printf("\n"); // Close the stream and release the hostbuffer. This will also remove the NTPL assignments performed. NT_NetRxClose(hNetRx); // Open a config stream to delete a filter. if((status = NT_ConfigOpen(&hCfgStream, "TestStream")) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigOpen() failed: %s\n", errorBuffer); return -1; } // Delete the filter snprintf(tmpBuffer, 20, "delete=%d", ntplInfo.ntplId); if((status = NT_NTPL(hCfgStream, tmpBuffer, &ntplInfo, NT_NTPL_PARSER_VALIDATE_NORMAL)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NTPL() failed: %s\n", errorBuffer); fprintf(stderr, ">>> NTPL errorcode: %X\n", ntplInfo.u.errorData.errCode); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[0]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[1]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[2]); return -1; } // Close the config stream if((status = NT_ConfigClose(hCfgStream)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigClose() failed: %s\n", errorBuffer); return -1; } printf("Done: %16d packets, %16d bytes\n", numPackets, numBytes); // Close down the NTAPI library NT_Done(); return 0; }