Catching DBEvents

Note

Basic knowledge of asyncio is assumed for this example.

DBEvent notifications are sent to the OpenAPI when it is actively interacting with the DBMS. The dbevent_catcher() coroutine shown below retrieves the notifications.

DBEvents can have an optional message attached. Refer to Handling VARCHAR and VARBYTE Data to understand how the buffer for the optional event message is allocated and to see how a varchar() function can be defined to assist.

For the sake of illustrating a reaction to a notification dbevent_catcher() places the details of the DBEvent in an asyncio.Queue(), to be conveniently available to the main application.

IIapi_catchEvent() is not awaitable. dbevent_catcher() uses a busy-wait loop to check for notifications. A global asyncio.Event() called stop is used to terminate the loop.

To ensure prompt reaction to events during idle periods when the application is not actively interacting with the DBMS the dbevent_getter() coroutine uses a timer to periodically request delivery of notifications.

These coroutines would be run as concurrent tasks together with the main application task. When run that way they could disrupt an OpenAPI Protocol initiated by the main task unless their operation is coordinated with the main task. All the tasks need to take and release a global asyncio.Lock() to ensure safe shared use of the DBMS session. In this example the lock is called session_mutex.

Download

 1import asyncio
 2import ctypes
 3import struct
 4import pyngres.asyncio as py
 5...
 6
 7DBEVENT_POLLING_INTERVAL = 0.1  # seconds
 8
 9connHandle = None
10tranHandle = None
11...
12
13# instantiate the required asyncio controls
14run_flag = asyncio.Event()
15session_mutex = asyncio.Lock()
16event_queue = asyncio.Queue()
17...
18
19async def dbevent_catcher():
20    '''task to catch DBEvents'''
21
22    # a DBEvent can include an optional message; prepare to buffer it
23    gdp = py.IIAPI_GETDESCRPARM()
24    event_msg = ctypes.c_buffer(258)
25    event_value = py.IIAPI_DATAVALUE()
26    event_value.dv_length = len(event_msg)
27    event_value.dv_value = ctypes.addressof(event_msg)
28    eventData = (py.IIAPI_DATAVALUE * 1)()
29    eventData[0] = event_value
30    gcp = py.IIAPI_GETCOLPARM()
31    gcp.gc_rowCount = 1
32    gcp.gc_columnCount = 1
33    gcp.gc_columnData = eventData        
34
35    # get a DBEvent notification eventHandle
36    cep = py.IIAPI_CATCHEVENTPARM()
37    cep.ce_connHandle = connHandle
38    async with session_mutex:
39        py.IIapi_catchEvent(cep)
40        # the eventHandle is returned immediately
41        eventHandle = cep.ce_eventHandle
42
43    # busy-wait loop to check for notification
44    while run_flag.is_set():
45        if cep.ce_genParm.gp_completed:
46            # capture the DBEvent description and queue it
47            event = dict()
48            event['eventDB'] = cep.ce_eventDB[:] 
49            event['eventName'] = cep.ce_eventName[:]
50            event['eventOwner'] = cep.ce_eventOwner[:]
51            date = cep.ce_eventTime.dv_value
52            event['eventTime'] = ingresdate_to_str(date)
53            if cep.ce_eventInfoAvail:
54                # fetch the event message
55                async with session_mutex:
56                    gdp.gd_stmtHandle = eventHandle
57                    await py.IIapi_getDescriptor(gdp)
58                    gcp.gc_stmtHandle = eventHandle                        
59                    await py.IIapi_getColumns(gcp)
60                    # the message is returned as a VARCHAR so unpack it
61                    msg_length,msg_bytes= struct.unpack('h256s',event_msg)
62                    msg = msg_bytes[:msg_length].decode()
63                    event['eventText'] = msg
64            else:
65                event['eventText'] = None
66            # put the DBEvent notification in the global queue
67            await event_queue.put(event)
68
69            # wait for the next notification, reusing the event handle
70            cep = py.IIAPI_CATCHEVENTPARM()
71            cep.ce_connHandle = connHandle
72            cep.ce_eventHandle = eventHandle
73            async with session_mutex:
74                py.IIapi_catchEvent(cep)
75            # surrender control back to asyncio
76            await asyncio.sleep(0)
77        else:
78            # no event yet; sleep through a polling interval 
79            await asyncio.sleep(DBEVENT_POLLING_INTERVAL)
80
81async def dbevent_getter():
82    '''task to solicit DBEvent notifications'''
83
84    # the OpenAPI delivers DBEvents while executing queries. DBEvents
85    # won't be delivered while the application is idle or not using
86    # the DBMS session. Running dbevent_getter() ensures a DBEvent is
87    # delivered within the polling interval even when the DBMS session 
88    # is idle
89
90    gvp = py.IIAPI_GETEVENTPARM()
91    gvp.gv_connHandle = connHandle
92    gvp.gv_timeout = 0
93    while run_flag.is_set():
94        async with session_mutex:
95            await py.IIapi_getEvent(gvp)
96        # repeat after half a polling interval
97        await asyncio.sleep(DBEVENT_POLLING_INTERVAL * 0.5)
98...

Download