94 #include <unordered_map> 97 #define STR_INNER(A) #A 98 #define STR(A) STR_INNER(A) 111 if (status != NT_SUCCESS) {
114 std::cerr << message <<
": " << error_buffer << std::endl;
115 std::exit(EXIT_FAILURE);
128 status =
NT_ConfigOpen(&config_stream,
"flow_learn_span NT_ConfigOpen");
131 for (
const auto& ntpl : ntpls) {
148 const uint64_t* ptr =
reinterpret_cast<const uint64_t*
>(flow->
keyData);
152 return (ptr[0] * 45684803) ^
153 (ptr[1] * 198138211) ^
156 (ptr[4] * 3294876479) ^
157 (meta * 663664226587);
167 void rx_task(std::atomic<int>* ready, std::atomic<bool>* end_task,
168 uint32_t stream_id, std::function<
void(
const NtNetBuf_t&)> handle)
181 while (!end_task->load()) {
184 if (status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN)
continue;
209 std::unordered_multimap<uint64_t, std::unique_ptr<NtFlow_t>>
flow_map;
229 while (!end_task->load()) {
253 counter_unhandled += 1;
269 const uint8_t* packet =
reinterpret_cast<const uint8_t*
>(dyn4) + dyn4->
descrLength;
271 auto flow = std::unique_ptr<NtFlow_t>(
new NtFlow_t);
272 std::memset(flow.get(), 0x0,
sizeof(
NtFlow_t));
274 const uint8_t* ipv4_src = packet + dyn4->
offset0;
275 const uint8_t* ipv4_dst = packet + dyn4->
offset0 + 4;
277 const uint8_t* udp_src = packet + dyn4->
offset1;
278 const uint8_t* udp_dst = packet + dyn4->
offset1 + 2;
284 if (std::lexicographical_compare(ipv4_src, ipv4_src + 4, ipv4_dst, ipv4_dst + 4) ||
285 (std::equal(ipv4_src, ipv4_src + 4, ipv4_dst) &&
286 std::lexicographical_compare(udp_src, udp_src + 2, udp_dst, udp_dst + 2))) {
288 std::memcpy(flow->keyData, ipv4_src, 4);
289 std::memcpy(flow->keyData + 4, ipv4_dst, 4);
290 std::memcpy(flow->keyData + 8, udp_src, 2);
291 std::memcpy(flow->keyData + 10, udp_dst, 2);
294 std::memcpy(flow->keyData, ipv4_dst, 4);
295 std::memcpy(flow->keyData + 4, ipv4_src, 4);
296 std::memcpy(flow->keyData + 8, udp_dst, 2);
297 std::memcpy(flow->keyData + 10, udp_src, 2);
304 flow->ipProtocolField = 0x11;
313 auto flow_raw_ptr = flow.get();
319 auto range = flow_map.equal_range(flow->id);
320 for (
auto it = range.first; it != range.second; ++it) {
321 if (std::equal(flow->keyData, flow->keyData + 40, it->second->keyData) &&
322 flow->ipProtocolField == it->second->ipProtocolField &&
323 flow->keyId == it->second->keyId && flow->keySetId == it->second->keySetId) {
328 flow_map.insert(std::pair<uint64_t, std::unique_ptr<NtFlow_t>>{flow->id, std::move(flow)});
345 counter_blacklist_hit += 1;
355 counter_whitelist_hit += 1;
366 constexpr uint8_t adapter_no = 0;
378 "DefineMacro(\"FilterCheck\", \"Port==$1 and Layer3Protocol==IPv4 and Layer4Protocol==UDP\")",
379 "DefineMacro(\"KeyTypeProtoSpecs\", \"(layer3header[12]/32, layer3header[16]/32, layer4header[0]/16,layer4header[2]/16)\")",
385 "KeyType[Name=kt] = {32, 32, 16, 16}",
386 "KeyDef[Name=kd; KeyType=kt; IpProtocolField=Outer; KeySort=Sorted] = KeyTypeProtoSpecs",
389 "Assign[StreamId=0; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == UNHANDLED",
390 "Assign[StreamId=1; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == MISS",
391 "Assign[StreamId=2; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(
BLACKLIST),
392 "Assign[StreamId=3; Descriptor=DYN4, Offset0=Layer3Header[12], Offset1=Layer4Header[0]] = FilterCheck(0) and Key(kd, KeyID=1) == " STR(
WHITELIST),
396 "Assign[StreamId=Drop; Priority=10]=All" 400 NtFlowAttr_t flow_attr;
409 std::atomic<bool> end_task {
false};
410 std::atomic<int> ready {0};
419 while(ready.load() < 5) std::this_thread::yield();
421 std::cout <<
"Press enter to end application..." << std::endl;
425 end_task.store(
true);
427 if (flow_program_task0.joinable()) flow_program_task0.join();
428 if (rx_task3.joinable()) rx_task3.join();
429 if (rx_task2.joinable()) rx_task2.join();
430 if (rx_task1.joinable()) rx_task1.join();
431 if (rx_task0.joinable()) rx_task0.join();
434 std::cout <<
"counter_miss: " <<
counter_miss << std::endl;