On 05.05.2016 18:10, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
> <snip/>
While it looks ok-ish in the context of the RecvAll function,
because you have skipFunc + recvFunc both being invoked
asynchronously, this design feels a quite odd when used
in combination with the plain virStreamRecv().
Just for a minute lets consider
st = virStreamNew(conn, 0);
virStreamRegisterSkip(st, skipFunc, &fd);
virStorageVolDownload(st, ...);
while (1) {
char buf[4096];
virStreamRecv(st, buf, sizeof(buf));
..do something with buf..
}
I think it is quite unpleasant to have the skipFunc be called
out of band to the code dealing with the Recv.
I think it is preferrable that we have an explicit synchronous
API for consuming the hole.
The same really applies from POV of the upload side - I think
we should be able to synchronously push the hole in, rather
than having the asynchronous InData callback.
IIUC, the reason you've chosen this async callback approach
is because the virStream{Send,Recv,SendAll,RecvAll} methods
are not extensible to cope with holes.
I'm thinking though that we'd get a more attractive public API
by creating hole friendly versions of virStream{Send,Recv,etc}
instead of going the extra callback route.
The download side could we made to work if we had a flag for
virStreamRecv that instructed it to only read data up to the
next hole, and defined an explicit error code to use to indicate
that we've hit a hole.
This would be used thus:
while (1) {
char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
if (ret == -1) {
virErrorPtr err = virGetLastError();
if (err.code == VIR_ERR_STREAM_HOLE) {
ret = virStreamHoleSize(st, buf);
Could this be made even more friendlier by returning a special value
(say -2) in case of a hole?
...seek ret bytes in target...
} else {
return -1;
}
} else {
...write buf to target...
}
}
Okay, so imagine we have STREAM_SKIP packet incoming what should happen
if it is processed by bare virStreamRead()? IOW user requests sparse
streams but sticks to calling old virStreamRecv().
One thing that I'm worried here about is that @flags of
virStreamRecvFlags() would not be transferred to the sender side as one
might expect.
To make this work with virStreamRecvAll, we would need to add
a virStreamSparseRecvAll() which had two callbacks eg
typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len):
virStreamSparseRecvAll(virStreamPtr stream,
virStreamSinkFunc handler,
virStreamSinkHoleFunc holeHandler,
void *opaque) {
while (1) {
char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE);
if (ret == -1) {
virErrorPtr err = virGetLastError();
if (err.code == VIR_ERR_STREAM_HOLE) {
ret = virStreamHoleSize(st, buf);
holeHandler(st, ret);
} else {
return -1;
}
} else {
handler(st, buf, ret);
}
}
}
Now considering the upload side of things, the virStreamSend
function doesn't actually need cahnges, though it could do
with a flags parameter for best practice. We just need the
virStreamSkip() function you already add.
while (1) {
char buf[4096];
if (..in hole...) {
..get hole size...
virStreamSkip(st, len);
} else {
...read N bytes...
virStreamSend(st, buf, len);
}
}
The SendAll method is again more complicated as the callback we pass
into it is insufficient to figure out if we have holes. We could
add a virStreamSparseSendAll() which had two callbacks again:
typedef int (*virStreamSourceHoleFunc)(holeHandler);
This returns the length of the current hole, or 0 if not at
a hole.
virStreamSparseSendAll(virStreamPtr stream,
virStreamSourceFunc handler,
virStreamSourceHoleFunc holeHandler,
void *opaque)
while (1) {
char buf[4096];
int ret = holeHandler(st);
if (ret > 0) {
virStreamSkip(st, ret);
} else {
ret = handler(st, buf, sizeof(buf);
virStreamSend(st, buf, ret);
}
}
}
So we would avoid the virStreamInData and virStreamRegisterInData
and virStreamRegisterSkip and virStreamSkipCallback methods
Okay. Makes sense. Let me see if I could reuse my RPC implementation,
because that had been the hardest part to implement of them all.
Michal