began working on an abstraction layer for sockets

This commit is contained in:
PS 2021-01-30 13:22:43 -08:00
parent b1d745aa1f
commit 070773437d
7 changed files with 245 additions and 16 deletions

View File

@ -8,27 +8,23 @@
internal void
BlumenLumen_MicListenJob(gs_thread_context* Ctx, u8* UserData)
{
packet_ringbuffer* MicPacketBuffer = (packet_ringbuffer*)UserData;
#if 0
platform_socket* ListenSocket = CreateSocketAndConnect(Ctx->SocketManager, "127.0.0.1", "20185");
mic_listen_job_data* Data = (mic_listen_job_data*)UserData;
while (true)
{
gs_data Data = SocketReceive(Ctx->SocketManager, &ListenSocket, Ctx->Transient);
if (Data.Size > 0)
gs_data Msg = SocketRecieve(Data->SocketManager, Data->ListenSocket, Ctx->Transient);
if (Msg.Size > 0)
{
OutputDebugStringA("Listened");
MicPacketBuffer->Values[MicPacketBuffer->WriteHead++] = Data;
if (MicPacketBuffer->WriteHead >= PACKETS_MAX)
Data->MicPacketBuffer->Values[Data->MicPacketBuffer->WriteHead++] = Msg;
if (Data->MicPacketBuffer->WriteHead >= PACKETS_MAX)
{
MicPacketBuffer->WriteHead = 0;
Data->MicPacketBuffer->WriteHead = 0;
}
}
}
CloseSocket(Ctx->SocketManager, ListenSocket);
#endif
CloseSocket(Data->SocketManager, Data->ListenSocket);
}
internal gs_data
@ -43,7 +39,11 @@ BlumenLumen_CustomInit(app_state* State, context Context)
Result = PushSizeToData(&State->Permanent, sizeof(blumen_lumen_state));
blumen_lumen_state* BLState = (blumen_lumen_state*)Result.Memory;
BLState->MicListenThread = CreateThread(Context.ThreadManager, BlumenLumen_MicListenJob, (u8*)&BLState->MicPacketBuffer);
BLState->MicListenJobData.SocketManager = Context.SocketManager;
BLState->MicListenJobData.MicPacketBuffer = &BLState->MicPacketBuffer;
BLState->MicListenJobData.ListenSocket = CreateSocket(Context.SocketManager, "127.0.0.1", "20185");
BLState->MicListenThread = CreateThread(Context.ThreadManager, BlumenLumen_MicListenJob, (u8*)&BLState->MicListenJobData);
#if 1

View File

@ -24,12 +24,20 @@ struct microphone_packet
};
#pragma pack(pop)
struct mic_listen_job_data
{
platform_socket_manager* SocketManager;
packet_ringbuffer* MicPacketBuffer;
platform_socket_handle_ ListenSocket;
};
struct blumen_lumen_state
{
packet_ringbuffer MicPacketBuffer;
temp_job_req JobReq;
platform_thread_handle MicListenThread;
mic_listen_job_data MicListenJobData;
};

View File

@ -207,6 +207,7 @@ struct context
cleanup_application* CleanupApplication;
platform_thread_manager* ThreadManager;
platform_socket_manager* SocketManager;
// Platform Services
gs_work_queue* GeneralWorkQueue;

View File

