net/ipfdemo/ipfdemo_example.c

Reference Documentation

product_line_custom
Napatech SmartNIC
category
Reference Information

Description

This source file is an example of how to write an IP fragment re-assembling code, using the Napatech FPGA IPFMode feature. It accelerates IP re-assembling by making it possible to load balance using a 5 tuple hash algoritm instead of a 2 tuple hash algorithm. This is done for fragments by learning from the first fragment and use this knowledge to calculate the 5 tuple for belonging fragments. The complexing case is the situation where the FPGA was not able to learn and identify a fragment and therefore it is sent to one of the un-matching streams. In this situation the code needs to exchange the un-matched packet and deliver it to the right re-assembling thread.

Prerequisites

A Napatech capture accelerator is needed to run this example with IPFMode support. The ntservice.ini file must have enough HostBuffersRx defined to honor the requested streams needed when running this demo application. Below is an example of a minimum ini-file. It will create 20 64MB RX hostbuffer from NUMA node 0.

Note that the PacketDescriptor is set to Ext9, This is needed to run IPFMode at all. If anything less is specified, then the IPFMode will not be used and it will revert into a Hash2Tuple mode.

Note2 that the TimeSyncReferencePriority is set to OsTime. This is critical, when using a fragtimeout value (default), because the timestamp of each packet is used to calculate its timeout. [System] TimestampFormat = NATIVE_UNIX [Adapter0] AdapterType = NT20E2 BusId = 00:02:00:00 PacketDescriptor = Ext9 HostBuffersRx = [20,64,0] TimeSyncReferencePriority = OsTime

************************* Overview of the algorithm used ******************************

The FPGA enables you to match IP fragments and distribute them using a multi CPU buffer splitting scheme based on a 5-tuple hash algorithm when using the IPFMode feature. If a multi CPU splitting scheme based on a 2-tuple hash algorithm gives good enough distribution, then the IPFMode feature in the FPGA should not be used. All belonging fragments will end up in same stream using a 2-tuple hash algorithm. IPFMode: how it works:
  • Learns from first fragment

  • First fragment contains streamID of unmatched fragments

  • Timeout setup for releasing learned entry

Graphical overview: +--------------+ | DOI hash | +------>| |------+ | | tables | | | +--------------+ | | | | | +----------------+ | | +----------------+ | |------+ +----->| | | Reassembling | +--------------+ | Unmatched | | | | Return | | | | threads |------------->| msg box |------------>| threads | | | | FIFOs | | | | |<-----+ +--------------+ +------| | +----------------+ | | +----------------+ ^ ^ ^ | | ^ ^ | | | | | | | | | | | | | | | | | | +--------------+ | | | | | | | | Msg box | | | | | | | +-------| |<-----+ | | | | | | FIFOs | | | | | | +--------------+ | | | | | | | 5 tuple hash splitting 2 tuple hash splitting | | | | | | | | | | +-------------------------------------------------------------------------------+ | | | NTAPI/FPGA | | | +-------------------------------------------------------------------------------+

Description of algorithm:

Threads:

Unmatched threads – variable number (N) – number of unmatched streams to use (2-tuple hash cpu-splitting)

Each unmatched thread receives the unmatched fragments. When a fragment is received, it is checked in the DOI tables to find a datagram ID match. If a match is found, the NetBuf containing the fragment is send to that reassembling Msg box FIFO, of which the DOI table belongs to. Otherwise, if no match found (yet), the fragment is temporarily put into a wait list. The wait list is periodically checked against the DOI tables.

Reassembling threads – variable number (M) – number of streams to use (5-tuple hash cpu-splitting)

When each reassembling thread receives IP fragments, it collects them into complete datagrams. If all fragments are received in order, or at least the first fragment is received first, then the need for unmatched fragment collection is not needed and all datagrams are reassembled in the reassembling threads. When the reassembling thread receives a first fragment, it then stores that fragment in its local collection hash table (tbl) and makes that datagram ID available for the specific unmatched thread to inform about its interest in fragments belonging to this datagram, using the specific DOI table connecting the reassembling thread with the unmatched thread. When all fragments are received for a datagram, they are all released and the DOI table entry is cleared. Each reassembling thread first reads from the Msg box for a NetBuf, and if none found, then it reads the NTAPI stream for a NetBuf. If any fragments are received from here, they are put into the collection table (tbl).

Inter-thread communication messages:

Msg boxes (FIFOs):

Used by Unmatched threads to send unmatched fragments to the reassembling threads. [[[unm1 Msgbox],[ unm2 Msgbox],…[ unmN Msgbox]], [[unm1 Msgbox],[ unm2 Msgbox],…[ unmN Msgbox]]…M times] reasm1 reasm2 …reasmM

The Msg boxes are built as FIFOs and are used by the unmatched threads to send the NetBuf of a received datagram fragment to the belonging reassembling thread.

Return Msg boxes (FIFOs):

Used by Reassmbling threads to send unmatched-fragment NetBufs back to the unmatched threads originally received them. [[[reasm1 Msgbox],[ reasm2 Msgbox],…[ reasmM Msgbox]], [[reasm1 Msgbox],[ reasm2 Msgbox],…[ reasmN Msgbox]]…N times] unm1 unm2 …unmN

This FIFO is only needed to make the complete algorithm lockless. The NTAPI cannot handle multi-threaded access to the packet interface without serialization.

DOI tables (Datagram of interest):

The DOI tables are a set of hash tables containing a list of datagram IDs that the reassembling threads are interested in.

The FPGA specifies the unmatched stream ID together with the reception of the first fragment. This information is used by the reassembling threads to inform the specific unmatched thread about where to send unmatched fragments matching the first fragments datagram ID.

Each reassembling thread has one DOI table allocated for each unmatched thread, thus M*N DOI hash tables are used. This way each reassembling thread has exclusive write access to one table for each unmatched tread. [[[reasm1 tbl],[reasm2 tbl],…[reasmM tbl]], [[reasm1 tbl],[reasm2 tbl],…[reasmM tbl]]…N times] Unmatched1 Unmatched2 …UnmatchedN

Each unmatched thread only reads information from the DOI tables. When an unmatched fragment matches an entry in the DOI table, then this fragment is send to the reassembling thread using the dedicated Msg box FIFO.

Datagram ID calculation:

The fragments belonging to a datagram are identified by the calculated datagram ID: It consists of information from 4 fields in the IP header (source IP, destination IP, Identification and protocol).

Code

