Catching DBEvents
Note
Basic knowledge of asyncio is assumed for this example.
DBEvent notifications are sent to the OpenAPI when it is actively
interacting with the DBMS.
The dbevent_catcher() coroutine shown below retrieves
the notifications.
DBEvents can have an optional message attached.
Refer to Handling VARCHAR and VARBYTE Data to understand how the buffer
for the optional event message is allocated and to see how
a varchar() function can be defined to assist.
For the sake of illustrating a reaction to a notification
dbevent_catcher() places the details of the DBEvent
in an asyncio.Queue(), to be conveniently
available to the main application.
IIapi_catchEvent() is not awaitable.
dbevent_catcher() uses a busy-wait loop to check for notifications.
A global asyncio.Event() called stop is used to terminate
the loop.
To ensure prompt
reaction to events during idle periods when the application is
not actively interacting with the DBMS the dbevent_getter()
coroutine uses a timer to periodically request delivery of notifications.
These coroutines would be run as concurrent tasks together
with the main application task.
When run that way they could disrupt an
OpenAPI Protocol initiated by the main
task unless their operation is coordinated with the main task.
All the tasks need to take and release a global asyncio.Lock()
to ensure safe shared use of the DBMS session. In this example the lock
is called session_mutex.
1import asyncio
2import ctypes
3import struct
4import pyngres.asyncio as py
5...
6
7DBEVENT_POLLING_INTERVAL = 0.1 # seconds
8
9connHandle = None
10tranHandle = None
11...
12
13# instantiate the required asyncio controls
14run_flag = asyncio.Event()
15session_mutex = asyncio.Lock()
16event_queue = asyncio.Queue()
17...
18
19async def dbevent_catcher():
20 '''task to catch DBEvents'''
21
22 # a DBEvent can include an optional message; prepare to buffer it
23 gdp = py.IIAPI_GETDESCRPARM()
24 event_msg = ctypes.c_buffer(258)
25 event_value = py.IIAPI_DATAVALUE()
26 event_value.dv_length = len(event_msg)
27 event_value.dv_value = ctypes.addressof(event_msg)
28 eventData = (py.IIAPI_DATAVALUE * 1)()
29 eventData[0] = event_value
30 gcp = py.IIAPI_GETCOLPARM()
31 gcp.gc_rowCount = 1
32 gcp.gc_columnCount = 1
33 gcp.gc_columnData = eventData
34
35 # get a DBEvent notification eventHandle
36 cep = py.IIAPI_CATCHEVENTPARM()
37 cep.ce_connHandle = connHandle
38 async with session_mutex:
39 py.IIapi_catchEvent(cep)
40 # the eventHandle is returned immediately
41 eventHandle = cep.ce_eventHandle
42
43 # busy-wait loop to check for notification
44 while run_flag.is_set():
45 if cep.ce_genParm.gp_completed:
46 # capture the DBEvent description and queue it
47 event = dict()
48 event['eventDB'] = cep.ce_eventDB[:]
49 event['eventName'] = cep.ce_eventName[:]
50 event['eventOwner'] = cep.ce_eventOwner[:]
51 date = cep.ce_eventTime.dv_value
52 event['eventTime'] = ingresdate_to_str(date)
53 if cep.ce_eventInfoAvail:
54 # fetch the event message
55 async with session_mutex:
56 gdp.gd_stmtHandle = eventHandle
57 await py.IIapi_getDescriptor(gdp)
58 gcp.gc_stmtHandle = eventHandle
59 await py.IIapi_getColumns(gcp)
60 # the message is returned as a VARCHAR so unpack it
61 msg_length,msg_bytes= struct.unpack('h256s',event_msg)
62 msg = msg_bytes[:msg_length].decode()
63 event['eventText'] = msg
64 else:
65 event['eventText'] = None
66 # put the DBEvent notification in the global queue
67 await event_queue.put(event)
68
69 # wait for the next notification, reusing the event handle
70 cep = py.IIAPI_CATCHEVENTPARM()
71 cep.ce_connHandle = connHandle
72 cep.ce_eventHandle = eventHandle
73 async with session_mutex:
74 py.IIapi_catchEvent(cep)
75 # surrender control back to asyncio
76 await asyncio.sleep(0)
77 else:
78 # no event yet; sleep through a polling interval
79 await asyncio.sleep(DBEVENT_POLLING_INTERVAL)
80
81async def dbevent_getter():
82 '''task to solicit DBEvent notifications'''
83
84 # the OpenAPI delivers DBEvents while executing queries. DBEvents
85 # won't be delivered while the application is idle or not using
86 # the DBMS session. Running dbevent_getter() ensures a DBEvent is
87 # delivered within the polling interval even when the DBMS session
88 # is idle
89
90 gvp = py.IIAPI_GETEVENTPARM()
91 gvp.gv_connHandle = connHandle
92 gvp.gv_timeout = 0
93 while run_flag.is_set():
94 async with session_mutex:
95 await py.IIapi_getEvent(gvp)
96 # repeat after half a polling interval
97 await asyncio.sleep(DBEVENT_POLLING_INTERVAL * 0.5)
98...