@ -496,6 +496,9 @@ WinMain (
Context.ThreadManager = PushStruct(&PlatformPermanent, platform_thread_manager);
*Context.ThreadManager = CreatePlatformThreadManager(Win32CreateThread, Win32KillThread);
Context.SocketManager = PushStruct(&PlatformPermanent, platform_socket_manager);
*Context.SocketManager = CreatePlatformSocketManager(Win32CreateSocket, Win32CloseSocket, Win32SocketReceive);
win32_dll_refresh DLLRefresh = InitializeDLLHotReloading(DLLName, WorkingDLLName, DLLLockFileName);
if (!ReloadAndLinkDLL(&DLLRefresh, &Context, &Win32WorkQueue.WorkQueue, true)) { return -1; }
@ -509,10 +512,6 @@ WinMain (
addressed_data_buffer_list OutputData = AddressedDataBufferList_Create(ThreadContext);
temp_job_req* Req = Context.InitializeApplication(Context);
Req->Proc = BlumenLumen_MicListenJob;
Win32_TestCode_SocketReading(ThreadContext, Req);
Running = true;
Context.WindowIsVisible = true;
while (Running)

View File

@ -131,6 +131,101 @@ Win32Socket_ConnectToAddress(char* Address, char* DefaultPort)
return Result;
}
internal bool
Win32CreateSocket(platform_socket* Socket, char* Address, char* DefaultPort)
{
bool Result = false;
addrinfo Hints = {0};
Hints.ai_family = AF_UNSPEC;
Hints.ai_socktype = SOCK_STREAM;
Hints.ai_protocol = IPPROTO_TCP;
addrinfo* PotentialConnections;
s32 Error = getaddrinfo(Address, DefaultPort, &Hints, &PotentialConnections);
if (Error == 0)
{
for (addrinfo* InfoAt = PotentialConnections; InfoAt != NULL; InfoAt = InfoAt->ai_next)
{
SOCKET SocketHandle = socket(InfoAt->ai_family, InfoAt->ai_socktype, InfoAt->ai_protocol);
if (SocketHandle == INVALID_SOCKET)
{
Error = WSAGetLastError();
InvalidCodePath;
}
Error = connect(SocketHandle, InfoAt->ai_addr, (int)InfoAt->ai_addrlen);
if (Error == SOCKET_ERROR)
{
closesocket(SocketHandle);
continue;
}
else
{
Socket->PlatformHandle = (u8*)Win32Alloc(sizeof(SOCKET), 0);
*(SOCKET*)Socket->PlatformHandle = SocketHandle;
Result = true;
break;
}
}
}
else
{
Error = WSAGetLastError();
InvalidCodePath;
}
if (!Result)
{
Assert(Socket->PlatformHandle == 0);
}
freeaddrinfo(PotentialConnections);
return Result;
}
internal bool
Win32CloseSocket(platform_socket* Socket)
{
SOCKET* Win32Sock = (SOCKET*)Socket->PlatformHandle;
closesocket(*Win32Sock);
Win32Free((u8*)Socket->PlatformHandle, sizeof(SOCKET));
*Socket = {};
return true;
}
internal gs_data
Win32SocketReceive(platform_socket* Socket, gs_memory_arena* Storage)
{
// TODO(pjs): Test this first code path when you have data running - it should
// get the actual size of the data packet being sent
#if 0
gs_data Result = {};
s32 BytesQueued = Win32Socket_PeekGetTotalSize(Socket);
if (BytesQueued > 0)
{
Result = PushSizeToData(Storage, BytesQueued);
s32 Flags = 0;
s32 BytesReceived = recv(Socket->Socket, (char*)Result.Memory, Result.Size, Flags);
Assert(BytesReceived == BytesQueued);
}
return Result;
#else
gs_data Result = PushSizeToData(Storage, 1024);
s32 Flags = 0;
SOCKET* Win32Sock = (SOCKET*)Socket->PlatformHandle;
s32 BytesReceived = recv(*Win32Sock, (char*)Result.Memory, Result.Size, Flags);
if (BytesReceived == SOCKET_ERROR)
{
// TODO(pjs): Error logging
s32 Error = WSAGetLastError();
InvalidCodePath;
}
Result.Size = BytesReceived;
return Result;
#endif
}
internal s32
Win32Socket_SetOption(win32_socket* Socket, s32 Level, s32 Option, const char* OptionValue, s32 OptionLength)
{

View File

@ -3345,6 +3345,101 @@ KillThread(platform_thread_manager* Manager, platform_thread_handle Handle)
}
//////////////////////////
//
// Socket Manager
CREATE_SOCKET(PlatformCreateSocket_Stub)
{
return false;
}
CLOSE_SOCKET(PlatformCloseSocket_Stub)
{
return false;
}
SOCKET_RECEIVE(PlatformSocketRecieve_Stub)
{
return {};
}
internal platform_socket_manager
CreatePlatformSocketManager(platform_create_socket* CreateSocketProc,
platform_close_socket* CloseSocketProc,
platform_socket_receive* SocketRecieveProc)
{
platform_socket_manager Result = {};
Result.CreateSocketProc = CreateSocketProc;
Result.CloseSocketProc = CloseSocketProc;
Result.SocketRecieveProc = SocketRecieveProc;
if (!CreateSocketProc)
{
Result.CreateSocketProc = PlatformCreateSocket_Stub;
}
if (!CloseSocketProc)
{
Result.CloseSocketProc = PlatformCloseSocket_Stub;
}
if (!SocketRecieveProc)
{
Result.SocketRecieveProc = PlatformSocketRecieve_Stub;
}
return Result;
}
internal platform_socket_handle_
CreateSocket(platform_socket_manager* Manager, char* Addr, char* Port)
{
platform_socket_handle_ Result = {};
for (u32 i = 1; i < SOCKETS_COUNT_MAX; i++)
{
if (!Manager->SocketsUsed[i])
{
Result.Index = i;
Manager->SocketsUsed[i] = true;
break;
}
}
Assert(Result.Index != 0);
platform_socket* Socket = &Manager->Sockets[Result.Index];
Manager->CreateSocketProc(Socket, Addr, Port);
return Result;
}
internal bool
CloseSocket(platform_socket_manager* Manager, platform_socket_handle_ Handle)
{
bool Result = false;
platform_socket* Socket = &Manager->Sockets[Handle.Index];
if (Manager->CloseSocketProc(Socket))
{
Manager->SocketsUsed[Handle.Index] = false;
*Socket = {};
Result = true;
}
return Result;
}
internal gs_data
SocketRecieve(platform_socket_manager* Manager, platform_socket_handle_ SocketHandle, gs_memory_arena* Storage)
{
gs_data Result = {};
if (Manager->SocketsUsed[SocketHandle.Index])
{
platform_socket* Socket = &Manager->Sockets[SocketHandle.Index];
if (Socket->PlatformHandle != 0)
{
Result = Manager->SocketRecieveProc(Socket, Storage);
}
}
return Result;
}
///////////////////////////
//
// Hashes

View File

@ -1086,5 +1086,36 @@ struct gs_work_queue
complete_queue_work* CompleteQueueWork;
};
// Sockets
typedef struct platform_socket_handle_
{
u32 Index;
} platform_socket_handle_;
typedef struct platform_socket
{
u8* PlatformHandle;
} platform_socket;
#define CREATE_SOCKET(name) bool name(platform_socket* Socket, char* Addr, char* DefaultPort)
typedef CREATE_SOCKET(platform_create_socket);
#define CLOSE_SOCKET(name) bool name(platform_socket* Socket)
typedef CLOSE_SOCKET(platform_close_socket);
#define SOCKET_RECEIVE(name) gs_data name(platform_socket* Socket, gs_memory_arena* Storage)
typedef SOCKET_RECEIVE(platform_socket_receive);
#define SOCKETS_COUNT_MAX 32
typedef struct platform_socket_manager
{
b8 SocketsUsed[SOCKETS_COUNT_MAX];
platform_socket Sockets[SOCKETS_COUNT_MAX];
platform_create_socket* CreateSocketProc;
platform_close_socket* CloseSocketProc;
platform_socket_receive* SocketRecieveProc;
} platform_socket_manager;
#define GS_TYPES_H
#endif // GS_TYPES_H