/* * %NT_SOFTWARE_LICENSE% */ /** * @example net/ipfdemo/ipfdemo_example.c * @section ipfdemo_example_description Description * * This source file is an example of how to write an IP fragment re-assembling code, * using the Napatech FPGA IPFMode feature. It accelerates IP re-assembling by * making it possible to load balance using a 5 tuple hash algoritm instead of * a 2 tuple hash algorithm. This is done for fragments by learning from the first * fragment and use this knowledge to calculate the 5 tuple for belonging fragments. * The complexing case is the situation where the FPGA was not able to learn * and identify a fragment and therefore it is sent to one of the un-matching streams. * In this situation the code needs to exchange the un-matched packet and deliver * it to the right re-assembling thread. * * * The following NTAPI functions are used: * - @ref NT_Init() * - @ref NT_NetRxOpen() * - @ref NT_NTPL() * - @ref NT_NetRxGet() * - @ref NT_NET_GET_PKT_L2_FRAME_TYPE() * - @ref NT_NET_GET_PKT_L3_FRAME_TYPE() * - @ref NT_NET_GET_PKT_L3_FRAGMENTED() * - @ref NT_NET_GET_PKT_DESCRIPTOR_FORMAT() * - @ref NT_NET_GET_PKT_L3_FIRST_FRAG() * - @ref NT_NET_GET_PKT_L2_PTR() * - @ref NT_NET_GET_PKT_L3_OFFSET() * - @ref NT_NET_GET_SEGMENT_TIMESTAMP() * - @ref NT_NET_GET_PKT_IPF_LAST_FRAGMENT() * - @ref NT_NET_GET_PKT_IPF_UNMATCHED_STREAMID() * - @ref NT_NetRxRelease() * - @ref NT_NetRxClose() * - @ref NT_ExplainError() * * @section ipfdemo_example_prerequisites Prerequisites * A Napatech capture accelerator is needed to run this example with IPFMode support. * The ntservice.ini file must have enough HostBuffersRx defined to honor * the requested streams needed when running this demo application. * Below is an example of a minimum ini-file. It will create 20 64MB RX * hostbuffer from NUMA node 0. * * Note that the PacketDescriptor is set to Ext9, This is needed to run * IPFMode at all. If anything less is specified, then the IPFMode will * not be used and it will revert into a Hash2Tuple mode. * * Note2 that the TimeSyncReferencePriority is set to OsTime. This is critical, when * using a fragtimeout value (default), because the timestamp of each * packet is used to calculate its timeout. * @code * [System] * TimestampFormat = NATIVE_UNIX * * [Adapter0] * AdapterType = NT20E2 * BusId = 00:02:00:00 * PacketDescriptor = Ext9 * HostBuffersRx = [20,64,0] * TimeSyncReferencePriority = OsTime * @endcode * * * ************************* Overview of the algorithm used ****************************** * * The FPGA enables you to match IP fragments and distribute them using a multi CPU * buffer splitting scheme based on a 5-tuple hash algorithm when using the IPFMode feature. * If a multi CPU splitting scheme based on a 2-tuple hash algorithm gives good enough * distribution, then the IPFMode feature in the FPGA should not be used. All belonging * fragments will end up in same stream using a 2-tuple hash algorithm. * IPFMode: how it works: * - Learns from first fragment * - First fragment contains streamID of unmatched fragments * - Timeout setup for releasing learned entry * * @code * Graphical overview: * * +--------------+ * | DOI hash | * +------>| |------+ * | | tables | | * | +--------------+ | * | | * | | * +----------------+ | | +----------------+ * | |------+ +----->| | * | Reassembling | +--------------+ | Unmatched | * | | | Return | | | * | threads |------------->| msg box |------------>| threads | * | | | FIFOs | | | * | |<-----+ +--------------+ +------| | * +----------------+ | | +----------------+ * ^ ^ ^ | | ^ ^ * | | | | | | | * | | | | | | | * | | | | +--------------+ | | | * | | | | | Msg box | | | | * | | | +-------| |<-----+ | | * | | | | FIFOs | | | * | | | +--------------+ | | * | | | | | * 5 tuple hash splitting 2 tuple hash splitting * | | | | | * | | | | | * +-------------------------------------------------------------------------------+ * | | * | NTAPI/FPGA | * | | * +-------------------------------------------------------------------------------+ * * @endcode * * Description of algorithm: * * * Threads: * -------- * * Unmatched threads – variable number (N) – number of unmatched streams to use * (2-tuple hash cpu-splitting) * * Each unmatched thread receives the unmatched fragments. When a fragment is received, * it is checked in the DOI tables to find a datagram ID match. If a match is found, the * NetBuf containing the fragment is send to that reassembling Msg box FIFO, of which * the DOI table belongs to. Otherwise, if no match found (yet), the fragment is * temporarily put into a wait list. The wait list is periodically checked against the * DOI tables. * * Reassembling threads – variable number (M) – number of streams to use * (5-tuple hash cpu-splitting) * * When each reassembling thread receives IP fragments, it collects them into complete * datagrams. If all fragments are received in order, or at least the first fragment is * received first, then the need for unmatched fragment collection is not needed and all * datagrams are reassembled in the reassembling threads. * When the reassembling thread receives a first fragment, it then stores that fragment * in its local collection hash table (tbl) and makes that datagram ID available for the * specific unmatched thread to inform about its interest in fragments belonging to this * datagram, using the specific DOI table connecting the reassembling thread with the * unmatched thread. * When all fragments are received for a datagram, they are all released and the DOI * table entry is cleared. * Each reassembling thread first reads from the Msg box for a NetBuf, and if none found, * then it reads the NTAPI stream for a NetBuf. If any fragments are received from here, * they are put into the collection table (tbl). * * * Inter-thread communication messages: * ------------------------------------ * * Msg boxes (FIFOs): * * Used by Unmatched threads to send unmatched fragments to the reassembling threads. * [[[unm1 Msgbox],[ unm2 Msgbox],…[ unmN Msgbox]], [[unm1 Msgbox],[ unm2 Msgbox],…[ unmN Msgbox]]…M times] * reasm1 reasm2 …reasmM * * The Msg boxes are built as FIFOs and are used by the unmatched threads to send the * NetBuf of a received datagram fragment to the belonging reassembling thread. * Return Msg boxes (FIFOs): * * Used by Reassmbling threads to send unmatched-fragment NetBufs back to the unmatched * threads originally received them. * [[[reasm1 Msgbox],[ reasm2 Msgbox],…[ reasmM Msgbox]], [[reasm1 Msgbox],[ reasm2 Msgbox],…[ reasmN Msgbox]]…N times] * unm1 unm2 …unmN * * This FIFO is only needed to make the complete algorithm lockless. The NTAPI cannot * handle multi-threaded access to the packet interface without serialization. * DOI tables (Datagram of interest): * * The DOI tables are a set of hash tables containing a list of datagram IDs that the * reassembling threads are interested in. * * The FPGA specifies the unmatched stream ID together with the reception of the first * fragment. This information is used by the reassembling threads to inform the specific * unmatched thread about where to send unmatched fragments matching the first fragments * datagram ID. * * Each reassembling thread has one DOI table allocated for each unmatched thread, thus * M*N DOI hash tables are used. This way each reassembling thread has exclusive write * access to one table for each unmatched tread. * [[[reasm1 tbl],[reasm2 tbl],…[reasmM tbl]], [[reasm1 tbl],[reasm2 tbl],…[reasmM tbl]]…N times] * Unmatched1 Unmatched2 …UnmatchedN * * Each unmatched thread only reads information from the DOI tables. When an unmatched * fragment matches an entry in the DOI table, then this fragment is send to the * reassembling thread using the dedicated Msg box FIFO. * * Datagram ID calculation: * * The fragments belonging to a datagram are identified by the calculated datagram ID: * It consists of information from 4 fields in the IP header (source IP, destination IP, * Identification and protocol). *<hr> * @section ipfdemo_example_code Code * @} */ #if defined(__linux__) || defined(__FreeBSD__) #include <unistd.h> #include <signal.h> #include <assert.h> #include <pthread.h> #include <arpa/inet.h> #include <sys/time.h> #elif defined(WIN32) || defined (WIN64) #include <winsock2.h> // ntohs() #include <time.h> #include <sys/timeb.h> #include <process.h> // threading #endif #include <argparse.h> #include <nt.h> #if defined(WIN32) || defined (WIN64) //#define snprintf _snprintf #define snprintf(a, b, c, d) _snprintf_s((a), _countof(a), (b), (c), (d)) #define strcpy(s, a) strcpy_s((s), _countof(s), (a)) // Sleep void sleep(int time) { Sleep(time * 1000); /* From seconds to milliseconds */ } void usleep(unsigned long usec) { Sleep(usec / 1000); /* From useconds to milliseconds */ } // Time struct timezone { int tz_minuteswest; /* minutes west of Greenwich */ int tz_dsttime; /* type of DST correction */ }; int gettimeofday(struct timeval *tv, struct timezone *tz) { struct __timeb64 timebuffer; _ftime64_s(&timebuffer); tv->tv_sec = (long)timebuffer.time; tv->tv_usec = timebuffer.millitm * 1000; return 0; } // Threading typedef HANDLE pthread_t; typedef unsigned (__stdcall *start_address_t)(void *parameter); int pthread_create(HANDLE *thread, DWORD *attr, start_address_t start_routine, void *parameter) { HANDLE handle = (HANDLE)_beginthreadex( NULL, 0, start_routine, parameter, 0, NULL); if (handle == 0) { int status = GetLastError(); fprintf(stderr, "pthread_create() fail with error: 0x%x\n", status); *thread = NULL; return status | NT_SYSTEM_ERRORS; } *thread = handle; return NT_SUCCESS; } int pthread_join(HANDLE thread, void **value_ptr) { WaitForSingleObject(thread, INFINITE); CloseHandle(thread); return NT_SUCCESS; } #endif #pragma pack(push, 1) /* IPv4 header structure */ typedef struct iphdr_s { uint8_t ipHlen:4; // little endian layout uint8_t ipVer:4; // little endian layout uint8_t tos; uint16_t ipTotlen; uint16_t Id; #define FRAG_OFFS_MASK 0x1FFF #define LAST_FRAG_BITS 0x00E0 uint16_t fragOffs; uint8_t ttl; uint8_t prot; uint16_t csum; uint32_t srcAddr; uint32_t dstAddr; } iphdr_t; #pragma pack(pop) #define L3_ADDR(_NetBuf_) ((uint8_t *)NT_NET_GET_PKT_L2_PTR(_NetBuf_)+NT_NET_GET_PKT_L3_OFFSET(_NetBuf_)) #define IPVERSION(_NetBuf_) (((iphdr_t *)L3_ADDR(_NetBuf_))->ipVer) /* macros for IPv4 packets */ #define IPV4_HDR_LENGTH(_NetBuf_) ((uint8_t)(((iphdr_t *)L3_ADDR(_NetBuf_))->ipHlen)<<2) #define IPV4_TOT_LEN(_NetBuf_) (ntohs(((iphdr_t *)L3_ADDR(_NetBuf_))->ipTotlen)) #define IPV4_FRAGMENT_ID(_NetBuf_) (ntohs(((iphdr_t *)L3_ADDR(_NetBuf_))->Id)) #define IPV4_FRAGMENT_OFFSET(_NetBuf_) ((ntohs(((iphdr_t *)L3_ADDR(_NetBuf_))->fragOffs)&FRAG_OFFS_MASK)<<3) #define IPV4_LAST_FRAG(_NetBuf_) (((((iphdr_t *)L3_ADDR(_NetBuf_))->fragOffs)&LAST_FRAG_BITS)==0) #define IPV4_SRC_ADDR(_NetBuf_) (((iphdr_t *)L3_ADDR(_NetBuf_))->srcAddr) #define IPV4_DST_ADDR(_NetBuf_) (((iphdr_t *)L3_ADDR(_NetBuf_))->dstAddr) #define IPV4_PROTOCOL(_NetBuf_) (((iphdr_t *)L3_ADDR(_NetBuf_))->prot) #define IPV4_GET_DGRAM_ID(_NetBuf_, _src_, _dst_, _id_, _prot_) \ do {iphdr_t *_ip_=(iphdr_t *)L3_ADDR(_NetBuf_);_src_=_ip_->srcAddr;_dst_=_ip_->dstAddr;\ _id_=_ip_->Id;_prot_=_ip_->prot;}while(0) #define IPV4_DATA_LEN(_NetBuf_) (IPV4_TOT_LEN(_NetBuf_)-IPV4_HDR_LENGTH(_NetBuf_)) /* Help structure to concatenate ID source fields */ typedef union _DOI_dgramId_u { volatile uint64_t src_id; struct { volatile uint32_t src; volatile uint32_t id; }; } DOI_dgramId_u; #define MAX_SRC_ID 8 typedef struct _DOI_dgramTbl_t { volatile uint64_t aSrc_id[MAX_SRC_ID]; // Source and frag ID concatenated into ID1 volatile uint64_t aDst_pr[MAX_SRC_ID]; // Destination and protocol concatenated into ID2 } DOI_dgramTbl_t; #define DOI_FRAG_TBL_SIZE 8081 /* Prime number */ #define DOI_HASH_GET_KEY(_entry_, _src_, _id_) (_entry_ = (_src_+_id_)%DOI_FRAG_TBL_SIZE) /* Re-assembler hash table types and definitions */ struct frag_s { NtNetBuf_t hNetBuf; // hNetBuf containing fragment buffer uint16_t offset; // Fragment offset uint16_t size; // Size of this fragment uint8_t firstFrag; // If marked as first fragment uint8_t lastFrag; // If marked as last fragment uint8_t fromUnm; // Is this packet received from an un-matched thread (needed to lock on release) uint8_t unmIndex; // if from unmatched stream, then this tells which unmatched thread index }; #define MAX_FRAG_CNT 18 #define REASSEMBLY_HASH_TBL_SIZE 1021 /* Prime number */ #define INITIAL_HASH_TBL_ENTRY_CNT 1024 typedef struct _hashTbl_entry_s { uint64_t id1; // Src and prot id for ID1 uint64_t id2; // Dest and prot Id for ID2 volatile uint64_t *pDOI_Src_id; // Source and frag ID field volatile uint64_t *pDOI_Dst_pr; // Dest and protocol ID field uint16_t fragCnt; // Number of fragments in aFrag struct frag_s aFrag[MAX_FRAG_CNT]; // Array of all fragments belonging to the same datagram struct _hashTbl_entry_s *pNext; // Pointer to next element in list } tbl_entry_t; /* Hash table macros */ #define LOOKUP_ENTRY(_tbl_base_, _tbl_entry_type_, _tbl_size_, _id1_, _id2_, _tbl_entry_) \ {uint32_t _key_=(uint32_t)((_id1_^_id2_)%_tbl_size_);_tbl_entry_type_ *_tbl_; \ _tbl_ = _tbl_base_[_key_]; while(_tbl_ && (_tbl_->id1 != _id1_ || _tbl_->id2 != _id2_)) _tbl_ = _tbl_->pNext; \ _tbl_entry_=_tbl_;} #define GET_NEW_TBL_ENTRY(_tbl_free_, _tbl_entry_type_, _tbl_entry_) \ {if (!_tbl_free_){int __o__;_tbl_entry_type_ *__plm__; \ for(__o__=0;__o__<64;__o__++){__plm__=calloc(1,sizeof(_tbl_entry_type_)); \ if (!__plm__) assert(0); __plm__->pNext=_tbl_free_;_tbl_free_=__plm__;}} \ _tbl_entry_ = _tbl_free_;_tbl_free_ = _tbl_free_->pNext;tbl_entry->pNext = NULL;} #define RELEASE_TBL_ENTRY(_tbl_free_, _tbl_entry_) \ {_tbl_entry_->pNext = _tbl_free_; _tbl_free_ = _tbl_entry_;} #define ADD_ENTRY_TO_TBL(_tbl_base_, _tbl_entry_type_, _tbl_size_, _tbl_entry_) \ {uint32_t _key_=(uint32_t)((_tbl_entry_->id1^_tbl_entry_->id2)%_tbl_size_);_tbl_entry_type_ *_tbl_; \ _tbl_ = _tbl_base_[_key_]; while (_tbl_ && (_tbl_->id1 != _tbl_entry_->id1 || _tbl_->id2 != _tbl_entry_->id2)) _tbl_ = _tbl_->pNext; \ if (_tbl_ == NULL) {_tbl_entry_->pNext = _tbl_base_[_key_]; _tbl_base_[_key_]=_tbl_entry_;} else assert(_tbl_==_tbl_entry_);} #define DEL_ENTRY_FROM_TBL(_tbl_base_, _tbl_entry_type_, _tbl_size_, _tbl_entry_) \ {uint32_t _key_=(uint32_t)((_tbl_entry_->id1^_tbl_entry_->id2)%_tbl_size_);_tbl_entry_type_ *_tbl_, *_prev_ = NULL; \ _tbl_ = _tbl_base_[_key_]; \ while (_tbl_ && (_tbl_->id1 != _tbl_entry_->id1 || _tbl_->id2 != _tbl_entry_->id2)) { \ _prev_ = _tbl_;_tbl_ = _tbl_->pNext;} \ if (_tbl_) {if (_prev_) _prev_->pNext = _tbl_->pNext;else _tbl_base_[_key_] = _tbl_->pNext;}else assert(0);} typedef struct _ipDefrag ipDefrag_t; #if defined(__linux__) || defined(__FreeBSD__) #define compiler_barrier() __asm__ __volatile__("":::"memory") #elif defined(WIN32) || defined (WIN64) #define compiler_barrier() #endif /* Msg box definition and control type and macros */ #define MSG_BOX_DEPTH 0x2000 typedef struct _msg_box_s { volatile uint32_t rdIdx; // Read index of msg box volatile uint32_t wrIdx; // Write index of msg box NtNetBuf_t data[MSG_BOX_DEPTH]; // Msg box elements (pointers to hNetBufs) } msg_box_t; /* Macros to use the msg box */ #define MSG_BOX_EMPTY(_fifo_) ((_fifo_)->wrIdx==(_fifo_)->rdIdx) #define MSG_BOX_FULL(_fifo_) (((_fifo_)->wrIdx-(_fifo_)->rdIdx) >= MSG_BOX_DEPTH) #define MSG_BOX_GET(_fifo_) (_fifo_)->data[(_fifo_)->rdIdx&(MSG_BOX_DEPTH-1)]; compiler_barrier(); (_fifo_)->rdIdx++ #define MSG_BOX_PUT(_fifo_, _elem_) \ do { \ (_fifo_)->data[(_fifo_)->wrIdx&(MSG_BOX_DEPTH-1)]=_elem_ ; \ compiler_barrier(); \ (_fifo_)->wrIdx++; \ } while (0) /* Wait-list type definition */ #define MAX_WAIT_ELEM 1024 typedef struct wait_list_s { NtNetBuf_t hNetBuf; struct wait_list_s *pNext; } wait_list_t; /* Un-matched thread variable definition */ typedef struct _unmThread_s { pthread_t thread; // This thread int streamId; // NT Stream Id to service by this thread DOI_dgramTbl_t *pDOI; // Datagram Of Interest tables. For this Unmatched Thread only, indexed for each Reassembly Thread msg_box_t *pMsgboxReturn; // All msg boxes used by the current unmatched thread indexed [reasmCnt] wait_list_t WaitListElem[MAX_WAIT_ELEM]; // All awailable wait-list elements wait_list_t *pFree; // Linked list of free wait-list elements wait_list_t *pWait; // Linked list of waiting wait-list elements ipDefrag_t *pIpDefrag; // Pointer to main instance variable /* Statistics */ int fragRecv; // Counter for all fragments received by this un-matched thread int fragsDeleted; // Counter for all timed out and deleted fragments in wait-list } unmThread_t; /* Re-assembling thread variable definition */ typedef struct _reasmThread_s { pthread_t thread; // This thread int idx; // Index number of this re-assembling thread (first re-assembling thread starts with index 0) int streamId; // NT Stream Id to service by this thread tbl_entry_t *tbl[REASSEMBLY_HASH_TBL_SIZE]; // Hash table for the current re-assembling thread tbl_entry_t *tblFree; // Free elements for the hash table msg_box_t *pMsgbox; // All msg boxes used by the current re-assembling thread indexed [unmCnt] ipDefrag_t *pIpDefrag; // Pointer to main instance variable /* statistics */ int datagramCompleted; // Counter for datagrams sent in fragments and re-assembled sucessfully int fragmentsTimedout; // Counter for all timed out and deleted fragments in hash table int firstFragRcv; // Counter for all first-fragments received by this re-assembling thread int nonFragments; // Counter for all non-fragmented packets received by this re-assembling stream int Src_IdClash; // Counter for all Id clashes encountered (frags with src,dst,prot,fragId identical) int msgboxPackets; // Counter for remaining msg box entries on exit } reasmThread_t; struct _ipDefrag { int Running; // Application running control variable int adapterNo; // Adapter number to run on int unmCnt; // number of un-matched streams int unmStart; // Value of first un-matched streamid int reasmCnt; // Number of re-assembling streams int reasmStart; // Value of first re-assembling streamId uint64_t fragTimeout; // Fragment timeout setting in ms unmThread_t *pUnmThreads; // All un-matched thread instance pointers DOI_dgramTbl_t *pDOI_Tbls; // Hash tables to notify interest of fragments to the Unmatched Threads - indexed [unmCnt][reasmCnt] reasmThread_t *pReasmThreads; // All re-assembling thread instance pointers msg_box_t *pReasmMsgboxes; // Pointer to all re-assembling threads message boxes msg_box_t *pReasmReturnMsgboxes; // Pointer to all NetBuf return message boxes int ExtDescrType; // The NT extended descriptor type used by FPGA to access and use the correct macros int allReasmClosed; // To let all Unm-threads/streams close after re-assembling threads are closed }; #define NO_OPTIONS 0 #define OPTION_HELP (1<<1) #define OPTION_ADAPTER (1<<2) #define OPTION_NUM_REASM (1<<3) #define OPTION_NUM_UNM (1<<4) #define OPTION_TIMEOUT (1<<5) #define OPTION_TABLE_PERSIST (1<<6) #define OPTION_TABLE_TIMEOUT (1<<7) static int opt_adapter = -1; static int opt_reasm = -1; static int opt_unm = -1; static int opt_frag = -1; static char *opt_persist = NULL; static int opt_timeout = -1; /** * Table of valid options. */ struct argparse_option arg_options[] = { OPT_HELP(), OPT_INTEGER('a', "adapter", &opt_adapter, "The adapter to run tests on", NULL, 0, 0, "adapter number"), OPT_INTEGER('r', "reasm", &opt_reasm, "Number of concurrent IP fragments re-assembling streams/threads", NULL, 0, 0, "number"), OPT_INTEGER('u', "unm", &opt_unm, "Number of concurrent un-matched streams/threads", NULL, 0, 0, "number"), OPT_INTEGER('f', "fragtimeout", &opt_frag, "How old fragments may get before they are deleted from tables (ms)", NULL, 0, 0, "ms"), OPT_STRING( 'p', "tablepersist", &opt_persist, "FPGA TablePersist", NULL, 0, 0, "timeout|lastfragment"), OPT_INTEGER('t', "tabletimeout", &opt_timeout, "FPGA table timeout", NULL, 0, 0, "value"), OPT_END(), }; static struct _ipDefrag IpDefrag; // The main instance handle static const char *usageText[] = { "Syntax:\n" "\n" "ipfdemo_example [-a <adapter number>] [-r <number>] [-u <number>] [-f <ms>] [-p <timeout|lastfragment>] [-t <value>]\n" "\nCommands:\n", NULL}; /***************************************************************************** Function to get system time in milliseconds ******************************************************************************/ static uint64_t GetSystemTimeNoArg(void) { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec * 1000000 + tv.tv_usec; } /***************************************************************************** Called from the un-matched threads, to check if any re-assembling threads are waiting for this fragment. It is communicated using the DOI hash table. One table for each re-assembling/un-matched thread combination. This is done to make singular write access to each table (no locks needed). ******************************************************************************/ static int SendToReasm(unmThread_t *pUnm, NtNetBuf_t *phNetBuf) { int i; int32_t entry; uint32_t src, dst; uint8_t prot; uint16_t id; DOI_dgramId_u val, val1; DOI_dgramTbl_t *pTbl; msg_box_t *pMsgbox; int idx; NtNetBuf_t hNetBuf = *phNetBuf; IPV4_GET_DGRAM_ID(hNetBuf, src, dst, id, prot); val.src = src; val.id = id; val1.src = dst; val1.id = prot; for (i = 0; i < pUnm->pIpDefrag->reasmCnt; i++) { pTbl = &pUnm->pDOI[i * DOI_FRAG_TBL_SIZE]; DOI_HASH_GET_KEY(entry, src, id); pMsgbox = &pUnm->pIpDefrag->pReasmMsgboxes[i * pUnm->pIpDefrag->unmCnt + (pUnm->streamId - pUnm->pIpDefrag->unmStart)]; idx=0; while (idx < MAX_SRC_ID) { if (pTbl[entry].aSrc_id[idx] == val.src_id && pTbl[entry].aDst_pr[idx] == val1.src_id) { if (!MSG_BOX_FULL(pMsgbox)) { MSG_BOX_PUT(pMsgbox, hNetBuf); *phNetBuf = NULL; return 1; } else { /* Msgbox full, we try again later */ return 0; } } idx++; } } return 0; } /***************************************************************************** This is the main un-matched thread routine. One thread for each un-matched stream is spawned to handle all the un-matched streams configured. Upon receival of an IP fragment from hardware, this routine searches the DOI tables (restricted to the ones associated with this stream ID) for interest in this fragment from any re-assembling threads. If one is waiting for fragments belonging to this particular datagram, it is send to that re-assembling thread using the msg box mechanism. If none interested (yet), the fragment is temporarily stored in a wait-list. Frequently the wait-list is scanned together with the DOI tables to make sure the fragments are sent to the corresponding re-assembling thread when it raises interest herein. If the wait-list is full, no more fragments are read until the last hNetBuf can be delivered or added to wait-list. ******************************************************************************/ #if defined(__linux__) || defined(__FreeBSD__) static void *_UnMatchedThread(void *arg) #elif defined(WIN32) || defined (WIN64) static unsigned __stdcall _UnMatchedThread(void *arg) #endif { unmThread_t *pUnm = (unmThread_t *)arg; // this pointer for this thread char streamName[20]; // Stream name char errorBuffer[NT_ERRBUF_SIZE]; // Error buffer NtNetStreamRx_t hNetRx; // Handle to the RX stream NtNetBuf_t hNetBuf = NULL; // Net buffer container int i; // loop variable int status; // status from NTAPI calls int invalidPkt; // Invalid packet indicator (we only receive IP fragments with these streams) int checkTimeout = 0; // Indicator for fragment timeout check uint64_t sysTs = 0; // System Timestamp int deleteIt; // If we should delete the entry from the wait-list. Is delivered or timed out wait_list_t *pWait; // Help pointer to cleanup wait list on exit sprintf(streamName,"IPFUnm_%i", pUnm->streamId); /* Fill free list with elements for the wait-list */ pUnm->WaitListElem[0].pNext = NULL; for (i = 1; i < MAX_WAIT_ELEM; i++) { pUnm->WaitListElem[i].pNext = &pUnm->WaitListElem[i - 1]; } pUnm->pFree = &pUnm->WaitListElem[MAX_WAIT_ELEM - 1]; // Get a stream handle if((status = NT_NetRxOpen(&hNetRx, streamName, NT_NET_INTERFACE_PACKET, pUnm->streamId, -1)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] NT_NetRxOpen() failed: %s\n", pUnm->streamId, errorBuffer); #if defined(__linux__) || defined(__FreeBSD__) return (void *)NULL; #elif defined(WIN32) || defined (WIN64) { _endthreadex(0); return 0; } #endif } while(pUnm->pIpDefrag->Running) { /* Check all Return Msg boxes for fragment packets to be freed */ for (i = 0; i < pUnm->pIpDefrag->reasmCnt; i++) { while (!MSG_BOX_EMPTY(&pUnm->pMsgboxReturn[i])) { NtNetBuf_t hNetBuf1; hNetBuf1 = MSG_BOX_GET(&pUnm->pMsgboxReturn[i]); // printf("Releasing unmatched fragment %i (%llx) by thread index %i\n", i, hNetBuf1, pUnm->streamId - pUnm->pIpDefrag->unmStart); NT_NetRxRelease(hNetRx, hNetBuf1); } } /* Check for entries in the wait-list to be delivered to re-assembling thread who has signaled interest through (DOI hash table) */ if (pUnm->pWait) { wait_list_t *pNxt, *pPrev = NULL, *pWaitFrag = pUnm->pWait; if (checkTimeout) { sysTs = GetSystemTimeNoArg()*100; // make it 10ns units } /* Loop all elements in list */ while (pWaitFrag) { /* Try to send to an interested re-assembly tread */ deleteIt = SendToReasm(pUnm, &pWaitFrag->hNetBuf); if (!deleteIt && checkTimeout) { /* check for timed out condition */ if (sysTs - NT_NET_GET_PKT_TIMESTAMP(pWaitFrag->hNetBuf) > pUnm->pIpDefrag->fragTimeout) { deleteIt=1; } } /* Remove from list if send or timed out */ if (deleteIt) { if (pWaitFrag->hNetBuf) { NT_NetRxRelease(hNetRx, pWaitFrag->hNetBuf); pWaitFrag->hNetBuf = NULL; pUnm->fragsDeleted++; } pNxt = pWaitFrag->pNext; /* Move base if needed */ if (pUnm->pWait == pWaitFrag) { pUnm->pWait = pNxt; } else if (pPrev) { pPrev->pNext = pWaitFrag->pNext; } /* Put onto free list */ pWaitFrag->pNext = pUnm->pFree; pUnm->pFree = pWaitFrag; /* Step to next element */ pWaitFrag = pNxt; } else { /* Jump over element */ pPrev = pWaitFrag; pWaitFrag = pWaitFrag->pNext; } } } if (hNetBuf == NULL) { /* Read new fragments from stream */ if((status = NT_NetRxGet(hNetRx, &hNetBuf, 1)) != NT_SUCCESS) { if((status == NT_STATUS_TIMEOUT) || (status == NT_STATUS_TRYAGAIN)) { if (pUnm->pIpDefrag->fragTimeout) checkTimeout = 1; continue; } // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] NT_NetRxGet() failed: %s\n", pUnm->streamId, errorBuffer); #if defined(__linux__) || defined(__FreeBSD__) return (void *)NULL; #elif defined(WIN32) || defined (WIN64) { _endthreadex(0); return 0; } #endif } } assert(hNetBuf != NULL); /* New packet received */ invalidPkt = 1; if (NT_NET_GET_PKT_L2_FRAME_TYPE(hNetBuf) == NT_L2_FRAME_TYPE_ETHER_II && NT_NET_GET_PKT_L3_FRAME_TYPE(hNetBuf) == NT_L3_FRAME_TYPE_IPv4 && NT_NET_GET_PKT_L3_FRAGMENTED(hNetBuf)) { /* Packet is fragmented and valid for our use */ if (NT_NET_GET_PKT_DESCRIPTOR_FORMAT(hNetBuf) == 9) { if (NT_NET_GET_PKT_L3_FIRST_FRAG(hNetBuf)) { fprintf(stderr, "[%i] ERROR un-matched streams may never receive first fragment\n", pUnm->streamId); exit(0); } invalidPkt = 0; if (SendToReasm(pUnm, &hNetBuf) == 0) { /* No match found. Put entry into wait list */ if (pUnm->pFree == NULL) { /* We wait for available slots */ usleep(1); if (pUnm->pIpDefrag->fragTimeout) checkTimeout = 1; } else { wait_list_t *pNxt; pNxt = pUnm->pFree->pNext; pUnm->pFree->pNext = pUnm->pWait; pUnm->pWait = pUnm->pFree; pUnm->pFree = pNxt; pUnm->pWait->hNetBuf = hNetBuf; /* Buffer deliverd to wait-list */ hNetBuf = NULL; } } if (hNetBuf == NULL) { pUnm->fragRecv++; } } } if (invalidPkt && !NT_NET_GET_PKT_L3_FRAGMENTED(hNetBuf)) { fprintf(stderr, "[%i] ERROR, invalid non-ipv4-fragment received on one un-matched fragment stream\n", pUnm->streamId); exit(0); } } /* Exiting section */ pWait = pUnm->pWait; while (pWait) { // Release pending fragment packets NT_NetRxRelease(hNetRx, pWait->hNetBuf); pWait = pWait->pNext; } /* All re-assembling threads has to close before we may close the un-matched threads. We need these threads to continue running while freeing hNetBuf's sent from the un-matched streams */ while (pUnm->pIpDefrag->allReasmClosed == 0) { usleep(1000); } /* Empty all Return Msg Boxes */ for (i = 0; i < pUnm->pIpDefrag->reasmCnt; i++) { while (!MSG_BOX_EMPTY(&pUnm->pMsgboxReturn[i])) { NtNetBuf_t buf = MSG_BOX_GET(&pUnm->pMsgboxReturn[i]); NT_NetRxRelease(hNetRx, buf); } } /* Close NT stream */ if ((status = NT_NetRxClose(hNetRx)) != NT_SUCCESS) { NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] Un-matched: NT_NetRxGet() failed: %s\n", pUnm->streamId, errorBuffer); } #if defined(__linux__) || defined(__FreeBSD__) return (void *)NULL; #elif defined(WIN32) || defined (WIN64) { _endthreadex(0); return 0; } #endif } static void _FreeUnmFragment(reasmThread_t *pReasm, int unmIndex, NtNetBuf_t hNetBuf) { msg_box_t *pMsgbox; int waitCnt; pMsgbox = &pReasm->pIpDefrag->pReasmReturnMsgboxes[unmIndex * pReasm->pIpDefrag->reasmCnt + (pReasm->streamId - pReasm->pIpDefrag->reasmStart)]; waitCnt=0; while (waitCnt < 100) { if (MSG_BOX_FULL(pMsgbox)) { usleep(1000); waitCnt++; } else { MSG_BOX_PUT(pMsgbox, hNetBuf); break; } }; if (waitCnt == 100) { fprintf(stderr, "[%i] Return Msg Box full - failed to put element.\n", pReasm->streamId); } } /***************************************************************************** Free a complete re-assembled datagram ******************************************************************************/ static void _ReleaseDgramTblEntry(reasmThread_t *pReasm, tbl_entry_t *tbl_entry, NtNetStreamRx_t hNetRx) { int i; DEL_ENTRY_FROM_TBL(pReasm->tbl, tbl_entry_t, REASSEMBLY_HASH_TBL_SIZE, tbl_entry); for (i = 0; i < tbl_entry->fragCnt; i++) { if (tbl_entry->aFrag[i].fromUnm) { /* Find Return Msg box to return NetBuf to */ _FreeUnmFragment(pReasm, tbl_entry->aFrag[i].unmIndex, tbl_entry->aFrag[i].hNetBuf); } else { NT_NetRxRelease(hNetRx, tbl_entry->aFrag[i].hNetBuf); } } /* zero out interest in DOI tbl */ if (tbl_entry->pDOI_Src_id) { *tbl_entry->pDOI_Src_id = 0; } if (tbl_entry->pDOI_Dst_pr) { *tbl_entry->pDOI_Dst_pr = 0; } tbl_entry->id1 = 0; tbl_entry->id2 = 0; RELEASE_TBL_ENTRY(pReasm->tblFree, tbl_entry); } /***************************************************************************** Check if the datagram entry specified in tbl_entry in hash table is complete ******************************************************************************/ static int _CheckDatagramComplete(reasmThread_t *pReasm, tbl_entry_t *tbl_entry, NtNetStreamRx_t hNetRx) { int size = 0, i; int fstFrag = -1; int lstFrag = -1; /* Check all fragments in this datagram entry */ for (i = 0; i < tbl_entry->fragCnt; i++) { fstFrag = i; if (tbl_entry->aFrag[i].lastFrag) { lstFrag = i; } size += tbl_entry->aFrag[i].size; } /* if all received, process and release it */ if (fstFrag >= 0 && lstFrag >= 0) { /* Got both first and last frag */ if (tbl_entry->aFrag[lstFrag].offset + tbl_entry->aFrag[lstFrag].size == size) { pReasm->datagramCompleted++; /* A complete datagram received and collected. Should do a checksum calculation on it to validate correctness */ _ReleaseDgramTblEntry(pReasm, tbl_entry, hNetRx); return 1; } } return 0; } /***************************************************************************** The main re-assembling thread routine. This function is called for each stream specified in the load-balance configuration. It collects packets and fragments. On fragment receival, it collects them into one datagram collection for re-assembling. The hNetBufs are released. The thread may receive the fragments from the stream it reads from, or it receives it from a msg box from one of the un-matched threads. On receival of a first-fragment, the interest of that datagram is raised in the DOI table associated with the un-matched thread these fragments will be sent on. A first-fragment will always be sent on the ordinary data streams, never on one of the un-matched streams specified. ******************************************************************************/ #if defined(__linux__) || defined(__FreeBSD__) static void *_ReassemblyThread(void *arg) #elif defined(WIN32) || defined (WIN64) static unsigned __stdcall _ReassemblyThread(void *arg) #endif { reasmThread_t *pReasm = (reasmThread_t *)arg; char tmpBuffer[20]; // Buffer used to create NTPL expressions char errorBuffer[NT_ERRBUF_SIZE]; // Error buffer NtNetStreamRx_t hNetRx; // Handle to the RX stream NtNetBuf_t hNetBuf=NULL; // Net buffer container int i,ii; // Loop counters int status; // NTAPI call return value uint8_t fromUnm; // Flag to indicate that the hNetBuf is sent from an un-matched thread uint8_t unmIndex; int timeoutChkCnt; // Counter to control packet timeout checking uint64_t sysTs; // Variable to hold system time stamp DOI_dgramId_u val, val1; // DOI datagram ID variables tbl_entry_t *pTbl; // Hash table traverse helper variable sprintf(tmpBuffer,"IPFReasm_%i", pReasm->streamId); if((status = NT_NetRxOpen(&hNetRx, tmpBuffer, NT_NET_INTERFACE_PACKET, pReasm->streamId, -1)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] NT_NetRxOpen() failed: %s\n", pReasm->streamId, errorBuffer); #if defined(__linux__) || defined(__FreeBSD__) return (void *)NULL; #elif defined(WIN32) || defined (WIN64) { _endthreadex(0); return 0; } #endif } timeoutChkCnt = 0; while(pReasm->pIpDefrag->Running) { hNetBuf=NULL; /* Check all Msg boxes for fragment packets from un-matched threads */ for (i = 0; i < pReasm->pIpDefrag->unmCnt; i++) { if (!MSG_BOX_EMPTY(&pReasm->pMsgbox[i])) { hNetBuf = MSG_BOX_GET(&pReasm->pMsgbox[i]); fromUnm=1; unmIndex = (uint8_t)i; break; } } if (hNetBuf==NULL) { if((status = NT_NetRxGet(hNetRx, &hNetBuf, 1)) != NT_SUCCESS) { if((status == NT_STATUS_TIMEOUT) || (status == NT_STATUS_TRYAGAIN)) { if (pReasm->pIpDefrag->fragTimeout > 0) { if (++timeoutChkCnt >= 100) { int deleteIt = 0; /* Run through list of fragments to delete timed out fragments */ sysTs = GetSystemTimeNoArg()*100; // make it 1/10ns units for (i = 0; i < REASSEMBLY_HASH_TBL_SIZE; i++) { if (pReasm->tbl[i]) { tbl_entry_t *pNxt, *tbl_entry = pReasm->tbl[i]; while (tbl_entry) { deleteIt = 0; for (ii = 0; ii < tbl_entry->fragCnt; ii++) { if (sysTs - NT_NET_GET_PKT_TIMESTAMP(tbl_entry->aFrag[ii].hNetBuf) > pReasm->pIpDefrag->fragTimeout) { deleteIt=1; break; } } if (deleteIt) { pNxt = tbl_entry->pNext; _ReleaseDgramTblEntry(pReasm, tbl_entry, hNetRx); pReasm->fragmentsTimedout++; tbl_entry = pNxt; } else { tbl_entry = tbl_entry->pNext; } } } } timeoutChkCnt = 0; } } continue; } if (status) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] NT_NetRxGet() failed: %s\n", pReasm->streamId, errorBuffer); goto EXIT; } } fromUnm = 0; unmIndex = 0; } timeoutChkCnt = 0; if (NT_NET_GET_PKT_L2_FRAME_TYPE(hNetBuf) == NT_L2_FRAME_TYPE_ETHER_II && NT_NET_GET_PKT_L3_FRAME_TYPE(hNetBuf) == NT_L3_FRAME_TYPE_IPv4 && NT_NET_GET_PKT_L3_FRAGMENTED(hNetBuf)) { uint32_t unmEntry; uint32_t src, dst; uint8_t prot; uint16_t id; int offset; tbl_entry_t *tbl_entry=NULL; DOI_dgramTbl_t *unmTbl; if (NT_NET_GET_PKT_L3_FIRST_FRAG(hNetBuf)) pReasm->firstFragRcv++; IPV4_GET_DGRAM_ID(hNetBuf, src, dst, id, prot); val.src = src; val.id = id; val1.src = dst; val1.id = prot; LOOKUP_ENTRY(pReasm->tbl, tbl_entry_t, REASSEMBLY_HASH_TBL_SIZE, val.src_id, val1.src_id, tbl_entry); if (tbl_entry == NULL) { GET_NEW_TBL_ENTRY(pReasm->tblFree, tbl_entry_t, tbl_entry); } if (tbl_entry->id1 == 0) { // New datagram fragment received tbl_entry->id1 = val.src_id; tbl_entry->id2 = val1.src_id; tbl_entry->fragCnt = 0; tbl_entry->pDOI_Src_id = NULL; tbl_entry->pDOI_Dst_pr = NULL; ADD_ENTRY_TO_TBL(pReasm->tbl, tbl_entry_t, REASSEMBLY_HASH_TBL_SIZE, tbl_entry); } else { // Error check if (tbl_entry->fragCnt == MAX_FRAG_CNT) { fprintf(stderr, "[%i] ERROR - MAX frag cnt reached\n", pReasm->streamId); exit(0); } } // Add fragment to datagram in list tbl_entry->aFrag[tbl_entry->fragCnt].hNetBuf = hNetBuf; tbl_entry->aFrag[tbl_entry->fragCnt].fromUnm = fromUnm; tbl_entry->aFrag[tbl_entry->fragCnt].unmIndex = unmIndex; /* * warning You may get conversion warnings here caused by ntohs(). This is a bug in /usr/include/bits/byteswap.h */ tbl_entry->aFrag[tbl_entry->fragCnt].offset = (uint16_t)IPV4_FRAGMENT_OFFSET(hNetBuf); tbl_entry->aFrag[tbl_entry->fragCnt].size = (uint16_t)IPV4_DATA_LEN(hNetBuf); tbl_entry->aFrag[tbl_entry->fragCnt].firstFrag = (uint8_t)NT_NET_GET_PKT_L3_FIRST_FRAG(hNetBuf); if (pReasm->pIpDefrag->ExtDescrType == 9) { tbl_entry->aFrag[tbl_entry->fragCnt].lastFrag = (uint8_t)NT_NET_GET_PKT_IPF_LAST_FRAGMENT(hNetBuf); } else { tbl_entry->aFrag[tbl_entry->fragCnt].lastFrag = IPV4_LAST_FRAG(hNetBuf); } tbl_entry->fragCnt++; /* Check datagram for completion */ if (_CheckDatagramComplete(pReasm, tbl_entry, hNetRx) == 0) { if (NT_NET_GET_PKT_DESCRIPTOR_FORMAT(hNetBuf) == 9) { if (NT_NET_GET_PKT_L3_FIRST_FRAG(hNetBuf)) { int idx=-1; /* Find thread-index of the un-matched stream receiving un-matched fragments */ offset = NT_NET_GET_PKT_IPF_UNMATCHED_STREAMID(hNetBuf) - pReasm->pIpDefrag->unmStart; /* Get a pointer to the DOI table between this re-assembling thread and the un-matching fragments thread */ unmTbl = &pReasm->pIpDefrag->pDOI_Tbls[(offset * pReasm->pIpDefrag->reasmCnt + pReasm->idx)* DOI_FRAG_TBL_SIZE]; DOI_HASH_GET_KEY(unmEntry, src, id); // On first fragment receival - Notify the corresponding un-matched thread about interest in fragments of this datagram for (i=0;i<MAX_SRC_ID;i++) { if (unmTbl[unmEntry].aSrc_id[i] == val.src_id && unmTbl[unmEntry].aDst_pr[i] == val1.src_id ) { pReasm->Src_IdClash++; /* Ok already there! */ idx = i; break; } if (idx < 0 && unmTbl[unmEntry].aSrc_id[i] == 0) { idx = i; } } if (idx >= 0) { tbl_entry->pDOI_Src_id = &unmTbl[unmEntry].aSrc_id[idx]; tbl_entry->pDOI_Dst_pr = &unmTbl[unmEntry].aDst_pr[idx]; *tbl_entry->pDOI_Dst_pr = val1.src_id; *tbl_entry->pDOI_Src_id = val.src_id; } else { fprintf(stderr, "[%i] Too many clashes in DOI (more than %i - raise MAX_SRC_ID)\n", pReasm->streamId, MAX_SRC_ID); exit(0); } } } } } else { pReasm->nonFragments++; // Release packet as it is not fragmented. if((status = NT_NetRxRelease(hNetRx, hNetBuf)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] NT_NetRxRelease() failed: %s\n", pReasm->streamId , errorBuffer); goto EXIT; } } } EXIT: for (i = 0; i < REASSEMBLY_HASH_TBL_SIZE; i++) { pTbl = pReasm->tbl[i]; while (pTbl) { // Release all hash table fragment packets for (ii = 0; ii < pTbl->fragCnt; ii++) { if (pTbl->aFrag[ii].fromUnm) { _FreeUnmFragment(pReasm, pTbl->aFrag[ii].unmIndex, pTbl->aFrag[ii].hNetBuf); } else { NT_NetRxRelease(hNetRx, pTbl->aFrag[ii].hNetBuf); } } pTbl = pTbl->pNext; } } /* Empty Msg boxes */ for (i = 0; i < pReasm->pIpDefrag->unmCnt; i++) { while (!MSG_BOX_EMPTY(&pReasm->pMsgbox[i])) { hNetBuf = MSG_BOX_GET(&pReasm->pMsgbox[i]); _FreeUnmFragment(pReasm, i, hNetBuf); pReasm->msgboxPackets++; } } // Close the stream if ((status = NT_NetRxClose(hNetRx)) != NT_SUCCESS) { NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "[%i] Re-asm thread: NT_NetRxGet() failed: %s\n", pReasm->streamId, errorBuffer); } #if defined(__linux__) || defined(__FreeBSD__) return (void *)NULL; #elif defined(WIN32) || defined (WIN64) { _endthreadex(0); return 0; } #endif } /***************************************************************************** Ctrl-C signal handler routine. ******************************************************************************/ #if defined(WIN32) || defined (WIN64) static BOOL WINAPI StopApplication(int sig) #else static void StopApplication(int sig __attribute__((unused))) #endif { #ifdef WIN32 IpDefrag.Running = 0; return TRUE; #else if (sig == SIGINT) IpDefrag.Running = 0; #endif } /* Default paramters */ #define INPUT_ADAPTER 0 #define FRAGMENT_TIMEOUT 2000 // mSec #define NUM_STREAMS 4 #define NUM_UNM_STREAMS 4 #define TABLE_TIMEOUT 125 #define TABLE_PERSIST_TIMEOUT 0 /***************************************************************************** Main routine. It initializes, configures FPGA, starts needed threads (un-matched and re-assembling threads), and loops while printing out statistics. On Ctrl-C it stops looping and begins the cleanup process. ******************************************************************************/ int main(int argc, const char *argv[]) { char tmpBuffer[100]; // Buffer to build filter string char errorBuffer[NT_ERRBUF_SIZE]; // Error buffer int status; // Status variable NtNtplInfo_t ntplInfo; // Return data structure from the NT_NTPL() call. NtInfoStream_t hInfoStream; // Info stream handle NtInfo_t hInfo; // Info handle int i, ii; // Counter variables int inp_1, inp_2; // Input port numbers for selected adapter NtConfigStream_t hCfgStream; // Handle to a config stream int hashTblFreed; // Counter for re-assemble hash table entries remained on exit int waitTblFreed; // Counter for un-matched wait table entries remained on exit int msgboxElmFreed; // Counter for hNetBuf elements remained in msg boxes int tablePersistTimeout; // Control parameter for NTPL IPFMode TablePersist setting int tableTimeout; // Control parameter for NTPL IPFMode timeout setting struct argparse argparse; #define NUM_NTPL 3 const char *ntplExpr[NUM_NTPL] = {"Assign[streamid=(%i..%i)]=port==(%i..%i)", "HashMode=Hash%iTuple", "IPFMode[StreamId=(%i..%i);timeout=%i;TablePersist=%s]=port==(%i..%i)"}; // Set up ctrl+c handler #if defined(WIN32) || defined (WIN64) SetConsoleCtrlHandler((PHANDLER_ROUTINE)StopApplication, TRUE); #else struct sigaction newaction; // Ctrl+c handle memset(&newaction, 0, sizeof(newaction)); newaction.sa_handler = StopApplication; if (sigaction(SIGINT, &newaction, NULL) < 0) { fprintf(stderr, "Failed to register sigaction.\n"); exit(1); } #endif // Setup default parameter settings memset(&IpDefrag, 0 , sizeof(struct _ipDefrag)); IpDefrag.adapterNo = INPUT_ADAPTER; IpDefrag.reasmCnt = NUM_STREAMS; IpDefrag.reasmStart = 0; IpDefrag.unmCnt = NUM_UNM_STREAMS; IpDefrag.unmStart = IpDefrag.reasmStart + IpDefrag.reasmCnt; IpDefrag.fragTimeout = FRAGMENT_TIMEOUT * 100000ULL; tableTimeout = TABLE_TIMEOUT * 10; tablePersistTimeout = TABLE_PERSIST_TIMEOUT; argparse_init(&argparse, arg_options, usageText, 0); argparse_parse(&argparse, argc, argv); if (opt_adapter != -1) { IpDefrag.adapterNo = opt_adapter; } if (opt_reasm != -1) { IpDefrag.reasmCnt = opt_reasm; } if (opt_unm != -1) { IpDefrag.unmCnt = opt_unm; } if (opt_frag != -1) { IpDefrag.fragTimeout = (uint64_t)opt_frag * 100000ULL; } if (opt_persist != NULL) { if (strcmp(opt_persist, "timeout") == 0) { tablePersistTimeout = 1; } } if (opt_timeout != -1) { tableTimeout = opt_timeout * 10; } // Initialize the NTAPI library and thereby check if NTAPI_VERSION can be used together with this library if((status = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_Init() failed: %s\n", errorBuffer); return -1; } /* Open the info stream */ if ((status = NT_InfoOpen(&hInfoStream, "IPFExample")) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_InfoOpen() failed: %s\n", errorBuffer); return -1; } /* Read number of adapter */ hInfo.cmd=NT_INFO_CMD_READ_ADAPTER_V6; hInfo.u.adapter_v6.adapterNo = (uint8_t) IpDefrag.adapterNo; if((status = NT_InfoRead(hInfoStream, &hInfo)) != 0) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_InfoRead() failed: %s\n", errorBuffer); return -1; } /* collect port range for the selected adapter */ inp_1 = hInfo.u.adapter_v6.data.portOffset; inp_2 = hInfo.u.adapter_v6.data.portOffset + hInfo.u.adapter_v6.data.numPorts - 1; /* Ensure that Ext9 was configured. */ if (NT_PACKET_DESCRIPTOR_TYPE_NT_EXTENDED == hInfo.u.adapter_v6.data.descriptorType && 9 == hInfo.u.adapter_v6.data.extendedDescriptor) { IpDefrag.ExtDescrType = hInfo.u.adapter_v6.data.extendedDescriptor; } else { fprintf(stderr, "The packet descriptor is not Ext9. Please set 'PacketDescriptor = Ext9' in ntservice.ini file for the selected adapter.\n"); return -1; } /* Get timesync configuration. Must use OS mode to run timeouts with this demo */ if (IpDefrag.fragTimeout) { hInfo.cmd=NT_INFO_CMD_READ_TIMESYNC_V4; hInfo.u.timeSync_v4.adapterNo = (uint8_t) IpDefrag.adapterNo; if ((status = NT_InfoRead(hInfoStream, &hInfo)) != NT_SUCCESS) { fprintf(stderr, "Failed to read timesync info.\n"); return -1; } if (hInfo.u.timeSync_v4.data.timeRef != NT_TIMESYNC_REFERENCE_OSTIME) { fprintf(stderr, "The timesync reference clock on the selected adapter is not OS. Please set TimeSyncReferencePriority=OsTime in ntservice.ini file for the selected adapter\n"); return -1; } } /* Close info stream */ if((status = NT_InfoClose(hInfoStream)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_InfoClose() failed: %s\n", errorBuffer); return -1; } // Open a config stream to assign a filter to a stream ID. if((status = NT_ConfigOpen(&hCfgStream, "IPFExample")) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigOpen() failed: %s\n", errorBuffer); return -1; } /* Build final NTPL command strings */ for (i = 0; i < NUM_NTPL; i++) { switch (i) { case 0: sprintf(tmpBuffer, ntplExpr[i], IpDefrag.reasmStart, IpDefrag.reasmStart + IpDefrag.reasmCnt - 1, inp_1, inp_2); break; case 1: sprintf(tmpBuffer, ntplExpr[i], (IpDefrag.ExtDescrType == 9)?5:2); break; case 2: if (IpDefrag.ExtDescrType == 9) { if (tablePersistTimeout) { sprintf(tmpBuffer, ntplExpr[i], IpDefrag.unmStart, IpDefrag.unmStart + IpDefrag.unmCnt - 1, tableTimeout, "TimeoutOnly", inp_1, inp_2); } else { sprintf(tmpBuffer, ntplExpr[i], IpDefrag.unmStart, IpDefrag.unmStart + IpDefrag.unmCnt - 1, tableTimeout, "LastFragment", inp_1, inp_2); } } else { IpDefrag.unmCnt = 0; continue; } break; default: strcpy(tmpBuffer, ntplExpr[i]); break; } // Assign NTPL expressions needed if((status = NT_NTPL(hCfgStream, tmpBuffer, &ntplInfo, NT_NTPL_PARSER_VALIDATE_NORMAL)) != NT_SUCCESS) { fprintf(stderr, "ERROR --> %s\n", tmpBuffer); // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NTPL() failed: %s\n", errorBuffer); fprintf(stderr, ">>> NTPL errorcode: %X\n", ntplInfo.u.errorData.errCode); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[0]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[1]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[2]); return -1; } } IpDefrag.Running = 1; IpDefrag.allReasmClosed = 0; /* Create IPv4 Reassembly threads */ IpDefrag.pReasmThreads = calloc(1, sizeof(reasmThread_t) * IpDefrag.reasmCnt); if (IpDefrag.pReasmThreads == NULL) { fprintf(stderr, "Memory allocation failed\n"); return -1; } /* Create communication message boxes. One msg box for each Rasm thread from each Unm thread */ /* Create twice - one for NetBuf send from unmatched thread to reassembling thread and one for */ /* returning the NetBuf again to the unmatched thread */ IpDefrag.pReasmMsgboxes = calloc(1, sizeof(msg_box_t) * IpDefrag.reasmCnt * 2 * /* IpDefrag.unmCnt == 0 when IpDefrag.ExtDescrType != 8 */ (IpDefrag.unmCnt == 0 ? 1 : IpDefrag.unmCnt)); if (IpDefrag.pReasmMsgboxes == NULL) { fprintf(stderr, "Memory allocation failed\n"); return -1; } IpDefrag.pReasmReturnMsgboxes = (msg_box_t *)((char *)IpDefrag.pReasmMsgboxes + sizeof(msg_box_t) * IpDefrag.reasmCnt * IpDefrag.unmCnt); /* initialize and start all IP fragment re-assembling threads */ for (i = 0; i < IpDefrag.reasmCnt; i++) { tbl_entry_t *tbl; IpDefrag.pReasmThreads[i].pIpDefrag = &IpDefrag; IpDefrag.pReasmThreads[i].streamId = IpDefrag.reasmStart + i; IpDefrag.pReasmThreads[i].idx = i; IpDefrag.pReasmThreads[i].pMsgbox = &IpDefrag.pReasmMsgboxes[i * IpDefrag.unmCnt]; /* Create initial hash table elements and fill in free list. May grow dynamically later */ IpDefrag.pReasmThreads[i].tblFree = calloc(1, sizeof(tbl_entry_t)); tbl = IpDefrag.pReasmThreads[i].tblFree; if (tbl) { for (ii = 1; ii < INITIAL_HASH_TBL_ENTRY_CNT; ii++) { tbl->pNext = calloc(1, sizeof(tbl_entry_t)); tbl=tbl->pNext; if (tbl == NULL) break; } } if((status = pthread_create(&IpDefrag.pReasmThreads[i].thread, NULL, _ReassemblyThread, (void*)&IpDefrag.pReasmThreads[i])) != 0) { fprintf(stderr, "Unable to create Unmatched stream thread"); return status; } } if (IpDefrag.unmCnt) { /* Create all DOI-tables for all un-matched threads */ IpDefrag.pDOI_Tbls = calloc(1, sizeof(DOI_dgramTbl_t) * DOI_FRAG_TBL_SIZE * IpDefrag.unmCnt * IpDefrag.reasmCnt); /* Create Unmatched fragment threads */ IpDefrag.pUnmThreads = calloc(1, sizeof(unmThread_t) * IpDefrag.unmCnt); if (IpDefrag.pUnmThreads == NULL) { fprintf(stderr, "Memory allocation failed\n"); return -1; } for (i = 0; i < IpDefrag.unmCnt; i++) { IpDefrag.pUnmThreads[i].pIpDefrag = &IpDefrag; IpDefrag.pUnmThreads[i].streamId = IpDefrag.unmStart + i; IpDefrag.pUnmThreads[i].pDOI = &IpDefrag.pDOI_Tbls[i * DOI_FRAG_TBL_SIZE * IpDefrag.reasmCnt]; IpDefrag.pUnmThreads[i].pMsgboxReturn = &IpDefrag.pReasmReturnMsgboxes[i * IpDefrag.reasmCnt]; if((status = pthread_create(&IpDefrag.pUnmThreads[i].thread, NULL, _UnMatchedThread, (void*)&IpDefrag.pUnmThreads[i])) != 0) { fprintf(stderr, "Unable to create Unmatched stream thread"); return status; } } } /* main statistics loop */ while (IpDefrag.Running) { int sum1,sum2,sum3,sum4,sum5; printf("Parameters:\n"); printf("Adapter number to run on %i\n", IpDefrag.adapterNo); printf("Number of streams/threads %i\n", IpDefrag.reasmCnt); printf("Number of un-matched streams/threads %i\n", IpDefrag.unmCnt); printf("Fragment timeout (ms) %i\n", (int)(IpDefrag.fragTimeout/100000)); printf("IPFMode table timeout (ms) %i\n", tableTimeout/10); printf("IPFMode TablePersist "); if (tablePersistTimeout) { printf("TimeoutOnly\n"); } else { printf("LastFragment\n"); } printf("\nRunning with extended descriptor %i\n", IpDefrag.ExtDescrType); printf("-------------------------------------------------------------------\n"); printf("Re-assembling streams:\n"); printf(" # Completed First frag Deleted partial Non-fragmented Id\n"); printf(" (timed out) packets clashes\n"); sum1 = sum2 = sum3 = sum4 = sum5 = 0; for (i = 0; i < IpDefrag.reasmCnt; i++) { printf("%2d:%8d, %8d, %10d, %14d, %12d\n", IpDefrag.pReasmThreads[i].streamId, IpDefrag.pReasmThreads[i].datagramCompleted, IpDefrag.pReasmThreads[i].firstFragRcv, IpDefrag.pReasmThreads[i].fragmentsTimedout, IpDefrag.pReasmThreads[i].nonFragments, IpDefrag.pReasmThreads[i].Src_IdClash); sum1 += IpDefrag.pReasmThreads[i].datagramCompleted; sum2 += IpDefrag.pReasmThreads[i].firstFragRcv; sum3 += IpDefrag.pReasmThreads[i].fragmentsTimedout; sum4 += IpDefrag.pReasmThreads[i].nonFragments; sum5 += IpDefrag.pReasmThreads[i].Src_IdClash; } printf("sum:%7d, %8d, %10d, %14d, %12d\n", sum1,sum2,sum3,sum4,sum5); if (IpDefrag.unmCnt) { printf("Un-matched fragment streams:\n"); printf(" # Fragments Deleted frags\n"); printf(" received (timed out)\n"); for (i = 0; i< IpDefrag.unmCnt; i++) { printf("%2d:%8d,%12d\n", IpDefrag.pUnmThreads[i].streamId, IpDefrag.pUnmThreads[i].fragRecv, IpDefrag.pUnmThreads[i].fragsDeleted); } } printf("\n-------------------------------------------------------------------\n"); fflush(stdout); // Sleep 2 sec sleep(2); } /* Cleanup section */ hashTblFreed = 0; waitTblFreed = 0; msgboxElmFreed = 0; for (i = 0; i < IpDefrag.reasmCnt; i++) { tbl_entry_t *tbl; pthread_join(IpDefrag.pReasmThreads[i].thread, NULL); /* Free all free table entries */ while (IpDefrag.pReasmThreads[i].tblFree) { tbl = IpDefrag.pReasmThreads[i].tblFree->pNext; free(IpDefrag.pReasmThreads[i].tblFree); IpDefrag.pReasmThreads[i].tblFree = tbl; } /* Free all hash table entries containing fragments */ for (ii = 0; ii < REASSEMBLY_HASH_TBL_SIZE; ii++) { while (IpDefrag.pReasmThreads[i].tbl[ii]) { tbl = IpDefrag.pReasmThreads[i].tbl[ii]->pNext; hashTblFreed++; // already released on thread termination free(IpDefrag.pReasmThreads[i].tbl[ii]); IpDefrag.pReasmThreads[i].tbl[ii] = tbl; } } msgboxElmFreed += IpDefrag.pReasmThreads[i].msgboxPackets; } free(IpDefrag.pReasmThreads); IpDefrag.allReasmClosed = 1; for (i = 0; i < IpDefrag.unmCnt; i++) { pthread_join(IpDefrag.pUnmThreads[i].thread, NULL); } for (i = 0; i < IpDefrag.unmCnt; i++) { wait_list_t *pWait = IpDefrag.pUnmThreads[i].pWait; while (pWait) { waitTblFreed++; // already released on thread termination pWait=pWait->pNext; } } free(IpDefrag.pUnmThreads); free(IpDefrag.pReasmMsgboxes); free(IpDefrag.pDOI_Tbls); if (msgboxElmFreed || hashTblFreed || waitTblFreed) { printf("\n----------------------------------------------------------"); if (hashTblFreed) { printf("\n%i entries remained in hash table on exit", hashTblFreed); } if (waitTblFreed) { printf("\n%i entries remained in un-matched fragments wait-table on exit", waitTblFreed); } if (msgboxElmFreed) { printf("\n%i entries remained in msg boxes on exit", msgboxElmFreed); } printf("\n----------------------------------------------------------\n"); } // Delete the filter snprintf(tmpBuffer, 20, "delete=%d", ntplInfo.ntplId); if((status = NT_NTPL(hCfgStream, tmpBuffer, &ntplInfo, NT_NTPL_PARSER_VALIDATE_NORMAL)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_NTPL() failed: %s\n", errorBuffer); fprintf(stderr, ">>> NTPL errorcode: %X\n", ntplInfo.u.errorData.errCode); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[0]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[1]); fprintf(stderr, ">>> %s\n", ntplInfo.u.errorData.errBuffer[2]); return -1; } // Close the config stream if((status = NT_ConfigClose(hCfgStream)) != NT_SUCCESS) { // Get the status code as text NT_ExplainError(status, errorBuffer, sizeof(errorBuffer)); fprintf(stderr, "NT_ConfigClose() failed: %s\n", errorBuffer); return -1; } return 0; }